"""
Support for validating and generating examples for SKA telescope model
schemas.
"""
import logging
from typing import Callable
from typing import Optional as OptType
from typing import Tuple
from schema import And, Literal, Optional, Or, Schema, SchemaError
from . import csp, lowcbf, mccs, midcbf, pst, sdp, tmc
from ._common import TMSchema
from .schemas import layout
_LOGGER = logging.getLogger("ska_telmodel")
OLD_PREFIX = "https://schema.skatelescope.org/"
NEW_PREFIX = "https://schema.skao.int/"
# Map interface prefixes to the function that can be used to generate
# the schema for them
URI_SCHEMA_MAP = {
# delay model schemas
layout.LAYOUT_PREFIX: layout.get_layout_schema,
layout.RECEPTOR_PREFIX: layout.get_receptor_schema,
layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_schema,
layout.LOCATION_PREFIX: layout.get_location_schema,
layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_schema,
layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_schema,
layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_schema,
# CSP schemas
csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_schema,
csp.CSP_CONFIG_PREFIX: csp.get_csp_config_schema,
csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_schema,
csp.CSP_SCAN_PREFIX: csp.get_csp_scan_schema,
csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_schema,
csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_schema,
csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_schema,
csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_schema,
# LOWCSP schemas
csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_schema,
csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_schema,
csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_schema,
csp.LOWCSP_RELEASERESOURCES_PREFIX: (
csp.get_low_csp_releaseresources_schema
),
# PST schemas
pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_schema,
# SDP schemas
sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_schema,
sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_schema,
sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_schema,
sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_schema,
sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_schema,
# MCCS schemas
mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_schema,
mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_schema,
mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_schema,
mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_schema,
mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_schema,
mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_schema,
mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_schema,
mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_schema,
mccs.ANTENNA_PREFIX: mccs.get_antenna_schema,
mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_schema,
mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_schema,
mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_schema,
mccs.STATION_PREFIX: mccs.get_station_schema,
# TMC schemas
tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_schema, # noqa: E501
tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_schema,
tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_schema,
tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_schema,
tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_schema,
# TMC mid schemas
tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_schema,
tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_schema,
tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_schema,
tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_schema,
# LOWCBF schemas
lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
lowcbf.get_lowcbf_assignresources_schema
),
lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: lowcbf.get_lowcbf_configurescan_schema,
lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_schema,
lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
lowcbf.get_lowcbf_releaseresources_schema
),
# MIDCBF schemas
midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_schema,
}
# As above, but for example generation
URI_EXAMPLE_MAP = {
# delay model schemas
layout.LAYOUT_PREFIX: layout.get_layout_example,
layout.RECEPTOR_PREFIX: layout.get_receptor_example,
layout.FIXED_DELAY_PREFIX: layout.get_fixed_delay_example,
layout.LOCATION_PREFIX: layout.get_location_example,
layout.LOCAL_LOCATION_PREFIX: layout.get_local_position_example,
layout.GEODETIC_LOCATION_PREFIX: layout.get_geodetic_position_example,
layout.GEOCENTRIC_LOCATION_PREFIX: layout.get_geocentric_position_example,
# CSP schemas
csp.CSP_ASSIGNRESOURCES_PREFIX: csp.get_csp_assignresources_example,
csp.CSP_CONFIG_PREFIX: csp.get_csp_config_example,
csp.CSP_CONFIGSCAN_PREFIX: csp.get_csp_config_example,
csp.CSP_SCAN_PREFIX: csp.get_csp_scan_example,
csp.CSP_ENDSCAN_PREFIX: csp.get_csp_endscan_example,
csp.CSP_RELEASERESOURCES_PREFIX: csp.get_csp_releaseresources_example,
csp.CSP_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
csp.CSP_MID_DELAYMODEL_PREFIX: csp.get_csp_delaymodel_example,
csp.CSP_LOW_DELAYMODEL_PREFIX: csp.get_csp_low_delaymodel_example,
csp.LOWCSP_ASSIGNRESOURCES_PREFIX: csp.get_low_csp_assignresources_example,
csp.LOWCSP_CONFIGURE_PREFIX: csp.get_low_csp_configure_example,
csp.LOWCSP_SCAN_PREFIX: csp.get_low_csp_scan_example,
csp.LOWCSP_RELEASERESOURCES_PREFIX: (
csp.get_low_csp_releaseresources_example
),
# PST schemas
pst.PST_CONFIGURE_PREFIX: pst.get_pst_config_example,
# SDP schemas
sdp.SDP_ASSIGNRES_PREFIX: sdp.get_sdp_assignres_example,
sdp.SDP_RELEASERES_PREFIX: sdp.get_sdp_releaseres_example,
sdp.SDP_CONFIGURE_PREFIX: sdp.get_sdp_configure_example,
sdp.SDP_SCAN_PREFIX: sdp.get_sdp_scan_example,
sdp.SDP_RECVADDRS_PREFIX: sdp.get_sdp_recvaddrs_example,
# MCCS schemas
mccs.MCCS_ASSIGNEDRES_PREFIX: mccs.get_mccs_assignedres_example,
mccs.MCCS_ASSIGNRES_PREFIX: mccs.get_mccs_assignres_example,
mccs.MCCS_CONFIGURE_PREFIX: mccs.get_mccs_configure_example,
mccs.MCCS_SCAN_PREFIX: mccs.get_mccs_scan_example,
mccs.MCCS_RELEASERES_PREFIX: mccs.get_mccs_releaseres_example,
mccs.ANTENNA_PROPERTIES_PREFIX: mccs.get_antenna_properties_example,
mccs.ANTENNA_GEOMETRY_PREFIX: mccs.get_antenna_geometry_example,
mccs.ANTENNA_FEATURES_PREFIX: mccs.get_antenna_features_example,
mccs.ANTENNA_PREFIX: mccs.get_antenna_example,
mccs.STATION_PROPERTIES_PREFIX: mccs.get_station_properties_example,
mccs.STATION_GEOMETRY_PREFIX: mccs.get_station_geometry_example,
mccs.STATION_FEATURES_PREFIX: mccs.get_station_features_example,
mccs.STATION_PREFIX: mccs.get_station_example,
# TMC schemas
tmc.LOW_TMC_ASSIGNRESOURCES_PREFIX: tmc.get_low_tmc_assignresources_example, # noqa: E501
tmc.LOW_TMC_CONFIGURE_PREFIX: tmc.get_low_tmc_configure_example,
tmc.LOW_TMC_SCAN_PREFIX: tmc.get_low_tmc_scan_example,
tmc.LOW_TMC_RELEASERESOURCES_PREFIX: tmc.get_low_tmc_releaseresources_example, # noqa: E501
tmc.TMC_ASSIGNEDRES_PREFIX: tmc.get_tmc_assignedres_example,
# TMC mid schemas
tmc.TMC_SCAN_PREFIX: tmc.get_tmc_scan_example,
tmc.TMC_ASSIGNRESOURCES_PREFIX: tmc.get_tmc_assignresources_example,
tmc.TMC_CONFIGURE_PREFIX: tmc.get_tmc_configure_example,
tmc.TMC_RELEASERESOURCES_PREFIX: tmc.get_tmc_releaseresources_example,
# LOWCBF schemas
lowcbf.LOWCBF_ASSIGNRESOURCES_PREFIX: (
lowcbf.get_lowcbf_assignresources_example
),
lowcbf.LOWCBF_CONFIGURESCAN_PREFIX: (
lowcbf.get_lowcbf_configurescan_example
),
lowcbf.LOWCBF_SCAN_PREFIX: lowcbf.get_lowcbf_scan_example,
lowcbf.LOWCBF_RELEASERESOURCES_PREFIX: (
lowcbf.get_lowcbf_releaseresources_example
),
# MIDCBF schemas
midcbf.MIDCBF_INITSYSPARAM_PREFIX: midcbf.get_midcbf_initsysparam_example,
}
[docs]
def schema_by_uri(version: str, strict: int = 1, **kwargs) -> Schema:
"""
Looks up interface schema based on interface identifier
:param version: Interface URI
:param strict: Strictness level
:returns: Interface schema
"""
# Perform look-up
for prefix, schema_fn in URI_SCHEMA_MAP.items():
if version.startswith(prefix):
return schema_fn(version, strict, **kwargs)
# Check whether there's a match if we convert an old domain prefix
if version.startswith(OLD_PREFIX):
version2 = NEW_PREFIX + version[len(OLD_PREFIX) :]
for prefix, schema_fn in URI_SCHEMA_MAP.items():
if version2.startswith(prefix):
return schema_fn(version2, strict, **kwargs)
# Otherwise - unknown...
raise ValueError(f"Unknown schema URI kind: {version}!")
[docs]
def example_by_uri(version: str, *args) -> dict:
"""
Generates an example for a particular schema
:param version: Interface URI
:param args: Extra parameters depending on interface (strings)
:returns: Dictionary
"""
# Perform lookup
for prefix, example_fn in URI_EXAMPLE_MAP.items():
if version.startswith(prefix):
return example_fn(version, *args)
# Check whether there's a match if we convert an old domain prefix
if version.startswith(OLD_PREFIX):
version2 = NEW_PREFIX + version[len(OLD_PREFIX) :]
for prefix, example_fn in URI_EXAMPLE_MAP.items():
if version2.startswith(prefix):
return example_fn(version2, *args)
# Otherwise - unknown...
raise ValueError(f"Unknown schema URI kind for example: {version}!")
[docs]
class SchemaUri:
"""Convenience class for manipulating version URIs.
:param version: Interface URI
"""
def __init__(self, version: str):
"""
Construct instance.
:param version: Interface URI
"""
self._version = version
self._parts = version.rsplit("/", maxsplit=1)
@property
def prefix(self) -> str:
"""
Get the prefix.
:returns: prefix
"""
return self._parts[0] + "/"
@property
def version(self) -> str:
"""
Get the version.
:returns: version
"""
return self._parts[1]
@property
def major_minor(self) -> Tuple[int, int]:
"""
Get the major and minor parts of the version.
:returns: tuple of major and minor versions
"""
parts = self.version.split(".")
return int(parts[0]), int(parts[1])
def __repr__(self) -> str:
return self._version
[docs]
def validate(version: OptType[str], config: dict, strictness: int = 1):
"""Validate a dictionary against schema
Will automatically determine the schema to check against
:param version: Interface with version
:param config: Dictionary to validate
:param strictness: Strictness level (0: permissive warnings,
1: permissive errors + strict warnings, 2: strict errors).
Note that with strictness level 2, a lot of generally harmless
schema violations will cause an exception to be raised. This is
generally inadvisable in production consumer code ("be liberal
in what you accept"!).
:raises SchemaError: Raised if the object fails permissive checks
at strictness level 1. At strictness level 2, raised if the object
fails any schema check.
"""
# Take from dictionary if possible
if version is None:
version = config.get("interface")
assert isinstance(
version, str
), f"Invalid 'interface' value: {version}"
def make_schema(strict):
return schema_by_uri(version, strict)
return _validate_schema(config, strictness, make_schema)
def _validate_schema(
obj: dict, strictness: int, make_schema: Callable, **kwargs
):
"""
Validate a configuration object
:param obj: Object to validate
:param strictness: Strictness level (0: permissive warnings,
1: permissive errors + strict warnings, 2: strict errors).
DO NOT USE STRICTNESS 2 IN PRODUCTION!
:param make_schema: Schema generator
:param kwargs: Additional parameters to generator
:return: None
:raise: `ValueError` on schema validation failure.
"""
# Strip description from schemas to prevent error messages
# becoming unwieldy
def strip_description(schema):
if isinstance(schema, dict):
return {
strip_description(k): strip_description(v)
for k, v in schema.items()
}
if isinstance(schema, list):
return [strip_description(v) for v in schema]
if isinstance(schema, Optional):
return Optional(strip_description(schema.schema))
if isinstance(schema, TMSchema):
s = TMSchema(
schema=strip_description(schema.schema),
error=schema._error,
ignore_extra_keys=schema.ignore_extra_keys,
name=schema.raw_name,
as_reference=schema.as_reference,
version=schema.version,
strict=schema.strict,
)
return s
if isinstance(schema, Literal):
return schema.schema
if isinstance(schema, Or):
return Or(*[strip_description(a) for a in schema.args])
if isinstance(schema, And):
return And(*[strip_description(a) for a in schema.args])
return schema
# Do permissive check
try:
permissive_schema = make_schema(strict=False, **kwargs)
permissive_schema = strip_description(permissive_schema)
permissive_schema.validate(obj)
except SchemaError as e:
message = f"Validation {e}"
if strictness >= 1:
raise ValueError(message) from None
_LOGGER.warning(message)
if strictness == 0:
return
# Do strict check
try:
strict_schema = make_schema(strict=True, **kwargs)
strict_schema = strip_description(strict_schema)
strict_schema.validate(obj)
except SchemaError as e:
message = f"Strict validation {e}"
if strictness >= 2:
raise ValueError(message) from None
_LOGGER.warning(message)