Module modules.telemetry.telemetry
Telemetry to parse radio packets, keep history and to log everything. Incoming information comes from rn2483_radio_payloads in payload format. Outputs information to telemetry_json_output in friendly JSON for UI.
Functions
def shutdown_sequence(signum: int, stack_frame: frame) ‑> None
-
Expand source code
def shutdown_sequence(signum: int, stack_frame: FrameType) -> None: """Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM.""" for child in active_children(): child.terminate() exit(0)
Kills all children before terminating. Acts as a signal handler for Telemetry class when receiving SIGTERM.
Classes
class Telemetry (serial_status: queue.Queue[str],
rn2483_radio_payloads: queue.Queue[typing.Any],
rn2483_radio_input: queue.Queue[str],
radio_signal_report: queue.Queue[str],
telemetry_json_output: queue.Queue[dict[str, typing.Any]],
telemetry_ws_commands: queue.Queue[list[str]],
config: Config,
version: str)-
Expand source code
class Telemetry: def __init__( self, serial_status: Queue[str], rn2483_radio_payloads: Queue[Any], rn2483_radio_input: Queue[str], radio_signal_report: Queue[str], telemetry_json_output: Queue[JSON], telemetry_ws_commands: Queue[list[str]], config: Config, version: str, ): super().__init__() # Multiprocessing Queues to communicate with SerialManager and WebSocketHandler processes self.serial_status: Queue[str] = serial_status self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads self.rn2483_radio_input: Queue[str] = rn2483_radio_input self.radio_signal_report: Queue[str] = radio_signal_report self.telemetry_json_output: Queue[JSON] = telemetry_json_output self.telemetry_ws_commands: Queue[list[str]] = telemetry_ws_commands self.config = config self.version = version # Telemetry Status holds the current status of the telemetry backend # Telemetry Data holds the last few copies of received data blocks stored under the subtype name as a key. self.status: TelemetryStatus = TelemetryStatus() self.telemetry_data: TelemetryBuffer = TelemetryBuffer(self.config.telemetry_buffer_size) # Mission File System self.missions_dir = Path.cwd().joinpath("missions") self.missions_dir.mkdir(parents=True, exist_ok=True) self.mission_path: Path | None = None # Mission Recording self.mission_recording_file: TextIOWrapper[BufferedWriter] | None = None # Replay System self.replay: Process | None = None self.replay_input: Queue[str] = mp.Queue() # type:ignore self.replay_output: Queue[str] = mp.Queue() # type:ignore # Handle program closing to ensure no orphan processes signal(SIGTERM, shutdown_sequence) # type:ignore # Start Telemetry self.update_websocket() self.run() def run(self): while True: # Sleep for 1 ms sleep(0.001) while not self.telemetry_ws_commands.empty(): try: # Parse websocket command into an enum commands: list[str] = self.telemetry_ws_commands.get() command = wsc.parse(commands, wsc.WebsocketCommand) parameters = commands # Remaining items in the commands list are parameters self.execute_command(command, parameters) except AttributeError as e: logger.error(e) except wsc.WebsocketCommandNotFound as e: logger.error(e) while not self.radio_signal_report.empty(): # TODO set radio SNR logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}") while not self.serial_status.empty(): x = self.serial_status.get().split(" ", maxsplit=1) logger.debug(f"serial_status: {x}") self.parse_serial_status(command=x[0], data=x[1]) self.update_websocket() # Switch data queues between replay and radio depending on mission state match self.status.mission.state: case MissionState.RECORDED: while not self.replay_output.empty(): self.process_transmission(self.replay_output.get()) self.update_websocket() case _: while not self.rn2483_radio_payloads.empty(): self.process_transmission(self.rn2483_radio_payloads.get()) self.update_websocket() def update_websocket(self) -> None: """Updates the websocket with the latest packet using the JSON output process.""" websocket_response = { "org": self.config.organization, "rocket": self.config.rocket_name, "version": self.version, "status": dict(self.status), "telemetry": self.telemetry_data.get(), } self.telemetry_json_output.put(websocket_response) def reset_data(self) -> None: """Resets all live data on the telemetry backend to a default state.""" self.status = TelemetryStatus() self.telemetry_data.clear() def parse_serial_status(self, command: str, data: str) -> None: """Parses the serial managers status output""" match command: case "serial_ports": self.status.serial.available_ports = literal_eval(data) case "rn2483_connected": self.status.rn2483_radio.connected = bool(data) case "rn2483_port": if self.status.mission.state != MissionState.DNE: self.reset_data() self.status.rn2483_radio.connected_port = data match self.status.rn2483_radio.connected_port: case "": self.status.mission.state = MissionState.DNE case _: self.status.mission.state = MissionState.LIVE case _: return None def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None: """Executes the passed websocket command.""" WSCommand = wsc.WebsocketCommand match command: case WSCommand.UPDATE: self.status.replay.update_mission_list() # Replay commands case WSCommand.REPLAY.value.PLAY: if not parameters: raise ReplayPlaybackError mission_name = " ".join(parameters) try: self.play_mission(mission_name) except MissionNotFoundError as e: logger.error(e.message) except ReplayPlaybackError as e: logger.error(e.message) case WSCommand.REPLAY.value.PAUSE: self.set_replay_speed(0.0) case WSCommand.REPLAY.value.RESUME: self.set_replay_speed(self.status.replay.last_played_speed) case WSCommand.REPLAY.value.SPEED: self.set_replay_speed(float(parameters[0])) case WSCommand.REPLAY.value.STOP: self.stop_replay() # Record commands case WSCommand.RECORD.value.STOP: self.stop_recording() case WSCommand.RECORD.value.START: # If there is no mission name, use the default mission_name = None if not parameters else " ".join(parameters) try: self.start_recording(mission_name) except AlreadyRecordingError as e: logger.error(e.message) except ReplayPlaybackError as e: logger.error(e.message) case _: raise NotImplementedError(f"Command {command} not implemented.") self.update_websocket() def set_replay_speed(self, speed: float): """Set the playback speed of the replay system.""" try: speed = 0.0 if float(speed) < 0 else float(speed) except ValueError: speed = 0.0 # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0 self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1 self.status.replay.speed = speed # Set replay status based on speed # If mission is not recorded, replay should be in DNE state. # if else, set to pause/playing based on speed if self.status.mission.state != MissionState.RECORDED: self.status.replay.state = ReplayState.DNE elif speed == 0.0: self.status.replay.state = ReplayState.PAUSED self.replay_input.put(f"speed {speed}") else: self.status.replay.state = ReplayState.PLAYING self.replay_input.put(f"speed {speed}") def stop_replay(self) -> None: """Stops the replay.""" logger.info("REPLAY STOP") if self.replay is not None: self.replay.terminate() self.replay = None # Empty replay output self.replay_output: Queue[str] = mp.Queue() # type:ignore self.reset_data() def play_mission(self, mission_name: str) -> None: """Plays the desired mission recording.""" # Ensure not doing anything silly if self.status.mission.recording: raise AlreadyRecordingError mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}") if mission_file not in self.status.replay.mission_files_list: raise MissionNotFoundError(mission_name) # Set output data to current mission self.status.mission.name = mission_name # We are not to record when replaying missions self.status.mission.state = MissionState.RECORDED self.status.mission.recording = False # Replay system if self.replay is None: self.replay = Process( target=TelemetryReplay( self.replay_output, self.replay_input, self.status.replay.speed, mission_file, ).run ) self.replay.start() self.set_replay_speed( speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1 ) logger.info(f"REPLAY {mission_name} PLAYING") def start_recording(self, mission_name: str | None = None) -> None: """Starts recording the current mission. If no mission name is given, the recording epoch is used.""" # TODO self.status.mission.recording = True self.mission_path = self.missions_dir.joinpath(f"{mission_name or 'default'}.{MISSION_EXTENSION}") # This long line creates a BufferedWriter object that can write plaintext self.mission_recording_file = TextIOWrapper( BufferedWriter(open(self.mission_path, "wb+", 0)), line_buffering=False, write_through=True ) logger.info(f"Starting to record to {self.mission_path}") def stop_recording(self) -> None: """Stops the current recording.""" self.status.mission.recording = False if self.mission_recording_file: self.mission_recording_file.close() self.mission_recording_file = None logger.info("Recording stopped") # TODO def process_transmission(self, data: str) -> None: """Processes the incoming radio transmission data.""" # Always write data to file when recording, even if it can't be parsed correctly if self.status.mission.recording and self.mission_recording_file: logger.info(f"Recording: {data}") self.mission_recording_file.write(f"{data}\n") try: # Parse the transmission, if result is not null, update telemetry data parsed: ParsedTransmission | None = parse_rn2483_transmission(data, self.config) if parsed and parsed.blocks: # Updates the telemetry buffer with the latest block data and latest mission time self.telemetry_data.add(parsed.blocks) except Exception as e: print(e) logger.error(e)
Methods
def execute_command(self, command: enum.Enum, parameters: list[str]) ‑> None
-
Expand source code
def execute_command(self, command: wsc.Enum, parameters: list[str]) -> None: """Executes the passed websocket command.""" WSCommand = wsc.WebsocketCommand match command: case WSCommand.UPDATE: self.status.replay.update_mission_list() # Replay commands case WSCommand.REPLAY.value.PLAY: if not parameters: raise ReplayPlaybackError mission_name = " ".join(parameters) try: self.play_mission(mission_name) except MissionNotFoundError as e: logger.error(e.message) except ReplayPlaybackError as e: logger.error(e.message) case WSCommand.REPLAY.value.PAUSE: self.set_replay_speed(0.0) case WSCommand.REPLAY.value.RESUME: self.set_replay_speed(self.status.replay.last_played_speed) case WSCommand.REPLAY.value.SPEED: self.set_replay_speed(float(parameters[0])) case WSCommand.REPLAY.value.STOP: self.stop_replay() # Record commands case WSCommand.RECORD.value.STOP: self.stop_recording() case WSCommand.RECORD.value.START: # If there is no mission name, use the default mission_name = None if not parameters else " ".join(parameters) try: self.start_recording(mission_name) except AlreadyRecordingError as e: logger.error(e.message) except ReplayPlaybackError as e: logger.error(e.message) case _: raise NotImplementedError(f"Command {command} not implemented.") self.update_websocket()
Executes the passed websocket command.
def parse_serial_status(self, command: str, data: str) ‑> None
-
Expand source code
def parse_serial_status(self, command: str, data: str) -> None: """Parses the serial managers status output""" match command: case "serial_ports": self.status.serial.available_ports = literal_eval(data) case "rn2483_connected": self.status.rn2483_radio.connected = bool(data) case "rn2483_port": if self.status.mission.state != MissionState.DNE: self.reset_data() self.status.rn2483_radio.connected_port = data match self.status.rn2483_radio.connected_port: case "": self.status.mission.state = MissionState.DNE case _: self.status.mission.state = MissionState.LIVE case _: return None
Parses the serial managers status output
def play_mission(self, mission_name: str) ‑> None
-
Expand source code
def play_mission(self, mission_name: str) -> None: """Plays the desired mission recording.""" # Ensure not doing anything silly if self.status.mission.recording: raise AlreadyRecordingError mission_file = self.missions_dir.joinpath(f"{mission_name}.{MISSION_EXTENSION}") if mission_file not in self.status.replay.mission_files_list: raise MissionNotFoundError(mission_name) # Set output data to current mission self.status.mission.name = mission_name # We are not to record when replaying missions self.status.mission.state = MissionState.RECORDED self.status.mission.recording = False # Replay system if self.replay is None: self.replay = Process( target=TelemetryReplay( self.replay_output, self.replay_input, self.status.replay.speed, mission_file, ).run ) self.replay.start() self.set_replay_speed( speed=self.status.replay.last_played_speed if self.status.replay.last_played_speed > 0 else 1 ) logger.info(f"REPLAY {mission_name} PLAYING")
Plays the desired mission recording.
def process_transmission(self, data: str) ‑> None
-
Expand source code
def process_transmission(self, data: str) -> None: """Processes the incoming radio transmission data.""" # Always write data to file when recording, even if it can't be parsed correctly if self.status.mission.recording and self.mission_recording_file: logger.info(f"Recording: {data}") self.mission_recording_file.write(f"{data}\n") try: # Parse the transmission, if result is not null, update telemetry data parsed: ParsedTransmission | None = parse_rn2483_transmission(data, self.config) if parsed and parsed.blocks: # Updates the telemetry buffer with the latest block data and latest mission time self.telemetry_data.add(parsed.blocks) except Exception as e: print(e) logger.error(e)
Processes the incoming radio transmission data.
def reset_data(self) ‑> None
-
Expand source code
def reset_data(self) -> None: """Resets all live data on the telemetry backend to a default state.""" self.status = TelemetryStatus() self.telemetry_data.clear()
Resets all live data on the telemetry backend to a default state.
def run(self)
-
Expand source code
def run(self): while True: # Sleep for 1 ms sleep(0.001) while not self.telemetry_ws_commands.empty(): try: # Parse websocket command into an enum commands: list[str] = self.telemetry_ws_commands.get() command = wsc.parse(commands, wsc.WebsocketCommand) parameters = commands # Remaining items in the commands list are parameters self.execute_command(command, parameters) except AttributeError as e: logger.error(e) except wsc.WebsocketCommandNotFound as e: logger.error(e) while not self.radio_signal_report.empty(): # TODO set radio SNR logger.info(f"SIGNAL DATA {self.radio_signal_report.get()}") while not self.serial_status.empty(): x = self.serial_status.get().split(" ", maxsplit=1) logger.debug(f"serial_status: {x}") self.parse_serial_status(command=x[0], data=x[1]) self.update_websocket() # Switch data queues between replay and radio depending on mission state match self.status.mission.state: case MissionState.RECORDED: while not self.replay_output.empty(): self.process_transmission(self.replay_output.get()) self.update_websocket() case _: while not self.rn2483_radio_payloads.empty(): self.process_transmission(self.rn2483_radio_payloads.get()) self.update_websocket()
def set_replay_speed(self, speed: float)
-
Expand source code
def set_replay_speed(self, speed: float): """Set the playback speed of the replay system.""" try: speed = 0.0 if float(speed) < 0 else float(speed) except ValueError: speed = 0.0 # Keeps last played speed updated while preventing it from hitting 0 if past speed is 0 self.status.replay.last_played_speed = self.status.replay.speed if self.status.replay.speed != 0.0 else 1 self.status.replay.speed = speed # Set replay status based on speed # If mission is not recorded, replay should be in DNE state. # if else, set to pause/playing based on speed if self.status.mission.state != MissionState.RECORDED: self.status.replay.state = ReplayState.DNE elif speed == 0.0: self.status.replay.state = ReplayState.PAUSED self.replay_input.put(f"speed {speed}") else: self.status.replay.state = ReplayState.PLAYING self.replay_input.put(f"speed {speed}")
Set the playback speed of the replay system.
def start_recording(self, mission_name: str | None = None) ‑> None
-
Expand source code
def start_recording(self, mission_name: str | None = None) -> None: """Starts recording the current mission. If no mission name is given, the recording epoch is used.""" # TODO self.status.mission.recording = True self.mission_path = self.missions_dir.joinpath(f"{mission_name or 'default'}.{MISSION_EXTENSION}") # This long line creates a BufferedWriter object that can write plaintext self.mission_recording_file = TextIOWrapper( BufferedWriter(open(self.mission_path, "wb+", 0)), line_buffering=False, write_through=True ) logger.info(f"Starting to record to {self.mission_path}")
Starts recording the current mission. If no mission name is given, the recording epoch is used.
def stop_recording(self) ‑> None
-
Expand source code
def stop_recording(self) -> None: """Stops the current recording.""" self.status.mission.recording = False if self.mission_recording_file: self.mission_recording_file.close() self.mission_recording_file = None logger.info("Recording stopped") # TODO
Stops the current recording.
def stop_replay(self) ‑> None
-
Expand source code
def stop_replay(self) -> None: """Stops the replay.""" logger.info("REPLAY STOP") if self.replay is not None: self.replay.terminate() self.replay = None # Empty replay output self.replay_output: Queue[str] = mp.Queue() # type:ignore self.reset_data()
Stops the replay.
def update_websocket(self) ‑> None
-
Expand source code
def update_websocket(self) -> None: """Updates the websocket with the latest packet using the JSON output process.""" websocket_response = { "org": self.config.organization, "rocket": self.config.rocket_name, "version": self.version, "status": dict(self.status), "telemetry": self.telemetry_data.get(), } self.telemetry_json_output.put(websocket_response)
Updates the websocket with the latest packet using the JSON output process.