Source code for ska_telmodel.csp.validators

from itertools import repeat

from .common_schema import (
    IPV4_REGEX_PATTERN,
    MAX_CHANNELS_PER_FSP,
    MAX_CHANNELS_PER_STREAM,
    MAX_CORR_CHANNELS,
    MAX_STREAMS_PER_FSP,
)

"""
csp.validators module defines constants and functions for
validating CSP fields in schemas.
"""

MAX_NO_INTEGRATION_FACTOR = 10


[docs] def validate_integration_factor(integration_factor: int) -> bool: """Checks if the integration_factor is valid. :param integration_factor: Integration time for correlation products :returns: True if integration_factor is valid """ return 1 <= integration_factor <= MAX_NO_INTEGRATION_FACTOR
def validate_output_host( entry: list[int, str], start_channel_upper_bound: int ) -> bool: # Ensure only two entries in each host table entry if len(entry) != 2: return False # Validate channel ID if entry[0] < 0 or entry[0] > start_channel_upper_bound: return False # Check IP address pattern lies between 0.0.0.0 -> 255.255.255.255 if not IPV4_REGEX_PATTERN.fullmatch(entry[1]): return False return True def validate_output_link_map( entry: list[int], start_channel_upper_bound: int ) -> bool: # Ensure only two entries in output_link_map entry if len(entry) != 2: return False # Validate channel ID if entry[0] < 0 or entry[0] > start_channel_upper_bound: return False # Validate link ID if entry[1] != 1: return False return True def validate_output_port_entry( entry: list[int], start_channel_upper_bound: int ) -> bool: # Ensure only two entries in output_port entry if len(entry) != 2: return False # Validate start channel ID if entry[0] < 0 or entry[0] > start_channel_upper_bound: return False # Validate port number if entry[1] < 0 or entry[1] > 65535: return False return True def validate_output_port( mapping: list[list[int]], major_version: int, minor_version: int ) -> bool: if (major_version, minor_version) >= (4, 0): start_channel_upper_bound = MAX_CORR_CHANNELS else: start_channel_upper_bound = MAX_CHANNELS_PER_FSP - 1 # Validate must have at least 1 mapping if len(mapping) == 0: return False if (major_version, minor_version) < (4, 1): # Validate number of fsp streams if len(mapping) > MAX_STREAMS_PER_FSP: return False # Validate each mapping is a valid entry if False in list( map( validate_output_port_entry, mapping, repeat(start_channel_upper_bound), ) ): return False # AA0.5/AA1 # Validate start channels must be in increments of 20 # Validate channels must be in ascending order expected_channel = 0 for index, value in enumerate(mapping): if index == 0 and (major_version, minor_version) >= (4, 0): expected_channel = value[0] if value[0] != expected_channel: return False expected_channel = expected_channel + MAX_CHANNELS_PER_STREAM return True def channel_ids_in_asc_order( channel_map: list[list[int]] | list[list[int, str]] ) -> bool: previous_value = None for index, value in enumerate(channel_map): if index == 0: previous_value = value[0] continue if value[0] <= previous_value: return False return True def processing_regions_have_unique_fsp_ids( processing_regions: list[dict], ) -> bool: known_fsp_ids = [] for processing_region in processing_regions: fsp_ids = processing_region.get("fsp_ids", []) for fsp_id in fsp_ids: if fsp_id in known_fsp_ids: return False else: known_fsp_ids.append(fsp_id) return True def timing_beams_start_with_start_value( start_property: str, map_property: str, processing_regions: list[dict], ) -> bool: for processing_region in processing_regions: start_property_value = processing_region[start_property] timing_beams = processing_region.get("timing_beams", []) for timing_beam in timing_beams: if map_property in timing_beam: if timing_beam[map_property][0][0] != start_property_value: return False return True