"""
Support for validating and generating examples for SKA telescope model
schemas.
"""
import logging
import os
import re
from typing import Callable
from typing import Optional as OptType
from typing import Tuple
import jsonschema
from schema import And, Literal, Optional, Or, Schema, SchemaError
from ska_telmodel.data.schema_cache import SchemaCache
from . import csp, lowcbf, mccs, midcbf, pss, pst, sdp, tmc
from ._common import TMSchema
from .schemas import layout
_LOGGER = logging.getLogger("ska_telmodel")
OLD_PREFIX = "https://schema.skatelescope.org/"
NEW_PREFIX = "https://schema.skao.int/"
DEFAULT_JSONSCHEMA_BASE_URIS = [
"https://schema.skao.int",
"https://ska-telescope.gitlab.io/ska-telmodel/json_schema",
]
JSONSCHEMA_BASE_URIS = os.getenv(
"SKA_TELMODEL_JSONSCHEMA_BASE_URIS", DEFAULT_JSONSCHEMA_BASE_URIS
)
# Map interface prefixes to the function that can be used to generate
# the schema for them
URI_SCHEMA_MAP = {
# delay model schemas
layout.LAYOUT_PREFIX: layout.get_layout_schema,
layout.RECEPTOR_PREFIX: layout.get_receptor_schema,
layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_schema,
layout.LOCATION_PREFIX: layout.get_location_schema,
layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_schema,
layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_schema,
layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_schema,
# CSP schemas
csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_schema,
csp.CSP_CONFIG_PREFIX: csp.get_csp_config_schema,
csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_schema,
csp.CSP_SCAN_PREFIX: csp.get_csp_scan_schema,
csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_schema,
csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_schema,
csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_schema,
# LOWCSP schemas
csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_schema,
csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_schema,
csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_schema,
csp.LOWCSP_RELEASERESOURCES_PREFIX: (
csp.get_low_csp_releaseresources_schema
),
# PSS schemas
pss.PSS_CONFIGURE_PREFIX: pss.get_pss_configure_schema,
pss.PSS_CHEETAH_CONFIGURE_PREFIX: pss.get_pss_cheetah_configure_schema,
# PST schemas
pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_schema,
# SDP schemas
sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_schema,
sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_schema,
sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_schema,
sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_schema,
sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_schema,
# MCCS schemas
mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_schema,
mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_schema,
mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_schema,
mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_schema,
mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_schema,
mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_schema,
mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_schema,
mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_schema,
mccs.ANTENNA_PREFIX: mccs.get_antenna_schema,
mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_schema,
mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_schema,
mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_schema,
mccs.STATION_PREFIX: mccs.get_station_schema,
# TMC schemas
tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_schema, # noqa: E501
tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_schema,
tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_schema,
tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_schema,
tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_schema,
# TMC mid schemas
tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_schema,
tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_schema,
tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_schema,
tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_schema,
# LOWCBF schemas
lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
lowcbf.get_lowcbf_assignresources_schema
),
lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: lowcbf.get_lowcbf_configurescan_schema,
lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_schema,
lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
lowcbf.get_lowcbf_releaseresources_schema
),
# MIDCBF schemas
midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_schema,
}
# As above, but for example generation
URI_EXAMPLE_MAP = {
# delay model schemas
layout.LAYOUT_PREFIX: layout.get_layout_example,
layout.RECEPTOR_PREFIX: layout.get_receptor_example,
layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_example,
layout.LOCATION_PREFIX: layout.get_location_example,
layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_example,
layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_example,
layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_example,
# CSP schemas
csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_example,
csp.CSP_CONFIG_PREFIX: csp.get_csp_config_example,
csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_example,
csp.CSP_SCAN_PREFIX: csp.get_csp_scan_example,
csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_example,
csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_example,
csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_example,
csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_example,
csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_example,
csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_example,
csp.LOWCSP_RELEASERESOURCES_PREFIX: (
csp.get_low_csp_releaseresources_example
),
# PSS schemas
pss.PSS_CONFIGURE_PREFIX: pss.get_pss_configure_example,
pss.PSS_CHEETAH_CONFIGURE_PREFIX: pss.get_pss_cheetah_configure_example,
# PST schemas
pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_example,
# SDP schemas
sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_example,
sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_example,
sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_example,
sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_example,
sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_example,
# MCCS schemas
mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_example,
mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_example,
mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_example,
mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_example,
mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_example,
mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_example,
mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_example,
mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_example,
mccs.ANTENNA_PREFIX: mccs.get_antenna_example,
mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_example,
mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_example,
mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_example,
mccs.STATION_PREFIX: mccs.get_station_example,
# TMC schemas
tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_example, # noqa: E501
tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_example,
tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_example,
tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_example, # noqa: E501
tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_example,
# TMC mid schemas
tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_example,
tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_example,
tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_example,
tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_example,
# LOWCBF schemas
lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
lowcbf.get_lowcbf_assignresources_example
),
lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: (
lowcbf.get_lowcbf_configurescan_example
),
lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_example,
lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
lowcbf.get_lowcbf_releaseresources_example
),
# MIDCBF schemas
midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_example,
}
[docs]
def schema_by_uri(version: str, strict: int = 1, **kwargs) -> Schema:
"""
Looks up interface schema based on interface identifier
:param version: Interface URI
:param strict: Strictness level
:returns: Interface schema
"""
# Perform look-up
for prefix, schema_fn in URI_SCHEMA_MAP.items():
if version.startswith(prefix):
return schema_fn(version, strict, **kwargs)
# Check whether there's a match if we convert an old domain prefix
if version.startswith(OLD_PREFIX):
version2 = NEW_PREFIX + version[len(OLD_PREFIX) :]
for prefix, schema_fn in URI_SCHEMA_MAP.items():
if version2.startswith(prefix):
return schema_fn(version2, strict, **kwargs)
# Otherwise - unknown...
raise ValueError(f"Unknown schema URI kind: {version}!")
[docs]
def example_by_uri(version: str, *args) -> dict:
"""
Generates an example for a particular schema
:param version: Interface URI
:param args: Extra parameters depending on interface (strings)
:returns: Dictionary
"""
# Perform lookup
for prefix, example_fn in URI_EXAMPLE_MAP.items():
if version.startswith(prefix):
return example_fn(version, *args)
# Check whether there's a match if we convert an old domain prefix
if version.startswith(OLD_PREFIX):
version2 = NEW_PREFIX + version[len(OLD_PREFIX) :]
for prefix, example_fn in URI_EXAMPLE_MAP.items():
if version2.startswith(prefix):
return example_fn(version2, *args)
# Otherwise - unknown...
raise ValueError(f"Unknown schema URI kind for example: {version}!")
[docs]
class SchemaUri:
"""Convenience class for manipulating version URIs.
:param version: Interface URI
"""
def __init__(self, version: str):
"""
Construct instance.
:param version: Interface URI
"""
self._version = version
self._parts = version.rsplit("/", maxsplit=1)
self._nexted_parts = version.rsplit("/", maxsplit=2)
@property
def prefix(self) -> str:
"""
Get the prefix.
:returns: prefix
"""
return self._parts[0] + "/"
@property
def version(self) -> str:
"""
Get the version.
:returns: version
"""
return self._parts[1]
@property
def prefix_before_version(self) -> str:
"""
Get the version without the prefix.
:returns: version
"""
return self._nexted_parts[1]
@property
def major_minor(self) -> Tuple[int, int]:
"""
Get the major and minor parts of the version.
:returns: tuple of major and minor versions
"""
parts = self.version.split(".")
return int(parts[0]), int(parts[1])
def __repr__(self) -> str:
return self._version
[docs]
def validate(
version: OptType[str],
config: dict,
strictness: int = 1,
jsonschema_fallback=False,
):
"""Validate a dictionary against schema
Will automatically determine the schema to check against
:param version: Interface with version
:param config: Dictionary to validate
:param strictness: Strictness level (0: permissive warnings,
1: permissive errors + strict warnings, 2: strict errors).
Note that with strictness level 2, a lot of generally harmless
schema violations will cause an exception to be raised. This is
generally inadvisable in production consumer code ("be liberal
in what you accept"!).Therefore, strictness 2 checks will only
be allowed to raise errors when running in testing mode (either
PYTEST_VERSION or SKA_TELMODEL_ALLOW_STRICT_VALIDATION) is set.
:param jsonschema_fallback: Schema Validation Strategy.
The system uses a configurable fallback mechanism controlled
by the ``jsonschema_fallback`` flag. by default,
this flag is set to ``False``. validation process is below.
in what you accept"!).
First attempts validation using the local TMSchema object.
If local validation fails with SchemaError, automatically
falls back to external schema validation.
External validation uses the provided document URL.
:raises ValueError: Raised if the object fails permissive checks
at strictness level 1. At strictness level 2, raised if the object
fails any schema check 3.Raised if fails external validation
using documented URL.
"""
# Disable strictness 2 if called outside of a pytest run.
if (
strictness > 1
and os.environ.get("PYTEST_VERSION") is None
and not os.environ.get("SKA_TELMODEL_ALLOW_STRICT_VALIDATION", False)
):
strictness = 1
# Take from dictionary if possible
if version is None:
version = config.get("interface")
assert isinstance(
version, str
), f"Invalid 'interface' value: {version}"
def make_schema(strict):
return schema_by_uri(version, strict)
try:
return _validate_schema(config, strictness, make_schema)
except SchemaError as e:
if jsonschema_fallback:
return _validate_schema_using_document_url(config, version)
raise ValueError(e) from None
def _validate_schema(
obj: dict, strictness: int, make_schema: Callable, **kwargs
):
"""
Validate a configuration object
:param obj: Object to validate
:param strictness: Strictness level (0: permissive warnings,
1: permissive errors + strict warnings, 2: strict errors).
DO NOT USE STRICTNESS 2 IN PRODUCTION!
:param make_schema: Schema generator
:param kwargs: Additional parameters to generator
:return: None
:raise: `ValueError` on schema validation failure.
"""
# Strip description from schemas to prevent error messages
# becoming unwieldy
def strip_description(schema):
if isinstance(schema, dict):
return {
strip_description(k): strip_description(v)
for k, v in schema.items()
}
if isinstance(schema, list):
return [strip_description(v) for v in schema]
if isinstance(schema, Optional):
return Optional(strip_description(schema.schema))
if isinstance(schema, TMSchema):
s = TMSchema(
schema=strip_description(schema.schema),
error=schema._error,
ignore_extra_keys=schema.ignore_extra_keys,
name=schema.raw_name,
as_reference=schema.as_reference,
version=schema.version,
strict=schema.strict,
)
return s
if isinstance(schema, Literal):
return schema.schema
if isinstance(schema, Or):
return Or(*[strip_description(a) for a in schema.args])
if isinstance(schema, And):
return And(*[strip_description(a) for a in schema.args])
return schema
# Do permissive check
try:
permissive_schema = make_schema(strict=False, **kwargs)
permissive_schema = strip_description(permissive_schema)
permissive_schema.validate(obj)
except SchemaError as e:
message = f"Validation {e}"
if strictness >= 1:
raise SchemaError(message)
_LOGGER.warning(message)
if strictness == 0:
return
# Do strict check
try:
strict_schema = make_schema(strict=True, **kwargs)
strict_schema = strip_description(strict_schema)
strict_schema.validate(obj)
except SchemaError as e:
message = f"Strict validation {e}"
if strictness >= 2:
raise SchemaError(message) from None
_LOGGER.warning(message)
def _fetch_schema_from_cache(version_path: str) -> dict:
"""
Fetch a schema from the cache
:param version_path: Version path
:return: Schema
"""
for uri in DEFAULT_JSONSCHEMA_BASE_URIS:
try:
schema_cache = SchemaCache(uri)
schema = schema_cache.get_schema(version_path)
return schema
except Exception as err:
_LOGGER.info(err)
_QUOTED_VALUE_PATTERN = re.compile(r"'(.*?)'")
[docs]
def _validate_schema_using_document_url(obj: dict, version: str) -> None:
"""
Validate a configuration object efficiently using cached validators.
Implements an efficient caching mechanism for external schema documents.
- Automatically caches schemas fetched from external URLs.
- Used LRU (Least Recently Used) cache strategy.
- Added Configurable cache size to manage memory usage.
Args:
obj: Object to validate
version: Schema version string
Returns:
None
Raises:
ValueError: On schema validation failure.
"""
try:
version_uri = SchemaUri(version)
version_path = (
f"{version_uri.prefix_before_version}/{version_uri.version}"
)
schema = _fetch_schema_from_cache(version_path)
jsonschema.validate(instance=obj, schema=schema)
except jsonschema.exceptions.ValidationError as schemaerr:
major, minor = version_uri.major_minor
path_str = " ".join(
version_uri.prefix_before_version.split("-")[1:]
).title()
quoted_value = _QUOTED_VALUE_PATTERN.search(schemaerr.message).group(1)
message = "JSON schema validation"
message += f"'{path_str} {major}.{minor}': Wrong key {quoted_value}"
if schemaerr.instance is not None:
message += f" in {str(schemaerr.instance)}"
raise ValueError(message) from None
except jsonschema.exceptions.SchemaError as err:
raise ValueError(f"JSON schema validation {err}") from None