Source code for ska_telmodel.schema

"""
Support for validating and generating examples for SKA telescope model
schemas.

"""
import logging
import os
import re
from typing import Callable
from typing import Optional as OptType
from typing import Tuple

import jsonschema
from schema import And, Literal, Optional, Or, Schema, SchemaError

from ska_telmodel.data.schema_cache import SchemaCache

from . import csp, lowcbf, mccs, midcbf, pss, pst, sdp, tmc
from ._common import TMSchema
from .schemas import layout

_LOGGER = logging.getLogger("ska_telmodel")
OLD_PREFIX = "https://schema.skatelescope.org/"
NEW_PREFIX = "https://schema.skao.int/"


DEFAULT_JSONSCHEMA_BASE_URIS = [
    "https://schema.skao.int",
    "https://ska-telescope.gitlab.io/ska-telmodel/json_schema",
]

JSONSCHEMA_BASE_URIS = os.getenv(
    "SKA_TELMODEL_JSONSCHEMA_BASE_URIS", DEFAULT_JSONSCHEMA_BASE_URIS
)


# Map interface prefixes to the function that can be used to generate
# the schema for them
URI_SCHEMA_MAP = {
    # delay model schemas
    layout.LAYOUT_PREFIX: layout.get_layout_schema,
    layout.RECEPTOR_PREFIX: layout.get_receptor_schema,
    layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_schema,
    layout.LOCATION_PREFIX: layout.get_location_schema,
    layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_schema,
    layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_schema,
    layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_schema,
    # CSP schemas
    csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_schema,
    csp.CSP_CONFIG_PREFIX: csp.get_csp_config_schema,
    csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_schema,
    csp.CSP_SCAN_PREFIX: csp.get_csp_scan_schema,
    csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_schema,
    csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_schema,
    csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
    csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
    csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_schema,
    # LOWCSP schemas
    csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_schema,
    csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_schema,
    csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_schema,
    csp.LOWCSP_RELEASERESOURCES_PREFIX: (
        csp.get_low_csp_releaseresources_schema
    ),
    # PSS schemas
    pss.PSS_CONFIGURE_PREFIX: pss.get_pss_configure_schema,
    pss.PSS_CHEETAH_CONFIGURE_PREFIX: pss.get_pss_cheetah_configure_schema,
    # PST schemas
    pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_schema,
    # SDP schemas
    sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_schema,
    sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_schema,
    sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_schema,
    sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_schema,
    sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_schema,
    # MCCS schemas
    mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_schema,
    mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_schema,
    mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_schema,
    mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_schema,
    mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_schema,
    mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_schema,
    mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_schema,
    mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_schema,
    mccs.ANTENNA_PREFIX: mccs.get_antenna_schema,
    mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_schema,
    mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_schema,
    mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_schema,
    mccs.STATION_PREFIX: mccs.get_station_schema,
    # TMC schemas
    tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_schema,  # noqa: E501
    tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_schema,
    tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_schema,
    tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_schema,
    tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_schema,
    # TMC mid schemas
    tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_schema,
    tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_schema,
    tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_schema,
    tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_schema,
    # LOWCBF schemas
    lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
        lowcbf.get_lowcbf_assignresources_schema
    ),
    lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: lowcbf.get_lowcbf_configurescan_schema,
    lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_schema,
    lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
        lowcbf.get_lowcbf_releaseresources_schema
    ),
    # MIDCBF schemas
    midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_schema,
}

# As above, but for example generation
URI_EXAMPLE_MAP = {
    # delay model schemas
    layout.LAYOUT_PREFIX: layout.get_layout_example,
    layout.RECEPTOR_PREFIX: layout.get_receptor_example,
    layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_example,
    layout.LOCATION_PREFIX: layout.get_location_example,
    layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_example,
    layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_example,
    layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_example,
    # CSP schemas
    csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_example,
    csp.CSP_CONFIG_PREFIX: csp.get_csp_config_example,
    csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_example,
    csp.CSP_SCAN_PREFIX: csp.get_csp_scan_example,
    csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_example,
    csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_example,
    csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
    csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
    csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_example,
    csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_example,
    csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_example,
    csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_example,
    csp.LOWCSP_RELEASERESOURCES_PREFIX: (
        csp.get_low_csp_releaseresources_example
    ),
    # PSS schemas
    pss.PSS_CONFIGURE_PREFIX: pss.get_pss_configure_example,
    pss.PSS_CHEETAH_CONFIGURE_PREFIX: pss.get_pss_cheetah_configure_example,
    # PST schemas
    pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_example,
    # SDP schemas
    sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_example,
    sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_example,
    sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_example,
    sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_example,
    sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_example,
    # MCCS schemas
    mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_example,
    mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_example,
    mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_example,
    mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_example,
    mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_example,
    mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_example,
    mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_example,
    mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_example,
    mccs.ANTENNA_PREFIX: mccs.get_antenna_example,
    mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_example,
    mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_example,
    mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_example,
    mccs.STATION_PREFIX: mccs.get_station_example,
    # TMC schemas
    tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_example,  # noqa: E501
    tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_example,
    tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_example,
    tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_example,  # noqa: E501
    tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_example,
    # TMC mid schemas
    tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_example,
    tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_example,
    tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_example,
    tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_example,
    # LOWCBF schemas
    lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
        lowcbf.get_lowcbf_assignresources_example
    ),
    lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: (
        lowcbf.get_lowcbf_configurescan_example
    ),
    lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_example,
    lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
        lowcbf.get_lowcbf_releaseresources_example
    ),
    # MIDCBF schemas
    midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_example,
}


[docs] def schema_by_uri(version: str, strict: int = 1, **kwargs) -> Schema: """ Looks up interface schema based on interface identifier :param version: Interface URI :param strict: Strictness level :returns: Interface schema """ # Perform look-up for prefix, schema_fn in URI_SCHEMA_MAP.items(): if version.startswith(prefix): return schema_fn(version, strict, **kwargs) # Check whether there's a match if we convert an old domain prefix if version.startswith(OLD_PREFIX): version2 = NEW_PREFIX + version[len(OLD_PREFIX) :] for prefix, schema_fn in URI_SCHEMA_MAP.items(): if version2.startswith(prefix): return schema_fn(version2, strict, **kwargs) # Otherwise - unknown... raise ValueError(f"Unknown schema URI kind: {version}!")
[docs] def example_by_uri(version: str, *args) -> dict: """ Generates an example for a particular schema :param version: Interface URI :param args: Extra parameters depending on interface (strings) :returns: Dictionary """ # Perform lookup for prefix, example_fn in URI_EXAMPLE_MAP.items(): if version.startswith(prefix): return example_fn(version, *args) # Check whether there's a match if we convert an old domain prefix if version.startswith(OLD_PREFIX): version2 = NEW_PREFIX + version[len(OLD_PREFIX) :] for prefix, example_fn in URI_EXAMPLE_MAP.items(): if version2.startswith(prefix): return example_fn(version2, *args) # Otherwise - unknown... raise ValueError(f"Unknown schema URI kind for example: {version}!")
[docs] class SchemaUri: """Convenience class for manipulating version URIs. :param version: Interface URI """ def __init__(self, version: str): """ Construct instance. :param version: Interface URI """ self._version = version self._parts = version.rsplit("/", maxsplit=1) self._nexted_parts = version.rsplit("/", maxsplit=2) @property def prefix(self) -> str: """ Get the prefix. :returns: prefix """ return self._parts[0] + "/" @property def version(self) -> str: """ Get the version. :returns: version """ return self._parts[1] @property def prefix_before_version(self) -> str: """ Get the version without the prefix. :returns: version """ return self._nexted_parts[1] @property def major_minor(self) -> Tuple[int, int]: """ Get the major and minor parts of the version. :returns: tuple of major and minor versions """ parts = self.version.split(".") return int(parts[0]), int(parts[1]) def __repr__(self) -> str: return self._version
[docs] def validate( version: OptType[str], config: dict, strictness: int = 1, jsonschema_fallback=False, ): """Validate a dictionary against schema Will automatically determine the schema to check against :param version: Interface with version :param config: Dictionary to validate :param strictness: Strictness level (0: permissive warnings, 1: permissive errors + strict warnings, 2: strict errors). Note that with strictness level 2, a lot of generally harmless schema violations will cause an exception to be raised. This is generally inadvisable in production consumer code ("be liberal in what you accept"!).Therefore, strictness 2 checks will only be allowed to raise errors when running in testing mode (either PYTEST_VERSION or SKA_TELMODEL_ALLOW_STRICT_VALIDATION) is set. :param jsonschema_fallback: Schema Validation Strategy. The system uses a configurable fallback mechanism controlled by the ``jsonschema_fallback`` flag. by default, this flag is set to ``False``. validation process is below. in what you accept"!). First attempts validation using the local TMSchema object. If local validation fails with SchemaError, automatically falls back to external schema validation. External validation uses the provided document URL. :raises ValueError: Raised if the object fails permissive checks at strictness level 1. At strictness level 2, raised if the object fails any schema check 3.Raised if fails external validation using documented URL. """ # Disable strictness 2 if called outside of a pytest run. if ( strictness > 1 and os.environ.get("PYTEST_VERSION") is None and not os.environ.get("SKA_TELMODEL_ALLOW_STRICT_VALIDATION", False) ): strictness = 1 # Take from dictionary if possible if version is None: version = config.get("interface") assert isinstance( version, str ), f"Invalid 'interface' value: {version}" def make_schema(strict): return schema_by_uri(version, strict) try: return _validate_schema(config, strictness, make_schema) except SchemaError as e: if jsonschema_fallback: return _validate_schema_using_document_url(config, version) raise ValueError(e) from None
def _validate_schema( obj: dict, strictness: int, make_schema: Callable, **kwargs ): """ Validate a configuration object :param obj: Object to validate :param strictness: Strictness level (0: permissive warnings, 1: permissive errors + strict warnings, 2: strict errors). DO NOT USE STRICTNESS 2 IN PRODUCTION! :param make_schema: Schema generator :param kwargs: Additional parameters to generator :return: None :raise: `ValueError` on schema validation failure. """ # Strip description from schemas to prevent error messages # becoming unwieldy def strip_description(schema): if isinstance(schema, dict): return { strip_description(k): strip_description(v) for k, v in schema.items() } if isinstance(schema, list): return [strip_description(v) for v in schema] if isinstance(schema, Optional): return Optional(strip_description(schema.schema)) if isinstance(schema, TMSchema): s = TMSchema( schema=strip_description(schema.schema), error=schema._error, ignore_extra_keys=schema.ignore_extra_keys, name=schema.raw_name, as_reference=schema.as_reference, version=schema.version, strict=schema.strict, ) return s if isinstance(schema, Literal): return schema.schema if isinstance(schema, Or): return Or(*[strip_description(a) for a in schema.args]) if isinstance(schema, And): return And(*[strip_description(a) for a in schema.args]) return schema # Do permissive check try: permissive_schema = make_schema(strict=False, **kwargs) permissive_schema = strip_description(permissive_schema) permissive_schema.validate(obj) except SchemaError as e: message = f"Validation {e}" if strictness >= 1: raise SchemaError(message) _LOGGER.warning(message) if strictness == 0: return # Do strict check try: strict_schema = make_schema(strict=True, **kwargs) strict_schema = strip_description(strict_schema) strict_schema.validate(obj) except SchemaError as e: message = f"Strict validation {e}" if strictness >= 2: raise SchemaError(message) from None _LOGGER.warning(message) def _fetch_schema_from_cache(version_path: str) -> dict: """ Fetch a schema from the cache :param version_path: Version path :return: Schema """ for uri in DEFAULT_JSONSCHEMA_BASE_URIS: try: schema_cache = SchemaCache(uri) schema = schema_cache.get_schema(version_path) return schema except Exception as err: _LOGGER.info(err) _QUOTED_VALUE_PATTERN = re.compile(r"'(.*?)'")
[docs] def _validate_schema_using_document_url(obj: dict, version: str) -> None: """ Validate a configuration object efficiently using cached validators. Implements an efficient caching mechanism for external schema documents. - Automatically caches schemas fetched from external URLs. - Used LRU (Least Recently Used) cache strategy. - Added Configurable cache size to manage memory usage. Args: obj: Object to validate version: Schema version string Returns: None Raises: ValueError: On schema validation failure. """ try: version_uri = SchemaUri(version) version_path = ( f"{version_uri.prefix_before_version}/{version_uri.version}" ) schema = _fetch_schema_from_cache(version_path) jsonschema.validate(instance=obj, schema=schema) except jsonschema.exceptions.ValidationError as schemaerr: major, minor = version_uri.major_minor path_str = " ".join( version_uri.prefix_before_version.split("-")[1:] ).title() quoted_value = _QUOTED_VALUE_PATTERN.search(schemaerr.message).group(1) message = "JSON schema validation" message += f"'{path_str} {major}.{minor}': Wrong key {quoted_value}" if schemaerr.instance is not None: message += f" in {str(schemaerr.instance)}" raise ValueError(message) from None except jsonschema.exceptions.SchemaError as err: raise ValueError(f"JSON schema validation {err}") from None