diff --git a/sram_parser/packet_structs.py b/sram_parser/packet_structs.py new file mode 100644 index 0000000..1836665 --- /dev/null +++ b/sram_parser/packet_structs.py @@ -0,0 +1,142 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +MAX_PAYLOAD_LENGTH = 127 + + +@dataclass +class PhyHeader: + frame_length: int + + @property + def size(self) -> int: + return 1 + + @staticmethod + def from_bytes(data: bytes) -> "PhyHeader": + return PhyHeader(data[0] & 0x7F) + + +class FrameType(Enum): + BEACON = 0b000 + DATA = 0b001 + ACKNOWLEDGE = 0b010 + MAC_COMMAND = 0b011 + + +class AddressingMode(Enum): + NOT_SPECIFIED = 0b00 + RESERVED = 0b01 + SHORT = 0b10 + LONG = 0b11 + + +@dataclass +class FrameControlField: + type: FrameType + security_enabled: bool + frame_pending: bool + ack_requested: bool + pan_id_compression: bool + destination_addressing_mode: AddressingMode + frame_version: int + source_addressing_mode: AddressingMode + + @property + def size(self) -> int: + return 2 + + @staticmethod + def from_bytes(data: bytes) -> "FrameControlField": + frame_type = FrameType((data[0] & 0b1110_0000) >> 5) + security_enabled = bool(data[0] & 0b0001_0000 >> 4) + frame_pending = bool((data[0] & 0b0000_1000) >> 3) + ack_requested = bool((data[0] & 0b0000_0100) >> 2) + pan_id_compression = bool((data[0] & 0b0000_0010) >> 1) + + dst_addressing = AddressingMode((data[1] & 0b0011_0000) >> 4) + frame_version = (data[1] & 0b0000_1100) >> 2 + src_addressing = AddressingMode((data[1] & 0b0000_0011)) + + return FrameControlField( + frame_type, + security_enabled, + frame_pending, + ack_requested, + pan_id_compression, + dst_addressing, + frame_version, + src_addressing + ) + + +@dataclass +class Address: + mac: int + pan_id: int + is_short_mode: bool = False + + @property + def size(self) -> int: + if self.is_short_mode: + return 2 + 2 + else: + return 2 + 8 + + @staticmethod + def from_bytes(data: bytes, addressing_mode: AddressingMode) -> Optional["Address"]: + if addressing_mode == AddressingMode.NOT_SPECIFIED: + return None + + pan = int.from_bytes(data[0:2], byteorder="little") + + if addressing_mode == AddressingMode.SHORT: + is_short = True + address = int.from_bytes(data[2:4], byteorder="little") + else: + is_short = False + address = int.from_bytes(data[2:10], byteorder="little") + + return Address( + address, + pan, + is_short + ) + + +@dataclass +class MacProtocolDataUnit: + frame_control_field: FrameControlField + sequence_number: int + destination_address: Optional[Address] + source_address: Optional[Address] + payload: bytes + frame_checksum: int + + def payload_length(self, header: PhyHeader) -> int: + # Payload length = length - FCF size - sequence nr (2 bytes) - src address size - dst address size + # - checksum size (2 bytes) + return header.frame_length - self.frame_control_field.size \ + - 2 \ + - self.source_address.size \ + - self.destination_address.size \ + - 2 + + @staticmethod + def from_bytes(data: bytes, header: PhyHeader) -> "MacProtocolDataUnit": + offset = 0 + + fcf = FrameControlField.from_bytes(data) + offset += fcf.size + + sequence_nr = int.from_bytes(data[offset:], byteorder="little") + offset += 2 + + dst_address = Address.from_bytes(data[offset:], fcf.destination_addressing_mode) + offset += dst_address.size if dst_address else 0 + src_address = Address.from_bytes(data[offset:], fcf.source_addressing_mode) + offset += src_address.size if src_address else 0 + + # @todo: payload and CRC parsing. + return MacProtocolDataUnit(fcf, sequence_nr, dst_address, src_address, bytes(), 0) diff --git a/sram_parser/sram_parser.py b/sram_parser/sram_parser.py new file mode 100644 index 0000000..d66e0bb --- /dev/null +++ b/sram_parser/sram_parser.py @@ -0,0 +1,19 @@ +from packet_structs import * + + +def decode_sram(data: bytes) -> (PhyHeader, MacProtocolDataUnit): + header = PhyHeader.from_bytes(data) + mpdu = MacProtocolDataUnit.from_bytes(data[1:], header) + + return header, mpdu + + +if __name__ == "__main__": + raw_data_str = "230198002300120023000B004C6F6C20746869732069732061206D65737361676500" + raw_data = bytes.fromhex(raw_data_str) + + print(f"Raw:\n\r{raw_data}\n\r") + header, mpdu = decode_sram(raw_data) + + print(f"Header: {header}") + print(f"MPDU: {mpdu}")