Module modules.websocket.websocket
Classes
class TornadoWSServer (application: tornado.web.Application,
request: tornado.httputil.HTTPServerRequest,
**kwargs: Any)-
Expand source code
class TornadoWSServer(tornado.websocket.WebSocketHandler, ABC): """The server which handles websocket connections.""" clients: set[TornadoWSServer] = set() last_msg_send: str = "" pw = "d916d328c73327336b8ccb25a1309a9766df1131f3a5064473933d6aae617442" sudo_user = None def open(self, *args: Any, **kwargs: Any) -> None: TornadoWSServer.clients.add(self) self.send_message(self.last_msg_send) logger.info("Client connected") def on_close(self) -> None: TornadoWSServer.clients.remove(self) logger.info("Client disconnected") def on_message(self, message: str | bytes) -> None: message = str(message) logger.info(f"Received message: {message}") # When we allow many users to access the front end, only a single user should have access to commands # To facilitate this, very simple authentication is implemented. # Once a sudo_user has been authenticated, only messages sent by them should be processed if self == TornadoWSServer.sudo_user: if message == "deauth": TornadoWSServer.sudo_user = None else: ws_commands_queue.put(message) # If no one has been authenticated as the sudo_user, the only messages that should be processed are those # that try to authenticate elif TornadoWSServer.sudo_user is None: parsed_message = message.split(" ") if len(parsed_message) == 2 and parsed_message[0] == "auth": h = hashlib.sha256() h.update(parsed_message[1].encode()) if h.hexdigest() == TornadoWSServer.pw: TornadoWSServer.sudo_user = self logger.info("Successfully authenticated") else: logger.info("Incorrect password") else: logger.info("Insufficient permissions") def check_origin(self, origin: str) -> bool: """Authenticates clients from any host origin (_ parameter).""" return True @classmethod def send_message(cls, message: str | None) -> None: if message is None or message == "null": return cls.last_msg_send = message for client in cls.clients: _ = client.write_message(message)
The server which handles websocket connections.
Ancestors
- tornado.websocket.WebSocketHandler
- tornado.web.RequestHandler
- abc.ABC
Class variables
var clients : set[TornadoWSServer]
var last_msg_send : str
var pw
var sudo_user
Static methods
def send_message(message: str | None) ‑> None
Methods
def check_origin(self, origin: str) ‑> bool
-
Expand source code
def check_origin(self, origin: str) -> bool: """Authenticates clients from any host origin (_ parameter).""" return True
Authenticates clients from any host origin (_ parameter).
def on_close(self) ‑> None
-
Expand source code
def on_close(self) -> None: TornadoWSServer.clients.remove(self) logger.info("Client disconnected")
Invoked when the WebSocket is closed.
If the connection was closed cleanly and a status code or reason phrase was supplied, these values will be available as the attributes
self.close_code
andself.close_reason
.Changed in version: 4.0
Added
close_code
andclose_reason
attributes. def on_message(self, message: str | bytes) ‑> None
-
Expand source code
def on_message(self, message: str | bytes) -> None: message = str(message) logger.info(f"Received message: {message}") # When we allow many users to access the front end, only a single user should have access to commands # To facilitate this, very simple authentication is implemented. # Once a sudo_user has been authenticated, only messages sent by them should be processed if self == TornadoWSServer.sudo_user: if message == "deauth": TornadoWSServer.sudo_user = None else: ws_commands_queue.put(message) # If no one has been authenticated as the sudo_user, the only messages that should be processed are those # that try to authenticate elif TornadoWSServer.sudo_user is None: parsed_message = message.split(" ") if len(parsed_message) == 2 and parsed_message[0] == "auth": h = hashlib.sha256() h.update(parsed_message[1].encode()) if h.hexdigest() == TornadoWSServer.pw: TornadoWSServer.sudo_user = self logger.info("Successfully authenticated") else: logger.info("Incorrect password") else: logger.info("Insufficient permissions")
Handle incoming messages on the WebSocket
This method must be overridden.
Changed in version: 4.5
on_message
can be a coroutine. def open(self, *args: Any, **kwargs: Any) ‑> None
-
Expand source code
def open(self, *args: Any, **kwargs: Any) -> None: TornadoWSServer.clients.add(self) self.send_message(self.last_msg_send) logger.info("Client connected")
Invoked when a new WebSocket is opened.
The arguments to
open
are extracted from thetornado.web.URLSpec
regular expression, just like the arguments totornado.web.RequestHandler.get
.open
may be a coroutine.on_message
will not be called untilopen
has returned.Changed in version: 5.1
open
may be a coroutine.
class WebSocketHandler (telemetry_json_output: Queue[Any], ws_commands: Queue[Any])
-
Expand source code
class WebSocketHandler(Process): """Handles starting the websocket server process.""" def __init__(self, telemetry_json_output: Queue[Any], ws_commands: Queue[Any]): super().__init__() global ws_commands_queue self.telemetry_json_output: Queue[Any] = telemetry_json_output ws_commands_queue = ws_commands # Default to test mode # ws_commands_queue.put("serial rn2483_radio connect test") self.start_websocket_server() def start_websocket_server(self) -> None: """Starts up the websocket server.""" wss = tornado.web.Application( [ (r"/websocket", TornadoWSServer), ( r"/(.*)", tornado.web.StaticFileHandler, {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"}, ), ], websocket_ping_interval=5, websocket_ping_timeout=10, ) try: _ = wss.listen(33845) logger.info("HTTP listening on port 33845, accessible at http://localhost:33845") except OSError: logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!") ws_commands_queue.put("shutdown") io_loop = tornado.ioloop.IOLoop.current() periodic_callback = tornado.ioloop.PeriodicCallback( lambda: TornadoWSServer.send_message(self.check_for_messages()), 50 ) periodic_callback.start() io_loop.start() def check_for_messages(self) -> Optional[str]: """Returns any JSON data that may be on the telemetry JSON output queue.""" json_data = None while not self.telemetry_json_output.empty(): json_data = self.telemetry_json_output.get() return json.dumps(json_data)
Handles starting the websocket server process.
Ancestors
- multiprocessing.context.Process
- multiprocessing.process.BaseProcess
Methods
def check_for_messages(self) ‑> str | None
-
Expand source code
def check_for_messages(self) -> Optional[str]: """Returns any JSON data that may be on the telemetry JSON output queue.""" json_data = None while not self.telemetry_json_output.empty(): json_data = self.telemetry_json_output.get() return json.dumps(json_data)
Returns any JSON data that may be on the telemetry JSON output queue.
def start_websocket_server(self) ‑> None
-
Expand source code
def start_websocket_server(self) -> None: """Starts up the websocket server.""" wss = tornado.web.Application( [ (r"/websocket", TornadoWSServer), ( r"/(.*)", tornado.web.StaticFileHandler, {"path": os.path.join(os.getcwd(), "static"), "default_filename": "test.html"}, ), ], websocket_ping_interval=5, websocket_ping_timeout=10, ) try: _ = wss.listen(33845) logger.info("HTTP listening on port 33845, accessible at http://localhost:33845") except OSError: logger.error("Failed to bind to port 33845, ensure there is no other running ground station process!") ws_commands_queue.put("shutdown") io_loop = tornado.ioloop.IOLoop.current() periodic_callback = tornado.ioloop.PeriodicCallback( lambda: TornadoWSServer.send_message(self.check_for_messages()), 50 ) periodic_callback.start() io_loop.start()
Starts up the websocket server.