Module modules.telemetry.replay

Replays radio packets from the mission file, outputting them as replay payloads.

Classes

class LogfileIterator (replay_path: pathlib.Path)
Expand source code
class LogfileIterator:
    def __init__(self, replay_path: Path):
        self.replay_path = replay_path

    def __iter__(self):
        with open(self.replay_path, "rb") as file:
            callsign = file.read(9)
            if len(callsign) < 9:
                return
            next_packet = bytearray()

            while True:
                # Look for the header
                data = file.read(9)
                if len(data) < 9:
                    next_packet.extend(data)
                    yield next_packet.hex()
                    break

                # Start a new packet
                if data == callsign:
                    yield next_packet.hex()
                    next_packet = bytearray()

                next_packet.extend(data)
class LogfileReplay (replay_payloads: queue.Queue[str],
replay_input: queue.Queue[str],
replay_speed: float,
replay_path: pathlib.Path)
Expand source code
class LogfileReplay(TelemetryReplay):
    def __init__(
        self,
        replay_payloads: Queue[str],
        replay_input: Queue[str],
        replay_speed: float,
        replay_path: Path,
    ):
        super().__init__(replay_payloads, replay_input, replay_speed, replay_path)

    def run(self):
        """Run the mission until completion."""
        for packet in LogfileIterator(self.replay_path):
            if self.speed > 0:
                self.replay_payloads.put(packet)

            if not self.replay_input.empty():
                self.parse_input_command(self.replay_input.get())
            sleep(0.052)

This class replays telemetry data from a mission file.

Ancestors

Inherited members

class TelemetryReplay (replay_payloads: queue.Queue[str],
replay_input: queue.Queue[str],
replay_speed: float,
replay_path: pathlib.Path)
Expand source code
class TelemetryReplay:
    """
    This class replays telemetry data from a mission file.
    """

    def __init__(
        self,
        replay_payloads: Queue[str],
        replay_input: Queue[str],
        replay_speed: float,
        replay_path: Path,
    ):
        super().__init__()

        # Replay buffers (Input and output)
        self.replay_payloads: Queue[str] = replay_payloads
        self.replay_input: Queue[str] = replay_input

        # Misc replay
        self.replay_path: Path = replay_path

        # Loop data
        self.last_loop_time: int = int(time() * 1000)
        self.total_time_offset: int = 0
        self.speed: float = replay_speed

    def run(self):
        """Run the mission until completion."""
        # TODO: fix replay speed

        # Replay raw radio transmission file
        with open(self.replay_path, "r") as file:
            for line in file:
                if self.speed > 0:
                    self.replay_payloads.put(line)

                if not self.replay_input.empty():
                    self.parse_input_command(self.replay_input.get())
                sleep(0.052)

    def parse_input_command(self, data: str) -> None:
        cmd_list = data.split(" ")
        match cmd_list[0]:
            case "speed":
                self.speed = float(cmd_list[1])
                # Reset loop time so resuming playback doesn't skip the time it was paused
                self.last_loop_time = int(time() * 1000)
            case _:
                raise NotImplementedError(f"Replay command of {cmd_list} invalid.")

This class replays telemetry data from a mission file.

Subclasses

Methods

def parse_input_command(self, data: str) ‑> None
Expand source code
def parse_input_command(self, data: str) -> None:
    cmd_list = data.split(" ")
    match cmd_list[0]:
        case "speed":
            self.speed = float(cmd_list[1])
            # Reset loop time so resuming playback doesn't skip the time it was paused
            self.last_loop_time = int(time() * 1000)
        case _:
            raise NotImplementedError(f"Replay command of {cmd_list} invalid.")
def run(self)
Expand source code
def run(self):
    """Run the mission until completion."""
    # TODO: fix replay speed

    # Replay raw radio transmission file
    with open(self.replay_path, "r") as file:
        for line in file:
            if self.speed > 0:
                self.replay_payloads.put(line)

            if not self.replay_input.empty():
                self.parse_input_command(self.replay_input.get())
            sleep(0.052)

Run the mission until completion.