import logging
from ska_telmodel._common import split_interface_version
from .. import channel_map
LOGGER = logging.getLogger("ska_telmodel.csp")
[docs]
def get_fsp_channel_offset(csp_config_in: dict) -> int:
"""
Determines first channel ID within an FSP
"""
# Fixed?
return 0
[docs]
def get_fsp_output_channel_offset(
fsp_config: dict, fsp_id: str, fsp_ch_offset: str
) -> int:
"""
Determines the FSP channel offset. Either read from the dictionary
or reconstructed.
:param fsp_config: FSP configuration structure
:param fsp_id: Position of FSP in configuration
:param fsp_ch_offset: Name of FSP channel offset field
"""
# Return channel offset if it is set
if fsp_ch_offset in fsp_config:
return fsp_config[fsp_ch_offset]
# Otherwise fallback to calculation from FSP ID
fallback_offset = 14880 * (fsp_config[fsp_id] - 1)
LOGGER.warning(
"FSP lacks output channel offset, using "
f"{fallback_offset}: {fsp_config}"
)
return fallback_offset
[docs]
def add_receive_addresses(
scan_type: str,
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
telescope_branch: str = "mid",
) -> dict:
"""
Add SDP visibility receive addresses into CSP configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
if telescope_branch.lower() == "mid":
csp_config = add_midcbf_visibility_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
elif telescope_branch.lower() == "low":
csp_config = add_lowcbf_visibility_receive_addresses(
csp_config, scan_receive_addrs, sdp_interface_version
)
csp_config = add_pss_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
csp_config = add_pst_receive_addresses(
csp_config,
scan_receive_addrs,
csp_interface_version,
sdp_interface_version,
)
return csp_config
[docs]
def add_lowcbf_visibility_receive_addresses(
csp_config: dict, scan_receive_addrs: dict, sdp_interface_version
) -> dict:
"""
Add SDP visibility receive addresses into low-cbf configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param sdp_interface_version: SDP interface version
:returns: New CSP configuration
"""
# Find lowCBF configuration
cbf_config = csp_config.get("lowcbf", csp_config)
# Determine applicable field names
MAPS_TO_SPLIT = {
"host": "host",
"mac": "mac",
"port": "port",
}
# Ensure split_interface_version function is defined
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if sdp_major == 0 and sdp_minor <= 3:
LOGGER.warning(
"SDP interface version is <= 0.3. Returning csp_config unchanged."
)
return csp_config
for stn_beam in cbf_config.get("vis", {}).get("stn_beams", []):
beam_id = stn_beam["stn_beam_id"]
found_beam = None
for beam in scan_receive_addrs.values():
if (
beam.get("function") == "visibilities"
and beam.get("visibility_beam_id", 1) == beam_id
):
found_beam = beam
if found_beam is None:
LOGGER.warning(
"No visibility beam found in SDP receive "
+ "addresses matching CBF's station beam!"
)
continue
# Add the output addresses as applicable
for in_map_name, out_map_name in MAPS_TO_SPLIT.items():
if in_map_name in found_beam:
stn_beam[out_map_name] = found_beam[in_map_name]
return csp_config
[docs]
def add_midcbf_visibility_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into mid-cbf configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
# CSP: Get major and minor version of the interface
csp_major, _ = split_interface_version(csp_interface_version)
# Find CBF configuration
if "cbf" in csp_config:
# version 1.0 (ADR-18)
cbf_config = csp_config["cbf"]
elif "midcbf" in csp_config:
# version 4.0 (ADR-99)
cbf_config = csp_config["midcbf"]
else:
cbf_config = csp_config
# Determine applicable field names
FSP = "fsp"
PR = "processing_regions"
COR = "correlation"
if csp_major == 0 or csp_major == 1:
fsp_id = "fspID"
fsp_ch_offset = "fspChannelOffset"
MAPS_TO_SPLIT = {
"host": "outputHost",
"mac": "outputMac",
"port": "outputPort",
}
elif csp_major == 2:
fsp_id = "fsp_id"
fsp_ch_offset = "channel_offset"
MAPS_TO_SPLIT = {
"host": "output_host",
"mac": "output_mac",
"port": "output_port",
}
elif csp_major == 3:
fsp_id = "fsp_id"
fsp_ch_offset = "channel_offset"
MAPS_TO_SPLIT = {
"host": "output_host",
"port": "output_port",
}
else:
start_channel_id = "sdp_start_channel_id"
MAPS_TO_SPLIT = {
"host": "output_host",
"port": "output_port",
}
# SDP: Get major and minor version of the interface, find
# applicable beam if we have a version that supports multiple
# beams.
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if sdp_major > 0 or sdp_minor > 3:
# Find first visibility beam (for CBF mid, we assume there to
# only be one.)
found_beam = None
for beam in scan_receive_addrs.values():
if beam.get("function") == "visibilities":
found_beam = beam
if found_beam is None:
LOGGER.warning("No visibility beam found in receive addresses!")
return csp_config
scan_receive_addrs = found_beam
if (csp_major) <= 3:
# Collect fsps, adding output offset as applicable
for fsp in cbf_config[FSP]:
# pylint: disable-next=possibly-used-before-assignment
if fsp_ch_offset not in fsp:
fsp[fsp_ch_offset] = get_fsp_output_channel_offset(
# pylint: disable-next=possibly-used-before-assignment
fsp,
# pylint: disable-next=possibly-used-before-assignment
fsp_id,
fsp_ch_offset,
)
# Sort fsps by output channel offset, determining channel groups
fsps_sorted = list(
sorted(cbf_config[FSP], key=lambda fsp: fsp[fsp_ch_offset])
)
fsp_offsets = [fsp[fsp_ch_offset] for fsp in fsps_sorted]
ch_offset = get_fsp_channel_offset(cbf_config)
for in_map_name, out_map_name in MAPS_TO_SPLIT.items():
# Split map, if present
if in_map_name not in scan_receive_addrs:
continue
in_map = scan_receive_addrs[in_map_name]
if len(in_map) > 0:
groups = fsp_offsets + [in_map[-1][0] + 1]
split_map = channel_map.split_channel_map_at(
in_map, groups, ch_offset
)
else:
split_map = [[] for _ in cbf_config[FSP]]
# Add to FSP sections
for fsp, new_map in zip(cbf_config[FSP], split_map):
fsp[out_map_name] = new_map
else:
# Sort processing regions by start_channel, determining channel groups
prs_sorted = list(
# pylint: disable-next=used-before-assignment
sorted(cbf_config[COR][PR], key=lambda pr: pr[start_channel_id])
)
pr_offsets = [pr[start_channel_id] for pr in prs_sorted]
for in_map_name, out_map_name in MAPS_TO_SPLIT.items():
# Split map
in_map = scan_receive_addrs[in_map_name]
if len(in_map) > 0:
groups = pr_offsets + [in_map[-1][0] + 1]
split_map = channel_map.split_channel_map_at(in_map, groups)
else:
split_map = [[] for _ in cbf_config[COR][PR]]
# Add to FSP sections
for pr, new_map in zip(cbf_config[COR][PR], split_map):
pr[out_map_name] = new_map
return csp_config
[docs]
def add_pss_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into pulsar search configuration
:param csp_config: CSP input configuration
:param scan_receive_addrs: SDP receive addresses for scan
:param csp_interface_version: CSP interface version to assume
:param sdp_interface_version: SDP interface version to assume
:returns: New CSP configuration
"""
csp_major, csp_minor = split_interface_version(csp_interface_version)
if (csp_major, csp_minor) < (2, 1):
return csp_config
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if (sdp_major, sdp_minor) < (0, 4):
return csp_config
def apply_receive_addrs(beam):
beam_id = beam["beam_id"]
found_dest_host = None
found_dest_port = None
for sdp_beam in scan_receive_addrs.values():
if sdp_beam.get("function") == "pulsar search":
if sdp_beam.get("search_beam_id") == beam_id:
found_dest_host = sdp_beam.get("host")
found_dest_port = sdp_beam.get("port")
break
if found_dest_host is None:
LOGGER.warning(
"Could not find receive addresses for PSS search beam %s!",
beam_id,
)
else:
beam["dest_host"] = found_dest_host[0][1]
beam["dest_port"] = found_dest_port[0][1]
# v1.2+: beams nested under cheetah[]
if "cheetah" in csp_config.get("pss", {}):
for cheetah in csp_config["pss"]["cheetah"]:
for beam in cheetah.get("beam", []):
apply_receive_addrs(beam)
# v<1.2: flat beam list
elif "beam" in csp_config.get("pss", {}):
for beam in csp_config["pss"]["beam"]:
apply_receive_addrs(beam)
return csp_config
[docs]
def add_pst_receive_addresses(
csp_config: dict,
scan_receive_addrs: dict,
csp_interface_version: str,
sdp_interface_version: str,
) -> dict:
"""
Add SDP visibility receive addresses into pulsar timing configuration
:param scan_trype: Scan type executed
:param csp_config: CSP input configuration
:param sdp_receive_addrs: SDP receive addresses for scan
:returns: New CSP configuration
"""
# Check that CSP and SDP versions are new enough, and the PST
# section is actually populated
csp_major, csp_minor = split_interface_version(csp_interface_version)
if (
(csp_major, csp_minor) < (2, 2)
or "pst" not in csp_config
or "scan" not in csp_config["pst"]
):
return csp_config
sdp_major, sdp_minor = split_interface_version(sdp_interface_version)
if (sdp_major, sdp_minor) < (0, 4):
return csp_config
# There is only one receive address for PST, therefore we only
# need to find one PST beam
found_dest_host = None
found_dest_port = None
for sdp_beam in scan_receive_addrs.values():
if sdp_beam.get("function") == "pulsar timing":
found_dest_host = sdp_beam.get("host")
found_dest_port = sdp_beam.get("port")
break
if found_dest_host is None:
LOGGER.warning("Could not find receive addresses for pulsar timing!")
return csp_config
# Set
csp_config["pst"]["scan"]["destination_address"] = [
found_dest_host[0][1],
found_dest_port[0][1],
]
return csp_config