Module modules.serial.rn2483_radio

Wrapper around RN2483 radio module.

See the command data sheet for more information: https://ww1.microchip.com/downloads/en/DeviceDoc/RN2483-LoRa-Technology-Module-Command-Reference-User-Guide-DS40001784G.pdf

Expand source code
"""
Wrapper around RN2483 radio module.

See the command data sheet for more information:
https://ww1.microchip.com/downloads/en/DeviceDoc/RN2483-LoRa-Technology-Module-Command-Reference-User-Guide-DS40001784G.pdf
"""

from typing import Optional
from serial import Serial, EIGHTBITS, PARITY_NONE, SerialException
from modules.misc.config import RadioParameters

RN2483_BAUD: int = 57600  # The baud rate of the RN2483 radio
NUM_GPIO: int = 14  # Number of GPIO pins on the RN2483 module
READ_TIMEOUT: float = 10.0  # Time out for serial read operations

# Radio parameters
MODULATION_MODES: list[str] = ["lora", "fsk"]
POWER_MIN: int = -3
POWER_MAX: int = 16
VALID_SPREADING_FACTORS: list[int] = [7, 8, 9, 10, 11, 12]
VALID_CODING_RATES: list[str] = ["4/5", "4/6", "4/7", "4/8"]
VALID_BANDWIDTHS: list[int] = [125, 250, 500]
SYNC_MIN: int = 0
SYNC_MAX: int = 256
PREAMBLE_MIN: int = 0
PREAMBLE_MAX: int = 65535

# Keywords for parameter setting commands of the RN2483 module over serial
SETTING_KW: dict[str, str] = {
    "modulation": "mod",
    "frequency": "freq",
    "power": "pwr",
    "spread_factor": "sf",
    "coding_rate": "cr",
    "bandwidth": "bw",
    "preamble_len": "prlen",
    "cyclic_redundancy": "crc",
    "iqi": "iqi",
    "sync_word": "sync",
}


# Helper functions
def wait_for_ok(conn: Serial) -> bool:
    """
    Check to see if 'ok' is loaded onto the serial line by the RN2483 radio. If we receive 'ok' then this
    function returns True. If anything else is read from the serial line then this function returns False.

    Arguments:
        conn: Serial connection to an RN2483 radio.
    """
    rv = str(conn.readline())  # Read from serial line
    return ("ok" in rv) or ("4294967245" in rv)


def radio_write(conn: Serial, data: str) -> None:
    """
    Writes data to the RN2483 radio via UART.

    Arguments:
        conn: A serial connection to the RN2483 radio.
        data: The full command or data to be sent to the RN2483 radio.
    """
    data += "\r\n"
    conn.flush()  # Flush the serial port
    conn.write(data.encode("utf-8"))  # Encode command_string as bytes and then transmit over serial port


def radio_write_ok(conn: Serial, data: str) -> bool:
    """
    Writes a command to the radio and waits for a response of 'ok'.

    Arguments:
        conn: A serial connection to the radio.
        data: The full command or data to be sent to the RN2483 radio.
    """
    radio_write(conn, data)
    return wait_for_ok(conn)


class RN2483Radio:
    def __init__(self, serial_port: str):
        self.serial = Serial(
            port=serial_port,
            baudrate=RN2483_BAUD,
            bytesize=EIGHTBITS,
            parity=PARITY_NONE,
            stopbits=1,
        )
        self.serial.timeout = READ_TIMEOUT  # Read timeout

    def init_gpio(self) -> None:
        """Set all GPIO pins to input mode, thereby putting them in a state of high impedance."""

        radio_write(self.serial, "sys set pinmode GPIO0 digout")
        radio_write(self.serial, "sys set pinmode GPIO1 digout")
        radio_write(self.serial, "sys set pinmode GPIO2 digout")
        radio_write(self.serial, "sys set pindig GPIO0 1")
        radio_write(self.serial, "sys set pindig GPIO1 1")
        radio_write(self.serial, "sys set pindig GPIO2 0")

        for i in range(NUM_GPIO):
            radio_write(self.serial, f"sys set pinmode GPIO{i} digin")

    def reset(self) -> bool:
        """
        Performs a software reset on the RN2483 radio.

        Returns:
            True if the reset was successful, false otherwise.
        """
        radio_write(self.serial, "sys reset")
        wait_for_ok(self.serial)
        return "RN2483" in str(self.serial.readline())  # Confirm from the RN2483 radio that the reset was a success

    def configure(self, parameters: RadioParameters) -> None:
        """
        Configures the RN2483 radio with the provided radio parameters.

        Arguments:
            parameters: The parameters to set the radio with.

        Raises:
            SerialException: When a radio parameter could not be set.
        """

        for parameter, value in parameters:
            # Special case where spread factor value must be preceded by sf
            if parameter == "spread_factor":
                value = f"sf{value}"

            # Special case: boolean settings must be specified using on/off terms instead of true/false
            if parameter == "cyclic_redundancy" or parameter == "iqi":
                value = "on" if value else "off"

            if not radio_write_ok(self.serial, f"radio set {SETTING_KW[parameter]} {value}"):
                raise SerialException(f"Could not set parameter '{SETTING_KW[parameter]}' to '{value}'.")

    def setup(self, parameters: RadioParameters) -> None:
        """
        Resets the RN2483 radio, initializes its GPIO pins and sets its parameters to those provided.

        Arguments:
            parameters: The parameters to set up the radio with.

        Raises:
            SerialException: When a radio parameter could not be set.
        """
        self.reset()
        self.configure(parameters)
        if not radio_write_ok(self.serial, "radio set wdt 0"):  # Turn off watch dog timer
            raise SerialException("Could not turn off watchdog timer.")
        # For some reason, initializing GPIO causes issues. We don't need them anyway
        # self.init_gpio()

    def _set_rx_mode(self) -> bool:
        """
        Set the RN2483 radio to receive mode so that it constantly listens for transmissions.

        Returns:
            True setting receive mode worked, false otherwise.
        """
        if not radio_write_ok(self.serial, "mac pause"):  # This command must be passed before any reception can occur
            return False

        # Command radio to go into continuous reception mode
        radio_write(self.serial, "radio rx 0")
        result = str(self.serial.readline())
        if "busy" in result or "ok" in result:
            return True
        return False

    def receive(self) -> Optional[str]:
        """
        Checks for new transmissions on the serial connection.

        Returns:
            A string message that the radio received (in hexadecimal digits), otherwise None.
        """

        # Enter receive mode
        if not self._set_rx_mode():
            return None

        message = str(self.serial.readline())[10:-5]  # Trim off reception indicator

        # Check if message is in hex
        try:
            int(message, 16)
            return message
        except ValueError:
            return None

    def signal_report(self) -> int:
        """
        Gets a signal to noise ratio report from the radio.

        Returns:
            An integer from -128 to 127 representing the signal to noise ratio of the last received packet.
        """
        radio_write(self.serial, "radio get snr")
        return int(self.serial.readline())

Functions

def radio_write(conn: serial.serialposix.Serial, data: str) ‑> None

Writes data to the RN2483 radio via UART.

Arguments

conn: A serial connection to the RN2483 radio. data: The full command or data to be sent to the RN2483 radio.

Expand source code
def radio_write(conn: Serial, data: str) -> None:
    """
    Writes data to the RN2483 radio via UART.

    Arguments:
        conn: A serial connection to the RN2483 radio.
        data: The full command or data to be sent to the RN2483 radio.
    """
    data += "\r\n"
    conn.flush()  # Flush the serial port
    conn.write(data.encode("utf-8"))  # Encode command_string as bytes and then transmit over serial port
def radio_write_ok(conn: serial.serialposix.Serial, data: str) ‑> bool

Writes a command to the radio and waits for a response of 'ok'.

Arguments

conn: A serial connection to the radio. data: The full command or data to be sent to the RN2483 radio.

Expand source code
def radio_write_ok(conn: Serial, data: str) -> bool:
    """
    Writes a command to the radio and waits for a response of 'ok'.

    Arguments:
        conn: A serial connection to the radio.
        data: The full command or data to be sent to the RN2483 radio.
    """
    radio_write(conn, data)
    return wait_for_ok(conn)
def wait_for_ok(conn: serial.serialposix.Serial) ‑> bool

Check to see if 'ok' is loaded onto the serial line by the RN2483 radio. If we receive 'ok' then this function returns True. If anything else is read from the serial line then this function returns False.

Arguments

conn: Serial connection to an RN2483 radio.

Expand source code
def wait_for_ok(conn: Serial) -> bool:
    """
    Check to see if 'ok' is loaded onto the serial line by the RN2483 radio. If we receive 'ok' then this
    function returns True. If anything else is read from the serial line then this function returns False.

    Arguments:
        conn: Serial connection to an RN2483 radio.
    """
    rv = str(conn.readline())  # Read from serial line
    return ("ok" in rv) or ("4294967245" in rv)

Classes

class RN2483Radio (serial_port: str)
Expand source code
class RN2483Radio:
    def __init__(self, serial_port: str):
        self.serial = Serial(
            port=serial_port,
            baudrate=RN2483_BAUD,
            bytesize=EIGHTBITS,
            parity=PARITY_NONE,
            stopbits=1,
        )
        self.serial.timeout = READ_TIMEOUT  # Read timeout

    def init_gpio(self) -> None:
        """Set all GPIO pins to input mode, thereby putting them in a state of high impedance."""

        radio_write(self.serial, "sys set pinmode GPIO0 digout")
        radio_write(self.serial, "sys set pinmode GPIO1 digout")
        radio_write(self.serial, "sys set pinmode GPIO2 digout")
        radio_write(self.serial, "sys set pindig GPIO0 1")
        radio_write(self.serial, "sys set pindig GPIO1 1")
        radio_write(self.serial, "sys set pindig GPIO2 0")

        for i in range(NUM_GPIO):
            radio_write(self.serial, f"sys set pinmode GPIO{i} digin")

    def reset(self) -> bool:
        """
        Performs a software reset on the RN2483 radio.

        Returns:
            True if the reset was successful, false otherwise.
        """
        radio_write(self.serial, "sys reset")
        wait_for_ok(self.serial)
        return "RN2483" in str(self.serial.readline())  # Confirm from the RN2483 radio that the reset was a success

    def configure(self, parameters: RadioParameters) -> None:
        """
        Configures the RN2483 radio with the provided radio parameters.

        Arguments:
            parameters: The parameters to set the radio with.

        Raises:
            SerialException: When a radio parameter could not be set.
        """

        for parameter, value in parameters:
            # Special case where spread factor value must be preceded by sf
            if parameter == "spread_factor":
                value = f"sf{value}"

            # Special case: boolean settings must be specified using on/off terms instead of true/false
            if parameter == "cyclic_redundancy" or parameter == "iqi":
                value = "on" if value else "off"

            if not radio_write_ok(self.serial, f"radio set {SETTING_KW[parameter]} {value}"):
                raise SerialException(f"Could not set parameter '{SETTING_KW[parameter]}' to '{value}'.")

    def setup(self, parameters: RadioParameters) -> None:
        """
        Resets the RN2483 radio, initializes its GPIO pins and sets its parameters to those provided.

        Arguments:
            parameters: The parameters to set up the radio with.

        Raises:
            SerialException: When a radio parameter could not be set.
        """
        self.reset()
        self.configure(parameters)
        if not radio_write_ok(self.serial, "radio set wdt 0"):  # Turn off watch dog timer
            raise SerialException("Could not turn off watchdog timer.")
        # For some reason, initializing GPIO causes issues. We don't need them anyway
        # self.init_gpio()

    def _set_rx_mode(self) -> bool:
        """
        Set the RN2483 radio to receive mode so that it constantly listens for transmissions.

        Returns:
            True setting receive mode worked, false otherwise.
        """
        if not radio_write_ok(self.serial, "mac pause"):  # This command must be passed before any reception can occur
            return False

        # Command radio to go into continuous reception mode
        radio_write(self.serial, "radio rx 0")
        result = str(self.serial.readline())
        if "busy" in result or "ok" in result:
            return True
        return False

    def receive(self) -> Optional[str]:
        """
        Checks for new transmissions on the serial connection.

        Returns:
            A string message that the radio received (in hexadecimal digits), otherwise None.
        """

        # Enter receive mode
        if not self._set_rx_mode():
            return None

        message = str(self.serial.readline())[10:-5]  # Trim off reception indicator

        # Check if message is in hex
        try:
            int(message, 16)
            return message
        except ValueError:
            return None

    def signal_report(self) -> int:
        """
        Gets a signal to noise ratio report from the radio.

        Returns:
            An integer from -128 to 127 representing the signal to noise ratio of the last received packet.
        """
        radio_write(self.serial, "radio get snr")
        return int(self.serial.readline())

Methods

def configure(self, parameters: RadioParameters) ‑> None

Configures the RN2483 radio with the provided radio parameters.

Arguments

parameters: The parameters to set the radio with.

Raises

SerialException
When a radio parameter could not be set.
Expand source code
def configure(self, parameters: RadioParameters) -> None:
    """
    Configures the RN2483 radio with the provided radio parameters.

    Arguments:
        parameters: The parameters to set the radio with.

    Raises:
        SerialException: When a radio parameter could not be set.
    """

    for parameter, value in parameters:
        # Special case where spread factor value must be preceded by sf
        if parameter == "spread_factor":
            value = f"sf{value}"

        # Special case: boolean settings must be specified using on/off terms instead of true/false
        if parameter == "cyclic_redundancy" or parameter == "iqi":
            value = "on" if value else "off"

        if not radio_write_ok(self.serial, f"radio set {SETTING_KW[parameter]} {value}"):
            raise SerialException(f"Could not set parameter '{SETTING_KW[parameter]}' to '{value}'.")
def init_gpio(self) ‑> None

Set all GPIO pins to input mode, thereby putting them in a state of high impedance.

Expand source code
def init_gpio(self) -> None:
    """Set all GPIO pins to input mode, thereby putting them in a state of high impedance."""

    radio_write(self.serial, "sys set pinmode GPIO0 digout")
    radio_write(self.serial, "sys set pinmode GPIO1 digout")
    radio_write(self.serial, "sys set pinmode GPIO2 digout")
    radio_write(self.serial, "sys set pindig GPIO0 1")
    radio_write(self.serial, "sys set pindig GPIO1 1")
    radio_write(self.serial, "sys set pindig GPIO2 0")

    for i in range(NUM_GPIO):
        radio_write(self.serial, f"sys set pinmode GPIO{i} digin")
def receive(self) ‑> Optional[str]

Checks for new transmissions on the serial connection.

Returns

A string message that the radio received (in hexadecimal digits), otherwise None.

Expand source code
def receive(self) -> Optional[str]:
    """
    Checks for new transmissions on the serial connection.

    Returns:
        A string message that the radio received (in hexadecimal digits), otherwise None.
    """

    # Enter receive mode
    if not self._set_rx_mode():
        return None

    message = str(self.serial.readline())[10:-5]  # Trim off reception indicator

    # Check if message is in hex
    try:
        int(message, 16)
        return message
    except ValueError:
        return None
def reset(self) ‑> bool

Performs a software reset on the RN2483 radio.

Returns

True if the reset was successful, false otherwise.

Expand source code
def reset(self) -> bool:
    """
    Performs a software reset on the RN2483 radio.

    Returns:
        True if the reset was successful, false otherwise.
    """
    radio_write(self.serial, "sys reset")
    wait_for_ok(self.serial)
    return "RN2483" in str(self.serial.readline())  # Confirm from the RN2483 radio that the reset was a success
def setup(self, parameters: RadioParameters) ‑> None

Resets the RN2483 radio, initializes its GPIO pins and sets its parameters to those provided.

Arguments

parameters: The parameters to set up the radio with.

Raises

SerialException
When a radio parameter could not be set.
Expand source code
def setup(self, parameters: RadioParameters) -> None:
    """
    Resets the RN2483 radio, initializes its GPIO pins and sets its parameters to those provided.

    Arguments:
        parameters: The parameters to set up the radio with.

    Raises:
        SerialException: When a radio parameter could not be set.
    """
    self.reset()
    self.configure(parameters)
    if not radio_write_ok(self.serial, "radio set wdt 0"):  # Turn off watch dog timer
        raise SerialException("Could not turn off watchdog timer.")
    # For some reason, initializing GPIO causes issues. We don't need them anyway
    # self.init_gpio()
def signal_report(self) ‑> int

Gets a signal to noise ratio report from the radio.

Returns

An integer from -128 to 127 representing the signal to noise ratio of the last received packet.

Expand source code
def signal_report(self) -> int:
    """
    Gets a signal to noise ratio report from the radio.

    Returns:
        An integer from -128 to 127 representing the signal to noise ratio of the last received packet.
    """
    radio_write(self.serial, "radio get snr")
    return int(self.serial.readline())