Module modules.telemetry.replay
Replays radio packets from the mission file, outputting them as replay payloads.
Expand source code
"""Replays radio packets from the mission file, outputting them as replay payloads."""
import logging
from pathlib import Path
from queue import Queue
from time import time, sleep
# Set up logging
logger = logging.getLogger(__name__)
# TODO: This should be adjacent to an RN2483 radio emulator that just "receives" each packet in the mission file at the
# correct mission time.
class TelemetryReplay:
"""
This class replays telemetry data from a mission file.
"""
def __init__(
self,
replay_payloads: Queue[str],
replay_input: Queue[str],
replay_speed: float,
replay_path: Path,
):
super().__init__()
# Replay buffers (Input and output)
self.replay_payloads: Queue[str] = replay_payloads
self.replay_input: Queue[str] = replay_input
# Misc replay
self.replay_path: Path = replay_path
# Loop data
self.last_loop_time: int = int(time() * 1000)
self.total_time_offset: int = 0
self.speed: float = replay_speed
def run(self):
"""Run the mission until completion."""
# TODO: fix replay speed
# Replay raw radio transmission file
with open(self.replay_path, "r") as file:
for line in file:
if self.speed > 0:
self.replay_payloads.put(line)
if not self.replay_input.empty():
self.parse_input_command(self.replay_input.get())
sleep(0.052)
def parse_input_command(self, data: str) -> None:
cmd_list = data.split(" ")
match cmd_list[0]:
case "speed":
self.speed = float(cmd_list[1])
# Reset loop time so resuming playback doesn't skip the time it was paused
self.last_loop_time = int(time() * 1000)
case _:
raise NotImplementedError(f"Replay command of {cmd_list} invalid.")
Classes
class TelemetryReplay (replay_payloads: queue.Queue[str], replay_input: queue.Queue[str], replay_speed: float, replay_path: pathlib.Path)
-
This class replays telemetry data from a mission file.
Expand source code
class TelemetryReplay: """ This class replays telemetry data from a mission file. """ def __init__( self, replay_payloads: Queue[str], replay_input: Queue[str], replay_speed: float, replay_path: Path, ): super().__init__() # Replay buffers (Input and output) self.replay_payloads: Queue[str] = replay_payloads self.replay_input: Queue[str] = replay_input # Misc replay self.replay_path: Path = replay_path # Loop data self.last_loop_time: int = int(time() * 1000) self.total_time_offset: int = 0 self.speed: float = replay_speed def run(self): """Run the mission until completion.""" # TODO: fix replay speed # Replay raw radio transmission file with open(self.replay_path, "r") as file: for line in file: if self.speed > 0: self.replay_payloads.put(line) if not self.replay_input.empty(): self.parse_input_command(self.replay_input.get()) sleep(0.052) def parse_input_command(self, data: str) -> None: cmd_list = data.split(" ") match cmd_list[0]: case "speed": self.speed = float(cmd_list[1]) # Reset loop time so resuming playback doesn't skip the time it was paused self.last_loop_time = int(time() * 1000) case _: raise NotImplementedError(f"Replay command of {cmd_list} invalid.")
Methods
def parse_input_command(self, data: str) ‑> None
-
Expand source code
def parse_input_command(self, data: str) -> None: cmd_list = data.split(" ") match cmd_list[0]: case "speed": self.speed = float(cmd_list[1]) # Reset loop time so resuming playback doesn't skip the time it was paused self.last_loop_time = int(time() * 1000) case _: raise NotImplementedError(f"Replay command of {cmd_list} invalid.")
def run(self)
-
Run the mission until completion.
Expand source code
def run(self): """Run the mission until completion.""" # TODO: fix replay speed # Replay raw radio transmission file with open(self.replay_path, "r") as file: for line in file: if self.speed > 0: self.replay_payloads.put(line) if not self.replay_input.empty(): self.parse_input_command(self.replay_input.get()) sleep(0.052)