Module modules.telemetry.telemetry

Telemetry to parse radio packets, keep history and to log everything. Incoming information comes from rn2483_radio_payloads in payload format. Outputs information to telemetry_json_output in friendly JSON for UI.

Expand source code
"""
Telemetry to parse radio packets, keep history and to log everything.
Incoming information comes from rn2483_radio_payloads in payload format.
Outputs information to telemetry_json_output in friendly JSON for UI.
"""

from io import BufferedWriter
import logging
from ast import literal_eval
from queue import Queue
import multiprocessing as mp
from multiprocessing import Process, active_children
from pathlib import Path
from signal import signal, SIGTERM
from time import sleep
from typing import Any, TypeAlias
from types import FrameType

from modules.telemetry.data import TelemetryData
from modules.telemetry.status import TelemetryStatus, MissionState, ReplayState
import modules.telemetry.websocket_commands as wsc
from modules.misc.config import Config
from modules.telemetry.replay import TelemetryReplay
from modules.telemetry.parsing_utils import parse_rn2483_transmission, ParsedTransmission
from modules.telemetry.errors import MissionNotFoundError, AlreadyRecordingError, ReplayPlaybackError

# Constants
MISSION_EXTENSION: str = "mission"

# Types
JSON: TypeAlias = dict[str, Any]

# Set up logging
logger = logging.getLogger(__name__)


def shutdown_sequence(signum: int, stack_frame: FrameType) -> None:
    """Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM."""
    for child in active_children():
        child.terminate()
    exit(0)


class Telemetry:
    def __init__(
        self,
        serial_status: Queue[str],
        rn2483_radio_payloads: Queue[Any],
        rn2483_radio_input: Queue[str],
        radio_signal_report: Queue[str],
        telemetry_json_output: Queue[JSON],
        telemetry_ws_commands: Queue[list[str]],
        config: Config,
        version: str,
    ):
        super().__init__()
        # Multiprocessing Queues to communicate with SerialManager and WebSocketHandler processes
        self.serial_status: Queue[str] = serial_status
        self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
        self.rn2483_radio_input: Queue[str] = rn2483_radio_input
        self.radio_signal_report: Queue[str] = radio_signal_report
        self.telemetry_json_output: Queue[JSON] = telemetry_json_output
        self.telemetry_ws_commands: Queue[list[str]] = telemetry_ws_commands

        self.config = config
        self.version = version

        # Telemetry Status holds the current status of the telemetry backend
        # Telemetry Data holds the last few copies of received data blocks stored under the subtype name as a key.
        self.status: TelemetryStatus = TelemetryStatus()
        self.telemetry_data: TelemetryData = TelemetryData(self.config.telemetry_buffer_size)

        # Mission File System
        self.missions_dir = Path.cwd().joinpath("missions")
        self.missions_dir.mkdir(parents=True, exist_ok=True)
        self.mission_path: Path | None = None

        # Mission Recording (not in use)
        self.mission_recording_file: BufferedWriter | None = None
        self.mission_recording_buffer: bytearray = bytearray(b"")

        # Replay System
        self.replay = None
        self.replay_input: Queue[str] = mp.Queue()  # type:ignore
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore

        # Handle program closing to ensure no orphan processes
        signal(SIGTERM, shutdown_sequence)  # type:ignore

        # Start Telemetry
        self.update_websocket()
        self.run()

    def run(self):
        while True:
            # Sleep for 1 ms
            sleep(0.001)

            while not self.telemetry_ws_commands.empty():
                try:
                    # Parse websocket command into an enum
                    commands: list[str] = self.telemetry_ws_commands.get()
                    command = wsc.parse(commands, wsc.WebsocketCommand)
                    parameters = commands  # Remaining items in the commands list are parameters
                    self.execute_command(command, parameters)
                except AttributeError as e:
                    logger.error(e)
                except wsc.WebsocketCommandNotFound as e:
                    logger.error(e)

            while not self.radio_signal_report.empty():
                # TODO set radio SNR
                logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}")

            while not self.serial_status.empty():
                x = self.serial_status.get().split(" ", maxsplit=1)
                logger.debug(f"serial_status: {x}")
                self.parse_serial_status(command=x[0], data=x[1])
                self.update_websocket()

            # Switch data queues between replay and radio depending on mission state
            match self.status.mission.state:
                case MissionState.RECORDED:
                    while not self.replay_output.empty():
                        self.process_transmission(self.replay_output.get())
                        self.update_websocket()
                case _:
                    while not self.rn2483_radio_payloads.empty():
                        self.process_transmission(self.rn2483_radio_payloads.get())
                        self.update_websocket()

    def update_websocket(self) -> None:
        """Updates the websocket with the latest packet using the JSON output process."""
        websocket_response = {
            "org": self.config.organization,
            "rocket": self.config.rocket_name,
            "version": self.version,
            "status": dict(self.status),
            "telemetry": dict(self.telemetry_data),
        }
        self.telemetry_json_output.put(websocket_response)

    def reset_data(self) -> None:
        """Resets all live data on the telemetry backend to a default state."""
        self.status = TelemetryStatus()
        self.telemetry_data.clear()

    def parse_serial_status(self, command: str, data: str) -> None:
        """Parses the serial managers status output"""
        match command:
            case "serial_ports":
                self.status.serial.available_ports = literal_eval(data)
            case "rn2483_connected":
                self.status.rn2483_radio.connected = bool(data)
            case "rn2483_port":
                if self.status.mission.state != MissionState.DNE:
                    self.reset_data()
                self.status.rn2483_radio.connected_port = data

                match self.status.rn2483_radio.connected_port:
                    case "":
                        self.status.mission.state = MissionState.DNE
                    case _:
                        self.status.mission.state = MissionState.LIVE
            case _:
                return None

    def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
        """Executes the passed websocket command."""

        WSCommand = wsc.WebsocketCommand
        match command:
            case WSCommand.UPDATE:
                self.status.replay.update_mission_list()

            # Replay commands
            case WSCommand.REPLAY.value.PLAY:
                if not parameters:
                    raise ReplayPlaybackError
                mission_name = " ".join(parameters)
                try:
                    self.play_mission(mission_name)
                except MissionNotFoundError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case WSCommand.REPLAY.value.PAUSE:
                self.set_replay_speed(0.0)
            case WSCommand.REPLAY.value.RESUME:
                self.set_replay_speed(self.status.replay.last_played_speed)
            case WSCommand.REPLAY.value.SPEED:
                self.set_replay_speed(float(parameters[0]))
            case WSCommand.REPLAY.value.STOP:
                self.stop_replay()

            # Record commands
            case WSCommand.RECORD.value.STOP:
                self.stop_recording()
            case WSCommand.RECORD.value.START:
                # If there is no mission name, use the default
                mission_name = None if not parameters else " ".join(parameters)
                try:
                    self.start_recording(mission_name)
                except AlreadyRecordingError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case _:
                raise NotImplementedError(f"Command {command} not implemented.")

        self.update_websocket()

    def set_replay_speed(self, speed: float):
        """Set the playback speed of the replay system."""
        try:
            speed = 0.0 if float(speed) < 0 else float(speed)
        except ValueError:
            speed = 0.0

        # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0
        self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1
        self.status.replay.speed = speed

        # Set replay status based on speed
        # If mission is not recorded, replay should be in DNE state.
        # if else, set to pause/playing based on speed
        if self.status.mission.state != MissionState.RECORDED:
            self.status.replay.state = ReplayState.DNE
        elif speed == 0.0:
            self.status.replay.state = ReplayState.PAUSED
            self.replay_input.put(f"speed {speed}")
        else:
            self.status.replay.state = ReplayState.PLAYING
            self.replay_input.put(f"speed {speed}")

    def stop_replay(self) -> None:
        """Stops the replay."""

        logger.info("REPLAY STOP")

        if self.replay is not None:
            self.replay.terminate()
        self.replay = None

        # Empty replay output
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore
        self.reset_data()

    def play_mission(self, mission_name: str) -> None:
        """Plays the desired mission recording."""

        # Ensure not doing anything silly
        if self.status.mission.recording:
            raise AlreadyRecordingError

        mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}")
        if mission_file not in self.status.replay.mission_files_list:
            raise MissionNotFoundError(mission_name)

        # Set output data to current mission
        self.status.mission.name = mission_name

        # We are not to record when replaying missions
        self.status.mission.state = MissionState.RECORDED
        self.status.mission.recording = False

        # Replay system
        if self.replay is None:
            self.replay = Process(
                target=TelemetryReplay(
                    self.replay_output,
                    self.replay_input,
                    self.status.replay.speed,
                    mission_file,
                ).run
            )
            self.replay.start()

        self.set_replay_speed(
            speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
        )

        logger.info(f"REPLAY {mission_name} PLAYING")

    def start_recording(self, mission_name: str | None = None) -> None:
        """Starts recording the current mission. If no mission name is given, the recording epoch is used."""
        # TODO

    def stop_recording(self) -> None:
        """Stops the current recording."""

        logger.info("RECORDING STOP")
        # TODO

    def process_transmission(self, data: str) -> None:
        """Processes the incoming radio transmission data."""

        # Parse the transmission, if result is not null, update telemetry data
        parsed_transmission: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
        if parsed_transmission and parsed_transmission.blocks:
            # Updates the telemetry buffer with the latest block data and latest mission time
            self.telemetry_data.update_telemetry(parsed_transmission.packet_header.version, parsed_transmission.blocks)

            # TODO UPDATE FOR V1
            # Write data to file when recording
            # if self.status.mission.recording:
            #     logger.debug(f"Recording: {self.status.mission.recording}")
            #     self.mission_recording_buffer += TelemetryDataBlock(block.subtype, data=block).to_bytes()
            #     if len(self.mission_recording_buffer) >= 512:
            #         buffer_length = len(self.mission_recording_buffer)
            #         self.recording_write_bytes(buffer_length - (buffer_length % 512))

Functions

def shutdown_sequence(signum: int, stack_frame: frame) ‑> None

Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM.

Expand source code
def shutdown_sequence(signum: int, stack_frame: FrameType) -> None:
    """Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM."""
    for child in active_children():
        child.terminate()
    exit(0)

Classes

class Telemetry (serial_status: queue.Queue[str], rn2483_radio_payloads: queue.Queue[typing.Any], rn2483_radio_input: queue.Queue[str], radio_signal_report: queue.Queue[str], telemetry_json_output: queue.Queue[dict[str, typing.Any]], telemetry_ws_commands: queue.Queue[list[str]], config: Config, version: str)
Expand source code
class Telemetry:
    def __init__(
        self,
        serial_status: Queue[str],
        rn2483_radio_payloads: Queue[Any],
        rn2483_radio_input: Queue[str],
        radio_signal_report: Queue[str],
        telemetry_json_output: Queue[JSON],
        telemetry_ws_commands: Queue[list[str]],
        config: Config,
        version: str,
    ):
        super().__init__()
        # Multiprocessing Queues to communicate with SerialManager and WebSocketHandler processes
        self.serial_status: Queue[str] = serial_status
        self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
        self.rn2483_radio_input: Queue[str] = rn2483_radio_input
        self.radio_signal_report: Queue[str] = radio_signal_report
        self.telemetry_json_output: Queue[JSON] = telemetry_json_output
        self.telemetry_ws_commands: Queue[list[str]] = telemetry_ws_commands

        self.config = config
        self.version = version

        # Telemetry Status holds the current status of the telemetry backend
        # Telemetry Data holds the last few copies of received data blocks stored under the subtype name as a key.
        self.status: TelemetryStatus = TelemetryStatus()
        self.telemetry_data: TelemetryData = TelemetryData(self.config.telemetry_buffer_size)

        # Mission File System
        self.missions_dir = Path.cwd().joinpath("missions")
        self.missions_dir.mkdir(parents=True, exist_ok=True)
        self.mission_path: Path | None = None

        # Mission Recording (not in use)
        self.mission_recording_file: BufferedWriter | None = None
        self.mission_recording_buffer: bytearray = bytearray(b"")

        # Replay System
        self.replay = None
        self.replay_input: Queue[str] = mp.Queue()  # type:ignore
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore

        # Handle program closing to ensure no orphan processes
        signal(SIGTERM, shutdown_sequence)  # type:ignore

        # Start Telemetry
        self.update_websocket()
        self.run()

    def run(self):
        while True:
            # Sleep for 1 ms
            sleep(0.001)

            while not self.telemetry_ws_commands.empty():
                try:
                    # Parse websocket command into an enum
                    commands: list[str] = self.telemetry_ws_commands.get()
                    command = wsc.parse(commands, wsc.WebsocketCommand)
                    parameters = commands  # Remaining items in the commands list are parameters
                    self.execute_command(command, parameters)
                except AttributeError as e:
                    logger.error(e)
                except wsc.WebsocketCommandNotFound as e:
                    logger.error(e)

            while not self.radio_signal_report.empty():
                # TODO set radio SNR
                logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}")

            while not self.serial_status.empty():
                x = self.serial_status.get().split(" ", maxsplit=1)
                logger.debug(f"serial_status: {x}")
                self.parse_serial_status(command=x[0], data=x[1])
                self.update_websocket()

            # Switch data queues between replay and radio depending on mission state
            match self.status.mission.state:
                case MissionState.RECORDED:
                    while not self.replay_output.empty():
                        self.process_transmission(self.replay_output.get())
                        self.update_websocket()
                case _:
                    while not self.rn2483_radio_payloads.empty():
                        self.process_transmission(self.rn2483_radio_payloads.get())
                        self.update_websocket()

    def update_websocket(self) -> None:
        """Updates the websocket with the latest packet using the JSON output process."""
        websocket_response = {
            "org": self.config.organization,
            "rocket": self.config.rocket_name,
            "version": self.version,
            "status": dict(self.status),
            "telemetry": dict(self.telemetry_data),
        }
        self.telemetry_json_output.put(websocket_response)

    def reset_data(self) -> None:
        """Resets all live data on the telemetry backend to a default state."""
        self.status = TelemetryStatus()
        self.telemetry_data.clear()

    def parse_serial_status(self, command: str, data: str) -> None:
        """Parses the serial managers status output"""
        match command:
            case "serial_ports":
                self.status.serial.available_ports = literal_eval(data)
            case "rn2483_connected":
                self.status.rn2483_radio.connected = bool(data)
            case "rn2483_port":
                if self.status.mission.state != MissionState.DNE:
                    self.reset_data()
                self.status.rn2483_radio.connected_port = data

                match self.status.rn2483_radio.connected_port:
                    case "":
                        self.status.mission.state = MissionState.DNE
                    case _:
                        self.status.mission.state = MissionState.LIVE
            case _:
                return None

    def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
        """Executes the passed websocket command."""

        WSCommand = wsc.WebsocketCommand
        match command:
            case WSCommand.UPDATE:
                self.status.replay.update_mission_list()

            # Replay commands
            case WSCommand.REPLAY.value.PLAY:
                if not parameters:
                    raise ReplayPlaybackError
                mission_name = " ".join(parameters)
                try:
                    self.play_mission(mission_name)
                except MissionNotFoundError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case WSCommand.REPLAY.value.PAUSE:
                self.set_replay_speed(0.0)
            case WSCommand.REPLAY.value.RESUME:
                self.set_replay_speed(self.status.replay.last_played_speed)
            case WSCommand.REPLAY.value.SPEED:
                self.set_replay_speed(float(parameters[0]))
            case WSCommand.REPLAY.value.STOP:
                self.stop_replay()

            # Record commands
            case WSCommand.RECORD.value.STOP:
                self.stop_recording()
            case WSCommand.RECORD.value.START:
                # If there is no mission name, use the default
                mission_name = None if not parameters else " ".join(parameters)
                try:
                    self.start_recording(mission_name)
                except AlreadyRecordingError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case _:
                raise NotImplementedError(f"Command {command} not implemented.")

        self.update_websocket()

    def set_replay_speed(self, speed: float):
        """Set the playback speed of the replay system."""
        try:
            speed = 0.0 if float(speed) < 0 else float(speed)
        except ValueError:
            speed = 0.0

        # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0
        self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1
        self.status.replay.speed = speed

        # Set replay status based on speed
        # If mission is not recorded, replay should be in DNE state.
        # if else, set to pause/playing based on speed
        if self.status.mission.state != MissionState.RECORDED:
            self.status.replay.state = ReplayState.DNE
        elif speed == 0.0:
            self.status.replay.state = ReplayState.PAUSED
            self.replay_input.put(f"speed {speed}")
        else:
            self.status.replay.state = ReplayState.PLAYING
            self.replay_input.put(f"speed {speed}")

    def stop_replay(self) -> None:
        """Stops the replay."""

        logger.info("REPLAY STOP")

        if self.replay is not None:
            self.replay.terminate()
        self.replay = None

        # Empty replay output
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore
        self.reset_data()

    def play_mission(self, mission_name: str) -> None:
        """Plays the desired mission recording."""

        # Ensure not doing anything silly
        if self.status.mission.recording:
            raise AlreadyRecordingError

        mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}")
        if mission_file not in self.status.replay.mission_files_list:
            raise MissionNotFoundError(mission_name)

        # Set output data to current mission
        self.status.mission.name = mission_name

        # We are not to record when replaying missions
        self.status.mission.state = MissionState.RECORDED
        self.status.mission.recording = False

        # Replay system
        if self.replay is None:
            self.replay = Process(
                target=TelemetryReplay(
                    self.replay_output,
                    self.replay_input,
                    self.status.replay.speed,
                    mission_file,
                ).run
            )
            self.replay.start()

        self.set_replay_speed(
            speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
        )

        logger.info(f"REPLAY {mission_name} PLAYING")

    def start_recording(self, mission_name: str | None = None) -> None:
        """Starts recording the current mission. If no mission name is given, the recording epoch is used."""
        # TODO

    def stop_recording(self) -> None:
        """Stops the current recording."""

        logger.info("RECORDING STOP")
        # TODO

    def process_transmission(self, data: str) -> None:
        """Processes the incoming radio transmission data."""

        # Parse the transmission, if result is not null, update telemetry data
        parsed_transmission: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
        if parsed_transmission and parsed_transmission.blocks:
            # Updates the telemetry buffer with the latest block data and latest mission time
            self.telemetry_data.update_telemetry(parsed_transmission.packet_header.version, parsed_transmission.blocks)

            # TODO UPDATE FOR V1
            # Write data to file when recording
            # if self.status.mission.recording:
            #     logger.debug(f"Recording: {self.status.mission.recording}")
            #     self.mission_recording_buffer += TelemetryDataBlock(block.subtype, data=block).to_bytes()
            #     if len(self.mission_recording_buffer) >= 512:
            #         buffer_length = len(self.mission_recording_buffer)
            #         self.recording_write_bytes(buffer_length - (buffer_length % 512))

Methods

def execute_command(self, command: enum.Enum, parameters: list[str]) ‑> None

Executes the passed websocket command.

Expand source code
def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
    """Executes the passed websocket command."""

    WSCommand = wsc.WebsocketCommand
    match command:
        case WSCommand.UPDATE:
            self.status.replay.update_mission_list()

        # Replay commands
        case WSCommand.REPLAY.value.PLAY:
            if not parameters:
                raise ReplayPlaybackError
            mission_name = " ".join(parameters)
            try:
                self.play_mission(mission_name)
            except MissionNotFoundError as e:
                logger.error(e.message)
            except ReplayPlaybackError as e:
                logger.error(e.message)
        case WSCommand.REPLAY.value.PAUSE:
            self.set_replay_speed(0.0)
        case WSCommand.REPLAY.value.RESUME:
            self.set_replay_speed(self.status.replay.last_played_speed)
        case WSCommand.REPLAY.value.SPEED:
            self.set_replay_speed(float(parameters[0]))
        case WSCommand.REPLAY.value.STOP:
            self.stop_replay()

        # Record commands
        case WSCommand.RECORD.value.STOP:
            self.stop_recording()
        case WSCommand.RECORD.value.START:
            # If there is no mission name, use the default
            mission_name = None if not parameters else " ".join(parameters)
            try:
                self.start_recording(mission_name)
            except AlreadyRecordingError as e:
                logger.error(e.message)
            except ReplayPlaybackError as e:
                logger.error(e.message)
        case _:
            raise NotImplementedError(f"Command {command} not implemented.")

    self.update_websocket()
def parse_serial_status(self, command: str, data: str) ‑> None

Parses the serial managers status output

Expand source code
def parse_serial_status(self, command: str, data: str) -> None:
    """Parses the serial managers status output"""
    match command:
        case "serial_ports":
            self.status.serial.available_ports = literal_eval(data)
        case "rn2483_connected":
            self.status.rn2483_radio.connected = bool(data)
        case "rn2483_port":
            if self.status.mission.state != MissionState.DNE:
                self.reset_data()
            self.status.rn2483_radio.connected_port = data

            match self.status.rn2483_radio.connected_port:
                case "":
                    self.status.mission.state = MissionState.DNE
                case _:
                    self.status.mission.state = MissionState.LIVE
        case _:
            return None
def play_mission(self, mission_name: str) ‑> None

Plays the desired mission recording.

Expand source code
def play_mission(self, mission_name: str) -> None:
    """Plays the desired mission recording."""

    # Ensure not doing anything silly
    if self.status.mission.recording:
        raise AlreadyRecordingError

    mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}")
    if mission_file not in self.status.replay.mission_files_list:
        raise MissionNotFoundError(mission_name)

    # Set output data to current mission
    self.status.mission.name = mission_name

    # We are not to record when replaying missions
    self.status.mission.state = MissionState.RECORDED
    self.status.mission.recording = False

    # Replay system
    if self.replay is None:
        self.replay = Process(
            target=TelemetryReplay(
                self.replay_output,
                self.replay_input,
                self.status.replay.speed,
                mission_file,
            ).run
        )
        self.replay.start()

    self.set_replay_speed(
        speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
    )

    logger.info(f"REPLAY {mission_name} PLAYING")
def process_transmission(self, data: str) ‑> None

Processes the incoming radio transmission data.

Expand source code
def process_transmission(self, data: str) -> None:
    """Processes the incoming radio transmission data."""

    # Parse the transmission, if result is not null, update telemetry data
    parsed_transmission: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
    if parsed_transmission and parsed_transmission.blocks:
        # Updates the telemetry buffer with the latest block data and latest mission time
        self.telemetry_data.update_telemetry(parsed_transmission.packet_header.version, parsed_transmission.blocks)

        # TODO UPDATE FOR V1
        # Write data to file when recording
        # if self.status.mission.recording:
        #     logger.debug(f"Recording: {self.status.mission.recording}")
        #     self.mission_recording_buffer += TelemetryDataBlock(block.subtype, data=block).to_bytes()
        #     if len(self.mission_recording_buffer) >= 512:
        #         buffer_length = len(self.mission_recording_buffer)
        #         self.recording_write_bytes(buffer_length - (buffer_length % 512))
def reset_data(self) ‑> None

Resets all live data on the telemetry backend to a default state.

Expand source code
def reset_data(self) -> None:
    """Resets all live data on the telemetry backend to a default state."""
    self.status = TelemetryStatus()
    self.telemetry_data.clear()
def run(self)
Expand source code
def run(self):
    while True:
        # Sleep for 1 ms
        sleep(0.001)

        while not self.telemetry_ws_commands.empty():
            try:
                # Parse websocket command into an enum
                commands: list[str] = self.telemetry_ws_commands.get()
                command = wsc.parse(commands, wsc.WebsocketCommand)
                parameters = commands  # Remaining items in the commands list are parameters
                self.execute_command(command, parameters)
            except AttributeError as e:
                logger.error(e)
            except wsc.WebsocketCommandNotFound as e:
                logger.error(e)

        while not self.radio_signal_report.empty():
            # TODO set radio SNR
            logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}")

        while not self.serial_status.empty():
            x = self.serial_status.get().split(" ", maxsplit=1)
            logger.debug(f"serial_status: {x}")
            self.parse_serial_status(command=x[0], data=x[1])
            self.update_websocket()

        # Switch data queues between replay and radio depending on mission state
        match self.status.mission.state:
            case MissionState.RECORDED:
                while not self.replay_output.empty():
                    self.process_transmission(self.replay_output.get())
                    self.update_websocket()
            case _:
                while not self.rn2483_radio_payloads.empty():
                    self.process_transmission(self.rn2483_radio_payloads.get())
                    self.update_websocket()
def set_replay_speed(self, speed: float)

Set the playback speed of the replay system.

Expand source code
def set_replay_speed(self, speed: float):
    """Set the playback speed of the replay system."""
    try:
        speed = 0.0 if float(speed) < 0 else float(speed)
    except ValueError:
        speed = 0.0

    # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0
    self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1
    self.status.replay.speed = speed

    # Set replay status based on speed
    # If mission is not recorded, replay should be in DNE state.
    # if else, set to pause/playing based on speed
    if self.status.mission.state != MissionState.RECORDED:
        self.status.replay.state = ReplayState.DNE
    elif speed == 0.0:
        self.status.replay.state = ReplayState.PAUSED
        self.replay_input.put(f"speed {speed}")
    else:
        self.status.replay.state = ReplayState.PLAYING
        self.replay_input.put(f"speed {speed}")
def start_recording(self, mission_name: Optional[str] = None) ‑> None

Starts recording the current mission. If no mission name is given, the recording epoch is used.

Expand source code
def start_recording(self, mission_name: str | None = None) -> None:
    """Starts recording the current mission. If no mission name is given, the recording epoch is used."""
    # TODO
def stop_recording(self) ‑> None

Stops the current recording.

Expand source code
def stop_recording(self) -> None:
    """Stops the current recording."""

    logger.info("RECORDING STOP")
    # TODO
def stop_replay(self) ‑> None

Stops the replay.

Expand source code
def stop_replay(self) -> None:
    """Stops the replay."""

    logger.info("REPLAY STOP")

    if self.replay is not None:
        self.replay.terminate()
    self.replay = None

    # Empty replay output
    self.replay_output: Queue[str] = mp.Queue()  # type:ignore
    self.reset_data()
def update_websocket(self) ‑> None

Updates the websocket with the latest packet using the JSON output process.

Expand source code
def update_websocket(self) -> None:
    """Updates the websocket with the latest packet using the JSON output process."""
    websocket_response = {
        "org": self.config.organization,
        "rocket": self.config.rocket_name,
        "version": self.version,
        "status": dict(self.status),
        "telemetry": dict(self.telemetry_data),
    }
    self.telemetry_json_output.put(websocket_response)