Module modules.telemetry.telemetry

Telemetry to parse radio packets, keep history and to log everything. Incoming information comes from rn2483_radio_payloads in payload format. Outputs information to telemetry_json_output in friendly JSON for UI.

Functions

def shutdown_sequence(signum: int, stack_frame: frame) ‑> None
Expand source code
def shutdown_sequence(signum: int, stack_frame: FrameType) -> None:
    """Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM."""
    for child in active_children():
        child.terminate()
    exit(0)

Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM.

Classes

class Telemetry (serial_status: queue.Queue[str],
rn2483_radio_payloads: queue.Queue[typing.Any],
rn2483_radio_input: queue.Queue[str],
radio_signal_report: queue.Queue[str],
telemetry_json_output: queue.Queue[dict[str, typing.Any]],
telemetry_ws_commands: queue.Queue[list[str]],
config: Config,
version: str)
Expand source code
class Telemetry:
    def __init__(
        self,
        serial_status: Queue[str],
        rn2483_radio_payloads: Queue[Any],
        rn2483_radio_input: Queue[str],
        radio_signal_report: Queue[str],
        telemetry_json_output: Queue[JSON],
        telemetry_ws_commands: Queue[list[str]],
        config: Config,
        version: str,
    ):
        super().__init__()
        # Multiprocessing Queues to communicate with SerialManager and WebSocketHandler processes
        self.serial_status: Queue[str] = serial_status
        self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
        self.rn2483_radio_input: Queue[str] = rn2483_radio_input
        self.radio_signal_report: Queue[str] = radio_signal_report
        self.telemetry_json_output: Queue[JSON] = telemetry_json_output
        self.telemetry_ws_commands: Queue[list[str]] = telemetry_ws_commands

        self.config = config
        self.version = version

        # Telemetry Status holds the current status of the telemetry backend
        # Telemetry Data holds the last few copies of received data blocks stored under the subtype name as a key.
        self.status: TelemetryStatus = TelemetryStatus()
        self.telemetry_data: TelemetryBuffer = TelemetryBuffer(self.config.telemetry_buffer_size)

        # Mission File System
        self.missions_dir = Path.cwd().joinpath("missions")
        self.missions_dir.mkdir(parents=True, exist_ok=True)
        self.mission_path: Path | None = None

        # Mission Recording
        self.mission_recording_file: TextIOWrapper[BufferedWriter] | None = None

        # Replay System
        self.replay: Process | None = None
        self.replay_input: Queue[str] = mp.Queue()  # type:ignore
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore

        # Handle program closing to ensure no orphan processes
        signal(SIGTERM, shutdown_sequence)  # type:ignore

        # Start Telemetry
        self.update_websocket()
        self.run()

    def run(self):
        while True:
            # Sleep for 1 ms
            sleep(0.001)

            while not self.telemetry_ws_commands.empty():
                try:
                    # Parse websocket command into an enum
                    commands: list[str] = self.telemetry_ws_commands.get()
                    command = wsc.parse(commands, wsc.WebsocketCommand)
                    parameters = commands  # Remaining items in the commands list are parameters
                    self.execute_command(command, parameters)
                except AttributeError as e:
                    logger.error(e)
                except wsc.WebsocketCommandNotFound as e:
                    logger.error(e)

            while not self.radio_signal_report.empty():
                # TODO set radio SNR
                logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}")

            while not self.serial_status.empty():
                x = self.serial_status.get().split(" ", maxsplit=1)
                logger.debug(f"serial_status: {x}")
                self.parse_serial_status(command=x[0], data=x[1])
                self.update_websocket()

            # Switch data queues between replay and radio depending on mission state
            match self.status.mission.state:
                case MissionState.RECORDED:
                    while not self.replay_output.empty():
                        self.process_transmission(self.replay_output.get())
                        self.update_websocket()
                case _:
                    while not self.rn2483_radio_payloads.empty():
                        self.process_transmission(self.rn2483_radio_payloads.get())
                        self.update_websocket()

    def update_websocket(self) -> None:
        """Updates the websocket with the latest packet using the JSON output process."""
        websocket_response = {
            "org": self.config.organization,
            "rocket": self.config.rocket_name,
            "version": self.version,
            "status": dict(self.status),
            "telemetry": self.telemetry_data.get(),
        }
        self.telemetry_json_output.put(websocket_response)

    def reset_data(self) -> None:
        """Resets all live data on the telemetry backend to a default state."""
        self.status = TelemetryStatus()
        self.telemetry_data.clear()

    def parse_serial_status(self, command: str, data: str) -> None:
        """Parses the serial managers status output"""
        match command:
            case "serial_ports":
                self.status.serial.available_ports = literal_eval(data)
            case "rn2483_connected":
                self.status.rn2483_radio.connected = bool(data)
            case "rn2483_port":
                if self.status.mission.state != MissionState.DNE:
                    self.reset_data()
                self.status.rn2483_radio.connected_port = data

                match self.status.rn2483_radio.connected_port:
                    case "":
                        self.status.mission.state = MissionState.DNE
                    case _:
                        self.status.mission.state = MissionState.LIVE
            case _:
                return None

    def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
        """Executes the passed websocket command."""

        WSCommand = wsc.WebsocketCommand
        match command:
            case WSCommand.UPDATE:
                self.status.replay.update_mission_list()

            # Replay commands
            case WSCommand.REPLAY.value.PLAY:
                if not parameters:
                    raise ReplayPlaybackError
                mission_name = " ".join(parameters)
                try:
                    self.play_mission(mission_name)
                except MissionNotFoundError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case WSCommand.REPLAY.value.PAUSE:
                self.set_replay_speed(0.0)
            case WSCommand.REPLAY.value.RESUME:
                self.set_replay_speed(self.status.replay.last_played_speed)
            case WSCommand.REPLAY.value.SPEED:
                self.set_replay_speed(float(parameters[0]))
            case WSCommand.REPLAY.value.STOP:
                self.stop_replay()

            # Record commands
            case WSCommand.RECORD.value.STOP:
                self.stop_recording()
            case WSCommand.RECORD.value.START:
                # If there is no mission name, use the default
                mission_name = None if not parameters else " ".join(parameters)
                try:
                    self.start_recording(mission_name)
                except AlreadyRecordingError as e:
                    logger.error(e.message)
                except ReplayPlaybackError as e:
                    logger.error(e.message)
            case _:
                raise NotImplementedError(f"Command {command} not implemented.")

        self.update_websocket()

    def set_replay_speed(self, speed: float):
        """Set the playback speed of the replay system."""
        try:
            speed = 0.0 if float(speed) < 0 else float(speed)
        except ValueError:
            speed = 0.0

        # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0
        self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1
        self.status.replay.speed = speed

        # Set replay status based on speed
        # If mission is not recorded, replay should be in DNE state.
        # if else, set to pause/playing based on speed
        if self.status.mission.state != MissionState.RECORDED:
            self.status.replay.state = ReplayState.DNE
        elif speed == 0.0:
            self.status.replay.state = ReplayState.PAUSED
            self.replay_input.put(f"speed {speed}")
        else:
            self.status.replay.state = ReplayState.PLAYING
            self.replay_input.put(f"speed {speed}")

    def stop_replay(self) -> None:
        """Stops the replay."""

        logger.info("REPLAY STOP")

        if self.replay is not None:
            self.replay.terminate()
        self.replay = None

        # Empty replay output
        self.replay_output: Queue[str] = mp.Queue()  # type:ignore
        self.reset_data()

    def play_mission(self, mission_name: str) -> None:
        """Plays the desired mission recording."""

        # Ensure not doing anything silly
        if self.status.mission.recording:
            raise AlreadyRecordingError

        mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}")
        if mission_file not in self.status.replay.mission_files_list:
            raise MissionNotFoundError(mission_name)

        # Set output data to current mission
        self.status.mission.name = mission_name

        # We are not to record when replaying missions
        self.status.mission.state = MissionState.RECORDED
        self.status.mission.recording = False

        # Replay system
        if self.replay is None:
            self.replay = Process(
                target=TelemetryReplay(
                    self.replay_output,
                    self.replay_input,
                    self.status.replay.speed,
                    mission_file,
                ).run
            )
            self.replay.start()

        self.set_replay_speed(
            speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
        )

        logger.info(f"REPLAY {mission_name} PLAYING")

    def start_recording(self, mission_name: str | None = None) -> None:
        """Starts recording the current mission. If no mission name is given, the recording epoch is used."""
        # TODO
        self.status.mission.recording = True
        self.mission_path = self.missions_dir.joinpath(f"{mission_name or 'default'}.{MISSION_EXTENSION}")
        # This long line creates a BufferedWriter object that can write plaintext
        self.mission_recording_file = TextIOWrapper(
            BufferedWriter(open(self.mission_path, "wb+", 0)), line_buffering=False, write_through=True
        )
        logger.info(f"Starting to record to {self.mission_path}")

    def stop_recording(self) -> None:
        """Stops the current recording."""

        self.status.mission.recording = False
        if self.mission_recording_file:
            self.mission_recording_file.close()
        self.mission_recording_file = None
        logger.info("Recording stopped")
        # TODO

    def process_transmission(self, data: str) -> None:
        """Processes the incoming radio transmission data."""
        # Always write data to file when recording, even if it can't be parsed correctly
        if self.status.mission.recording and self.mission_recording_file:
            logger.info(f"Recording: {data}")
            self.mission_recording_file.write(f"{data}\n")

        try:
            # Parse the transmission, if result is not null, update telemetry data
            parsed: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
            if parsed and parsed.blocks:
                # Updates the telemetry buffer with the latest block data and latest mission time
                self.telemetry_data.add(parsed.blocks)
        except Exception as e:
            print(e)
            logger.error(e)

Methods

def execute_command(self, command: enum.Enum, parameters: list[str]) ‑> None
Expand source code
def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None:
    """Executes the passed websocket command."""

    WSCommand = wsc.WebsocketCommand
    match command:
        case WSCommand.UPDATE:
            self.status.replay.update_mission_list()

        # Replay commands
        case WSCommand.REPLAY.value.PLAY:
            if not parameters:
                raise ReplayPlaybackError
            mission_name = " ".join(parameters)
            try:
                self.play_mission(mission_name)
            except MissionNotFoundError as e:
                logger.error(e.message)
            except ReplayPlaybackError as e:
                logger.error(e.message)
        case WSCommand.REPLAY.value.PAUSE:
            self.set_replay_speed(0.0)
        case WSCommand.REPLAY.value.RESUME:
            self.set_replay_speed(self.status.replay.last_played_speed)
        case WSCommand.REPLAY.value.SPEED:
            self.set_replay_speed(float(parameters[0]))
        case WSCommand.REPLAY.value.STOP:
            self.stop_replay()

        # Record commands
        case WSCommand.RECORD.value.STOP:
            self.stop_recording()
        case WSCommand.RECORD.value.START:
            # If there is no mission name, use the default
            mission_name = None if not parameters else " ".join(parameters)
            try:
                self.start_recording(mission_name)
            except AlreadyRecordingError as e:
                logger.error(e.message)
            except ReplayPlaybackError as e:
                logger.error(e.message)
        case _:
            raise NotImplementedError(f"Command {command} not implemented.")

    self.update_websocket()

Executes the passed websocket command.

def parse_serial_status(self, command: str, data: str) ‑> None
Expand source code
def parse_serial_status(self, command: str, data: str) -> None:
    """Parses the serial managers status output"""
    match command:
        case "serial_ports":
            self.status.serial.available_ports = literal_eval(data)
        case "rn2483_connected":
            self.status.rn2483_radio.connected = bool(data)
        case "rn2483_port":
            if self.status.mission.state != MissionState.DNE:
                self.reset_data()
            self.status.rn2483_radio.connected_port = data

            match self.status.rn2483_radio.connected_port:
                case "":
                    self.status.mission.state = MissionState.DNE
                case _:
                    self.status.mission.state = MissionState.LIVE
        case _:
            return None

Parses the serial managers status output

def play_mission(self, mission_name: str) ‑> None
Expand source code
def play_mission(self, mission_name: str) -> None:
    """Plays the desired mission recording."""

    # Ensure not doing anything silly
    if self.status.mission.recording:
        raise AlreadyRecordingError

    mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}")
    if mission_file not in self.status.replay.mission_files_list:
        raise MissionNotFoundError(mission_name)

    # Set output data to current mission
    self.status.mission.name = mission_name

    # We are not to record when replaying missions
    self.status.mission.state = MissionState.RECORDED
    self.status.mission.recording = False

    # Replay system
    if self.replay is None:
        self.replay = Process(
            target=TelemetryReplay(
                self.replay_output,
                self.replay_input,
                self.status.replay.speed,
                mission_file,
            ).run
        )
        self.replay.start()

    self.set_replay_speed(
        speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1
    )

    logger.info(f"REPLAY {mission_name} PLAYING")

Plays the desired mission recording.

def process_transmission(self, data: str) ‑> None
Expand source code
def process_transmission(self, data: str) -> None:
    """Processes the incoming radio transmission data."""
    # Always write data to file when recording, even if it can't be parsed correctly
    if self.status.mission.recording and self.mission_recording_file:
        logger.info(f"Recording: {data}")
        self.mission_recording_file.write(f"{data}\n")

    try:
        # Parse the transmission, if result is not null, update telemetry data
        parsed: ParsedTransmission | None = parse_rn2483_transmission(data, self.config)
        if parsed and parsed.blocks:
            # Updates the telemetry buffer with the latest block data and latest mission time
            self.telemetry_data.add(parsed.blocks)
    except Exception as e:
        print(e)
        logger.error(e)

Processes the incoming radio transmission data.

def reset_data(self) ‑> None
Expand source code
def reset_data(self) -> None:
    """Resets all live data on the telemetry backend to a default state."""
    self.status = TelemetryStatus()
    self.telemetry_data.clear()

Resets all live data on the telemetry backend to a default state.

def run(self)
Expand source code
def run(self):
    while True:
        # Sleep for 1 ms
        sleep(0.001)

        while not self.telemetry_ws_commands.empty():
            try:
                # Parse websocket command into an enum
                commands: list[str] = self.telemetry_ws_commands.get()
                command = wsc.parse(commands, wsc.WebsocketCommand)
                parameters = commands  # Remaining items in the commands list are parameters
                self.execute_command(command, parameters)
            except AttributeError as e:
                logger.error(e)
            except wsc.WebsocketCommandNotFound as e:
                logger.error(e)

        while not self.radio_signal_report.empty():
            # TODO set radio SNR
            logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}")

        while not self.serial_status.empty():
            x = self.serial_status.get().split(" ", maxsplit=1)
            logger.debug(f"serial_status: {x}")
            self.parse_serial_status(command=x[0], data=x[1])
            self.update_websocket()

        # Switch data queues between replay and radio depending on mission state
        match self.status.mission.state:
            case MissionState.RECORDED:
                while not self.replay_output.empty():
                    self.process_transmission(self.replay_output.get())
                    self.update_websocket()
            case _:
                while not self.rn2483_radio_payloads.empty():
                    self.process_transmission(self.rn2483_radio_payloads.get())
                    self.update_websocket()
def set_replay_speed(self, speed: float)
Expand source code
def set_replay_speed(self, speed: float):
    """Set the playback speed of the replay system."""
    try:
        speed = 0.0 if float(speed) < 0 else float(speed)
    except ValueError:
        speed = 0.0

    # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0
    self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1
    self.status.replay.speed = speed

    # Set replay status based on speed
    # If mission is not recorded, replay should be in DNE state.
    # if else, set to pause/playing based on speed
    if self.status.mission.state != MissionState.RECORDED:
        self.status.replay.state = ReplayState.DNE
    elif speed == 0.0:
        self.status.replay.state = ReplayState.PAUSED
        self.replay_input.put(f"speed {speed}")
    else:
        self.status.replay.state = ReplayState.PLAYING
        self.replay_input.put(f"speed {speed}")

Set the playback speed of the replay system.

def start_recording(self, mission_name: str | None = None) ‑> None
Expand source code
def start_recording(self, mission_name: str | None = None) -> None:
    """Starts recording the current mission. If no mission name is given, the recording epoch is used."""
    # TODO
    self.status.mission.recording = True
    self.mission_path = self.missions_dir.joinpath(f"{mission_name or 'default'}.{MISSION_EXTENSION}")
    # This long line creates a BufferedWriter object that can write plaintext
    self.mission_recording_file = TextIOWrapper(
        BufferedWriter(open(self.mission_path, "wb+", 0)), line_buffering=False, write_through=True
    )
    logger.info(f"Starting to record to {self.mission_path}")

Starts recording the current mission. If no mission name is given, the recording epoch is used.

def stop_recording(self) ‑> None
Expand source code
def stop_recording(self) -> None:
    """Stops the current recording."""

    self.status.mission.recording = False
    if self.mission_recording_file:
        self.mission_recording_file.close()
    self.mission_recording_file = None
    logger.info("Recording stopped")
    # TODO

Stops the current recording.

def stop_replay(self) ‑> None
Expand source code
def stop_replay(self) -> None:
    """Stops the replay."""

    logger.info("REPLAY STOP")

    if self.replay is not None:
        self.replay.terminate()
    self.replay = None

    # Empty replay output
    self.replay_output: Queue[str] = mp.Queue()  # type:ignore
    self.reset_data()

Stops the replay.

def update_websocket(self) ‑> None
Expand source code
def update_websocket(self) -> None:
    """Updates the websocket with the latest packet using the JSON output process."""
    websocket_response = {
        "org": self.config.organization,
        "rocket": self.config.rocket_name,
        "version": self.version,
        "status": dict(self.status),
        "telemetry": self.telemetry_data.get(),
    }
    self.telemetry_json_output.put(websocket_response)

Updates the websocket with the latest packet using the JSON output process.