Module modules.websocket.websocket
Expand source code
# Tornado websocket for UI communication
# This is PURELY a pass through of data for connectivity. No format conversion is done here.
# Incoming information comes from telemetry_json_output from telemetry
# Outputs information to connected websocket clients
#
# Authors:
# Thomas Selwyn (Devil)
from __future__ import annotations
import json
from multiprocessing import Queue, Process
from abc import ABC
from typing import Optional, Any
import logging
import os.path
import tornado.gen
import tornado.httpserver
import tornado.ioloop
import tornado.web
import tornado.websocket
# Constants
ws_commands_queue: Queue[Any]
# Logger
logger = logging.getLogger(__name__)
class WebSocketHandler(Process):
"""Handles starting the websocket server process."""
def __init__(self, telemetry_json_output: Queue[Any], ws_commands: Queue[Any]):
super().__init__()
global ws_commands_queue
self.telemetry_json_output: Queue[Any] = telemetry_json_output
ws_commands_queue = ws_commands
# Default to test mode
# ws_commands_queue.put("serial rn2483_radio connect test")
self.start_websocket_server()
def start_websocket_server(self) -> None:
"""Starts up the websocket server."""
wss = tornado.web.Application(
[
(r"/websocket", TornadoWSServer),
(
r"/(.*)",
tornado.web.StaticFileHandler,
{"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"},
),
],
websocket_ping_interval=5,
websocket_ping_timeout=10,
)
try:
_ = wss.listen(33845)
logger.info("HTTP listening on port 33845, accessible at http://localhost:33845")
except OSError:
logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!")
ws_commands_queue.put("shutdown")
io_loop = tornado.ioloop.IOLoop.current()
periodic_callback = tornado.ioloop.PeriodicCallback(
lambda: TornadoWSServer.send_message(self.check_for_messages()), 50
)
periodic_callback.start()
io_loop.start()
def check_for_messages(self) -> Optional[str]:
"""Returns any JSON data that may be on the telemetry JSON output queue."""
json_data = None
while not self.telemetry_json_output.empty():
json_data = self.telemetry_json_output.get()
return json.dumps(json_data)
class TornadoWSServer(tornado.websocket.WebSocketHandler, ABC):
"""The server which handles websocket connections."""
clients: set[TornadoWSServer] = set()
last_msg_send: str = ""
global ws_commands_queue
def open(self) -> None:
TornadoWSServer.clients.add(self)
self.send_message(self.last_msg_send)
logger.info("Client connected")
def on_close(self) -> None:
TornadoWSServer.clients.remove(self)
logger.info("Client disconnected")
@staticmethod
def on_message(message: str) -> None:
global ws_commands_queue
ws_commands_queue.put(message)
def check_origin(self, _) -> bool:
"""Authenticates clients from any host origin (_ parameter)."""
return True
@classmethod
def send_message(cls, message: str | None) -> None:
if message is None or message == "null":
return
cls.last_msg_send = message
for client in cls.clients:
_ = client.write_message(message)
Classes
class TornadoWSServer (application: tornado.web.Application, request: tornado.httputil.HTTPServerRequest, **kwargs: Any)
-
The server which handles websocket connections.
Expand source code
class TornadoWSServer(tornado.websocket.WebSocketHandler, ABC): """The server which handles websocket connections.""" clients: set[TornadoWSServer] = set() last_msg_send: str = "" global ws_commands_queue def open(self) -> None: TornadoWSServer.clients.add(self) self.send_message(self.last_msg_send) logger.info("Client connected") def on_close(self) -> None: TornadoWSServer.clients.remove(self) logger.info("Client disconnected") @staticmethod def on_message(message: str) -> None: global ws_commands_queue ws_commands_queue.put(message) def check_origin(self, _) -> bool: """Authenticates clients from any host origin (_ parameter).""" return True @classmethod def send_message(cls, message: str | None) -> None: if message is None or message == "null": return cls.last_msg_send = message for client in cls.clients: _ = client.write_message(message)
Ancestors
- tornado.websocket.WebSocketHandler
- tornado.web.RequestHandler
- abc.ABC
Class variables
var clients : set[TornadoWSServer]
var last_msg_send : str
Static methods
def on_message(message: str) ‑> None
-
Handle incoming messages on the WebSocket
This method must be overridden.
Changed in version: 4.5
on_message
can be a coroutine.Expand source code
@staticmethod def on_message(message: str) -> None: global ws_commands_queue ws_commands_queue.put(message)
def send_message(message: str | None) ‑> None
-
Expand source code
@classmethod def send_message(cls, message: str | None) -> None: if message is None or message == "null": return cls.last_msg_send = message for client in cls.clients: _ = client.write_message(message)
Methods
def check_origin(self, _) ‑> bool
-
Authenticates clients from any host origin (_ parameter).
Expand source code
def check_origin(self, _) -> bool: """Authenticates clients from any host origin (_ parameter).""" return True
def on_close(self) ‑> None
-
Invoked when the WebSocket is closed.
If the connection was closed cleanly and a status code or reason phrase was supplied, these values will be available as the attributes
self.close_code
andself.close_reason
.Changed in version: 4.0
Added
close_code
andclose_reason
attributes.Expand source code
def on_close(self) -> None: TornadoWSServer.clients.remove(self) logger.info("Client disconnected")
def open(self) ‑> None
-
Invoked when a new WebSocket is opened.
The arguments to
open
are extracted from thetornado.web.URLSpec
regular expression, just like the arguments totornado.web.RequestHandler.get
.open
may be a coroutine.on_message
will not be called untilopen
has returned.Changed in version: 5.1
open
may be a coroutine.Expand source code
def open(self) -> None: TornadoWSServer.clients.add(self) self.send_message(self.last_msg_send) logger.info("Client connected")
class WebSocketHandler (telemetry_json_output: Queue[Any], ws_commands: Queue[Any])
-
Handles starting the websocket server process.
Expand source code
class WebSocketHandler(Process): """Handles starting the websocket server process.""" def __init__(self, telemetry_json_output: Queue[Any], ws_commands: Queue[Any]): super().__init__() global ws_commands_queue self.telemetry_json_output: Queue[Any] = telemetry_json_output ws_commands_queue = ws_commands # Default to test mode # ws_commands_queue.put("serial rn2483_radio connect test") self.start_websocket_server() def start_websocket_server(self) -> None: """Starts up the websocket server.""" wss = tornado.web.Application( [ (r"/websocket", TornadoWSServer), ( r"/(.*)", tornado.web.StaticFileHandler, {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"}, ), ], websocket_ping_interval=5, websocket_ping_timeout=10, ) try: _ = wss.listen(33845) logger.info("HTTP listening on port 33845, accessible at http://localhost:33845") except OSError: logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!") ws_commands_queue.put("shutdown") io_loop = tornado.ioloop.IOLoop.current() periodic_callback = tornado.ioloop.PeriodicCallback( lambda: TornadoWSServer.send_message(self.check_for_messages()), 50 ) periodic_callback.start() io_loop.start() def check_for_messages(self) -> Optional[str]: """Returns any JSON data that may be on the telemetry JSON output queue.""" json_data = None while not self.telemetry_json_output.empty(): json_data = self.telemetry_json_output.get() return json.dumps(json_data)
Ancestors
- multiprocessing.context.Process
- multiprocessing.process.BaseProcess
Methods
def check_for_messages(self) ‑> Optional[str]
-
Returns any JSON data that may be on the telemetry JSON output queue.
Expand source code
def check_for_messages(self) -> Optional[str]: """Returns any JSON data that may be on the telemetry JSON output queue.""" json_data = None while not self.telemetry_json_output.empty(): json_data = self.telemetry_json_output.get() return json.dumps(json_data)
def start_websocket_server(self) ‑> None
-
Starts up the websocket server.
Expand source code
def start_websocket_server(self) -> None: """Starts up the websocket server.""" wss = tornado.web.Application( [ (r"/websocket", TornadoWSServer), ( r"/(.*)", tornado.web.StaticFileHandler, {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"}, ), ], websocket_ping_interval=5, websocket_ping_timeout=10, ) try: _ = wss.listen(33845) logger.info("HTTP listening on port 33845, accessible at http://localhost:33845") except OSError: logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!") ws_commands_queue.put("shutdown") io_loop = tornado.ioloop.IOLoop.current() periodic_callback = tornado.ioloop.PeriodicCallback( lambda: TornadoWSServer.send_message(self.check_for_messages()), 50 ) periodic_callback.start() io_loop.start()