Source code for ska_telmodel.data.frontend

import json
import logging
import os
import re
import shutil
import typing

import yaml

try:
    from yaml import CDumper as Dumper
    from yaml import CLoader as Loader
except ImportError:  # pragma: no cover
    from yaml import Dumper, Loader  # noqa: F401

try:
    import tomllib
except ImportError:  # pragma: no cover
    import tomli as tomllib

import pathlib
import urllib.parse
from collections.abc import Iterable

from toolz import itertoolz

from ska_telmodel.constants import NEW_PREFIX
from ska_telmodel.data.large_files import (
    is_large_file_cached,
    large_file_search,
)

from .alias import AliasResolver
from .backend import TELMODEL_BACKENDS, TMDataBackend
from .large_files import large_file_download
from .sources import DEFAULT_SOURCES

LOGGING = logging.getLogger(__name__)


[docs] class TMData(object): """Represents a tree of telescope model data. Data is retrieved from specified ``sources`` (or using default sources if not passed). Depending on backend, this might cause data to be loaded from remote locations, such as the SKAO central artefact repository or Gitlab. Objects of this class provide a hierarchical ``dict``/``h5py``-like interface. For instance, you can print all objects with keys starting with ``instrument/layout`` as follows:: layouts = tmdata['instrument/layout'] for key in layouts: print(f"Data for {key}: ", layouts[key].get()) This works because :py:meth:`__getitem__` will redirect to :py:meth:`get_subtree` or :py:meth:`get` depending on whether a valid key is passed (i.e. it has an extension). The :py:class:`TMObject` object can then be used to access the underlying telescope model data. :param source_uris: List of telescope model data sources. If not passed, defaults to ``SKA_TELMODEL_SOURCES`` enviroment variable, then in-built :py:const:`DEFAULT_SOURCES`. :param prefix: Key prefix for sub-tree selection :param update: Update cached data sources (if any) :param backend_pars: Extra parameters to specific backend (types) :param yield_hidden: If set, ``__iter__`` will yield hidden files """ ALIASES_FILENAME = ".tmaliases" def __init__( self, source_uris: list[str] = None, prefix: str = "", update: bool = False, backend_pars: dict = {}, yield_hidden: bool = False, ): if prefix != "" and not TMDataBackend.valid_prefix(prefix): raise ValueError(f"Invalid telescope model data prefix: {prefix}") self._prefix = prefix # Constructing from another Data object? if isinstance(source_uris, TMData): data = source_uris self._source_uris = data._source_uris self._sources = data._sources else: # Default to if not source_uris: source_uris = os.getenv("SKA_TELMODEL_SOURCES") if source_uris: source_uris = source_uris.split(",") else: source_uris = DEFAULT_SOURCES # Otherwise construct self._source_uris = source_uris self._sources = [] for uri in source_uris: # Extract backend type parsed = urllib.parse.urlparse(uri) # Instantiate backend_cls = TELMODEL_BACKENDS[parsed.scheme or "file"] backend = backend_cls( uri, update, **backend_pars.get(parsed.scheme, {}) ) self._sources.append(backend) self.yield_hidden = yield_hidden # Construct key alias resolvers self.aliases = list() for s in self._sources: aliases_dictionary = dict() if s.exists(self.ALIASES_FILENAME): with s.open(self.ALIASES_FILENAME) as f: aliases_dictionary = json.load(f) self.aliases.append(AliasResolver()) for k, v in aliases_dictionary.items(): self.aliases[-1][k] = v def __iter__(self) -> Iterable[str]: # Append '/' to string to remove full_prefix = self._prefix if full_prefix: full_prefix += "/" last_key = None for key in itertoolz.merge_sorted( *(source.list_keys(self._prefix) for source in self._sources) ): assert key.startswith(full_prefix) if not self.yield_hidden and TMDataBackend.is_hidden_file(key): continue # De-duplicate if last_key is None or key != last_key: last_key = key if key.endswith(".link"): key = key[:-5] yield key[len(full_prefix) :]
[docs] def get_sources(self, pinned: bool = False) -> list[str]: """Returns list of source URIs :param pinned: Attempt to return URIs that will continue to refer to this specific version of telescope model data. E.g. for GitLab URIs, this replaces tags or branches by the concrete commit hash. :returns: list of sources """ return [src.get_uri(pinned) for src in self._sources]
[docs] def get(self, key: str) -> "TMObject": """ Returns the telescope model object with the given key :param key: Key to retrieve. Must be a valid telescope model key (i.e. have a file type extension) :returns: :py:class:`TMObject` object :raises: ``KeyError`` if object doesn't exist """ # Compose path, check that it is valid if key.endswith(".link"): large_file = True key = key[:-5] else: large_file = False if self._prefix: full_path = self._prefix + "/" + key else: full_path = key if not TMDataBackend.valid_key(full_path): raise ValueError(f"Invalid telescope model data key: {full_path}") # Find source for source, alias in zip( reversed(self._sources), reversed(self.aliases) ): # Does it exist? key = alias.resolve(full_path) if source.exists(key): return TMObject(source, key, large_file) elif source.exists(key + ".link"): return TMObject(source, key, True) raise KeyError(f"No telescope model data with key {full_path} exists!")
[docs] @classmethod def get_schema_uri(cls, key: str) -> str | None: """Returns the schema URI corresponding to the input key. :param key: Key to query :returns: Corresponding schema URI from schema-path maps """ pattern = r"^(ska-(?:[a-z]+-)+[a-z]+)/v([1-9][0-9]*)/" match = re.match(pattern, key) if not match: return None name = match.group(1) major_version = match.group(2) minimum_version = f"{major_version}.0" schema_uri = f"{NEW_PREFIX}{name}/{minimum_version}" return schema_uri
[docs] def get_subtree(self, prefix: str) -> "TMData": """ Returns clone of :py:class:`TMData` object with given prefix Note that no checking is done whether any keys with the given prefix exist. :param prefix: Prefix to narrow scope to. Must be a valid telescope model prefix :returns: :py:class:`TMData` object using prefix """ # Compose path, check that it is valid if self._prefix: full_path = self._prefix + "/" + prefix else: full_path = prefix if not TMDataBackend.valid_prefix(full_path): raise ValueError( f"Invalid telescope model data prefix: {full_path}" ) # Make Data object for subtree. Note that we do *not* check # whether the path exists. return TMData(self, full_path)
def __getitem__(self, key_or_prefix: str): if not key_or_prefix: raise KeyError("Empty key/prefix not allowed!") # A key? if TMDataBackend.valid_key(key_or_prefix): return self.get(key_or_prefix) # Otherwise assume we are constructing a subtree return self.get_subtree(key_or_prefix) def __contains__(self, key: str): """ Check whether a certain key exists in any source. :param key: Key to check for """ if self._prefix: full_path = self._prefix + "/" + key else: full_path = key for source in self._sources: if source.exists(full_path) or source.exists(full_path + ".link"): return True return False
[docs] class TMObject(object): """Represents a telescope model data object. Provides a number of ways to access the data. :param source: Backend to use to retrieve object data :param key: Key associated with object """ def __init__( self, source: TMDataBackend, key: str, large_file: bool = False ): self._source = source self._key = key self._large_file = large_file self._cached = None self._size = None @property def source(self) -> TMDataBackend: """Get the Data Backend for this object""" return self._source @property def is_large_file(self) -> bool: return self._large_file @property def is_cached(self) -> bool: if self._cached is not None: return self._cached # pragma: no cover if self._large_file: toml = self.get_link_contents() self._cached, _ = is_large_file_cached(toml["file_hash"]) else: self._cached = True return self._cached @property def size(self) -> int: if self._size is not None: return self._size # pragma: no cover if self.is_large_file: toml = self.get_link_contents() is_local, path = is_large_file_cached(toml["file_hash"]) if is_local: self._size = path.stat().st_size else: _, _, self._size = large_file_search(toml["file_hash"], "", "") else: meta = self._source.meta(self._key) if meta is not None: self._size = meta.get("size", None) if self._size is None: # pragma: no cover # This should never trigger, and is a fallback LOGGING.info("file has no size, using fallback") with self.open() as file: file.seek(0, os.SEEK_END) self._size = file.tell() return self._size
[docs] def get(self) -> bytes: """Access data at given key as raw bytes :returns: Raw object data """ if self._large_file: LOGGING.debug( "Reading large file directly into memory, " "consider using `.open` instead" ) with self._get_from_link().open("rb") as file: return file.read() return self._source.get(self._key)
[docs] def get_dict(self, **kwargs) -> dict: """Access object as a dictionary Will only work if the key ends with a known extension -- e.g. ``.json`` or ``.yaml``. :param kwargs: Extra parameters to ``[json/yaml].load`` :returns: Parsed dictionary """ # Determine type by extension ext = pathlib.Path(self._key).suffix if ext == ".json": with self.open() as f: return json.load(f, **kwargs) elif ext == ".yaml": with self.open() as f: return yaml.load(f, Loader=Loader, **kwargs) else: raise ValueError(f"Cannot deserialise object with suffix {ext}!")
[docs] def open(self) -> typing.IO[bytes]: """Access object data as a read-only file object :param key: Key to query :returns: File-like object """ if self._large_file: return self._get_from_link().open("rb") return self._source.open(self._key)
[docs] def copy(self, dest: str): """Copy object data to a file. :param dest: Path of destination file """ if self._large_file: return shutil.copy(self._get_from_link(), dest) return self._source.copy(self._key, dest)
def _get_from_link(self): file_hash = self.get_link_contents()["file_hash"] local_path = large_file_download(file_hash) return local_path