143 lines
3.8 KiB
Python
143 lines
3.8 KiB
Python
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Optional
|
|
|
|
MAX_PAYLOAD_LENGTH = 127
|
|
|
|
|
|
@dataclass
|
|
class PhyHeader:
|
|
frame_length: int
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
return 1
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> "PhyHeader":
|
|
return PhyHeader(data[0] & 0x7F)
|
|
|
|
|
|
class FrameType(Enum):
|
|
BEACON = 0b000
|
|
DATA = 0b001
|
|
ACKNOWLEDGE = 0b010
|
|
MAC_COMMAND = 0b011
|
|
|
|
|
|
class AddressingMode(Enum):
|
|
NOT_SPECIFIED = 0b00
|
|
RESERVED = 0b01
|
|
SHORT = 0b10
|
|
LONG = 0b11
|
|
|
|
|
|
@dataclass
|
|
class FrameControlField:
|
|
type: FrameType
|
|
security_enabled: bool
|
|
frame_pending: bool
|
|
ack_requested: bool
|
|
pan_id_compression: bool
|
|
destination_addressing_mode: AddressingMode
|
|
frame_version: int
|
|
source_addressing_mode: AddressingMode
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
return 2
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes) -> "FrameControlField":
|
|
frame_type = FrameType((data[0] & 0b1110_0000) >> 5)
|
|
security_enabled = bool(data[0] & 0b0001_0000 >> 4)
|
|
frame_pending = bool((data[0] & 0b0000_1000) >> 3)
|
|
ack_requested = bool((data[0] & 0b0000_0100) >> 2)
|
|
pan_id_compression = bool((data[0] & 0b0000_0010) >> 1)
|
|
|
|
dst_addressing = AddressingMode((data[1] & 0b0011_0000) >> 4)
|
|
frame_version = (data[1] & 0b0000_1100) >> 2
|
|
src_addressing = AddressingMode((data[1] & 0b0000_0011))
|
|
|
|
return FrameControlField(
|
|
frame_type,
|
|
security_enabled,
|
|
frame_pending,
|
|
ack_requested,
|
|
pan_id_compression,
|
|
dst_addressing,
|
|
frame_version,
|
|
src_addressing
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class Address:
|
|
mac: int
|
|
pan_id: int
|
|
is_short_mode: bool = False
|
|
|
|
@property
|
|
def size(self) -> int:
|
|
if self.is_short_mode:
|
|
return 2 + 2
|
|
else:
|
|
return 2 + 8
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes, addressing_mode: AddressingMode) -> Optional["Address"]:
|
|
if addressing_mode == AddressingMode.NOT_SPECIFIED:
|
|
return None
|
|
|
|
pan = int.from_bytes(data[0:2], byteorder="little")
|
|
|
|
if addressing_mode == AddressingMode.SHORT:
|
|
is_short = True
|
|
address = int.from_bytes(data[2:4], byteorder="little")
|
|
else:
|
|
is_short = False
|
|
address = int.from_bytes(data[2:10], byteorder="little")
|
|
|
|
return Address(
|
|
address,
|
|
pan,
|
|
is_short
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class MacProtocolDataUnit:
|
|
frame_control_field: FrameControlField
|
|
sequence_number: int
|
|
destination_address: Optional[Address]
|
|
source_address: Optional[Address]
|
|
payload: bytes
|
|
frame_checksum: int
|
|
|
|
def payload_length(self, header: PhyHeader) -> int:
|
|
# Payload length = length - FCF size - sequence nr (2 bytes) - src address size - dst address size
|
|
# - checksum size (2 bytes)
|
|
return header.frame_length - self.frame_control_field.size \
|
|
- 2 \
|
|
- self.source_address.size \
|
|
- self.destination_address.size \
|
|
- 2
|
|
|
|
@staticmethod
|
|
def from_bytes(data: bytes, header: PhyHeader) -> "MacProtocolDataUnit":
|
|
offset = 0
|
|
|
|
fcf = FrameControlField.from_bytes(data)
|
|
offset += fcf.size
|
|
|
|
sequence_nr = int.from_bytes(data[offset:], byteorder="little")
|
|
offset += 2
|
|
|
|
dst_address = Address.from_bytes(data[offset:], fcf.destination_addressing_mode)
|
|
offset += dst_address.size if dst_address else 0
|
|
src_address = Address.from_bytes(data[offset:], fcf.source_addressing_mode)
|
|
offset += src_address.size if src_address else 0
|
|
|
|
# @todo: payload and CRC parsing.
|
|
return MacProtocolDataUnit(fcf, sequence_nr, dst_address, src_address, bytes(), 0)
|