Module modules.telemetry.packet_spec.blocks

Functions

def add_to_dict(into: dict[str, Any], path: list[str], val: Any, strict: bool = False) ‑> None
Expand source code
def add_to_dict(into: dict[str, Any], path: list[str], val: Any, strict: bool = False) -> None:
    """Puts a value in a dictionary, creating new dictionaries as needed and adding the value to a list at the end

    Args:
        into (dict[str, Any]): A nested dictionary with or without the necessary keys
        pos (str): The position to insert the value, where into[pos1][pos2].append(val) translates to "pos1.pos2"
        val (Any): The value to insert into the final position's list
        strict (bool): Create an error if the requested path or list don't exist yet
    """
    last = path.pop()
    for index in path:
        if strict:
            into = into[index]
        else:
            into = into.setdefault(index, {})
    if strict:
        into[last].append(val)
    else:
        into.setdefault(last, []).append(val)

Puts a value in a dictionary, creating new dictionaries as needed and adding the value to a list at the end

Args

into : dict[str, Any]
A nested dictionary with or without the necessary keys
pos : str
The position to insert the value, where into[pos1][pos2].append(val) translates to "pos1.pos2"
val : Any
The value to insert into the final position's list
strict : bool
Create an error if the requested path or list don't exist yet
def convert_timestamp(abs_time: int,
block: Block)
Expand source code
def convert_timestamp(abs_time: int, block: Block):
    """Converts the absolute timestamp to the relative timestamp for the block

    Args:
        abs_time (int): The absolute timestamp
        block (Block): The block to convert the timestamp for, a subclass of TimedBlock
    """
    if isinstance(block, TimedBlock):
        block.measurement_time = abs_time + block.measurement_time

Converts the absolute timestamp to the relative timestamp for the block

Args

abs_time : int
The absolute timestamp
block : Block
The block to convert the timestamp for, a subclass of TimedBlock
def get_block_class(type: BlockType) ‑> type[Block]
Expand source code
def get_block_class(type: BlockType) -> type[Block]:
    """Get the block class associated with this type

    Args:
        type (BlockType): The type to get the block class for

    Raises:
        ValueError: If the block type is not supported

    Returns:
        type[Block]: The class of the block associated with this type
    """
    match type:
        case BlockType.ALTITUDE_ABOVE_LAUNCH_LEVEL:
            return AltitudeAboveLaunchLevel
        case BlockType.ALTITUDE_ABOVE_SEA_LEVEL:
            return AltitudeAboveSeaLevel
        case BlockType.LINEAR_ACCELERATION:
            return LinearAcceleration
        case BlockType.ANGULAR_VELOCITY:
            return AngularVelocity
        case BlockType.COORDINATES:
            return Coordinates
        case BlockType.HUMIDITY:
            return Humidity
        case BlockType.PRESSURE:
            return Pressure
        case BlockType.TEMPERATURE:
            return Temperature
        case BlockType.VOLTAGE:
            return Voltage
        case BlockType.MAGNETIC_FIELD:
            return MagneticField
        case _:
            raise ValueError(f"Unsupported block type: {type}")

Get the block class associated with this type

Args

type : BlockType
The type to get the block class for

Raises

ValueError
If the block type is not supported

Returns

type[Block]
The class of the block associated with this type
def parse_block_contents(packet_header: PacketHeader, block_header: BlockHeader, encoded: bytes) ‑> Block
Expand source code
def parse_block_contents(packet_header: PacketHeader, block_header: BlockHeader, encoded: bytes) -> Block:
    """Parses the block contents and returns an appropriate block

    Args:
        packet_header (PacketHeader): The header of the packet this block was recieved in
        block_header (BlockHeader): The header of the block to parse from the contents
        encoded (bytes): The encoded block body to parse

    Raises:
        InvalidBlockContents: If the block body could not be parsed correctly

    Returns:
        Block: The parsed block, either a Block or one of its subtypes
    """
    try:
        block_class = get_block_class(block_header.type)
        # Leave the constructor up to dataclass
        return block_class(*block_class.decode(packet_header.timestamp, encoded))
    except struct.error as e:
        raise InvalidBlockContents(block_header.type.name, f"bad block contents: {e}")

Parses the block contents and returns an appropriate block

Args

packet_header : PacketHeader
The header of the packet this block was recieved in
block_header : BlockHeader
The header of the block to parse from the contents
encoded : bytes
The encoded block body to parse

Raises

InvalidBlockContents
If the block body could not be parsed correctly

Returns

Block
The parsed block, either a Block or one of its subtypes

Classes

class AltitudeAboveLaunchLevel (measurement_time: int, altitude: int)
Expand source code
@dataclass
class AltitudeAboveLaunchLevel(TimedBlock):
    _struct_format: str = field(default="<hi", init=False, repr=False)
    measurement_time: int
    altitude: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["altitude_launch_level", "mission_time"], self.measurement_time)
        add_to_dict(into, ["altitude_launch_level", "metres"], millimeters_to_meters(self.altitude))

AltitudeAboveLaunchLevel(measurement_time: 'int', altitude: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var altitude : int
var measurement_time : int

Inherited members

class AltitudeAboveSeaLevel (measurement_time: int, altitude: int)
Expand source code
@dataclass
class AltitudeAboveSeaLevel(TimedBlock):
    _struct_format: str = field(default="<hi", init=False, repr=False)
    measurement_time: int
    altitude: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["altitude_sea_level", "mission_time"], self.measurement_time)
        add_to_dict(into, ["altitude_sea_level", "metres"], millimeters_to_meters(self.altitude))

AltitudeAboveSeaLevel(measurement_time: 'int', altitude: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var altitude : int
var measurement_time : int

Inherited members

class AngularVelocity (measurement_time: int, x_axis: int, y_axis: int, z_axis: int)
Expand source code
@dataclass
class AngularVelocity(TimedBlock):
    _struct_format: str = field(default="<hhhh", init=False, repr=False)
    measurement_time: int
    x_axis: int
    y_axis: int
    z_axis: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["angular_velocity", "mission_time"], self.measurement_time)
        add_to_dict(into, ["angular_velocity", "x"], tenthdegrees_to_degrees(self.x_axis))
        add_to_dict(into, ["angular_velocity", "y"], tenthdegrees_to_degrees(self.y_axis))
        add_to_dict(into, ["angular_velocity", "z"], tenthdegrees_to_degrees(self.z_axis))
        add_to_dict(
            into,
            ["angular_velocity", "magnitude"],
            magnitude(
                tenthdegrees_to_degrees(self.x_axis),
                tenthdegrees_to_degrees(self.y_axis),
                tenthdegrees_to_degrees(self.z_axis),
            ),
        )

AngularVelocity(measurement_time: 'int', x_axis: 'int', y_axis: 'int', z_axis: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var measurement_time : int
var x_axis : int
var y_axis : int
var z_axis : int

Inherited members

class Block (*args)
Expand source code
@dataclass
class Block:
    # A format for the struct class to unpack the block from bytes
    _struct_format: str = field(default="", init=False, repr=False)

    @classmethod
    def size(cls) -> int:
        """Use the struct format string to get the length of this block

        Returns:
            int: The length of this block in bytes
        """
        return struct.calcsize(cls._struct_format)

    @classmethod
    def decode(cls, packet_timestamp: int, encoded: bytes) -> tuple[int]:
        """Decode the block from bytes using the struct format string
        Args:
            packet_timestamp int: Number of half minutes since power on
            encoded bytes: Bytes containing block contents

        Returns:
            tuple[int]: The results of the unpacking
        """

        # Measurement time in milliseconds is always first data block attribute, extract it then add
        # packet header timestamp
        attributes = list(struct.unpack(cls._struct_format, encoded))
        attributes[0] = milliseconds_to_seconds(attributes[0]) + (30 * packet_timestamp)
        return (*attributes,)

    def output_formatted(self, into: dict[str, Any]) -> None:
        """Adds a block to a dictionary, formatted to how it should be sent to the websocket

        Args:
            into (dict[str, Any]): A dictionary to add the block to
        """
        pass

    def __init__(self, *args):  # type: ignore
        """Stand-in for dataclass constructors with any number of arguments"""
        pass

Block(*args)

Stand-in for dataclass constructors with any number of arguments

Subclasses

Static methods

def decode(packet_timestamp: int, encoded: bytes) ‑> tuple[int]

Decode the block from bytes using the struct format string

Args

packet_timestamp int: Number of half minutes since power on encoded bytes: Bytes containing block contents

Returns

tuple[int]
The results of the unpacking
def size() ‑> int

Use the struct format string to get the length of this block

Returns

int
The length of this block in bytes

Methods

def output_formatted(self, into: dict[str, Any]) ‑> None
Expand source code
def output_formatted(self, into: dict[str, Any]) -> None:
    """Adds a block to a dictionary, formatted to how it should be sent to the websocket

    Args:
        into (dict[str, Any]): A dictionary to add the block to
    """
    pass

Adds a block to a dictionary, formatted to how it should be sent to the websocket

Args

into : dict[str, Any]
A dictionary to add the block to
class Coordinates (measurement_time: int, latitude: int, longitude: int)
Expand source code
@dataclass
class Coordinates(TimedBlock):
    _struct_format: str = field(default="<hii", init=False, repr=False)
    measurement_time: int
    latitude: int
    longitude: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["gnss", "mission_time"], self.measurement_time)
        add_to_dict(into, ["gnss", "latitude"], microdegrees_to_degrees(self.latitude))
        add_to_dict(into, ["gnss", "longitude"], microdegrees_to_degrees(self.longitude))

Coordinates(measurement_time: 'int', latitude: 'int', longitude: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var latitude : int
var longitude : int
var measurement_time : int

Inherited members

class Humidity (measurement_time: int, humidity: int)
Expand source code
@dataclass
class Humidity(TimedBlock):
    _struct_format: str = field(default="<hI", init=False, repr=False)
    measurement_time: int
    humidity: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["humidity", "mission_time"], self.measurement_time)
        add_to_dict(into, ["humidity", "percentage"], self.humidity)

Humidity(measurement_time: 'int', humidity: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var humidity : int
var measurement_time : int

Inherited members

class InvalidBlockContents (block_type: str, message: str = '')
Expand source code
class InvalidBlockContents(Exception):
    """Exception raised when invalid block contents are encountered"""

    def __init__(self, block_type: str, message: str = ""):
        self.block_type = block_type
        super().__init__(f"Invalid block for {block_type}: {message}")

Exception raised when invalid block contents are encountered

Ancestors

  • builtins.Exception
  • builtins.BaseException
class LinearAcceleration (measurement_time: int, x_axis: int, y_axis: int, z_axis: int)
Expand source code
@dataclass
class LinearAcceleration(TimedBlock):
    _struct_format: str = field(default="<hhhh", init=False, repr=False)
    measurement_time: int
    x_axis: int
    y_axis: int
    z_axis: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["linear_acceleration", "mission_time"], self.measurement_time)
        add_to_dict(into, ["linear_acceleration", "x"], centimeters_to_meters(self.x_axis))
        add_to_dict(into, ["linear_acceleration", "y"], centimeters_to_meters(self.y_axis))
        add_to_dict(into, ["linear_acceleration", "z"], centimeters_to_meters(self.z_axis))
        add_to_dict(
            into,
            ["linear_acceleration", "magnitude"],
            magnitude(
                centimeters_to_meters(self.x_axis),
                centimeters_to_meters(self.y_axis),
                centimeters_to_meters(self.z_axis),
            ),
        )

LinearAcceleration(measurement_time: 'int', x_axis: 'int', y_axis: 'int', z_axis: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var measurement_time : int
var x_axis : int
var y_axis : int
var z_axis : int

Inherited members

class MagneticField (measurement_time: int, x_axis: int, y_axis: int, z_axis: int)
Expand source code
@dataclass
class MagneticField(TimedBlock):
    _struct_format: str = field(default="<hhhh", init=False, repr=False)
    measurement_time: int
    x_axis: int
    y_axis: int
    z_axis: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["magnetic_field", "mission_time"], self.measurement_time)
        add_to_dict(into, ["magnetic_field", "x"], self.x_axis)
        add_to_dict(into, ["magnetic_field", "y"], self.y_axis)
        add_to_dict(into, ["magnetic_field", "z"], self.z_axis)
        add_to_dict(into, ["magnetic_field", "magnitude"], magnitude(self.x_axis, self.y_axis, self.z_axis))

MagneticField(measurement_time: 'int', x_axis: 'int', y_axis: 'int', z_axis: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var measurement_time : int
var x_axis : int
var y_axis : int
var z_axis : int

Inherited members

class Pressure (measurement_time: int, pressure: int)
Expand source code
@dataclass
class Pressure(TimedBlock):
    _struct_format: str = field(default="<hI", init=False, repr=False)
    measurement_time: int
    pressure: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["pressure", "mission_time"], self.measurement_time)
        add_to_dict(into, ["pressure", "pascals"], self.pressure)

Pressure(measurement_time: 'int', pressure: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var measurement_time : int
var pressure : int

Inherited members

class Temperature (measurement_time: int, temperature: int)
Expand source code
@dataclass
class Temperature(TimedBlock):
    _struct_format: str = field(default="<hi", init=False, repr=False)
    measurement_time: int
    temperature: int

    def output_formatted(self, into: dict[str, Any]):
        add_to_dict(into, ["temperature", "mission_time"], self.measurement_time)
        add_to_dict(into, ["temperature", "celsius"], milli_degrees_to_celsius(self.temperature))

Temperature(measurement_time: 'int', temperature: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var measurement_time : int
var temperature : int

Inherited members

class TimedBlock (measurement_time: int)
Expand source code
@dataclass
class TimedBlock(Block):
    measurement_time: int

    def output_formatted(self, into: dict[str, Any]):
        pass

TimedBlock(measurement_time: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Subclasses

Instance variables

var measurement_time : int

Inherited members

class Voltage (measurement_time: int, voltage: int, identifier: int)
Expand source code
@dataclass
class Voltage(TimedBlock):
    _struct_format: str = field(default="<hhB", init=False, repr=False)
    measurement_time: int
    voltage: int
    identifier: int

    def output_formatted(self, into: dict[str, Any]):
        # super().output_formatted(into)
        add_to_dict(into, ["voltage", "mission_time"], self.measurement_time)
        add_to_dict(into, ["voltage", str(self.identifier)], self.voltage)

Voltage(measurement_time: 'int', voltage: 'int', identifier: 'int')

Stand-in for dataclass constructors with any number of arguments

Ancestors

Instance variables

var identifier : int
var measurement_time : int
var voltage : int

Inherited members