Source code for ska_telmodel._common

import importlib
import inspect
import re
from typing import Any, Callable, Optional, Tuple

import schema

#: MID SKA, 001-133.
MID_SKA = schema.Regex(r"^SKA((?!000)0[0-9][0-9]|1[0-2][0-9]|13[0-3])$")
#: MID Meerkat, 000-063.
MID_MKT = schema.Regex(r"^MKT0([0-5][0-9]|6[0-3])$")


[docs] class TMSchema(schema.Schema): """Wrapper on top of schema.Schema for incremental schema build-up.""" def __init__( self, schema: Any = None, error=None, ignore_extra_keys: bool = False, name: str = None, description: str = None, as_reference: bool = False, version: str = None, strict: bool = False, ): """ :param schema: Schema data (can be dictionary, list, value, see `schema`) :param error: Error message to show (see `schema`) :param ignore_extra_keys: Allows extra keys in non-strict modes :param name: Name to use in error messages :param description: Description to show in documentation :param as_reference: Generate separate sub-schema in JSON + documentation? :param version: Version of the schema :param strict: Strict mode? """ self._version = version self._strict = strict self._raw_name = name if schema is None: schema = {} if not strict: ignore_extra_keys = True if version is not None: version_num = version.rsplit("/", 1)[1] name = name.replace("/", "_") + f" {version_num}" if as_reference: # Strictly speaking blanks and hyphens don't work either # (jsonschema refuses them), but they are used so often # that we just replace them. name = name.replace(" ", "_").replace("-", "_").replace(",", "_") if not re.fullmatch(r"[a-zA-Z][a-zA-Z0-9_\. ]*", name): raise ValueError( f"Schema name is not a valid identifier: {name}" ) super(TMSchema, self).__init__( schema=schema, error=error, ignore_extra_keys=ignore_extra_keys, name=name, description=description, as_reference=as_reference, ) @classmethod def new(cls, name: str, version: str, strict: bool, **kwargs): return TMSchema(name=name, version=version, strict=strict, **kwargs) @property def raw_name(self): return self._raw_name @property def version(self): return self._version @property def strict(self): return self._strict def add_field( self, name: str, check: Any, check_strict: Any = None, description: str = None, optional: bool = False, default: Any = None, ): # Description + optional get indicated on the name if description is not None: name = schema.Literal( name, description=inspect.cleandoc(description) ) if optional: name = schema.Optional(name, default=default) # And together basic type check and strict checks if check_strict is not None and self._strict: check = schema.And(check, check_strict) # Add to schema self._schema[name] = check def add_opt_field( self, name: str, check: Any, check_strict: Any = None, description: str = None, default: Any = None, ): return self.add_field( name, check, check_strict, description, True, default ) def update(self, dct): self._schema.update( dct._schema if isinstance(dct, TMSchema) else dict(dct) ) def _unwrap_key(self, key: Any): if isinstance(key, schema.Literal): key = key.schema if isinstance(key, schema.Optional): key = key.schema if isinstance(key, schema.Literal): key = key.schema return key def __contains__(self, name: str): if name in self._schema: return True for key, _ in self._schema.items(): if self._unwrap_key(key) == name: return True return False def __getitem__(self, name: str): if name in self._schema: return self._schema[name] keys = [] for key, item in self._schema.items(): key = self._unwrap_key(key) if key == name: return item keys.append(key) raise IndexError( f"Key {name} was not found in schema! " f'Options: {",".join(repr(key) for key in keys)}' ) def __delitem__(self, name: str): if name in self._schema: del self._schema[name] return keys = [] for full_key, _ in self._schema.items(): key = self._unwrap_key(full_key) if key == name: del self._schema[full_key] return keys.append(key) raise IndexError( f"Key {name} was not found in schema! " f'Options: {",".join(repr(key) for key in keys)}' )
[docs] def is_field_optional(self, name: str) -> Optional[bool]: """ Checks whether the field with the given name is optional. Returns `None` if the field does not exist :param name: Name of the field :returns: bool """ for key in self._schema.keys(): optional = False if isinstance(key, schema.Optional): key = key.key optional = True if isinstance(key, schema.Literal): key = key.schema if key == name: return optional return None
[docs] def find_field_recursive(self, name: str) -> Optional["TMSchema"]: """ Recursively finds a field of the given name in the schema If the key exists multiple times, an arbitrary item will get returned. Note that to be returned by this function, the field must be in a `TMSchema` - if the schema is specified as a dictionary, it won't be found. :param name: Name of the field to look for :returns: A schema containing the given key """ # Does exist? if name in self: return self # Recurse for _, item in self._schema.items(): # Nested schema? Check values if isinstance(item, TMSchema): parent = item.find_field_recursive(name) if parent is not None: return parent # List? Check values elif isinstance(item, list): for obj in item: if isinstance(obj, TMSchema): parent = obj.find_field_recursive(name) if parent is not None: return parent # Dictionary? Check values elif isinstance(item, dict): for obj in item.values(): if isinstance(obj, TMSchema): parent = obj.find_field_recursive(name) if parent is not None: return parent return None
[docs] def stringify_keys_recursive(self) -> "TMSchema": """Returns a schema where all keys are strings. For :py:mod:`schema` it is valid to have a type as key in the schema (e.g. ``str``), which stands for an arbitrary key. However, this will not work when generating documentation, so it needs to be replaced by a string in those cases. :returns: Schema with keys renamed as appropriate """ def stringify_key(key): # Unwrap Literal & Optional if isinstance(key, schema.Literal): return schema.Literal( stringify_key(key.schema), description=key.description ) if isinstance(key, schema.Optional): return schema.Optional(stringify_key(key.schema)) # String? if isinstance(key, str): return key elif isinstance(key, schema.Regex): return f"(string matching {key})" elif isinstance(key, type): return f"(any {key.__name__})" else: raise ValueError(f"Unexpected key encountered: {key}") def maybe_recurse(v: Any): if isinstance(v, TMSchema): return v.stringify_keys_recursive() if isinstance(v, dict): return { stringify_key(k): maybe_recurse(_v) for k, _v in v.items() } if isinstance(v, schema.Or): new_args = [maybe_recurse(_v) for _v in v.args] return schema.Or( *new_args, ignore_extra_keys=v._ignore_extra_keys ) return v return TMSchema( maybe_recurse(self._schema), error=self._error, ignore_extra_keys=self.ignore_extra_keys, name=self.raw_name, description=self.description, as_reference=self.as_reference, version=self.version, strict=self.strict, )
[docs] def mk_if(cond: bool) -> Callable[[Any], Any]: """Generate schema combinator to conditionally activate a part.""" return (lambda x: x) if cond else (lambda x: schema.And())
def get_channel_map_schema( elem_type: Any, version: int, strict: bool ) -> schema.Schema: elem_schema = schema.Schema(elem_type) def valid_channel_map_entry(entry): if strict and any([not elem_schema.is_valid(e) for e in entry[1:]]): return False return isinstance(entry[0], int) return [valid_channel_map_entry]
[docs] def get_unique_id_schema( strict: bool, type_re: str = r"[a-z0-9]+" ) -> schema.Schema: """Return schema for unique identifier. :param type_re: Restricts ID type(s) to accept. """ if strict: return schema.Regex( r"^" + type_re + r"\-[a-z0-9]+\-[0-9]{8}\-[a-z0-9]+$" ) else: return str
[docs] def interface_uri(prefix: str, *versions: int) -> str: """Make an URI from the given prefix and versions :param prefix: Schema URI prefix. Must end in '/' :param versions: Components of the version """ assert prefix[-1] == "/" return f"{prefix}{'.'.join(str(v) for v in versions)}"
[docs] def split_interface_version(version: str) -> Tuple[int, int]: """Extracts version number from interface URI :param version: Version string :returns: (major version, minor version) tuple """ # get the string with the interface semantic version (X.Y) version_num = version.rsplit("/", 1)[1] (major_version, minor_version) = version_num.split(".") return int(major_version), int(minor_version)
def lookup_module(module, version: str): # Extract version major, minor = split_interface_version(version) # Attempt to import submodule return importlib.import_module(f"{module.__name__}.v{major}_{minor}") def lookup_schema(module, version: str, strict: int, schema_fn_name, **kwargs): # Get the module try: mod = lookup_module(module, version) except ModuleNotFoundError: raise schema.SchemaError( f"No matching schema version {version} in {module.__name__}!" ) # Find function to call try: fn = getattr(mod, schema_fn_name) except AttributeError: raise schema.SchemaError( f"Found no {schema_fn_name} to call for schema version {version} " f"in {module.__name__}!" ) # Invoke return fn(version, strict, **kwargs) def lookup_example(module, version: str, example_fn_name, *args, **kwargs): # Get the module try: mod = lookup_module(module, version) except ModuleNotFoundError: raise ValueError( f"No matching example version {version} in {module.__name__}!" ) # Find function to call try: fn = getattr(mod, example_fn_name) except AttributeError: raise ValueError( f"Found no {example_fn_name} to call for example version " f"{version} in {module.__name__}!" ) # Invoke return fn(version, *args, **kwargs)