Source code for ska_telmodel.csp.low_version

from functools import partial

from ska_telmodel.csp.version import check_csp_interface_version

from .._common import interface_uri  # , split_interface_version

LOWCSP_ASSIGNRESOURCES_PREFIX = (
    "https://schema.skao.int/ska-low-csp-assignresources/"
)
LOWCSP_CONFIGURE_PREFIX = "https://schema.skao.int/ska-low-csp-configure/"
LOWCSP_SCAN_PREFIX = "https://schema.skao.int/ska-low-csp-scan/"
LOWCSP_ENDSCAN_PREFIX = "https://schema.skao.int/ska-low-csp-endscan/"
LOWCSP_RELEASERESOURCES_PREFIX = (
    "https://schema.skao.int/ska-low-csp-releaseresources/"
)
# Functions to create interface URI for each subarray command, each function
# taking a list argument containing major,minor version integers to be
# incorporated into the URI
lowcsp_assignresources_uri = partial(
    interface_uri, LOWCSP_ASSIGNRESOURCES_PREFIX
)
lowcsp_configure_uri = partial(interface_uri, LOWCSP_CONFIGURE_PREFIX)
lowcsp_releaseresources_uri = partial(
    interface_uri, LOWCSP_RELEASERESOURCES_PREFIX
)
lowcsp_scan_uri = partial(interface_uri, LOWCSP_SCAN_PREFIX)

CompatibilityMap = {
    "2.0": {"lowcbf": "0.1", "pss": "0.0", "pst": "2.4"},
    "3.0": {"lowcbf": "0.2", "pss": "0.0", "pst": "2.4"},
    "3.1": {"lowcbf": "0.2", "pss": "0.0", "pst": "2.5"},
    "3.2": {"lowcbf": "0.2", "pss": "0.0", "pst": "2.5"},
    "4.0": {"lowcbf": "1.0", "pss": "0.0", "pst": "2.5"},  # nakshatra
    "5.0": {"lowcbf": "0.2", "pss": "0.0", "pst": "3.0"},
    "6.0": {"lowcbf": "0.2", "pss": "1.2", "pst": "3.0"},
}  # note: v4.0 is not used by CBF device yet

_ALLOWED_LOW_URI_PREFIXES = [
    LOWCSP_ASSIGNRESOURCES_PREFIX,
    LOWCSP_CONFIGURE_PREFIX,
    LOWCSP_SCAN_PREFIX,
    LOWCSP_RELEASERESOURCES_PREFIX,
]

MAX_LOW_PSS_BEAMS = 500


[docs] def get_csp_subsystem_version(version: str, subsystem: str) -> str: """ Using the CompatibilityMap evaluates the subsystem command interface number. :param version: the CSP command interface URI :param subsystem: the name of the subsystem :return: the version number of the subsystem command interface. """ version_num = check_csp_interface_version( version, _ALLOWED_LOW_URI_PREFIXES ) subsystem_version = CompatibilityMap[version_num][subsystem] subsystem_uri = get_subsystem_uri(version, subsystem) subsystem_uri = subsystem_uri + subsystem_version return subsystem_version, subsystem_uri
def get_subsystem_uri(version: str, subsystem: str) -> str: subsystem_uri = "" if subsystem == "pss": subsystem_uri = "https://schema.skao.int/ska-pss-configure/" elif subsystem == "pst": subsystem_uri = "https://schema.skao.int/ska-pst-configure/" elif subsystem == "lowcbf": if "configure" in version: version = version.replace("csp-configure", "cbf-configurescan") else: version = version.replace("csp", "cbf") subsystem_uri = "/".join(version.rsplit("/", 1)[:-1]) + "/" else: raise ValueError( f"Subsystem {subsystem} name not available at csp level" ) return subsystem_uri