Module modules.telemetry.data

Expand source code
# Contains the status data object class
__author__ = "Matteo Golin"

import json

# Imports
import logging
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import TypeAlias
from modules.telemetry.parsing_utils import ParsedBlock

# Aliases
OutputFormat: TypeAlias = dict[str, dict[str, dict[str, dict[str, str]]]]

logger = logging.getLogger(__name__)


@dataclass
class TelemetryDataPacket:
    """A generic block object to store information for telemetry data
    All stored values must be updated at once!"""

    mission_time: list[int] = field(default_factory=list[int])
    stored_values: dict[str, list[int]] = field(default_factory=dict)

    def update(self, data: dict[str, int], buffer_size: int) -> None:
        """Updates the stored values with the given data
        Args:
            data (dict[str, int]) : Dictionary of data to update containing mission_time and stored_values squashed
            buffer_size (int) : Size of the telemetry buffer"""

        # Ensure you are not half updating the packet
        # As this can cause the arrays to become out of sync and meaningless.
        if dict(self).keys() != data.keys():
            logger.error("Block must be updated using a full set of values at the same time!")
            logger.debug(f"Tried updating {dict(self).keys()} using {data.keys()}!")
            return

        # Updates stored values with new values
        for key in data.keys():
            if key == "mission_time":
                self.mission_time.append(data["mission_time"])
            else:
                self.stored_values[key].append(data[key])

        # Ensure buffer sizes are kept
        # Note: This uses a while loop incase buffer size shrinks drastically
        while len(self.mission_time) > buffer_size:
            self.mission_time.pop(0)
            for key in self.stored_values.keys():
                self.stored_values[key].pop(0)

    def clear(self) -> None:
        """Clears all stored values"""
        self.mission_time = []
        self.stored_values = {key: [] for key in self.stored_values.keys()}

    def __str__(self):
        """Returns a string representation of the TelemetryDataPacket"""
        return f"{self.__class__.__name__} -> time: {self.mission_time} ms, {self.stored_values}"

    def __iter__(self):
        """Returns an interator containing all the stored values"""
        yield "mission_time", self.mission_time
        for key in self.stored_values.keys():
            yield key, self.stored_values[key]


class TelemetryData:
    """Contains the output specification for the telemetry data block"""

    def __init__(self, telemetry_buffer_size: int = 20):
        """
        Initializes the telemetry data object.
        Args:
             telemetry_buffer_size: The size of the data buffer.
        """

        logger.debug(f"Initializing TelemetryData[{telemetry_buffer_size}]")
        self.buffer_size: int = telemetry_buffer_size
        self.decoder: list[dict[int, dict[str, str]]] = [{} for _ in range(5)]

        self.last_mission_time: int = -1
        self.output_blocks: dict[str, TelemetryDataPacket] = {}
        self.update_buffer: dict[str, dict[str, float | int | str | None]] = {}

        # Read packet definition file
        filepath = os.path.join(Path(__file__).parents[0], "telemetry_packet.json")
        with open(filepath, "r") as file:
            output_format: OutputFormat = dict(json.load(file))

        # Generate telemetry data packet from output specification
        for key in output_format.keys():
            telemetry_keys: list[str] = list(output_format[key].keys())
            self.output_blocks[key] = TelemetryDataPacket(stored_values={key: [] for key in telemetry_keys})
            self.update_buffer[key] = {key: None for key in telemetry_keys}

        # Generate extremely efficient access decoder matrix
        #                                        = {INPUT: OUTPUT}     "dataPacketBlockName.storedValueVariable"
        # decoder[packet_version][block_subtype] = {"gps_sats_in_use": "sats_in_use.gps_sats_in_use"}
        for data_packet in output_format.keys():
            for stored_value in output_format[data_packet].keys():
                for version in output_format[data_packet][stored_value].keys():
                    for block in output_format[data_packet][stored_value][version].keys():
                        input_key: str = output_format[data_packet][stored_value][version][block]
                        output_key: str = f"{data_packet}.{stored_value}"

                        existing: dict[str, str] = self.decoder[int(version)].get(int(block), {})
                        existing[input_key] = output_key
                        self.decoder[int(version)][int(block)] = existing

    def update_telemetry(self, packet_version: int, blocks: list[ParsedBlock]) -> None:
        """Updates telemetry object from given parsed blocks
        Args:
            packet_version (int): The packet encoding version
            blocks (list[ParsedBlock]): A list of parsed block objects"""

        # Extract block data
        for block in blocks:
            block_num: int = block.block_header.message_subtype
            block_decode: dict[str, str] = self.decoder[packet_version][block_num]
            data: dict[str, int | dict[str, int]] = block.block_contents

            logger.debug(f"{block}")

            try:
                # Update last mission time
                if data["mission_time"] > self.last_mission_time:  # type: ignore
                    self.last_mission_time = data["mission_time"]  # type: ignore

                # Grab input values and put them in update buffer (to fill output packets)
                for key in block_decode.keys():
                    destinationBlock: str = block_decode[key].split(".")[0]
                    destinationValue: str = block_decode[key].split(".")[1]
                    # Extract data and associated mission time to buffer
                    self.update_buffer[destinationBlock]["mission_time"] = data["mission_time"]  # type: ignore
                    if "." in key:
                        datakey: str = key.split(".")[0]
                        datasubkey: str = key.split(".")[1]
                        self.update_buffer[destinationBlock][destinationValue] = data[datakey][  # type: ignore
                            datasubkey
                        ]
                    else:
                        self.update_buffer[destinationBlock][destinationValue] = data[key]  # type: ignore

                # Check if we filled any packet during this block extraction
                for key in self.update_buffer.keys():
                    if None not in self.update_buffer[key].values():
                        # Let's update packet
                        self.output_blocks[key].update(self.update_buffer[key], self.buffer_size)  # type: ignore
                        # Clear packets buffer
                        for subkey in self.update_buffer[key].keys():
                            self.update_buffer[key][subkey] = None
            except KeyError as e:
                logger.error(f"Telemetry parsed block data issue. Missing key {e}")

    def update_buffer_size(self, new_buffer_size: int = 20) -> None:
        """Allows updating the telemetry buffer size without recreating object"""
        self.buffer_size = new_buffer_size

    def clear(self) -> None:
        """Clears the telemetry output data packet entirely"""
        self.last_mission_time = -1
        # Clear buffer
        for dest_block in self.update_buffer.keys():
            for dest_value in self.update_buffer[dest_block].keys():
                self.update_buffer[dest_block][dest_value] = None
        # Clear packet blocks
        for block in self.output_blocks.values():
            block.clear()

    def __iter__(self):
        """Returns an iterator containing all the packets"""
        yield "last_mission_time", self.last_mission_time
        for key in self.output_blocks.keys():
            yield key, dict(self.output_blocks[key])

Classes

class TelemetryData (telemetry_buffer_size: int = 20)

Contains the output specification for the telemetry data block

Initializes the telemetry data object.

Args

telemetry_buffer_size
The size of the data buffer.
Expand source code
class TelemetryData:
    """Contains the output specification for the telemetry data block"""

    def __init__(self, telemetry_buffer_size: int = 20):
        """
        Initializes the telemetry data object.
        Args:
             telemetry_buffer_size: The size of the data buffer.
        """

        logger.debug(f"Initializing TelemetryData[{telemetry_buffer_size}]")
        self.buffer_size: int = telemetry_buffer_size
        self.decoder: list[dict[int, dict[str, str]]] = [{} for _ in range(5)]

        self.last_mission_time: int = -1
        self.output_blocks: dict[str, TelemetryDataPacket] = {}
        self.update_buffer: dict[str, dict[str, float | int | str | None]] = {}

        # Read packet definition file
        filepath = os.path.join(Path(__file__).parents[0], "telemetry_packet.json")
        with open(filepath, "r") as file:
            output_format: OutputFormat = dict(json.load(file))

        # Generate telemetry data packet from output specification
        for key in output_format.keys():
            telemetry_keys: list[str] = list(output_format[key].keys())
            self.output_blocks[key] = TelemetryDataPacket(stored_values={key: [] for key in telemetry_keys})
            self.update_buffer[key] = {key: None for key in telemetry_keys}

        # Generate extremely efficient access decoder matrix
        #                                        = {INPUT: OUTPUT}     "dataPacketBlockName.storedValueVariable"
        # decoder[packet_version][block_subtype] = {"gps_sats_in_use": "sats_in_use.gps_sats_in_use"}
        for data_packet in output_format.keys():
            for stored_value in output_format[data_packet].keys():
                for version in output_format[data_packet][stored_value].keys():
                    for block in output_format[data_packet][stored_value][version].keys():
                        input_key: str = output_format[data_packet][stored_value][version][block]
                        output_key: str = f"{data_packet}.{stored_value}"

                        existing: dict[str, str] = self.decoder[int(version)].get(int(block), {})
                        existing[input_key] = output_key
                        self.decoder[int(version)][int(block)] = existing

    def update_telemetry(self, packet_version: int, blocks: list[ParsedBlock]) -> None:
        """Updates telemetry object from given parsed blocks
        Args:
            packet_version (int): The packet encoding version
            blocks (list[ParsedBlock]): A list of parsed block objects"""

        # Extract block data
        for block in blocks:
            block_num: int = block.block_header.message_subtype
            block_decode: dict[str, str] = self.decoder[packet_version][block_num]
            data: dict[str, int | dict[str, int]] = block.block_contents

            logger.debug(f"{block}")

            try:
                # Update last mission time
                if data["mission_time"] > self.last_mission_time:  # type: ignore
                    self.last_mission_time = data["mission_time"]  # type: ignore

                # Grab input values and put them in update buffer (to fill output packets)
                for key in block_decode.keys():
                    destinationBlock: str = block_decode[key].split(".")[0]
                    destinationValue: str = block_decode[key].split(".")[1]
                    # Extract data and associated mission time to buffer
                    self.update_buffer[destinationBlock]["mission_time"] = data["mission_time"]  # type: ignore
                    if "." in key:
                        datakey: str = key.split(".")[0]
                        datasubkey: str = key.split(".")[1]
                        self.update_buffer[destinationBlock][destinationValue] = data[datakey][  # type: ignore
                            datasubkey
                        ]
                    else:
                        self.update_buffer[destinationBlock][destinationValue] = data[key]  # type: ignore

                # Check if we filled any packet during this block extraction
                for key in self.update_buffer.keys():
                    if None not in self.update_buffer[key].values():
                        # Let's update packet
                        self.output_blocks[key].update(self.update_buffer[key], self.buffer_size)  # type: ignore
                        # Clear packets buffer
                        for subkey in self.update_buffer[key].keys():
                            self.update_buffer[key][subkey] = None
            except KeyError as e:
                logger.error(f"Telemetry parsed block data issue. Missing key {e}")

    def update_buffer_size(self, new_buffer_size: int = 20) -> None:
        """Allows updating the telemetry buffer size without recreating object"""
        self.buffer_size = new_buffer_size

    def clear(self) -> None:
        """Clears the telemetry output data packet entirely"""
        self.last_mission_time = -1
        # Clear buffer
        for dest_block in self.update_buffer.keys():
            for dest_value in self.update_buffer[dest_block].keys():
                self.update_buffer[dest_block][dest_value] = None
        # Clear packet blocks
        for block in self.output_blocks.values():
            block.clear()

    def __iter__(self):
        """Returns an iterator containing all the packets"""
        yield "last_mission_time", self.last_mission_time
        for key in self.output_blocks.keys():
            yield key, dict(self.output_blocks[key])

Methods

def clear(self) ‑> None

Clears the telemetry output data packet entirely

Expand source code
def clear(self) -> None:
    """Clears the telemetry output data packet entirely"""
    self.last_mission_time = -1
    # Clear buffer
    for dest_block in self.update_buffer.keys():
        for dest_value in self.update_buffer[dest_block].keys():
            self.update_buffer[dest_block][dest_value] = None
    # Clear packet blocks
    for block in self.output_blocks.values():
        block.clear()
def update_buffer_size(self, new_buffer_size: int = 20) ‑> None

Allows updating the telemetry buffer size without recreating object

Expand source code
def update_buffer_size(self, new_buffer_size: int = 20) -> None:
    """Allows updating the telemetry buffer size without recreating object"""
    self.buffer_size = new_buffer_size
def update_telemetry(self, packet_version: int, blocks: list[ParsedBlock]) ‑> None

Updates telemetry object from given parsed blocks

Args

packet_version : int
The packet encoding version
blocks : list[ParsedBlock]
A list of parsed block objects
Expand source code
def update_telemetry(self, packet_version: int, blocks: list[ParsedBlock]) -> None:
    """Updates telemetry object from given parsed blocks
    Args:
        packet_version (int): The packet encoding version
        blocks (list[ParsedBlock]): A list of parsed block objects"""

    # Extract block data
    for block in blocks:
        block_num: int = block.block_header.message_subtype
        block_decode: dict[str, str] = self.decoder[packet_version][block_num]
        data: dict[str, int | dict[str, int]] = block.block_contents

        logger.debug(f"{block}")

        try:
            # Update last mission time
            if data["mission_time"] > self.last_mission_time:  # type: ignore
                self.last_mission_time = data["mission_time"]  # type: ignore

            # Grab input values and put them in update buffer (to fill output packets)
            for key in block_decode.keys():
                destinationBlock: str = block_decode[key].split(".")[0]
                destinationValue: str = block_decode[key].split(".")[1]
                # Extract data and associated mission time to buffer
                self.update_buffer[destinationBlock]["mission_time"] = data["mission_time"]  # type: ignore
                if "." in key:
                    datakey: str = key.split(".")[0]
                    datasubkey: str = key.split(".")[1]
                    self.update_buffer[destinationBlock][destinationValue] = data[datakey][  # type: ignore
                        datasubkey
                    ]
                else:
                    self.update_buffer[destinationBlock][destinationValue] = data[key]  # type: ignore

            # Check if we filled any packet during this block extraction
            for key in self.update_buffer.keys():
                if None not in self.update_buffer[key].values():
                    # Let's update packet
                    self.output_blocks[key].update(self.update_buffer[key], self.buffer_size)  # type: ignore
                    # Clear packets buffer
                    for subkey in self.update_buffer[key].keys():
                        self.update_buffer[key][subkey] = None
        except KeyError as e:
            logger.error(f"Telemetry parsed block data issue. Missing key {e}")
class TelemetryDataPacket (mission_time: list[int] = <factory>, stored_values: dict[str, list[int]] = <factory>)

A generic block object to store information for telemetry data All stored values must be updated at once!

Expand source code
@dataclass
class TelemetryDataPacket:
    """A generic block object to store information for telemetry data
    All stored values must be updated at once!"""

    mission_time: list[int] = field(default_factory=list[int])
    stored_values: dict[str, list[int]] = field(default_factory=dict)

    def update(self, data: dict[str, int], buffer_size: int) -> None:
        """Updates the stored values with the given data
        Args:
            data (dict[str, int]) : Dictionary of data to update containing mission_time and stored_values squashed
            buffer_size (int) : Size of the telemetry buffer"""

        # Ensure you are not half updating the packet
        # As this can cause the arrays to become out of sync and meaningless.
        if dict(self).keys() != data.keys():
            logger.error("Block must be updated using a full set of values at the same time!")
            logger.debug(f"Tried updating {dict(self).keys()} using {data.keys()}!")
            return

        # Updates stored values with new values
        for key in data.keys():
            if key == "mission_time":
                self.mission_time.append(data["mission_time"])
            else:
                self.stored_values[key].append(data[key])

        # Ensure buffer sizes are kept
        # Note: This uses a while loop incase buffer size shrinks drastically
        while len(self.mission_time) > buffer_size:
            self.mission_time.pop(0)
            for key in self.stored_values.keys():
                self.stored_values[key].pop(0)

    def clear(self) -> None:
        """Clears all stored values"""
        self.mission_time = []
        self.stored_values = {key: [] for key in self.stored_values.keys()}

    def __str__(self):
        """Returns a string representation of the TelemetryDataPacket"""
        return f"{self.__class__.__name__} -> time: {self.mission_time} ms, {self.stored_values}"

    def __iter__(self):
        """Returns an interator containing all the stored values"""
        yield "mission_time", self.mission_time
        for key in self.stored_values.keys():
            yield key, self.stored_values[key]

Class variables

var mission_time : list[int]
var stored_values : dict[str, list[int]]

Methods

def clear(self) ‑> None

Clears all stored values

Expand source code
def clear(self) -> None:
    """Clears all stored values"""
    self.mission_time = []
    self.stored_values = {key: [] for key in self.stored_values.keys()}
def update(self, data: dict[str, int], buffer_size: int) ‑> None

Updates the stored values with the given data

Args

data (dict[str, int]) : Dictionary of data to update containing mission_time and stored_values squashed buffer_size (int) : Size of the telemetry buffer

Expand source code
def update(self, data: dict[str, int], buffer_size: int) -> None:
    """Updates the stored values with the given data
    Args:
        data (dict[str, int]) : Dictionary of data to update containing mission_time and stored_values squashed
        buffer_size (int) : Size of the telemetry buffer"""

    # Ensure you are not half updating the packet
    # As this can cause the arrays to become out of sync and meaningless.
    if dict(self).keys() != data.keys():
        logger.error("Block must be updated using a full set of values at the same time!")
        logger.debug(f"Tried updating {dict(self).keys()} using {data.keys()}!")
        return

    # Updates stored values with new values
    for key in data.keys():
        if key == "mission_time":
            self.mission_time.append(data["mission_time"])
        else:
            self.stored_values[key].append(data[key])

    # Ensure buffer sizes are kept
    # Note: This uses a while loop incase buffer size shrinks drastically
    while len(self.mission_time) > buffer_size:
        self.mission_time.pop(0)
        for key in self.stored_values.keys():
            self.stored_values[key].pop(0)