Module modules.telemetry.v1.block

Expand source code
# Contains universal block utilities for version 1 of the radio packet format
from dataclasses import dataclass
from enum import IntEnum
from typing import Self, Optional
import struct
import logging

from modules.telemetry.v1.data_block import DataBlockSubtype

MIN_SUPPORTED_VERSION: int = 1
MAX_SUPPORTED_VERSION: int = 1

# Set up logging
logger = logging.getLogger(__name__)


class BlockType(IntEnum):
    """The different radio block types for version 1 of the radio packet format."""

    DATA = 0x0


class DeviceAddress(IntEnum):
    """Lists the different device addresses for version 1 of the radio packet format."""

    GROUND_STATION = 0x0
    ROCKET = 0x1
    RESERVED = 0xFE
    MULTICAST = 0xFF

    def __str__(self):
        match self:
            case DeviceAddress.GROUND_STATION:
                return "GROUND STATION"
            case DeviceAddress.ROCKET:
                return "ROCKET"
            case DeviceAddress.RESERVED:
                return "RESERVED"
            case DeviceAddress.MULTICAST:
                return "MULTICAST"


class UnsupportedEncodingVersionError(Exception):
    """Exception raised when the encoding version is not supported."""

    def __init__(self, version: int):
        self.version = version
        super().__init__(f"Unsupported encoding version: {version}")


class InvalidHeaderFieldValueError(Exception):
    """Exception raised when an invalid header field is encountered."""

    def __init__(self, cls_name: str, val: str, field: str):
        self.val = val
        self.field = field
        super().__init__(f"Invalid {cls_name} field: {val} is not a valid value for {field}")


@dataclass
class PacketHeader:
    """Represents a V1 packet header."""

    callsign: str
    callzone: Optional[str]
    length: int
    version: int
    src_addr: DeviceAddress
    packet_num: int

    @classmethod
    def from_hex(cls, payload: str) -> Self:
        """
        Constructs a new packet header from a hex payload.
        Returns:
            A newly constructed packet header object.
        """
        header = bin(int(payload, 16))[2:]

        # Decodes the call sign/call zone from packet header
        # Rearranges if call zone (W5/VE3LWN) is first
        amateur_radio = bytes.fromhex(payload[:18]).decode("utf-8").strip("\x00").upper()
        ham_call_sign = amateur_radio[:6]
        ham_call_zone = amateur_radio[6:]
        if ham_call_sign.find("/") != -1:
            ham_call_sign = amateur_radio.split("/")[1]
            ham_call_zone = amateur_radio.split("/")[0]

        callsign = ham_call_sign.strip("/")
        callzone = ham_call_zone.strip("/")
        length = (int(header[71:79], 2) + 1) * 4
        version = int(header[79:87], 2)
        try:
            src_addr = DeviceAddress(int(header[87:95], 2))
        except ValueError as e:
            raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

        packet_num = struct.unpack(">I", struct.pack("<I", int(header[95:127], 2)))[0]

        if version < MIN_SUPPORTED_VERSION or version > MAX_SUPPORTED_VERSION:
            raise UnsupportedEncodingVersionError(version)

        return cls(callsign, callzone, length, version, src_addr, packet_num)

    def __len__(self) -> int:
        """
        Returns:
            The length of the packet associated with this packet header in bytes.
        """
        return self.length


@dataclass
class BlockHeader:
    """Represents a V1 header for a telemetry block."""

    length: int
    message_type: int
    message_subtype: int
    destination: DeviceAddress

    @classmethod
    def from_hex(cls, payload: str) -> Self:
        """
        Constructs a block header object from a hex payload.
        Returns:
            A newly constructed block header.
        """

        unpacked_header = struct.unpack("<BBBB", bytes.fromhex(payload))

        length = int(((unpacked_header[0]) + 1) * 4)

        try:
            message_type = BlockType(unpacked_header[1])
            message_subtype = DataBlockSubtype(unpacked_header[2])
            destination = DeviceAddress(unpacked_header[3])
        except ValueError as e:
            raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

        return cls(length, message_type, message_subtype, destination)

    def __len__(self) -> int:
        """
        Returns:
            The length of the block this header is associated with in bytes.
        """
        return self.length

    def __str__(self) -> str:
        """
        Returns a string representation of the block header
        """
        return (
            f"length {self.length}, type {self.message_type}, "
            f"subtype {self.message_subtype}, destination {self.destination}"
        )

Classes

class BlockHeader (length: int, message_type: int, message_subtype: int, destination: DeviceAddress)

Represents a V1 header for a telemetry block.

Expand source code
@dataclass
class BlockHeader:
    """Represents a V1 header for a telemetry block."""

    length: int
    message_type: int
    message_subtype: int
    destination: DeviceAddress

    @classmethod
    def from_hex(cls, payload: str) -> Self:
        """
        Constructs a block header object from a hex payload.
        Returns:
            A newly constructed block header.
        """

        unpacked_header = struct.unpack("<BBBB", bytes.fromhex(payload))

        length = int(((unpacked_header[0]) + 1) * 4)

        try:
            message_type = BlockType(unpacked_header[1])
            message_subtype = DataBlockSubtype(unpacked_header[2])
            destination = DeviceAddress(unpacked_header[3])
        except ValueError as e:
            raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

        return cls(length, message_type, message_subtype, destination)

    def __len__(self) -> int:
        """
        Returns:
            The length of the block this header is associated with in bytes.
        """
        return self.length

    def __str__(self) -> str:
        """
        Returns a string representation of the block header
        """
        return (
            f"length {self.length}, type {self.message_type}, "
            f"subtype {self.message_subtype}, destination {self.destination}"
        )

Class variables

var destinationDeviceAddress
var length : int
var message_subtype : int
var message_type : int

Static methods

def from_hex(payload: str) ‑> Self

Constructs a block header object from a hex payload.

Returns

A newly constructed block header.

Expand source code
@classmethod
def from_hex(cls, payload: str) -> Self:
    """
    Constructs a block header object from a hex payload.
    Returns:
        A newly constructed block header.
    """

    unpacked_header = struct.unpack("<BBBB", bytes.fromhex(payload))

    length = int(((unpacked_header[0]) + 1) * 4)

    try:
        message_type = BlockType(unpacked_header[1])
        message_subtype = DataBlockSubtype(unpacked_header[2])
        destination = DeviceAddress(unpacked_header[3])
    except ValueError as e:
        raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

    return cls(length, message_type, message_subtype, destination)
class BlockType (*args, **kwds)

The different radio block types for version 1 of the radio packet format.

Expand source code
class BlockType(IntEnum):
    """The different radio block types for version 1 of the radio packet format."""

    DATA = 0x0

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var DATA
class DeviceAddress (*args, **kwds)

Lists the different device addresses for version 1 of the radio packet format.

Expand source code
class DeviceAddress(IntEnum):
    """Lists the different device addresses for version 1 of the radio packet format."""

    GROUND_STATION = 0x0
    ROCKET = 0x1
    RESERVED = 0xFE
    MULTICAST = 0xFF

    def __str__(self):
        match self:
            case DeviceAddress.GROUND_STATION:
                return "GROUND STATION"
            case DeviceAddress.ROCKET:
                return "ROCKET"
            case DeviceAddress.RESERVED:
                return "RESERVED"
            case DeviceAddress.MULTICAST:
                return "MULTICAST"

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var GROUND_STATION
var MULTICAST
var RESERVED
var ROCKET
class InvalidHeaderFieldValueError (cls_name: str, val: str, field: str)

Exception raised when an invalid header field is encountered.

Expand source code
class InvalidHeaderFieldValueError(Exception):
    """Exception raised when an invalid header field is encountered."""

    def __init__(self, cls_name: str, val: str, field: str):
        self.val = val
        self.field = field
        super().__init__(f"Invalid {cls_name} field: {val} is not a valid value for {field}")

Ancestors

  • builtins.Exception
  • builtins.BaseException
class PacketHeader (callsign: str, callzone: Optional[str], length: int, version: int, src_addr: DeviceAddress, packet_num: int)

Represents a V1 packet header.

Expand source code
@dataclass
class PacketHeader:
    """Represents a V1 packet header."""

    callsign: str
    callzone: Optional[str]
    length: int
    version: int
    src_addr: DeviceAddress
    packet_num: int

    @classmethod
    def from_hex(cls, payload: str) -> Self:
        """
        Constructs a new packet header from a hex payload.
        Returns:
            A newly constructed packet header object.
        """
        header = bin(int(payload, 16))[2:]

        # Decodes the call sign/call zone from packet header
        # Rearranges if call zone (W5/VE3LWN) is first
        amateur_radio = bytes.fromhex(payload[:18]).decode("utf-8").strip("\x00").upper()
        ham_call_sign = amateur_radio[:6]
        ham_call_zone = amateur_radio[6:]
        if ham_call_sign.find("/") != -1:
            ham_call_sign = amateur_radio.split("/")[1]
            ham_call_zone = amateur_radio.split("/")[0]

        callsign = ham_call_sign.strip("/")
        callzone = ham_call_zone.strip("/")
        length = (int(header[71:79], 2) + 1) * 4
        version = int(header[79:87], 2)
        try:
            src_addr = DeviceAddress(int(header[87:95], 2))
        except ValueError as e:
            raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

        packet_num = struct.unpack(">I", struct.pack("<I", int(header[95:127], 2)))[0]

        if version < MIN_SUPPORTED_VERSION or version > MAX_SUPPORTED_VERSION:
            raise UnsupportedEncodingVersionError(version)

        return cls(callsign, callzone, length, version, src_addr, packet_num)

    def __len__(self) -> int:
        """
        Returns:
            The length of the packet associated with this packet header in bytes.
        """
        return self.length

Class variables

var callsign : str
var callzone : Optional[str]
var length : int
var packet_num : int
var src_addrDeviceAddress
var version : int

Static methods

def from_hex(payload: str) ‑> Self

Constructs a new packet header from a hex payload.

Returns

A newly constructed packet header object.

Expand source code
@classmethod
def from_hex(cls, payload: str) -> Self:
    """
    Constructs a new packet header from a hex payload.
    Returns:
        A newly constructed packet header object.
    """
    header = bin(int(payload, 16))[2:]

    # Decodes the call sign/call zone from packet header
    # Rearranges if call zone (W5/VE3LWN) is first
    amateur_radio = bytes.fromhex(payload[:18]).decode("utf-8").strip("\x00").upper()
    ham_call_sign = amateur_radio[:6]
    ham_call_zone = amateur_radio[6:]
    if ham_call_sign.find("/") != -1:
        ham_call_sign = amateur_radio.split("/")[1]
        ham_call_zone = amateur_radio.split("/")[0]

    callsign = ham_call_sign.strip("/")
    callzone = ham_call_zone.strip("/")
    length = (int(header[71:79], 2) + 1) * 4
    version = int(header[79:87], 2)
    try:
        src_addr = DeviceAddress(int(header[87:95], 2))
    except ValueError as e:
        raise InvalidHeaderFieldValueError(cls.__name__, e.args[0].split()[0], e.args[0].split()[-1])

    packet_num = struct.unpack(">I", struct.pack("<I", int(header[95:127], 2)))[0]

    if version < MIN_SUPPORTED_VERSION or version > MAX_SUPPORTED_VERSION:
        raise UnsupportedEncodingVersionError(version)

    return cls(callsign, callzone, length, version, src_addr, packet_num)
class UnsupportedEncodingVersionError (version: int)

Exception raised when the encoding version is not supported.

Expand source code
class UnsupportedEncodingVersionError(Exception):
    """Exception raised when the encoding version is not supported."""

    def __init__(self, version: int):
        self.version = version
        super().__init__(f"Unsupported encoding version: {version}")

Ancestors

  • builtins.Exception
  • builtins.BaseException