Source code for ska_telmodel.csp.config

import logging

from ska_telmodel._common import split_interface_version

from .. import channel_map

LOGGER = logging.getLogger("ska_telmodel.csp")


[docs] def get_fsp_channel_offset(csp_config_in: dict) -> int: """ Determines first channel ID within an FSP """ # Fixed? return 0
[docs] def get_fsp_output_channel_offset( fsp_config: dict, fsp_id: str, fsp_ch_offset: str ) -> int: """ Determines the FSP channel offset. Either read from the dictionary or reconstructed. :param fsp_config: FSP configuration structure :param fsp_id: Position of FSP in configuration :param fsp_ch_offset: Name of FSP channel offset field """ # Return channel offset if it is set if fsp_ch_offset in fsp_config: return fsp_config[fsp_ch_offset] # Otherwise fallback to calculation from FSP ID fallback_offset = 14880 * (fsp_config[fsp_id] - 1) LOGGER.warning( "FSP lacks output channel offset, using " f"{fallback_offset}: {fsp_config}" ) return fallback_offset
[docs] def add_receive_addresses( scan_type: str, csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, telescope_branch: str = "mid", ) -> dict: """ Add SDP visibility receive addresses into CSP configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ if telescope_branch.lower() == "mid": csp_config = add_midcbf_visibility_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) elif telescope_branch.lower() == "low": csp_config = add_lowcbf_visibility_receive_addresses( csp_config, scan_receive_addrs, sdp_interface_version ) csp_config = add_pss_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) csp_config = add_pst_receive_addresses( csp_config, scan_receive_addrs, csp_interface_version, sdp_interface_version, ) return csp_config
[docs] def add_lowcbf_visibility_receive_addresses( csp_config: dict, scan_receive_addrs: dict, sdp_interface_version ) -> dict: """ Add SDP visibility receive addresses into low-cbf configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param sdp_interface_version: SDP interface version :returns: New CSP configuration """ # Find lowCBF configuration cbf_config = csp_config.get("lowcbf", csp_config) # Determine applicable field names MAPS_TO_SPLIT = { "host": "host", "mac": "mac", "port": "port", } # Ensure split_interface_version function is defined sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if sdp_major == 0 and sdp_minor <= 3: LOGGER.warning( "SDP interface version is <= 0.3. Returning csp_config unchanged." ) return csp_config for stn_beam in cbf_config.get("vis", {}).get("stn_beams", []): beam_id = stn_beam["stn_beam_id"] found_beam = None for beam in scan_receive_addrs.values(): if ( beam.get("function") == "visibilities" and beam.get("visibility_beam_id", 1) == beam_id ): found_beam = beam if found_beam is None: LOGGER.warning( "No visibility beam found in SDP receive " + "addresses matching CBF's station beam!" ) continue # Add the output addresses as applicable for in_map_name, out_map_name in MAPS_TO_SPLIT.items(): if in_map_name in found_beam: stn_beam[out_map_name] = found_beam[in_map_name] return csp_config
[docs] def add_midcbf_visibility_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into mid-cbf configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ # CSP: Get major and minor version of the interface csp_major, _ = split_interface_version(csp_interface_version) # Find CBF configuration if "cbf" in csp_config: # version 1.0 (ADR-18) cbf_config = csp_config["cbf"] elif "midcbf" in csp_config: # version 4.0 (ADR-99) cbf_config = csp_config["midcbf"] else: cbf_config = csp_config # Determine applicable field names FSP = "fsp" PR = "processing_regions" COR = "correlation" if csp_major == 0 or csp_major == 1: fsp_id = "fspID" fsp_ch_offset = "fspChannelOffset" MAPS_TO_SPLIT = { "host": "outputHost", "mac": "outputMac", "port": "outputPort", } elif csp_major == 2: fsp_id = "fsp_id" fsp_ch_offset = "channel_offset" MAPS_TO_SPLIT = { "host": "output_host", "mac": "output_mac", "port": "output_port", } elif csp_major == 3: fsp_id = "fsp_id" fsp_ch_offset = "channel_offset" MAPS_TO_SPLIT = { "host": "output_host", "port": "output_port", } else: start_channel_id = "sdp_start_channel_id" MAPS_TO_SPLIT = { "host": "output_host", "port": "output_port", } # SDP: Get major and minor version of the interface, find # applicable beam if we have a version that supports multiple # beams. sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if sdp_major > 0 or sdp_minor > 3: # Find first visibility beam (for CBF mid, we assume there to # only be one.) found_beam = None for beam in scan_receive_addrs.values(): if beam.get("function") == "visibilities": found_beam = beam if found_beam is None: LOGGER.warning("No visibility beam found in receive addresses!") return csp_config scan_receive_addrs = found_beam if (csp_major) <= 3: # Collect fsps, adding output offset as applicable for fsp in cbf_config[FSP]: # pylint: disable-next=possibly-used-before-assignment if fsp_ch_offset not in fsp: fsp[fsp_ch_offset] = get_fsp_output_channel_offset( # pylint: disable-next=possibly-used-before-assignment fsp, # pylint: disable-next=possibly-used-before-assignment fsp_id, fsp_ch_offset, ) # Sort fsps by output channel offset, determining channel groups fsps_sorted = list( sorted(cbf_config[FSP], key=lambda fsp: fsp[fsp_ch_offset]) ) fsp_offsets = [fsp[fsp_ch_offset] for fsp in fsps_sorted] ch_offset = get_fsp_channel_offset(cbf_config) for in_map_name, out_map_name in MAPS_TO_SPLIT.items(): # Split map, if present if in_map_name not in scan_receive_addrs: continue in_map = scan_receive_addrs[in_map_name] if len(in_map) > 0: groups = fsp_offsets + [in_map[-1][0] + 1] split_map = channel_map.split_channel_map_at( in_map, groups, ch_offset ) else: split_map = [[] for _ in cbf_config[FSP]] # Add to FSP sections for fsp, new_map in zip(cbf_config[FSP], split_map): fsp[out_map_name] = new_map else: # Sort processing regions by start_channel, determining channel groups prs_sorted = list( # pylint: disable-next=used-before-assignment sorted(cbf_config[COR][PR], key=lambda pr: pr[start_channel_id]) ) pr_offsets = [pr[start_channel_id] for pr in prs_sorted] for in_map_name, out_map_name in MAPS_TO_SPLIT.items(): # Split map in_map = scan_receive_addrs[in_map_name] if len(in_map) > 0: groups = pr_offsets + [in_map[-1][0] + 1] split_map = channel_map.split_channel_map_at(in_map, groups) else: split_map = [[] for _ in cbf_config[COR][PR]] # Add to FSP sections for pr, new_map in zip(cbf_config[COR][PR], split_map): pr[out_map_name] = new_map return csp_config
[docs] def add_pss_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into pulsar search configuration :param csp_config: CSP input configuration :param scan_receive_addrs: SDP receive addresses for scan :param csp_interface_version: CSP interface version to assume :param sdp_interface_version: SDP interface version to assume :returns: New CSP configuration """ csp_major, csp_minor = split_interface_version(csp_interface_version) if (csp_major, csp_minor) < (2, 1): return csp_config sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if (sdp_major, sdp_minor) < (0, 4): return csp_config def apply_receive_addrs(beam): beam_id = beam["beam_id"] found_dest_host = None found_dest_port = None for sdp_beam in scan_receive_addrs.values(): if sdp_beam.get("function") == "pulsar search": if sdp_beam.get("search_beam_id") == beam_id: found_dest_host = sdp_beam.get("host") found_dest_port = sdp_beam.get("port") break if found_dest_host is None: LOGGER.warning( "Could not find receive addresses for PSS search beam %s!", beam_id, ) else: beam["dest_host"] = found_dest_host[0][1] beam["dest_port"] = found_dest_port[0][1] # v1.2+: beams nested under cheetah[] if "cheetah" in csp_config.get("pss", {}): for cheetah in csp_config["pss"]["cheetah"]: for beam in cheetah.get("beam", []): apply_receive_addrs(beam) # v<1.2: flat beam list elif "beam" in csp_config.get("pss", {}): for beam in csp_config["pss"]["beam"]: apply_receive_addrs(beam) return csp_config
[docs] def add_pst_receive_addresses( csp_config: dict, scan_receive_addrs: dict, csp_interface_version: str, sdp_interface_version: str, ) -> dict: """ Add SDP visibility receive addresses into pulsar timing configuration :param scan_trype: Scan type executed :param csp_config: CSP input configuration :param sdp_receive_addrs: SDP receive addresses for scan :returns: New CSP configuration """ # Check that CSP and SDP versions are new enough, and the PST # section is actually populated csp_major, csp_minor = split_interface_version(csp_interface_version) if ( (csp_major, csp_minor) < (2, 2) or "pst" not in csp_config or "scan" not in csp_config["pst"] ): return csp_config sdp_major, sdp_minor = split_interface_version(sdp_interface_version) if (sdp_major, sdp_minor) < (0, 4): return csp_config # There is only one receive address for PST, therefore we only # need to find one PST beam found_dest_host = None found_dest_port = None for sdp_beam in scan_receive_addrs.values(): if sdp_beam.get("function") == "pulsar timing": found_dest_host = sdp_beam.get("host") found_dest_port = sdp_beam.get("port") break if found_dest_host is None: LOGGER.warning("Could not find receive addresses for pulsar timing!") return csp_config # Set csp_config["pst"]["scan"]["destination_address"] = [ found_dest_host[0][1], found_dest_port[0][1], ] return csp_config