Module modules.websocket.websocket

Expand source code
# Tornado websocket for UI communication
# This is PURELY a pass through of data for connectivity. No format conversion is done here.
# Incoming information comes from telemetry_json_output from telemetry
# Outputs information to connected websocket clients
#
# Authors:
# Thomas Selwyn (Devil)

from __future__ import annotations
import json
from multiprocessing import Queue, Process
from abc import ABC
from typing import Optional, Any
import logging
import os.path
import tornado.gen
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.websocket

# Constants
ws_commands_queue: Queue[Any]

# Logger
logger = logging.getLogger(__name__)


class WebSocketHandler(Process):
    """Handles starting the websocket server process."""

    def __init__(self, telemetry_json_output: Queue[Any], ws_commands: Queue[Any]):
        super().__init__()
        global ws_commands_queue

        self.telemetry_json_output: Queue[Any] = telemetry_json_output
        ws_commands_queue = ws_commands

        # Default to test mode
        # ws_commands_queue.put("serial rn2483_radio connect test")

        self.start_websocket_server()

    def start_websocket_server(self) -> None:
        """Starts up the websocket server."""

        wss = tornado.web.Application(
            [
                (r"/websocket", TornadoWSServer),
                (
                    r"/(.*)",
                    tornado.web.StaticFileHandler,
                    {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"},
                ),
            ],
            websocket_ping_interval=5,
            websocket_ping_timeout=10,
        )

        try:
            _ = wss.listen(33845)
            logger.info("HTTP listening on port 33845, accessible at http://localhost:33845")
        except OSError:
            logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!")
            ws_commands_queue.put("shutdown")

        io_loop = tornado.ioloop.IOLoop.current()
        periodic_callback = tornado.ioloop.PeriodicCallback(
            lambda: TornadoWSServer.send_message(self.check_for_messages()), 50
        )

        periodic_callback.start()
        io_loop.start()

    def check_for_messages(self) -> Optional[str]:
        """Returns any JSON data that may be on the telemetry JSON output queue."""

        json_data = None
        while not self.telemetry_json_output.empty():
            json_data = self.telemetry_json_output.get()
        return json.dumps(json_data)


class TornadoWSServer(tornado.websocket.WebSocketHandler, ABC):
    """The server which handles websocket connections."""

    clients: set[TornadoWSServer] = set()
    last_msg_send: str = ""
    global ws_commands_queue

    def open(self) -> None:
        TornadoWSServer.clients.add(self)
        self.send_message(self.last_msg_send)
        logger.info("Client connected")

    def on_close(self) -> None:
        TornadoWSServer.clients.remove(self)
        logger.info("Client disconnected")

    @staticmethod
    def on_message(message: str) -> None:
        global ws_commands_queue
        ws_commands_queue.put(message)

    def check_origin(self, _) -> bool:
        """Authenticates clients from any host origin (_ parameter)."""
        return True

    @classmethod
    def send_message(cls, message: str | None) -> None:
        if message is None or message == "null":
            return

        cls.last_msg_send = message
        for client in cls.clients:
            _ = client.write_message(message)

Classes

class TornadoWSServer (application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)

The server which handles websocket connections.

Expand source code
class TornadoWSServer(tornado.websocket.WebSocketHandler, ABC):
    """The server which handles websocket connections."""

    clients: set[TornadoWSServer] = set()
    last_msg_send: str = ""
    global ws_commands_queue

    def open(self) -> None:
        TornadoWSServer.clients.add(self)
        self.send_message(self.last_msg_send)
        logger.info("Client connected")

    def on_close(self) -> None:
        TornadoWSServer.clients.remove(self)
        logger.info("Client disconnected")

    @staticmethod
    def on_message(message: str) -> None:
        global ws_commands_queue
        ws_commands_queue.put(message)

    def check_origin(self, _) -> bool:
        """Authenticates clients from any host origin (_ parameter)."""
        return True

    @classmethod
    def send_message(cls, message: str | None) -> None:
        if message is None or message == "null":
            return

        cls.last_msg_send = message
        for client in cls.clients:
            _ = client.write_message(message)

Ancestors

  • tornado.websocket.WebSocketHandler
  • tornado.web.RequestHandler
  • abc.ABC

Class variables

var clients : set[TornadoWSServer]
var last_msg_send : str

Static methods

def on_message(message: str) ‑> None

Handle incoming messages on the WebSocket

This method must be overridden.

Changed in version: 4.5

on_message can be a coroutine.

Expand source code
@staticmethod
def on_message(message: str) -> None:
    global ws_commands_queue
    ws_commands_queue.put(message)
def send_message(message: str | None) ‑> None
Expand source code
@classmethod
def send_message(cls, message: str | None) -> None:
    if message is None or message == "null":
        return

    cls.last_msg_send = message
    for client in cls.clients:
        _ = client.write_message(message)

Methods

def check_origin(self, _) ‑> bool

Authenticates clients from any host origin (_ parameter).

Expand source code
def check_origin(self, _) -> bool:
    """Authenticates clients from any host origin (_ parameter)."""
    return True
def on_close(self) ‑> None

Invoked when the WebSocket is closed.

If the connection was closed cleanly and a status code or reason phrase was supplied, these values will be available as the attributes self.close_code and self.close_reason.

Changed in version: 4.0

Added close_code and close_reason attributes.

Expand source code
def on_close(self) -> None:
    TornadoWSServer.clients.remove(self)
    logger.info("Client disconnected")
def open(self) ‑> None

Invoked when a new WebSocket is opened.

The arguments to open are extracted from the tornado.web.URLSpec regular expression, just like the arguments to tornado.web.RequestHandler.get.

open may be a coroutine. on_message will not be called until open has returned.

Changed in version: 5.1

open may be a coroutine.

Expand source code
def open(self) -> None:
    TornadoWSServer.clients.add(self)
    self.send_message(self.last_msg_send)
    logger.info("Client connected")
class WebSocketHandler (telemetry_json_output: Queue[Any], ws_commands: Queue[Any])

Handles starting the websocket server process.

Expand source code
class WebSocketHandler(Process):
    """Handles starting the websocket server process."""

    def __init__(self, telemetry_json_output: Queue[Any], ws_commands: Queue[Any]):
        super().__init__()
        global ws_commands_queue

        self.telemetry_json_output: Queue[Any] = telemetry_json_output
        ws_commands_queue = ws_commands

        # Default to test mode
        # ws_commands_queue.put("serial rn2483_radio connect test")

        self.start_websocket_server()

    def start_websocket_server(self) -> None:
        """Starts up the websocket server."""

        wss = tornado.web.Application(
            [
                (r"/websocket", TornadoWSServer),
                (
                    r"/(.*)",
                    tornado.web.StaticFileHandler,
                    {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"},
                ),
            ],
            websocket_ping_interval=5,
            websocket_ping_timeout=10,
        )

        try:
            _ = wss.listen(33845)
            logger.info("HTTP listening on port 33845, accessible at http://localhost:33845")
        except OSError:
            logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!")
            ws_commands_queue.put("shutdown")

        io_loop = tornado.ioloop.IOLoop.current()
        periodic_callback = tornado.ioloop.PeriodicCallback(
            lambda: TornadoWSServer.send_message(self.check_for_messages()), 50
        )

        periodic_callback.start()
        io_loop.start()

    def check_for_messages(self) -> Optional[str]:
        """Returns any JSON data that may be on the telemetry JSON output queue."""

        json_data = None
        while not self.telemetry_json_output.empty():
            json_data = self.telemetry_json_output.get()
        return json.dumps(json_data)

Ancestors

  • multiprocessing.context.Process
  • multiprocessing.process.BaseProcess

Methods

def check_for_messages(self) ‑> Optional[str]

Returns any JSON data that may be on the telemetry JSON output queue.

Expand source code
def check_for_messages(self) -> Optional[str]:
    """Returns any JSON data that may be on the telemetry JSON output queue."""

    json_data = None
    while not self.telemetry_json_output.empty():
        json_data = self.telemetry_json_output.get()
    return json.dumps(json_data)
def start_websocket_server(self) ‑> None

Starts up the websocket server.

Expand source code
def start_websocket_server(self) -> None:
    """Starts up the websocket server."""

    wss = tornado.web.Application(
        [
            (r"/websocket", TornadoWSServer),
            (
                r"/(.*)",
                tornado.web.StaticFileHandler,
                {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"},
            ),
        ],
        websocket_ping_interval=5,
        websocket_ping_timeout=10,
    )

    try:
        _ = wss.listen(33845)
        logger.info("HTTP listening on port 33845, accessible at http://localhost:33845")
    except OSError:
        logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!")
        ws_commands_queue.put("shutdown")

    io_loop = tornado.ioloop.IOLoop.current()
    periodic_callback = tornado.ioloop.PeriodicCallback(
        lambda: TornadoWSServer.send_message(self.check_for_messages()), 50
    )

    periodic_callback.start()
    io_loop.start()