from dataclasses import dataclass from enum import Enum from typing import Optional MAX_PAYLOAD_LENGTH = 127 @dataclass class PhyHeader: frame_length: int @property def size(self) -> int: return 1 @staticmethod def from_bytes(data: bytes) -> "PhyHeader": return PhyHeader(data[0] & 0x7F) class FrameType(Enum): BEACON = 0b000 DATA = 0b001 ACKNOWLEDGE = 0b010 MAC_COMMAND = 0b011 class AddressingMode(Enum): NOT_SPECIFIED = 0b00 RESERVED = 0b01 SHORT = 0b10 LONG = 0b11 @dataclass class FrameControlField: type: FrameType security_enabled: bool frame_pending: bool ack_requested: bool pan_id_compression: bool destination_addressing_mode: AddressingMode frame_version: int source_addressing_mode: AddressingMode @property def size(self) -> int: return 2 @staticmethod def from_bytes(data: bytes) -> "FrameControlField": frame_type = FrameType((data[0] & 0b1110_0000) >> 5) security_enabled = bool(data[0] & 0b0001_0000 >> 4) frame_pending = bool((data[0] & 0b0000_1000) >> 3) ack_requested = bool((data[0] & 0b0000_0100) >> 2) pan_id_compression = bool((data[0] & 0b0000_0010) >> 1) dst_addressing = AddressingMode((data[1] & 0b0011_0000) >> 4) frame_version = (data[1] & 0b0000_1100) >> 2 src_addressing = AddressingMode((data[1] & 0b0000_0011)) return FrameControlField( frame_type, security_enabled, frame_pending, ack_requested, pan_id_compression, dst_addressing, frame_version, src_addressing ) @dataclass class Address: mac: int pan_id: int is_short_mode: bool = False @property def size(self) -> int: if self.is_short_mode: return 2 + 2 else: return 2 + 8 @staticmethod def from_bytes(data: bytes, addressing_mode: AddressingMode) -> Optional["Address"]: if addressing_mode == AddressingMode.NOT_SPECIFIED: return None pan = int.from_bytes(data[0:2], byteorder="little") if addressing_mode == AddressingMode.SHORT: is_short = True address = int.from_bytes(data[2:4], byteorder="little") else: is_short = False address = int.from_bytes(data[2:10], byteorder="little") return Address( address, pan, is_short ) @dataclass class MacProtocolDataUnit: frame_control_field: FrameControlField sequence_number: int destination_address: Optional[Address] source_address: Optional[Address] payload: bytes frame_checksum: int def payload_length(self, header: PhyHeader) -> int: # Payload length = length - FCF size - sequence nr (2 bytes) - src address size - dst address size # - checksum size (2 bytes) return header.frame_length - self.frame_control_field.size \ - 2 \ - self.source_address.size \ - self.destination_address.size \ - 2 @staticmethod def from_bytes(data: bytes, header: PhyHeader) -> "MacProtocolDataUnit": offset = 0 fcf = FrameControlField.from_bytes(data) offset += fcf.size sequence_nr = int.from_bytes(data[offset:], byteorder="little") offset += 2 dst_address = Address.from_bytes(data[offset:], fcf.destination_addressing_mode) offset += dst_address.size if dst_address else 0 src_address = Address.from_bytes(data[offset:], fcf.source_addressing_mode) offset += src_address.size if src_address else 0 # @todo: payload and CRC parsing. return MacProtocolDataUnit(fcf, sequence_nr, dst_address, src_address, bytes(), 0)