Module modules.serial.serial_manager
Handles connections, specifying what serial port a radio should use and spawning the serial processes.
Expand source code
"""Handles connections, specifying what serial port a radio should use and spawning the serial processes."""
import glob
import sys
import logging
from queue import Queue
from multiprocessing import Process, active_children
from serial import Serial, SerialException
from modules.misc.config import Config
from modules.serial.serial_rn2483_radio import rn2483_radio_process
from modules.serial.serial_rn2483_emulator import SerialRN2483Emulator
from signal import signal, SIGTERM
from types import FrameType
# Set up logging
logger = logging.getLogger(__name__)
def shutdown_sequence(signum: int, stack_frame: FrameType):
for child in active_children():
child.terminate()
exit(0)
def update_serial_ports(serial_status: Queue[str]) -> list[str]:
"""Finds and updates serial ports on device
:raises EnvironmentError:
On unsupported or unknown platforms
:returns:
A list of the serial ports available on the system
"""
com_ports: list[str] = [""]
if sys.platform.startswith("win"):
com_ports = ["COM%s" % (i + 1) for i in range(256)]
elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"):
# '/dev/tty[A-Za-z]*'
com_ports = glob.glob("/dev/ttyUSB*")
com_ports += glob.glob("/dev/ACM*")
elif sys.platform.startswith("darwin"):
com_ports = glob.glob("/dev/tty.*")
tested_com_ports: list[str] = []
# Checks ports if they are potential COM ports
for test_port in com_ports:
try:
ser = Serial(test_port)
ser.close()
tested_com_ports.append(test_port)
except (OSError, SerialException):
pass
tested_com_ports.append("test")
serial_status.put(f"serial_ports {tested_com_ports}")
return tested_com_ports
class SerialManager:
def __init__(
self,
serial_status: Queue[str],
serial_ws_commands: Queue[list[str]],
radio_signal_report: Queue[int],
rn2483_radio_input: Queue[str],
rn2483_radio_payloads: Queue[str],
config: Config,
):
self.serial_status: Queue[str] = serial_status
self.serial_ports: list[str] = []
self.serial_ws_commands: Queue[list[str]] = serial_ws_commands
self.radio_signal_report: Queue[int] = radio_signal_report
self.rn2483_radio_input: Queue[str] = rn2483_radio_input
self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads
self.rn2483_radio: Process | None = None
self.config = config
# Immediately find serial ports
self.serial_ports = update_serial_ports(self.serial_status)
# Handle program closing to ensure no orphan processes
signal(SIGTERM, shutdown_sequence) # type:ignore
def run(self):
logger.info("Serial manager started.")
while True:
ws_cmd = self.serial_ws_commands.get()
# Parse command
try:
match ws_cmd[0]:
case "rn2483_radio":
self.parse_rn2483_radio_ws(ws_cmd[1:])
case "update":
self.serial_ports = update_serial_ports(self.serial_status)
case _:
logger.error("Serial: Invalid device type.")
except IndexError:
logger.error("Serial: Error parsing ws command")
def parse_rn2483_radio_ws(self, ws_cmd: list[str]) -> None:
"""Parses the websocket commands relating to the RN2483_radio"""
radio_ws_cmd = ws_cmd[0]
if radio_ws_cmd == "connect" and self.rn2483_radio is None:
proposed_serial_port = ws_cmd[1]
if proposed_serial_port == "test":
self.rn2483_radio = Process(
target=SerialRN2483Emulator,
args=(self.serial_status, self.radio_signal_report, self.rn2483_radio_payloads),
daemon=True,
)
else:
self.rn2483_radio = Process(
target=rn2483_radio_process,
args=(
self.serial_status,
self.radio_signal_report,
self.rn2483_radio_input,
self.rn2483_radio_payloads,
proposed_serial_port,
self.config.radio_parameters,
),
daemon=True,
)
# Start the appropriate process (emulator or real radio)
self.rn2483_radio.start()
elif radio_ws_cmd == "connect":
logger.info("Already connected.")
elif radio_ws_cmd == "disconnect" and self.rn2483_radio is not None:
logger.info("Serial: RN2483 Radio terminating")
self.serial_status.put("rn2483_connected False")
self.serial_status.put("rn2483_port null")
self.rn2483_radio.terminate()
self.rn2483_radio = None
elif radio_ws_cmd == "disconnect":
logger.warning("Serial: RN2483 Radio already disconnected.")
Functions
def shutdown_sequence(signum: int, stack_frame: frame)
-
Expand source code
def shutdown_sequence(signum: int, stack_frame: FrameType): for child in active_children(): child.terminate() exit(0)
def update_serial_ports(serial_status: queue.Queue[str]) ‑> list[str]
-
Finds and updates serial ports on device
:raises EnvironmentError: On unsupported or unknown platforms :returns: A list of the serial ports available on the system
Expand source code
def update_serial_ports(serial_status: Queue[str]) -> list[str]: """Finds and updates serial ports on device :raises EnvironmentError: On unsupported or unknown platforms :returns: A list of the serial ports available on the system """ com_ports: list[str] = [""] if sys.platform.startswith("win"): com_ports = ["COM%s" % (i + 1) for i in range(256)] elif sys.platform.startswith("linux") or sys.platform.startswith("cygwin"): # '/dev/tty[A-Za-z]*' com_ports = glob.glob("/dev/ttyUSB*") com_ports += glob.glob("/dev/ACM*") elif sys.platform.startswith("darwin"): com_ports = glob.glob("/dev/tty.*") tested_com_ports: list[str] = [] # Checks ports if they are potential COM ports for test_port in com_ports: try: ser = Serial(test_port) ser.close() tested_com_ports.append(test_port) except (OSError, SerialException): pass tested_com_ports.append("test") serial_status.put(f"serial_ports {tested_com_ports}") return tested_com_ports
Classes
class SerialManager (serial_status: queue.Queue[str], serial_ws_commands: queue.Queue[list[str]], radio_signal_report: queue.Queue[int], rn2483_radio_input: queue.Queue[str], rn2483_radio_payloads: queue.Queue[str], config: Config)
-
Expand source code
class SerialManager: def __init__( self, serial_status: Queue[str], serial_ws_commands: Queue[list[str]], radio_signal_report: Queue[int], rn2483_radio_input: Queue[str], rn2483_radio_payloads: Queue[str], config: Config, ): self.serial_status: Queue[str] = serial_status self.serial_ports: list[str] = [] self.serial_ws_commands: Queue[list[str]] = serial_ws_commands self.radio_signal_report: Queue[int] = radio_signal_report self.rn2483_radio_input: Queue[str] = rn2483_radio_input self.rn2483_radio_payloads: Queue[str] = rn2483_radio_payloads self.rn2483_radio: Process | None = None self.config = config # Immediately find serial ports self.serial_ports = update_serial_ports(self.serial_status) # Handle program closing to ensure no orphan processes signal(SIGTERM, shutdown_sequence) # type:ignore def run(self): logger.info("Serial manager started.") while True: ws_cmd = self.serial_ws_commands.get() # Parse command try: match ws_cmd[0]: case "rn2483_radio": self.parse_rn2483_radio_ws(ws_cmd[1:]) case "update": self.serial_ports = update_serial_ports(self.serial_status) case _: logger.error("Serial: Invalid device type.") except IndexError: logger.error("Serial: Error parsing ws command") def parse_rn2483_radio_ws(self, ws_cmd: list[str]) -> None: """Parses the websocket commands relating to the RN2483_radio""" radio_ws_cmd = ws_cmd[0] if radio_ws_cmd == "connect" and self.rn2483_radio is None: proposed_serial_port = ws_cmd[1] if proposed_serial_port == "test": self.rn2483_radio = Process( target=SerialRN2483Emulator, args=(self.serial_status, self.radio_signal_report, self.rn2483_radio_payloads), daemon=True, ) else: self.rn2483_radio = Process( target=rn2483_radio_process, args=( self.serial_status, self.radio_signal_report, self.rn2483_radio_input, self.rn2483_radio_payloads, proposed_serial_port, self.config.radio_parameters, ), daemon=True, ) # Start the appropriate process (emulator or real radio) self.rn2483_radio.start() elif radio_ws_cmd == "connect": logger.info("Already connected.") elif radio_ws_cmd == "disconnect" and self.rn2483_radio is not None: logger.info("Serial: RN2483 Radio terminating") self.serial_status.put("rn2483_connected False") self.serial_status.put("rn2483_port null") self.rn2483_radio.terminate() self.rn2483_radio = None elif radio_ws_cmd == "disconnect": logger.warning("Serial: RN2483 Radio already disconnected.")
Methods
def parse_rn2483_radio_ws(self, ws_cmd: list[str]) ‑> None
-
Parses the websocket commands relating to the RN2483_radio
Expand source code
def parse_rn2483_radio_ws(self, ws_cmd: list[str]) -> None: """Parses the websocket commands relating to the RN2483_radio""" radio_ws_cmd = ws_cmd[0] if radio_ws_cmd == "connect" and self.rn2483_radio is None: proposed_serial_port = ws_cmd[1] if proposed_serial_port == "test": self.rn2483_radio = Process( target=SerialRN2483Emulator, args=(self.serial_status, self.radio_signal_report, self.rn2483_radio_payloads), daemon=True, ) else: self.rn2483_radio = Process( target=rn2483_radio_process, args=( self.serial_status, self.radio_signal_report, self.rn2483_radio_input, self.rn2483_radio_payloads, proposed_serial_port, self.config.radio_parameters, ), daemon=True, ) # Start the appropriate process (emulator or real radio) self.rn2483_radio.start() elif radio_ws_cmd == "connect": logger.info("Already connected.") elif radio_ws_cmd == "disconnect" and self.rn2483_radio is not None: logger.info("Serial: RN2483 Radio terminating") self.serial_status.put("rn2483_connected False") self.serial_status.put("rn2483_port null") self.rn2483_radio.terminate() self.rn2483_radio = None elif radio_ws_cmd == "disconnect": logger.warning("Serial: RN2483 Radio already disconnected.")
def run(self)
-
Expand source code
def run(self): logger.info("Serial manager started.") while True: ws_cmd = self.serial_ws_commands.get() # Parse command try: match ws_cmd[0]: case "rn2483_radio": self.parse_rn2483_radio_ws(ws_cmd[1:]) case "update": self.serial_ports = update_serial_ports(self.serial_status) case _: logger.error("Serial: Invalid device type.") except IndexError: logger.error("Serial: Error parsing ws command")