Module modules.serial.serial_rn2483_radio

Process for controlling the setup of the RN2483 radio and reading its received messages.

Expand source code
"""Process for controlling the setup of the RN2483 radio and reading its received messages."""

import time
import logging
from queue import Queue
from serial import SerialException
from modules.misc.config import RadioParameters
from modules.serial.rn2483_radio import RN2483Radio

logger = logging.getLogger(__name__)


def rn2483_radio_process(
    serial_status: Queue[str],
    radio_signal_report: Queue[int],
    rn2483_radio_input: Queue[str],
    rn2483_radio_payloads: Queue[str],
    serial_port: str,
    settings: RadioParameters,
):
    """Runs the primary logic for connecting to and reading from the RN2483 radio."""
    radio = RN2483Radio(serial_port)

    logger.info(f"RN2483 Radio: Connected to {serial_port}")
    serial_status.put("rn2483_connected True")
    serial_status.put(f"rn2483_port {serial_port}")

    # Set up radio
    while True:
        try:
            radio.setup(settings)
            logger.debug("Radio initialization worked.")
            break
        except SerialException:
            serial_status.put("rn2483_connected False")
            serial_status.put("rn2483_port null")
            logger.error("RN2483 Radio: Error communicating with serial device.")
            time.sleep(3)

    # Get transmissions
    while True:
        while not rn2483_radio_input.empty():
            command_string = rn2483_radio_input.get()
            if command_string == "radio get snr":
                radio_signal_report.put(radio.signal_report())
            else:
                logger.error(f"Radio command '{command_string}' is not implemented.")

        # TODO: LIMIT TO ONLY AFTER THE ENTIRE BATCH IS DONE.
        # TODO: AFTER SENDING A COMMAND TO RADIO RECALL SET_RX_MODE()

        # Put serial message in data queue for telemetry
        message = radio.receive()
        if message is not None:
            logger.info(f"Received: {message}")
            rn2483_radio_payloads.put(message)

Functions

def rn2483_radio_process(serial_status: queue.Queue[str], radio_signal_report: queue.Queue[int], rn2483_radio_input: queue.Queue[str], rn2483_radio_payloads: queue.Queue[str], serial_port: str, settings: RadioParameters)

Runs the primary logic for connecting to and reading from the RN2483 radio.

Expand source code
def rn2483_radio_process(
    serial_status: Queue[str],
    radio_signal_report: Queue[int],
    rn2483_radio_input: Queue[str],
    rn2483_radio_payloads: Queue[str],
    serial_port: str,
    settings: RadioParameters,
):
    """Runs the primary logic for connecting to and reading from the RN2483 radio."""
    radio = RN2483Radio(serial_port)

    logger.info(f"RN2483 Radio: Connected to {serial_port}")
    serial_status.put("rn2483_connected True")
    serial_status.put(f"rn2483_port {serial_port}")

    # Set up radio
    while True:
        try:
            radio.setup(settings)
            logger.debug("Radio initialization worked.")
            break
        except SerialException:
            serial_status.put("rn2483_connected False")
            serial_status.put("rn2483_port null")
            logger.error("RN2483 Radio: Error communicating with serial device.")
            time.sleep(3)

    # Get transmissions
    while True:
        while not rn2483_radio_input.empty():
            command_string = rn2483_radio_input.get()
            if command_string == "radio get snr":
                radio_signal_report.put(radio.signal_report())
            else:
                logger.error(f"Radio command '{command_string}' is not implemented.")

        # TODO: LIMIT TO ONLY AFTER THE ENTIRE BATCH IS DONE.
        # TODO: AFTER SENDING A COMMAND TO RADIO RECALL SET_RX_MODE()

        # Put serial message in data queue for telemetry
        message = radio.receive()
        if message is not None:
            logger.info(f"Received: {message}")
            rn2483_radio_payloads.put(message)