from itertools import repeat
from .common_schema import (
IPV4_REGEX_PATTERN,
MAX_CHANNELS_PER_FSP,
MAX_CHANNELS_PER_STREAM,
MAX_CORR_CHANNELS,
MAX_STREAMS_PER_FSP,
)
"""
csp.validators module defines constants and functions for
validating CSP fields in schemas.
"""
MAX_NO_INTEGRATION_FACTOR = 10
[docs]
def validate_integration_factor(integration_factor: int) -> bool:
"""Checks if the integration_factor is valid.
:param integration_factor: Integration time for correlation products
:returns: True if integration_factor is valid
"""
return 1 <= integration_factor <= MAX_NO_INTEGRATION_FACTOR
def validate_output_host(
entry: list[int, str], start_channel_upper_bound: int
) -> bool:
# Ensure only two entries in each host table entry
if len(entry) != 2:
return False
# Validate channel ID
if entry[0] < 0 or entry[0] > start_channel_upper_bound:
return False
# Check IP address pattern lies between 0.0.0.0 -> 255.255.255.255
if not IPV4_REGEX_PATTERN.fullmatch(entry[1]):
return False
return True
def validate_output_link_map(
entry: list[int], start_channel_upper_bound: int
) -> bool:
# Ensure only two entries in output_link_map entry
if len(entry) != 2:
return False
# Validate channel ID
if entry[0] < 0 or entry[0] > start_channel_upper_bound:
return False
# Validate link ID
if entry[1] != 1:
return False
return True
def validate_output_port_entry(
entry: list[int], start_channel_upper_bound: int
) -> bool:
# Ensure only two entries in output_port entry
if len(entry) != 2:
return False
# Validate start channel ID
if entry[0] < 0 or entry[0] > start_channel_upper_bound:
return False
# Validate port number
if entry[1] < 0 or entry[1] > 65535:
return False
return True
def validate_output_port(
mapping: list[list[int]], major_version: int, minor_version: int
) -> bool:
if (major_version, minor_version) >= (4, 0):
start_channel_upper_bound = MAX_CORR_CHANNELS
else:
start_channel_upper_bound = MAX_CHANNELS_PER_FSP - 1
# Validate must have at least 1 mapping
if len(mapping) == 0:
return False
if (major_version, minor_version) < (4, 1):
# Validate number of fsp streams
if len(mapping) > MAX_STREAMS_PER_FSP:
return False
# Validate each mapping is a valid entry
if False in list(
map(
validate_output_port_entry,
mapping,
repeat(start_channel_upper_bound),
)
):
return False
# AA0.5/AA1
# Validate start channels must be in increments of 20
# Validate channels must be in ascending order
expected_channel = 0
for index, value in enumerate(mapping):
if index == 0 and (major_version, minor_version) >= (4, 0):
expected_channel = value[0]
if value[0] != expected_channel:
return False
expected_channel = expected_channel + MAX_CHANNELS_PER_STREAM
return True
def channel_ids_in_asc_order(
channel_map: list[list[int]] | list[list[int, str]]
) -> bool:
previous_value = None
for index, value in enumerate(channel_map):
if index == 0:
previous_value = value[0]
continue
if value[0] <= previous_value:
return False
return True
def processing_regions_have_unique_fsp_ids(
processing_regions: list[dict],
) -> bool:
known_fsp_ids = []
for processing_region in processing_regions:
fsp_ids = processing_region.get("fsp_ids", [])
for fsp_id in fsp_ids:
if fsp_id in known_fsp_ids:
return False
else:
known_fsp_ids.append(fsp_id)
return True
def timing_beams_start_with_start_value(
start_property: str,
map_property: str,
processing_regions: list[dict],
) -> bool:
for processing_region in processing_regions:
start_property_value = processing_region[start_property]
timing_beams = processing_region.get("timing_beams", [])
for timing_beam in timing_beams:
if map_property in timing_beam:
if timing_beam[map_property][0][0] != start_property_value:
return False
return True