import json
import logging
import os
import re
import shutil
import typing
import yaml
try:
from yaml import CDumper as Dumper
from yaml import CLoader as Loader
except ImportError: # pragma: no cover
from yaml import Dumper, Loader # noqa: F401
try:
import tomllib
except ImportError: # pragma: no cover
import tomli as tomllib
import pathlib
import urllib.parse
from collections.abc import Iterable
from toolz import itertoolz
from ska_telmodel.constants import NEW_PREFIX
from ska_telmodel.data.large_files import (
is_large_file_cached,
large_file_search,
)
from .alias import AliasResolver
from .backend import TELMODEL_BACKENDS, TMDataBackend
from .large_files import large_file_download
from .sources import DEFAULT_SOURCES
LOGGING = logging.getLogger(__name__)
[docs]
class TMData(object):
"""Represents a tree of telescope model data.
Data is retrieved from specified ``sources`` (or using default
sources if not passed). Depending on backend, this might cause
data to be loaded from remote locations, such as the SKAO central
artefact repository or Gitlab.
Objects of this class provide a hierarchical
``dict``/``h5py``-like interface. For instance, you can print all
objects with keys starting with ``instrument/layout`` as follows::
layouts = tmdata['instrument/layout']
for key in layouts:
print(f"Data for {key}: ", layouts[key].get())
This works because :py:meth:`__getitem__` will redirect to
:py:meth:`get_subtree` or :py:meth:`get` depending on whether
a valid key is passed (i.e. it has an extension).
The :py:class:`TMObject` object can then be used to access
the underlying telescope model data.
:param source_uris: List of telescope model data sources.
If not passed, defaults to ``SKA_TELMODEL_SOURCES``
enviroment variable, then in-built :py:const:`DEFAULT_SOURCES`.
:param prefix: Key prefix for sub-tree selection
:param update: Update cached data sources (if any)
:param backend_pars: Extra parameters to specific backend (types)
:param yield_hidden: If set, ``__iter__`` will yield hidden files
"""
ALIASES_FILENAME = ".tmaliases"
def __init__(
self,
source_uris: list[str] = None,
prefix: str = "",
update: bool = False,
backend_pars: dict = {},
yield_hidden: bool = False,
):
if prefix != "" and not TMDataBackend.valid_prefix(prefix):
raise ValueError(f"Invalid telescope model data prefix: {prefix}")
self._prefix = prefix
# Constructing from another Data object?
if isinstance(source_uris, TMData):
data = source_uris
self._source_uris = data._source_uris
self._sources = data._sources
else:
# Default to
if not source_uris:
source_uris = os.getenv("SKA_TELMODEL_SOURCES")
if source_uris:
source_uris = source_uris.split(",")
else:
source_uris = DEFAULT_SOURCES
# Otherwise construct
self._source_uris = source_uris
self._sources = []
for uri in source_uris:
# Extract backend type
parsed = urllib.parse.urlparse(uri)
# Instantiate
backend_cls = TELMODEL_BACKENDS[parsed.scheme or "file"]
backend = backend_cls(
uri, update, **backend_pars.get(parsed.scheme, {})
)
self._sources.append(backend)
self.yield_hidden = yield_hidden
# Construct key alias resolvers
self.aliases = list()
for s in self._sources:
aliases_dictionary = dict()
if s.exists(self.ALIASES_FILENAME):
with s.open(self.ALIASES_FILENAME) as f:
aliases_dictionary = json.load(f)
self.aliases.append(AliasResolver())
for k, v in aliases_dictionary.items():
self.aliases[-1][k] = v
def __iter__(self) -> Iterable[str]:
# Append '/' to string to remove
full_prefix = self._prefix
if full_prefix:
full_prefix += "/"
last_key = None
for key in itertoolz.merge_sorted(
*(source.list_keys(self._prefix) for source in self._sources)
):
assert key.startswith(full_prefix)
if not self.yield_hidden and TMDataBackend.is_hidden_file(key):
continue
# De-duplicate
if last_key is None or key != last_key:
last_key = key
if key.endswith(".link"):
key = key[:-5]
yield key[len(full_prefix) :]
[docs]
def get_sources(self, pinned: bool = False) -> list[str]:
"""Returns list of source URIs
:param pinned: Attempt to return URIs that will continue
to refer to this specific version of telescope model data.
E.g. for GitLab URIs, this replaces tags or branches by
the concrete commit hash.
:returns: list of sources
"""
return [src.get_uri(pinned) for src in self._sources]
[docs]
def get(self, key: str) -> "TMObject":
"""
Returns the telescope model object with the given key
:param key: Key to retrieve. Must be a valid telescope
model key (i.e. have a file type extension)
:returns: :py:class:`TMObject` object
:raises: ``KeyError`` if object doesn't exist
"""
# Compose path, check that it is valid
if key.endswith(".link"):
large_file = True
key = key[:-5]
else:
large_file = False
if self._prefix:
full_path = self._prefix + "/" + key
else:
full_path = key
if not TMDataBackend.valid_key(full_path):
raise ValueError(f"Invalid telescope model data key: {full_path}")
# Find source
for source, alias in zip(
reversed(self._sources), reversed(self.aliases)
):
# Does it exist?
key = alias.resolve(full_path)
if source.exists(key):
return TMObject(source, key, large_file)
elif source.exists(key + ".link"):
return TMObject(source, key, True)
raise KeyError(f"No telescope model data with key {full_path} exists!")
[docs]
@classmethod
def get_schema_uri(cls, key: str) -> str | None:
"""Returns the schema URI corresponding to the input key.
:param key: Key to query
:returns: Corresponding schema URI from schema-path maps
"""
pattern = r"^(ska-(?:[a-z]+-)+[a-z]+)/v([1-9][0-9]*)/"
match = re.match(pattern, key)
if not match:
return None
name = match.group(1)
major_version = match.group(2)
minimum_version = f"{major_version}.0"
schema_uri = f"{NEW_PREFIX}{name}/{minimum_version}"
return schema_uri
[docs]
def get_subtree(self, prefix: str) -> "TMData":
"""
Returns clone of :py:class:`TMData` object with given prefix
Note that no checking is done whether any keys with
the given prefix exist.
:param prefix: Prefix to narrow scope to.
Must be a valid telescope model prefix
:returns: :py:class:`TMData` object using prefix
"""
# Compose path, check that it is valid
if self._prefix:
full_path = self._prefix + "/" + prefix
else:
full_path = prefix
if not TMDataBackend.valid_prefix(full_path):
raise ValueError(
f"Invalid telescope model data prefix: {full_path}"
)
# Make Data object for subtree. Note that we do *not* check
# whether the path exists.
return TMData(self, full_path)
def __getitem__(self, key_or_prefix: str):
if not key_or_prefix:
raise KeyError("Empty key/prefix not allowed!")
# A key?
if TMDataBackend.valid_key(key_or_prefix):
return self.get(key_or_prefix)
# Otherwise assume we are constructing a subtree
return self.get_subtree(key_or_prefix)
def __contains__(self, key: str):
"""
Check whether a certain key exists in any source.
:param key: Key to check for
"""
if self._prefix:
full_path = self._prefix + "/" + key
else:
full_path = key
for source in self._sources:
if source.exists(full_path) or source.exists(full_path + ".link"):
return True
return False
[docs]
class TMObject(object):
"""Represents a telescope model data object.
Provides a number of ways to access the data.
:param source: Backend to use to retrieve object data
:param key: Key associated with object
"""
def __init__(
self, source: TMDataBackend, key: str, large_file: bool = False
):
self._source = source
self._key = key
self._large_file = large_file
self._cached = None
self._size = None
@property
def source(self) -> TMDataBackend:
"""Get the Data Backend for this object"""
return self._source
@property
def is_large_file(self) -> bool:
return self._large_file
@property
def is_cached(self) -> bool:
if self._cached is not None:
return self._cached # pragma: no cover
if self._large_file:
toml = self.get_link_contents()
self._cached, _ = is_large_file_cached(toml["file_hash"])
else:
self._cached = True
return self._cached
@property
def size(self) -> int:
if self._size is not None:
return self._size # pragma: no cover
if self.is_large_file:
toml = self.get_link_contents()
is_local, path = is_large_file_cached(toml["file_hash"])
if is_local:
self._size = path.stat().st_size
else:
_, _, self._size = large_file_search(toml["file_hash"], "", "")
else:
meta = self._source.meta(self._key)
if meta is not None:
self._size = meta.get("size", None)
if self._size is None: # pragma: no cover
# This should never trigger, and is a fallback
LOGGING.info("file has no size, using fallback")
with self.open() as file:
file.seek(0, os.SEEK_END)
self._size = file.tell()
return self._size
[docs]
def get(self) -> bytes:
"""Access data at given key as raw bytes
:returns: Raw object data
"""
if self._large_file:
LOGGING.debug(
"Reading large file directly into memory, "
"consider using `.open` instead"
)
with self._get_from_link().open("rb") as file:
return file.read()
return self._source.get(self._key)
[docs]
def get_dict(self, **kwargs) -> dict:
"""Access object as a dictionary
Will only work if the key ends with a known extension --
e.g. ``.json`` or ``.yaml``.
:param kwargs: Extra parameters to ``[json/yaml].load``
:returns: Parsed dictionary
"""
# Determine type by extension
ext = pathlib.Path(self._key).suffix
if ext == ".json":
with self.open() as f:
return json.load(f, **kwargs)
elif ext == ".yaml":
with self.open() as f:
return yaml.load(f, Loader=Loader, **kwargs)
else:
raise ValueError(f"Cannot deserialise object with suffix {ext}!")
[docs]
def open(self) -> typing.IO[bytes]:
"""Access object data as a read-only file object
:param key: Key to query
:returns: File-like object
"""
if self._large_file:
return self._get_from_link().open("rb")
return self._source.open(self._key)
[docs]
def copy(self, dest: str):
"""Copy object data to a file.
:param dest: Path of destination file
"""
if self._large_file:
return shutil.copy(self._get_from_link(), dest)
return self._source.copy(self._key, dest)
def _get_from_link(self):
file_hash = self.get_link_contents()["file_hash"]
local_path = large_file_download(file_hash)
return local_path
[docs]
def get_link_contents(self):
if self._large_file is False: # pragma: no cover
return {}
with self._source.open(f"{self._key}.link") as file:
return tomllib.load(file)