Module modules.telemetry.parsing_utils

Functions

def from_approved_callsign(pkt_hdr: PacketHeader,
approved_callsigns: dict[str, str]) ‑> bool
Expand source code
def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) -> bool:
    """Checks whether the call sign is recognized"""

    # Ensure packet is from an approved call sign
    if pkt_hdr.callsign in approved_callsigns:
        logger.debug(f"Incoming packet from {pkt_hdr.callsign} ({approved_callsigns.get(pkt_hdr.callsign)})")
        return True
    for callsign in approved_callsigns:
        if pkt_hdr.callsign.startswith(callsign):
            logger.debug(f"Incoming packet from {pkt_hdr.callsign} ({approved_callsigns.get(pkt_hdr.callsign)})")
            return True

    logger.warning(f"Incoming packet from unauthorized call sign {pkt_hdr.callsign}")
    return False

Checks whether the call sign is recognized

def parse_blocks(packet_header: PacketHeader,
encoded_blocks: bytes) ‑> List[Block]
Expand source code
def parse_blocks(packet_header: PacketHeader, encoded_blocks: bytes) -> List[Block]:
    """
    Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
    """

    # List of parsed blocks
    parsed_blocks: list[Block] = []

    # Parse through all encoded_blocks
    while len(encoded_blocks) > 0:
        # Parse block header
        logger.debug(f"encoded_blocks: {encoded_blocks}")
        logger.debug(f"Block header: {encoded_blocks[:BLOCK_HEADER_LENGTH]}")

        # Catch invalid block headers field values by skipping packet
        try:
            block_header = parse_block_header(encoded_blocks[:BLOCK_HEADER_LENGTH])
        except InvalidHeaderFieldValueError as e:
            logger.error(f"{e}, skipping rest of packet")
            return parsed_blocks

        logger.debug(block_header)

        block_len = get_block_class(block_header.type).size()
        block_contents = encoded_blocks[BLOCK_HEADER_LENGTH : BLOCK_HEADER_LENGTH + block_len]

        data_block = None
        try:
            data_block = parse_block_contents(packet_header, block_header, block_contents)
        except InvalidBlockContents as e:
            logger.error(f"{e}")

        if data_block is not None:
            logger.debug(data_block)
            parsed_blocks.append(data_block)
        # Remove the data we processed from the whole set, and move onto the next data block
        encoded_blocks = encoded_blocks[BLOCK_HEADER_LENGTH + block_len :]

    return parsed_blocks

Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.

def parse_rn2483_transmission(data: str,
config: Config) ‑> ParsedTransmission | None
Expand source code
def parse_rn2483_transmission(data: str, config: Config) -> Optional[ParsedTransmission]:
    """
    Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet
    is valid.
    """

    # Extract the packet header
    data = data.strip()  # Sometimes some extra whitespace
    packet_bytes = bytes.fromhex(data)
    logger.debug(f"Full data string: {data}")

    # Catch unsupported encoding versions by skipping packet
    try:
        packet_header: PacketHeader = parse_packet_header(packet_bytes[:PACKET_HEADER_LENGTH])
    except InvalidHeaderFieldValueError as e:
        logger.error(f"{e}, skipping packet")
        return

    logger.debug(packet_header)
    # We can keep unauthorized callsigns but we'll log them as warnings
    from_approved_callsign(packet_header, config.approved_callsigns)
    blocks = packet_bytes[PACKET_HEADER_LENGTH:]  # Remove the packet header

    return ParsedTransmission(packet_header, parse_blocks(packet_header, blocks))

Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet is valid.

Classes

class ParsedTransmission (packet_header: PacketHeader,
blocks: List[Block])
Expand source code
@dataclass
class ParsedTransmission:
    """Parsed transmission data from the telemetry process."""

    packet_header: PacketHeader
    blocks: List[Block]

Parsed transmission data from the telemetry process.

Instance variables

var blocks : List[Block]
var packet_headerPacketHeader