Module modules.serial.serial_rn2483_emulator

Expand source code
# Emulates the RN2483 LoRa Radio Module
# Outputs emulated radio payloads from the CU-InSpace rocket
#
# Authors:
# Thomas Selwyn (Devil)

import random
import struct
import time
from queue import Queue
from multiprocessing import Process
from datetime import datetime


class SerialRN2483Emulator(Process):
    def __init__(self, serial_status: Queue[str], radio_signal_report: Queue[str], rn2483_radio_payloads: Queue[str]):
        super().__init__()

        self.serial_status: Queue[str] = serial_status

        self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
        self.radio_signal_report: Queue[str] = radio_signal_report

        # Emulation Variables
        self.altitude: float = 0
        self.temp: float = 22
        self.going_up: bool = True
        self.startup_time: datetime = datetime.now()

        self.run()

    def run(self):
        self.serial_status.put("rn2483_connected True")
        self.serial_status.put("rn2483_port test")
        self.radio_signal_report.put("snr 30")
        # self.radio_signal_report.put("rssi -55")
        while True:
            self.tester()
            time.sleep(random.uniform(0, 2000) / 100000)

    def tester(self):
        """Generates test data to give to the telemetry process"""
        random_alternation = int(random.uniform(0, 1000))
        if self.going_up:
            self.temp += random_alternation / 500
        else:
            self.temp -= random_alternation / 500

        if self.temp > 100:
            self.going_up = False
        elif self.temp < 20:
            self.going_up = True

        self.altitude += random.uniform(0, 4)

        packet_call_sign = b"Devils".hex()
        six_byte_spacer = b"      ".hex()  # Should be packet header data!!!
        packet_header = f"{packet_call_sign}{six_byte_spacer}"
        block_header = "840C0000"  # Should be struct generated header data!!!

        offset = datetime.now() - self.startup_time

        # self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{'E01F00008D540100BC57FF0010FEFFFF'}")
        formatted_secs = int(offset.total_seconds() * 1000)
        formatted_temp = int(87181 + self.temp * 50)
        formatted_temp2 = int(self.temp * 1000)
        formatted_alt = int(self.altitude * 1000)
        byte_contents = struct.pack("<Iiii", formatted_secs, formatted_temp, formatted_temp2, formatted_alt)
        self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{byte_contents.hex().upper()}")

Classes

class SerialRN2483Emulator (serial_status: queue.Queue[str], radio_signal_report: queue.Queue[str], rn2483_radio_payloads: queue.Queue[str])

Process objects represent activity that is run in a separate process

The class is analogous to threading.Thread

Expand source code
class SerialRN2483Emulator(Process):
    def __init__(self, serial_status: Queue[str], radio_signal_report: Queue[str], rn2483_radio_payloads: Queue[str]):
        super().__init__()

        self.serial_status: Queue[str] = serial_status

        self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
        self.radio_signal_report: Queue[str] = radio_signal_report

        # Emulation Variables
        self.altitude: float = 0
        self.temp: float = 22
        self.going_up: bool = True
        self.startup_time: datetime = datetime.now()

        self.run()

    def run(self):
        self.serial_status.put("rn2483_connected True")
        self.serial_status.put("rn2483_port test")
        self.radio_signal_report.put("snr 30")
        # self.radio_signal_report.put("rssi -55")
        while True:
            self.tester()
            time.sleep(random.uniform(0, 2000) / 100000)

    def tester(self):
        """Generates test data to give to the telemetry process"""
        random_alternation = int(random.uniform(0, 1000))
        if self.going_up:
            self.temp += random_alternation / 500
        else:
            self.temp -= random_alternation / 500

        if self.temp > 100:
            self.going_up = False
        elif self.temp < 20:
            self.going_up = True

        self.altitude += random.uniform(0, 4)

        packet_call_sign = b"Devils".hex()
        six_byte_spacer = b"      ".hex()  # Should be packet header data!!!
        packet_header = f"{packet_call_sign}{six_byte_spacer}"
        block_header = "840C0000"  # Should be struct generated header data!!!

        offset = datetime.now() - self.startup_time

        # self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{'E01F00008D540100BC57FF0010FEFFFF'}")
        formatted_secs = int(offset.total_seconds() * 1000)
        formatted_temp = int(87181 + self.temp * 50)
        formatted_temp2 = int(self.temp * 1000)
        formatted_alt = int(self.altitude * 1000)
        byte_contents = struct.pack("<Iiii", formatted_secs, formatted_temp, formatted_temp2, formatted_alt)
        self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{byte_contents.hex().upper()}")

Ancestors

  • multiprocessing.context.Process
  • multiprocessing.process.BaseProcess

Methods

def run(self)

Method to be run in sub-process; can be overridden in sub-class

Expand source code
def run(self):
    self.serial_status.put("rn2483_connected True")
    self.serial_status.put("rn2483_port test")
    self.radio_signal_report.put("snr 30")
    # self.radio_signal_report.put("rssi -55")
    while True:
        self.tester()
        time.sleep(random.uniform(0, 2000) / 100000)
def tester(self)

Generates test data to give to the telemetry process

Expand source code
def tester(self):
    """Generates test data to give to the telemetry process"""
    random_alternation = int(random.uniform(0, 1000))
    if self.going_up:
        self.temp += random_alternation / 500
    else:
        self.temp -= random_alternation / 500

    if self.temp > 100:
        self.going_up = False
    elif self.temp < 20:
        self.going_up = True

    self.altitude += random.uniform(0, 4)

    packet_call_sign = b"Devils".hex()
    six_byte_spacer = b"      ".hex()  # Should be packet header data!!!
    packet_header = f"{packet_call_sign}{six_byte_spacer}"
    block_header = "840C0000"  # Should be struct generated header data!!!

    offset = datetime.now() - self.startup_time

    # self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{'E01F00008D540100BC57FF0010FEFFFF'}")
    formatted_secs = int(offset.total_seconds() * 1000)
    formatted_temp = int(87181 + self.temp * 50)
    formatted_temp2 = int(self.temp * 1000)
    formatted_alt = int(self.altitude * 1000)
    byte_contents = struct.pack("<Iiii", formatted_secs, formatted_temp, formatted_temp2, formatted_alt)
    self.rn2483_radio_payloads.put(f"{packet_header}{block_header}{byte_contents.hex().upper()}")