Module modules.telemetry.data

Functions

def default_telemetry_dict(last_mission_time: int) ‑> dict[str, typing.Any]
Expand source code
def default_telemetry_dict(last_mission_time: int) -> dict[str, Any]:
    """Creates an empty telemetry packet in the format expected by the frontend

    Returns:
        dict[str, Any]: An empty telemetry packet
    """
    return {
        "last_mission_time": last_mission_time,
        "altitude_sea_level": {
            "mission_time": [],
            "metres": [],
            "feet": []
        },
        "altitude_launch_level": {
            "mission_time": [],
            "metres": [],
            "feet": []
        },
        "linear_acceleration": {
            "mission_time": [],
            "x": [],
            "y": [],
            "z": [],
            "magnitude": [],
        },
        "angular_velocity": {
            "mission_time": [],
            "x": [],
            "y": [],
            "z": [],
            "magnitude": [],
        },
        "temperature": {
            "mission_time": [],
            "celsius": []
        },
        "pressure": {
            "mission_time": [],
            "pascals": []
        },
        "humidity": {
            "mission_time": [],
            "percentage": []
        },
        "gnss": {
            "mission_time": [],
            "latitude": [],
            "longitude": []
        },
        "sats_in_use": {
            "mission_time": [],
            "gps": [[], []],
            "glonass": [[], []]
        },
        "voltage": {
            "mission_time": []
        },
        "magnetic_field": {
            "mission_time": [],
            "x": [],
            "y": [],
            "z": [],
            "magnitude": [],
        }
    }

Creates an empty telemetry packet in the format expected by the frontend

Returns

dict[str, Any]
An empty telemetry packet

Classes

class TelemetryBuffer (telemetry_buffer_size: int = 20)
Expand source code
class TelemetryBuffer:
    """Contains the output specification for the telemetry data block"""

    def __init__(self, telemetry_buffer_size: int = 20):
        """
        Initializes the telemetry data object.
        Args:
             telemetry_buffer_size: The size of the data buffer.
        """

        logger.debug(f"Initializing TelemetryBuffer[{telemetry_buffer_size}]")
        self.buffer_size: int = telemetry_buffer_size
        self.last_mission_time: int = -1
        self.output_blocks: defaultdict[str, list[Block]] = defaultdict(list)

    def add(self, blocks: list[Block]) -> None:
        """Updates telemetry object from given parsed blocks
        Args:
            packet_version (int): The packet encoding version
            blocks (list[ParsedBlock]): A list of parsed block objects"""
        for block in blocks:
            if block is None:
                logger.warning("Received None block, skipping")
                continue
            if isinstance(block, TimedBlock):
                if block.measurement_time > self.last_mission_time:
                    self.last_mission_time = block.measurement_time

            block_buffer = self.output_blocks[type(block).__name__]
            block_buffer.append(block)
            # keep up to buffer_size blocks in the buffer, remove oldest blocks if buffer is full
            while len(block_buffer) > self.buffer_size:
                self.output_blocks[type(block).__name__].pop(0)

    def update_buffer_size(self, new_buffer_size: int = 20) -> None:
        """Allows updating the telemetry buffer size without recreating object"""
        self.buffer_size = new_buffer_size

    def clear(self) -> None:
        """Clears the telemetry output data packet entirely"""
        self.last_mission_time = -1
        self.output_blocks = defaultdict(list)

    def get(self) -> dict[str, Any]:
        """Returns the buffered telemetry data in the expected format"""
        output = default_telemetry_dict(self.last_mission_time)
        for block_buffer in self.output_blocks.values():
            for block in block_buffer:
                # Don't actually remove blocks when they've been output,
                # re-transmit until they get pushed out of the buffer by new data
                block.output_formatted(output)

        return output

Contains the output specification for the telemetry data block

Initializes the telemetry data object.

Args

telemetry_buffer_size
The size of the data buffer.

Methods

def add(self,
blocks: list[Block]) ‑> None
Expand source code
def add(self, blocks: list[Block]) -> None:
    """Updates telemetry object from given parsed blocks
    Args:
        packet_version (int): The packet encoding version
        blocks (list[ParsedBlock]): A list of parsed block objects"""
    for block in blocks:
        if block is None:
            logger.warning("Received None block, skipping")
            continue
        if isinstance(block, TimedBlock):
            if block.measurement_time > self.last_mission_time:
                self.last_mission_time = block.measurement_time

        block_buffer = self.output_blocks[type(block).__name__]
        block_buffer.append(block)
        # keep up to buffer_size blocks in the buffer, remove oldest blocks if buffer is full
        while len(block_buffer) > self.buffer_size:
            self.output_blocks[type(block).__name__].pop(0)

Updates telemetry object from given parsed blocks

Args

packet_version : int
The packet encoding version
blocks : list[ParsedBlock]
A list of parsed block objects
def clear(self) ‑> None
Expand source code
def clear(self) -> None:
    """Clears the telemetry output data packet entirely"""
    self.last_mission_time = -1
    self.output_blocks = defaultdict(list)

Clears the telemetry output data packet entirely

def get(self) ‑> dict[str, typing.Any]
Expand source code
def get(self) -> dict[str, Any]:
    """Returns the buffered telemetry data in the expected format"""
    output = default_telemetry_dict(self.last_mission_time)
    for block_buffer in self.output_blocks.values():
        for block in block_buffer:
            # Don't actually remove blocks when they've been output,
            # re-transmit until they get pushed out of the buffer by new data
            block.output_formatted(output)

    return output

Returns the buffered telemetry data in the expected format

def update_buffer_size(self, new_buffer_size: int = 20) ‑> None
Expand source code
def update_buffer_size(self, new_buffer_size: int = 20) -> None:
    """Allows updating the telemetry buffer size without recreating object"""
    self.buffer_size = new_buffer_size

Allows updating the telemetry buffer size without recreating object