Module modules.telemetry.parsing_utils
Expand source code
from dataclasses import dataclass
from typing import List, Optional
import logging
from modules.telemetry.v1.block import (
PacketHeader,
BlockHeader,
DeviceAddress,
UnsupportedEncodingVersionError,
InvalidHeaderFieldValueError,
)
import modules.telemetry.v1.data_block as v1db
from modules.misc.config import Config
MIN_SUPPORTED_VERSION: int = 1
MAX_SUPPORTED_VERSION: int = 1
logger = logging.getLogger(__name__)
# Dataclasses that allow us to structure the telemetry data
@dataclass
class ParsedBlock:
"""Parsed block data from the telemetry process."""
# mission_time: int
block_name: str
block_header: BlockHeader
block_contents: dict[str, int | dict[str, int]]
@dataclass
class ParsedTransmission:
"""Parsed transmission data from the telemetry process."""
packet_header: PacketHeader
blocks: List[ParsedBlock]
# Parsing functions
def parse_rn2483_transmission(data: str, config: Config) -> Optional[ParsedTransmission]:
"""
Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet
is valid.
"""
# List of parsed blocks
parsed_blocks: list[ParsedBlock] = []
# Extract the packet header
data = data.strip() # Sometimes some extra whitespace
logger.debug(f"Full data string: {data}")
# TODO Make a generic abstract packet header class to encompass V1 packet header, etc
# Catch unsupported encoding versions by skipping packet
try:
pkt_hdr = PacketHeader.from_hex(data[:32])
except UnsupportedEncodingVersionError as e:
logger.error(f"{e}, skipping packet")
return
# We can keep unauthorized callsigns but we'll log them as warnings
from_approved_callsign(pkt_hdr, config.approved_callsigns)
if len(pkt_hdr) <= 32: # If this packet nothing more than just the header
logger.debug(f"{pkt_hdr}")
blocks = data[32:] # Remove the packet header
# Parse through all blocks
while blocks != "":
# Parse block header
logger.debug(f"Blocks: {blocks}")
logger.debug(f"Block header: {blocks[:8]}")
# Catch invalid block headers field values by skipping packet
try:
block_header = BlockHeader.from_hex(blocks[:8])
except InvalidHeaderFieldValueError as e:
logger.error(f"{e}, skipping packet")
return
# Select block contents
block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols
block_contents = blocks[8:block_len]
logger.debug(f"Block info: {block_header}")
# Check if message is destined for ground station for processing
if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]:
cur_block = parse_radio_block(pkt_hdr.version, block_header, block_contents)
if cur_block:
parsed_blocks.append(cur_block) # Append parsed block to list
else:
logger.warning("Invalid destination address")
# Remove the data we processed from the whole set, and move onto the next data block
blocks = blocks[block_len:]
return ParsedTransmission(pkt_hdr, parsed_blocks)
def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) -> bool:
"""Checks whether the call sign is recognized"""
# Ensure packet is from an approved call sign
if pkt_hdr.callsign in approved_callsigns:
logger.debug(f"Incoming packet from {pkt_hdr.callsign} ({approved_callsigns.get(pkt_hdr.callsign)})")
else:
logger.warning(f"Incoming packet from unauthorized call sign {pkt_hdr.callsign}")
return False
return True
def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) -> Optional[ParsedBlock]:
"""
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
"""
# Working with hex strings until this point.
# Hex/Bytes Demarcation point
logger.debug(
f"Parsing v{pkt_version} type {block_header.message_type} subtype {block_header.message_subtype} contents: \
{hex_block_contents}"
)
block_bytes: bytes = bytes.fromhex(hex_block_contents)
# Convert message subtype string to enum
try:
block_subtype = v1db.DataBlockSubtype(block_header.message_subtype)
except ValueError:
logger.error(f"Invalid data block subtype {block_header.message_subtype}!")
return
# Use the appropriate parser for the block subtype enum
try:
# TODO Make an interface to support multiple v1/v2/v3 objects
block_contents = v1db.DataBlock.parse(block_subtype, block_bytes)
except NotImplementedError:
logger.warning(
f"Block parsing for type {block_header.message_type}, with subtype {block_header.message_subtype} not \
implemented!"
)
return
except v1db.DataBlockException as e:
logger.error(e)
logger.error(f"Block header: {block_header}")
logger.error(f"Block contents: {hex_block_contents}")
return
block_name = block_subtype.name.lower()
logger.debug(str(block_contents))
# TODO fix at some point
# if block == DataBlockSubtype.STATUS:
# self.status.rocket = jsp.RocketData.from_data_block(block)
# return
return ParsedBlock(block_name, block_header, dict(block_contents)) # type: ignore
Functions
def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) ‑> bool
-
Checks whether the call sign is recognized
Expand source code
def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) -> bool: """Checks whether the call sign is recognized""" # Ensure packet is from an approved call sign if pkt_hdr.callsign in approved_callsigns: logger.debug(f"Incoming packet from {pkt_hdr.callsign} ({approved_callsigns.get(pkt_hdr.callsign)})") else: logger.warning(f"Incoming packet from unauthorized call sign {pkt_hdr.callsign}") return False return True
def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) ‑> Optional[ParsedBlock]
-
Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string.
Expand source code
def parse_radio_block(pkt_version: int, block_header: BlockHeader, hex_block_contents: str) -> Optional[ParsedBlock]: """ Parses telemetry payload blocks from either parsed packets or stored replays. Block contents are a hex string. """ # Working with hex strings until this point. # Hex/Bytes Demarcation point logger.debug( f"Parsing v{pkt_version} type {block_header.message_type} subtype {block_header.message_subtype} contents: \ {hex_block_contents}" ) block_bytes: bytes = bytes.fromhex(hex_block_contents) # Convert message subtype string to enum try: block_subtype = v1db.DataBlockSubtype(block_header.message_subtype) except ValueError: logger.error(f"Invalid data block subtype {block_header.message_subtype}!") return # Use the appropriate parser for the block subtype enum try: # TODO Make an interface to support multiple v1/v2/v3 objects block_contents = v1db.DataBlock.parse(block_subtype, block_bytes) except NotImplementedError: logger.warning( f"Block parsing for type {block_header.message_type}, with subtype {block_header.message_subtype} not \ implemented!" ) return except v1db.DataBlockException as e: logger.error(e) logger.error(f"Block header: {block_header}") logger.error(f"Block contents: {hex_block_contents}") return block_name = block_subtype.name.lower() logger.debug(str(block_contents)) # TODO fix at some point # if block == DataBlockSubtype.STATUS: # self.status.rocket = jsp.RocketData.from_data_block(block) # return return ParsedBlock(block_name, block_header, dict(block_contents)) # type: ignore
def parse_rn2483_transmission(data: str, config: Config) ‑> Optional[ParsedTransmission]
-
Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet is valid.
Expand source code
def parse_rn2483_transmission(data: str, config: Config) -> Optional[ParsedTransmission]: """ Parses RN2483 Packets and extracts our telemetry payload blocks, returns parsed transmission object if packet is valid. """ # List of parsed blocks parsed_blocks: list[ParsedBlock] = [] # Extract the packet header data = data.strip() # Sometimes some extra whitespace logger.debug(f"Full data string: {data}") # TODO Make a generic abstract packet header class to encompass V1 packet header, etc # Catch unsupported encoding versions by skipping packet try: pkt_hdr = PacketHeader.from_hex(data[:32]) except UnsupportedEncodingVersionError as e: logger.error(f"{e}, skipping packet") return # We can keep unauthorized callsigns but we'll log them as warnings from_approved_callsign(pkt_hdr, config.approved_callsigns) if len(pkt_hdr) <= 32: # If this packet nothing more than just the header logger.debug(f"{pkt_hdr}") blocks = data[32:] # Remove the packet header # Parse through all blocks while blocks != "": # Parse block header logger.debug(f"Blocks: {blocks}") logger.debug(f"Block header: {blocks[:8]}") # Catch invalid block headers field values by skipping packet try: block_header = BlockHeader.from_hex(blocks[:8]) except InvalidHeaderFieldValueError as e: logger.error(f"{e}, skipping packet") return # Select block contents block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols block_contents = blocks[8:block_len] logger.debug(f"Block info: {block_header}") # Check if message is destined for ground station for processing if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]: cur_block = parse_radio_block(pkt_hdr.version, block_header, block_contents) if cur_block: parsed_blocks.append(cur_block) # Append parsed block to list else: logger.warning("Invalid destination address") # Remove the data we processed from the whole set, and move onto the next data block blocks = blocks[block_len:] return ParsedTransmission(pkt_hdr, parsed_blocks)
Classes
class ParsedBlock (block_name: str, block_header: BlockHeader, block_contents: dict[str, int | dict[str, int]])
-
Parsed block data from the telemetry process.
Expand source code
@dataclass class ParsedBlock: """Parsed block data from the telemetry process.""" # mission_time: int block_name: str block_header: BlockHeader block_contents: dict[str, int | dict[str, int]]
Class variables
var block_contents : dict[str, int | dict[str, int]]
var block_header : BlockHeader
var block_name : str
class ParsedTransmission (packet_header: PacketHeader, blocks: List[ParsedBlock])
-
Parsed transmission data from the telemetry process.
Expand source code
@dataclass class ParsedTransmission: """Parsed transmission data from the telemetry process.""" packet_header: PacketHeader blocks: List[ParsedBlock]
Class variables
var blocks : List[ParsedBlock]
var packet_header : PacketHeader