| # Copyright 2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # File generated from uci_packets.json, with the command: |
| # ./scripts/generate_python_backend.py --input uci_packets.json |
| # /!\ Do not edit by hand. |
| from dataclasses import dataclass, field, fields |
| from typing import Optional, List, Tuple |
| import enum |
| import inspect |
| import math |
| |
| |
| @dataclass |
| class Packet: |
| payload: Optional[bytes] = field(repr=False) |
| |
| @classmethod |
| def parse_all(cls, span: bytes) -> 'Packet': |
| packet, remain = getattr(cls, 'parse')(span) |
| if len(remain) > 0: |
| raise Exception('Unexpected parsing remainder') |
| return packet |
| |
| def show(self, prefix: str = ''): |
| print(f'{self.__class__.__name__}') |
| |
| def print_val(p: str, pp: str, name: str, align: int, typ, val): |
| if name == 'payload': |
| pass |
| |
| # Scalar fields. |
| elif typ is int: |
| print(f'{p}{name:{align}} = {val} (0x{val:x})') |
| |
| # Byte fields. |
| elif typ is bytes: |
| print(f'{p}{name:{align}} = [', end='') |
| line = '' |
| n_pp = '' |
| for (idx, b) in enumerate(val): |
| if idx > 0 and idx % 8 == 0: |
| print(f'{n_pp}{line}') |
| line = '' |
| n_pp = pp + (' ' * (align + 4)) |
| line += f' {b:02x}' |
| print(f'{n_pp}{line} ]') |
| |
| # Enum fields. |
| elif inspect.isclass(typ) and issubclass(typ, enum.IntEnum): |
| print(f'{p}{name:{align}} = {typ.__name__}::{val.name} (0x{val:x})') |
| |
| # Struct fields. |
| elif inspect.isclass(typ) and issubclass(typ, globals().get('Packet')): |
| print(f'{p}{name:{align}} = ', end='') |
| val.show(prefix=pp) |
| |
| # Array fields. |
| elif typ.__origin__ == list: |
| print(f'{p}{name:{align}}') |
| last = len(val) - 1 |
| align = 5 |
| for (idx, elt) in enumerate(val): |
| n_p = pp + ('├── ' if idx != last else '└── ') |
| n_pp = pp + ('│ ' if idx != last else ' ') |
| print_val(n_p, n_pp, f'[{idx}]', |
| align, typ.__args__[0], val[idx]) |
| |
| else: |
| print(f'{p}{name:{align}} = ##{typ}##') |
| |
| last = len(fields(self)) - 1 |
| align = max(len(f.name) for f in fields(self) if f.name != 'payload') |
| |
| for (idx, f) in enumerate(fields(self)): |
| p = prefix + ('├── ' if idx != last else '└── ') |
| pp = prefix + ('│ ' if idx != last else ' ') |
| val = getattr(self, f.name) |
| |
| print_val(p, pp, f.name, align, f.type, val) |
| |
| |
| class PacketBoundaryFlag(enum.IntEnum): |
| COMPLETE = 0x0 |
| NOT_COMPLETE = 0x1 |
| |
| |
| class GroupId(enum.IntEnum): |
| CORE = 0x0 |
| SESSION_CONFIG = 0x1 |
| RANGING_SESSION_CONTROL = 0x2 |
| DATA_CONTROL = 0x3 |
| TEST = 0xd |
| VENDOR_PICA = 0x9 |
| VENDOR_RESERVED_A = 0xa |
| VENDOR_RESERVED_B = 0xb |
| VENDOR_ANDROID = 0xe |
| VENDOR_RESERVED_E = 0xc |
| VENDOR_RESERVED_F = 0xf |
| |
| |
| class CoreOpCode(enum.IntEnum): |
| CORE_DEVICE_RESET = 0x0 |
| CORE_DEVICE_STATUS_NTF = 0x1 |
| CORE_GET_DEVICE_INFO = 0x2 |
| CORE_GET_CAPS_INFO = 0x3 |
| CORE_SET_CONFIG = 0x4 |
| CORE_GET_CONFIG = 0x5 |
| CORE_DEVICE_SUSPEND = 0x6 |
| CORE_GENERIC_ERROR_NTF = 0x7 |
| |
| |
| class SessionOpCode(enum.IntEnum): |
| SESSION_INIT = 0x0 |
| SESSION_DEINIT = 0x1 |
| SESSION_STATUS_NTF = 0x2 |
| SESSION_SET_APP_CONFIG = 0x3 |
| SESSION_GET_APP_CONFIG = 0x4 |
| SESSION_GET_COUNT = 0x5 |
| SESSION_GET_STATE = 0x6 |
| SESSION_UPDATE_CONTROLLER_MULTICAST_LIST = 0x7 |
| |
| |
| class RangeOpCode(enum.IntEnum): |
| RANGE_START = 0x0 |
| RANGE_STOP = 0x1 |
| RANGE_INTERVAL_UPDATE_REQ = 0x2 |
| RANGE_GET_RANGING_COUNT = 0x3 |
| |
| |
| class AppDataOpCode(enum.IntEnum): |
| APP_DATA_TX = 0x0 |
| APP_DATA_RX = 0x1 |
| |
| |
| class PicaOpCode(enum.IntEnum): |
| PICA_INIT_DEVICE = 0x0 |
| PICA_SET_DEVICE_POSITION = 0x1 |
| PICA_CREATE_BEACON = 0x2 |
| PICA_SET_BEACON_POSITION = 0x3 |
| PICA_DESTROY_BEACON = 0x4 |
| |
| |
| class AndroidOpCode(enum.IntEnum): |
| ANDROID_GET_POWER_STATS = 0x0 |
| ANDROID_SET_COUNTRY_CODE = 0x1 |
| |
| |
| class StatusCode(enum.IntEnum): |
| UCI_STATUS_OK = 0x0 |
| UCI_STATUS_REJECTED = 0x1 |
| UCI_STATUS_FAILED = 0x2 |
| UCI_STATUS_SYNTAX_ERROR = 0x3 |
| UCI_STATUS_INVALID_PARAM = 0x4 |
| UCI_STATUS_INVALID_RANGE = 0x5 |
| UCI_STATUS_INVALID_MSG_SIZE = 0x6 |
| UCI_STATUS_UNKNOWN_GID = 0x7 |
| UCI_STATUS_UNKNOWN_OID = 0x8 |
| UCI_STATUS_READ_ONLY = 0x9 |
| UCI_STATUS_COMMAND_RETRY = 0xa |
| UCI_STATUS_SESSION_NOT_EXIST = 0x11 |
| UCI_STATUS_SESSION_DUPLICATE = 0x12 |
| UCI_STATUS_SESSION_ACTIVE = 0x13 |
| UCI_STATUS_MAX_SESSIONS_EXCEEDED = 0x14 |
| UCI_STATUS_SESSION_NOT_CONFIGURED = 0x15 |
| UCI_STATUS_ACTIVE_SESSION_ONGOING = 0x16 |
| UCI_STATUS_MULTICAST_LIST_FULL = 0x17 |
| UCI_STATUS_ADDRESS_NOT_FOUND = 0x18 |
| UCI_STATUS_ADDRESS_ALREADY_PRESENT = 0x19 |
| UCI_STATUS_RANGING_TX_FAILED = 0x20 |
| UCI_STATUS_RANGING_RX_TIMEOUT = 0x21 |
| UCI_STATUS_RANGING_RX_PHY_DEC_FAILED = 0x22 |
| UCI_STATUS_RANGING_RX_PHY_TOA_FAILED = 0x23 |
| UCI_STATUS_RANGING_RX_PHY_STS_FAILED = 0x24 |
| UCI_STATUS_RANGING_RX_MAC_DEC_FAILED = 0x25 |
| UCI_STATUS_RANGING_RX_MAC_IE_DEC_FAILED = 0x26 |
| UCI_STATUS_RANGING_RX_MAC_IE_MISSING = 0x27 |
| UCI_STATUS_DATA_MAX_TX_PSDU_SIZE_EXCEEDED = 0x30 |
| UCI_STATUS_DATA_RX_CRC_ERROR = 0x31 |
| UCI_STATUS_ERROR_CCC_SE_BUSY = 0x50 |
| UCI_STATUS_ERROR_CCC_LIFECYCLE = 0x51 |
| |
| |
| class ResetConfig(enum.IntEnum): |
| UWBS_RESET = 0x0 |
| |
| |
| class DeviceConfigId(enum.IntEnum): |
| DEVICE_STATE = 0x0 |
| LOW_POWER_MODE = 0x1 |
| |
| |
| class AppConfigTlvType(enum.IntEnum): |
| DEVICE_TYPE = 0x0 |
| RANGING_ROUND_USAGE = 0x1 |
| STS_CONFIG = 0x2 |
| MULTI_NODE_MODE = 0x3 |
| CHANNEL_NUMBER = 0x4 |
| NO_OF_CONTROLEE = 0x5 |
| DEVICE_MAC_ADDRESS = 0x6 |
| DST_MAC_ADDRESS = 0x7 |
| SLOT_DURATION = 0x8 |
| RANGING_INTERVAL = 0x9 |
| STS_INDEX = 0xa |
| MAC_FCS_TYPE = 0xb |
| RANGING_ROUND_CONTROL = 0xc |
| AOA_RESULT_REQ = 0xd |
| RNG_DATA_NTF = 0xe |
| RNG_DATA_NTF_PROXIMITY_NEAR = 0xf |
| RNG_DATA_NTF_PROXIMITY_FAR = 0x10 |
| DEVICE_ROLE = 0x11 |
| RFRAME_CONFIG = 0x12 |
| PREAMBLE_CODE_INDEX = 0x14 |
| SFD_ID = 0x15 |
| PSDU_DATA_RATE = 0x16 |
| PREAMBLE_DURATION = 0x17 |
| RANGING_TIME_STRUCT = 0x1a |
| SLOTS_PER_RR = 0x1b |
| TX_ADAPTIVE_PAYLOAD_POWER = 0x1c |
| RESPONDER_SLOT_INDEX = 0x1e |
| PRF_MODE = 0x1f |
| SCHEDULED_MODE = 0x22 |
| KEY_ROTATION = 0x23 |
| KEY_ROTATION_RATE = 0x24 |
| SESSION_PRIORITY = 0x25 |
| MAC_ADDRESS_MODE = 0x26 |
| VENDOR_ID = 0x27 |
| STATIC_STS_IV = 0x28 |
| NUMBER_OF_STS_SEGMENTS = 0x29 |
| MAX_RR_RETRY = 0x2a |
| UWB_INITIATION_TIME = 0x2b |
| HOPPING_MODE = 0x2c |
| BLOCK_STRIDE_LENGTH = 0x2d |
| RESULT_REPORT_CONFIG = 0x2e |
| IN_BAND_TERMINATION_ATTEMPT_COUNT = 0x2f |
| SUB_SESSION_ID = 0x30 |
| BPRF_PHR_DATA_RATE = 0x31 |
| MAX_NUMBER_OF_MEASUREMENTS = 0x32 |
| STS_LENGTH = 0x35 |
| CCC_HOP_MODE_KEY = 0xa0 |
| CCC_UWB_TIME0 = 0xa1 |
| CCC_RANGING_PROTOCOL_VER = 0xa3 |
| CCC_UWB_CONFIG_ID = 0xa4 |
| CCC_PULSESHAPE_COMBO = 0xa5 |
| CCC_URSK_TTL = 0xa6 |
| NB_OF_RANGE_MEASUREMENTS = 0xe3 |
| NB_OF_AZIMUTH_MEASUREMENTS = 0xe4 |
| NB_OF_ELEVATION_MEASUREMENTS = 0xe5 |
| |
| |
| class CapTlvType(enum.IntEnum): |
| SUPPORTED_FIRA_PHY_VERSION_RANGE = 0x0 |
| SUPPORTED_FIRA_MAC_VERSION_RANGE = 0x1 |
| SUPPORTED_DEVICE_ROLES = 0x2 |
| SUPPORTED_RANGING_METHOD = 0x3 |
| SUPPORTED_STS_CONFIG = 0x4 |
| SUPPORTED_MULTI_NODE_MODES = 0x5 |
| SUPPORTED_RANGING_TIME_STRUCT = 0x6 |
| SUPPORTED_SCHEDULED_MODE = 0x7 |
| SUPPORTED_HOPPING_MODE = 0x8 |
| SUPPORTED_BLOCK_STRIDING = 0x9 |
| SUPPORTED_UWB_INITIATION_TIME = 0xa |
| SUPPORTED_CHANNELS = 0xb |
| SUPPORTED_RFRAME_CONFIG = 0xc |
| SUPPORTED_CC_CONSTRAINT_LENGTH = 0xd |
| SUPPORTED_BPRF_PARAMETER_SETS = 0xe |
| SUPPORTED_HPRF_PARAMETER_SETS = 0xf |
| SUPPORTED_AOA = 0x10 |
| SUPPORTED_EXTENDED_MAC_ADDRESS = 0x11 |
| SUPPORTED_AOA_RESULT_REQ_ANTENNA_INTERLEAVING = 0xe3 |
| SUPPORTED_POWER_STATS = 0xc0 |
| CCC_SUPPORTED_CHAPS_PER_SLOT = 0xa0 |
| CCC_SUPPORTED_SYNC_CODES = 0xa1 |
| CCC_SUPPORTED_HOPPING_CONFIG_MODES_AND_SEQUENCES = 0xa2 |
| CCC_SUPPORTED_CHANNELS = 0xa3 |
| CCC_SUPPORTED_VERSIONS = 0xa4 |
| CCC_SUPPORTED_UWB_CONFIGS = 0xa5 |
| CCC_SUPPORTED_PULSE_SHAPE_COMBOS = 0xa6 |
| CCC_SUPPORTED_RAN_MULTIPLIER = 0xa7 |
| |
| |
| class AoaResultReqType(enum.IntEnum): |
| AOA_DISABLE = 0x0 |
| AOA_ENABLE = 0x1 |
| AOA_ENABLE_AZIMUTH = 0x2 |
| AOA_ENABLE_ELEVATION = 0x3 |
| AOA_ENABLE_INTERLEAVED = 0xf0 |
| |
| |
| class DeviceState(enum.IntEnum): |
| DEVICE_STATE_READY = 0x1 |
| DEVICE_STATE_ACTIVE = 0x2 |
| DEVICE_STATE_ERROR = 0xff |
| |
| |
| class SessionState(enum.IntEnum): |
| SESSION_STATE_INIT = 0x0 |
| SESSION_STATE_DEINIT = 0x1 |
| SESSION_STATE_ACTIVE = 0x2 |
| SESSION_STATE_IDLE = 0x3 |
| |
| |
| class ReasonCode(enum.IntEnum): |
| STATE_CHANGE_WITH_SESSION_MANAGEMENT_COMMANDS = 0x0 |
| MAX_RANGING_ROUND_RETRY_COUNT_REACHED = 0x1 |
| MAX_NUMBER_OF_MEASUREMENTS_REACHED = 0x2 |
| ERROR_SLOT_LENGTH_NOT_SUPPORTED = 0x20 |
| ERROR_INSUFFICIENT_SLOTS_PER_RR = 0x21 |
| ERROR_MAC_ADDRESS_MODE_NOT_SUPPORTED = 0x22 |
| ERROR_INVALID_RANGING_INTERVAL = 0x23 |
| ERROR_INVALID_STS_CONFIG = 0x24 |
| ERROR_INVALID_RFRAME_CONFIG = 0x25 |
| |
| |
| class MulticastUpdateStatusCode(enum.IntEnum): |
| STATUS_OK_MULTICAST_LIST_UPDATE = 0x0 |
| STATUS_ERROR_MULTICAST_LIST_FULL = 0x1 |
| STATUS_ERROR_KEY_FETCH_FAIL = 0x2 |
| STATUS_ERROR_SUB_SESSION_ID_NOT_FOUND = 0x3 |
| |
| |
| class MacAddressIndicator(enum.IntEnum): |
| SHORT_ADDRESS = 0x0 |
| EXTENDED_ADDRESS = 0x1 |
| |
| |
| class SessionType(enum.IntEnum): |
| FIRA_RANGING_SESSION = 0x0 |
| FIRA_DATA_TRANSFER = 0x1 |
| CCC = 0xa0 |
| |
| |
| class MessageType(enum.IntEnum): |
| COMMAND = 0x1 |
| RESPONSE = 0x2 |
| NOTIFICATION = 0x3 |
| |
| |
| @dataclass |
| class UciPacket(Packet): |
| group_id: GroupId |
| packet_boundary_flag: PacketBoundaryFlag |
| message_type: MessageType |
| opcode: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['UciPacket', bytes]: |
| fields = {'payload': None} |
| if len(span) < 4: |
| raise Exception('Invalid packet size') |
| fields['group_id'] = GroupId((span[0] >> 0) & 0xf) |
| fields['packet_boundary_flag'] = PacketBoundaryFlag( |
| (span[0] >> 4) & 0x1) |
| fields['message_type'] = MessageType((span[0] >> 5) & 0x7) |
| fields['opcode'] = (span[1] >> 0) & 0x3f |
| _payload__size = span[3] |
| span = span[4:] |
| if len(span) < _payload__size: |
| raise Exception('Invalid packet size') |
| payload = span[:_payload__size] |
| fields['payload'] = payload |
| if fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0: |
| return DeviceResetCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2: |
| return GetDeviceInfoCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3: |
| return GetCapsInfoCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4: |
| return SetConfigCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5: |
| return GetConfigCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0: |
| return SessionInitCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1: |
| return SessionDeinitCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3: |
| return SessionSetAppConfigCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4: |
| return SessionGetAppConfigCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5: |
| return SessionGetCountCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6: |
| return SessionGetStateCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL: |
| return RangingCommand.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0: |
| return PicaInitDeviceCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1: |
| return PicaSetDevicePositionCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2: |
| return PicaCreateBeaconCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4: |
| return PicaDestroyBeaconCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.COMMAND and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeCmd.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0: |
| return DeviceResetRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2: |
| return GetDeviceInfoRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3: |
| return GetCapsInfoRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4: |
| return SetConfigRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5: |
| return GetConfigRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0: |
| return SessionInitRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1: |
| return SessionDeinitRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3: |
| return SessionSetAppConfigRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4: |
| return SessionGetAppConfigRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5: |
| return SessionGetCountRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6: |
| return SessionGetStateRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0: |
| return RangeStartRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x1: |
| return RangeStopRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x3: |
| return RangeGetRangingCountRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0: |
| return PicaInitDeviceRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1: |
| return PicaSetDevicePositionRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2: |
| return PicaCreateBeaconRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4: |
| return PicaDestroyBeaconRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.RESPONSE and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeRsp.parse(fields, payload) |
| elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x1: |
| return DeviceStatusNtf.parse(fields, payload) |
| elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x7: |
| return GenericError.parse(fields, payload) |
| elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x2: |
| return SessionStatusNtf.parse(fields, payload) |
| elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListNtf.parse(fields, payload) |
| elif fields['message_type'] == MessageType.NOTIFICATION and fields['packet_boundary_flag'] == PacketBoundaryFlag.COMPLETE and fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0: |
| return RangeDataNtf.parse(fields, payload) |
| else: |
| return UciPacket(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciCommand(UciPacket): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciCommand', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0: |
| return DeviceResetCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2: |
| return GetDeviceInfoCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3: |
| return GetCapsInfoCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4: |
| return SetConfigCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5: |
| return GetConfigCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0: |
| return SessionInitCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1: |
| return SessionDeinitCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3: |
| return SessionSetAppConfigCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4: |
| return SessionGetAppConfigCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5: |
| return SessionGetCountCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6: |
| return SessionGetStateCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL: |
| return RangingCommand.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0: |
| return PicaInitDeviceCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1: |
| return PicaSetDevicePositionCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2: |
| return PicaCreateBeaconCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4: |
| return PicaDestroyBeaconCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsCmd.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeCmd.parse(fields, payload) |
| else: |
| return UciCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciResponse(UciPacket): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x0: |
| return DeviceResetRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x2: |
| return GetDeviceInfoRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x3: |
| return GetCapsInfoRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x4: |
| return SetConfigRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x5: |
| return GetConfigRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x0: |
| return SessionInitRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x1: |
| return SessionDeinitRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x3: |
| return SessionSetAppConfigRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x4: |
| return SessionGetAppConfigRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x5: |
| return SessionGetCountRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x6: |
| return SessionGetStateRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0: |
| return RangeStartRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x1: |
| return RangeStopRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x3: |
| return RangeGetRangingCountRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x0: |
| return PicaInitDeviceRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x1: |
| return PicaSetDevicePositionRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x2: |
| return PicaCreateBeaconRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_PICA and fields['opcode'] == 0x4: |
| return PicaDestroyBeaconRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsRsp.parse(fields, payload) |
| elif fields['group_id'] == GroupId.VENDOR_ANDROID and fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeRsp.parse(fields, payload) |
| else: |
| return UciResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciNotification(UciPacket): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciNotification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x1: |
| return DeviceStatusNtf.parse(fields, payload) |
| elif fields['group_id'] == GroupId.CORE and fields['opcode'] == 0x7: |
| return GenericError.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x2: |
| return SessionStatusNtf.parse(fields, payload) |
| elif fields['group_id'] == GroupId.SESSION_CONFIG and fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListNtf.parse(fields, payload) |
| elif fields['group_id'] == GroupId.RANGING_SESSION_CONTROL and fields['opcode'] == 0x0: |
| return RangeDataNtf.parse(fields, payload) |
| else: |
| return UciNotification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class CoreCommand(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['CoreCommand', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return DeviceResetCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x2: |
| return GetDeviceInfoCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return GetCapsInfoCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return SetConfigCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x5: |
| return GetConfigCmd.parse(fields, payload) |
| else: |
| return CoreCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class CoreResponse(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['CoreResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return DeviceResetRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x2: |
| return GetDeviceInfoRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return GetCapsInfoRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return SetConfigRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x5: |
| return GetConfigRsp.parse(fields, payload) |
| else: |
| return CoreResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class CoreNotification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['CoreNotification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x1: |
| return DeviceStatusNtf.parse(fields, payload) |
| elif fields['opcode'] == 0x7: |
| return GenericError.parse(fields, payload) |
| else: |
| return CoreNotification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionCommand(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionCommand', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return SessionInitCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return SessionDeinitCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return SessionSetAppConfigCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return SessionGetAppConfigCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x5: |
| return SessionGetCountCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x6: |
| return SessionGetStateCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListCmd.parse(fields, payload) |
| else: |
| return SessionCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionResponse(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return SessionInitRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return SessionDeinitRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return SessionSetAppConfigRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return SessionGetAppConfigRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x5: |
| return SessionGetCountRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x6: |
| return SessionGetStateRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListRsp.parse(fields, payload) |
| else: |
| return SessionResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionNotification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionNotification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x2: |
| return SessionStatusNtf.parse(fields, payload) |
| elif fields['opcode'] == 0x7: |
| return SessionUpdateControllerMulticastListNtf.parse(fields, payload) |
| else: |
| return SessionNotification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangingCommand(UciCommand): |
| session_id: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangingCommand', bytes]: |
| if len(span) < 4: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| span = span[4:] |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return RangeStartCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return RangeStopCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return RangeGetRangingCountCmd.parse(fields, payload) |
| else: |
| return RangingCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangingResponse(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangingResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return RangeStartRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return RangeStopRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return RangeGetRangingCountRsp.parse(fields, payload) |
| else: |
| return RangingResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangingNotification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangingNotification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return RangeDataNtf.parse(fields, payload) |
| else: |
| return RangingNotification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaCommand(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaCommand', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return PicaInitDeviceCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return PicaSetDevicePositionCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x2: |
| return PicaCreateBeaconCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return PicaDestroyBeaconCmd.parse(fields, payload) |
| else: |
| return PicaCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaResponse(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return PicaInitDeviceRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return PicaSetDevicePositionRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x2: |
| return PicaCreateBeaconRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x3: |
| return PicaSetBeaconPositionRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x4: |
| return PicaDestroyBeaconRsp.parse(fields, payload) |
| else: |
| return PicaResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidCommand(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidCommand', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsCmd.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeCmd.parse(fields, payload) |
| else: |
| return AndroidCommand(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidResponse(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidResponse', bytes]: |
| payload = span |
| fields['payload'] = payload |
| if fields['opcode'] == 0x0: |
| return AndroidGetPowerStatsRsp.parse(fields, payload) |
| elif fields['opcode'] == 0x1: |
| return AndroidSetCountryCodeRsp.parse(fields, payload) |
| else: |
| return AndroidResponse(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidNotification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidNotification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return AndroidNotification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class DeviceResetCmd(CoreCommand): |
| reset_config: ResetConfig |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['DeviceResetCmd', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['reset_config'] = ResetConfig(span[0]) |
| span = span[1:] |
| return DeviceResetCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class DeviceResetRsp(CoreResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['DeviceResetRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return DeviceResetRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class DeviceStatusNtf(CoreNotification): |
| device_state: DeviceState |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['DeviceStatusNtf', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['device_state'] = DeviceState(span[0]) |
| span = span[1:] |
| return DeviceStatusNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetDeviceInfoCmd(CoreCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoCmd', bytes]: |
| return GetDeviceInfoCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetDeviceInfoRsp(CoreResponse): |
| status: StatusCode |
| uci_version: int |
| mac_version: int |
| phy_version: int |
| uci_test_version: int |
| vendor_spec_info: bytes |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetDeviceInfoRsp', bytes]: |
| if len(span) < 10: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| value_ = int.from_bytes(span[1:3], byteorder='little') |
| fields['uci_version'] = value_ |
| value_ = int.from_bytes(span[3:5], byteorder='little') |
| fields['mac_version'] = value_ |
| value_ = int.from_bytes(span[5:7], byteorder='little') |
| fields['phy_version'] = value_ |
| value_ = int.from_bytes(span[7:9], byteorder='little') |
| fields['uci_test_version'] = value_ |
| vendor_spec_info_count = span[9] |
| span = span[10:] |
| if len(span) < vendor_spec_info_count: |
| raise Exception('Invalid packet size') |
| vendor_spec_info = [] |
| for n in range(vendor_spec_info_count): |
| vendor_spec_info.append(int.from_bytes( |
| span[n:n + 1], byteorder='little')) |
| fields['vendor_spec_info'] = vendor_spec_info |
| span = span[vendor_spec_info_count:] |
| return GetDeviceInfoRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetCapsInfoCmd(CoreCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoCmd', bytes]: |
| return GetCapsInfoCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class CapTlv(Packet): |
| t: CapTlvType |
| v: bytes |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['CapTlv', bytes]: |
| fields = {'payload': None} |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['t'] = CapTlvType(span[0]) |
| v_count = span[1] |
| span = span[2:] |
| if len(span) < v_count: |
| raise Exception('Invalid packet size') |
| v = [] |
| for n in range(v_count): |
| v.append(int.from_bytes(span[n:n + 1], byteorder='little')) |
| fields['v'] = v |
| span = span[v_count:] |
| return CapTlv(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetCapsInfoRsp(CoreResponse): |
| status: StatusCode |
| tlvs: List[CapTlv] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetCapsInfoRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| tlvs_count = span[1] |
| span = span[2:] |
| tlvs = [] |
| for n in range(tlvs_count): |
| element, span = CapTlv.parse(span) |
| tlvs.append(element) |
| fields['tlvs'] = tlvs |
| return GetCapsInfoRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class DeviceParameter(Packet): |
| id: int |
| value: bytes |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['DeviceParameter', bytes]: |
| fields = {'payload': None} |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['id'] = span[0] |
| value_count = span[1] |
| span = span[2:] |
| if len(span) < value_count: |
| raise Exception('Invalid packet size') |
| value = [] |
| for n in range(value_count): |
| value.append(int.from_bytes(span[n:n + 1], byteorder='little')) |
| fields['value'] = value |
| span = span[value_count:] |
| return DeviceParameter(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SetConfigCmd(CoreCommand): |
| parameters: List[DeviceParameter] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SetConfigCmd', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| parameters_count = span[0] |
| span = span[1:] |
| parameters = [] |
| for n in range(parameters_count): |
| element, span = DeviceParameter.parse(span) |
| parameters.append(element) |
| fields['parameters'] = parameters |
| return SetConfigCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class DeviceConfigStatus(Packet): |
| parameter_id: int |
| status: StatusCode |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['DeviceConfigStatus', bytes]: |
| fields = {'payload': None} |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['parameter_id'] = span[0] |
| fields['status'] = StatusCode(span[1]) |
| span = span[2:] |
| return DeviceConfigStatus(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SetConfigRsp(CoreResponse): |
| status: StatusCode |
| parameters: List[DeviceConfigStatus] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SetConfigRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| parameters_count = span[1] |
| span = span[2:] |
| if len(span) < parameters_count * 2: |
| raise Exception('Invalid packet size') |
| parameters = [] |
| for n in range(parameters_count): |
| parameters.append(DeviceConfigStatus.parse_all( |
| span[n * 2:(n + 1) * 2])) |
| fields['parameters'] = parameters |
| span = span[parameters_count * 2:] |
| return SetConfigRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetConfigCmd(CoreCommand): |
| parameter_ids: bytes |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetConfigCmd', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| parameter_ids_count = span[0] |
| span = span[1:] |
| if len(span) < parameter_ids_count: |
| raise Exception('Invalid packet size') |
| parameter_ids = [] |
| for n in range(parameter_ids_count): |
| parameter_ids.append(int.from_bytes( |
| span[n:n + 1], byteorder='little')) |
| fields['parameter_ids'] = parameter_ids |
| span = span[parameter_ids_count:] |
| return GetConfigCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GetConfigRsp(CoreResponse): |
| status: StatusCode |
| parameters: List[DeviceParameter] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GetConfigRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| parameters_count = span[1] |
| span = span[2:] |
| parameters = [] |
| for n in range(parameters_count): |
| element, span = DeviceParameter.parse(span) |
| parameters.append(element) |
| fields['parameters'] = parameters |
| return GetConfigRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class GenericError(CoreNotification): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['GenericError', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return GenericError(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionInitCmd(SessionCommand): |
| session_id: int |
| session_type: SessionType |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionInitCmd', bytes]: |
| if len(span) < 5: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| fields['session_type'] = SessionType(span[4]) |
| span = span[5:] |
| return SessionInitCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionInitRsp(SessionResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionInitRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return SessionInitRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionDeinitCmd(SessionCommand): |
| session_id: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitCmd', bytes]: |
| if len(span) < 4: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| span = span[4:] |
| return SessionDeinitCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionDeinitRsp(SessionResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionDeinitRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return SessionDeinitRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionStatusNtf(SessionNotification): |
| session_id: int |
| session_state: SessionState |
| reason_code: ReasonCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionStatusNtf', bytes]: |
| if len(span) < 6: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| fields['session_state'] = SessionState(span[4]) |
| fields['reason_code'] = ReasonCode(span[5]) |
| span = span[6:] |
| return SessionStatusNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AppConfigParameter(Packet): |
| id: int |
| value: bytes |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['AppConfigParameter', bytes]: |
| fields = {'payload': None} |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['id'] = span[0] |
| value_count = span[1] |
| span = span[2:] |
| if len(span) < value_count: |
| raise Exception('Invalid packet size') |
| value = [] |
| for n in range(value_count): |
| value.append(int.from_bytes(span[n:n + 1], byteorder='little')) |
| fields['value'] = value |
| span = span[value_count:] |
| return AppConfigParameter(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionSetAppConfigCmd(SessionCommand): |
| session_id: int |
| parameters: List[AppConfigParameter] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigCmd', bytes]: |
| if len(span) < 5: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| parameters_count = span[4] |
| span = span[5:] |
| parameters = [] |
| for n in range(parameters_count): |
| element, span = AppConfigParameter.parse(span) |
| parameters.append(element) |
| fields['parameters'] = parameters |
| return SessionSetAppConfigCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AppConfigStatus(Packet): |
| config_id: int |
| status: StatusCode |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['AppConfigStatus', bytes]: |
| fields = {'payload': None} |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['config_id'] = span[0] |
| fields['status'] = StatusCode(span[1]) |
| span = span[2:] |
| return AppConfigStatus(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionSetAppConfigRsp(SessionResponse): |
| status: StatusCode |
| parameters: List[AppConfigStatus] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionSetAppConfigRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| parameters_count = span[1] |
| span = span[2:] |
| if len(span) < parameters_count * 2: |
| raise Exception('Invalid packet size') |
| parameters = [] |
| for n in range(parameters_count): |
| parameters.append(AppConfigStatus.parse_all( |
| span[n * 2:(n + 1) * 2])) |
| fields['parameters'] = parameters |
| span = span[parameters_count * 2:] |
| return SessionSetAppConfigRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetAppConfigCmd(SessionCommand): |
| session_id: int |
| parameters: bytes |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigCmd', bytes]: |
| if len(span) < 5: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| parameters_count = span[4] |
| span = span[5:] |
| if len(span) < parameters_count: |
| raise Exception('Invalid packet size') |
| parameters = [] |
| for n in range(parameters_count): |
| parameters.append(int.from_bytes( |
| span[n:n + 1], byteorder='little')) |
| fields['parameters'] = parameters |
| span = span[parameters_count:] |
| return SessionGetAppConfigCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetAppConfigRsp(SessionResponse): |
| status: StatusCode |
| parameters: List[AppConfigParameter] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetAppConfigRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| parameters_count = span[1] |
| span = span[2:] |
| parameters = [] |
| for n in range(parameters_count): |
| element, span = AppConfigParameter.parse(span) |
| parameters.append(element) |
| fields['parameters'] = parameters |
| return SessionGetAppConfigRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetCountCmd(SessionCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountCmd', bytes]: |
| return SessionGetCountCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetCountRsp(SessionResponse): |
| status: StatusCode |
| session_count: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetCountRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| fields['session_count'] = span[1] |
| span = span[2:] |
| return SessionGetCountRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetStateCmd(SessionCommand): |
| session_id: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateCmd', bytes]: |
| if len(span) < 4: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| span = span[4:] |
| return SessionGetStateCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionGetStateRsp(SessionResponse): |
| status: StatusCode |
| session_state: SessionState |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionGetStateRsp', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| fields['session_state'] = SessionState(span[1]) |
| span = span[2:] |
| return SessionGetStateRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class Controlee(Packet): |
| short_address: int |
| subsession_id: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['Controlee', bytes]: |
| fields = {'payload': None} |
| if len(span) < 6: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:2], byteorder='little') |
| fields['short_address'] = value_ |
| value_ = int.from_bytes(span[2:6], byteorder='little') |
| fields['subsession_id'] = value_ |
| span = span[6:] |
| return Controlee(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionUpdateControllerMulticastListCmd(SessionCommand): |
| session_id: int |
| action: int |
| controlees: List[Controlee] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListCmd', bytes]: |
| if len(span) < 6: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| fields['action'] = span[4] |
| controlees_count = span[5] |
| span = span[6:] |
| if len(span) < controlees_count * 6: |
| raise Exception('Invalid packet size') |
| controlees = [] |
| for n in range(controlees_count): |
| controlees.append(Controlee.parse_all(span[n * 6:(n + 1) * 6])) |
| fields['controlees'] = controlees |
| span = span[controlees_count * 6:] |
| return SessionUpdateControllerMulticastListCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionUpdateControllerMulticastListRsp(SessionResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return SessionUpdateControllerMulticastListRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class ControleeStatus(Packet): |
| mac_address: int |
| subsession_id: int |
| status: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['ControleeStatus', bytes]: |
| fields = {'payload': None} |
| if len(span) < 7: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:2], byteorder='little') |
| fields['mac_address'] = value_ |
| value_ = int.from_bytes(span[2:6], byteorder='little') |
| fields['subsession_id'] = value_ |
| fields['status'] = span[6] |
| span = span[7:] |
| return ControleeStatus(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class SessionUpdateControllerMulticastListNtf(SessionNotification): |
| session_id: int |
| remaining_multicast_list_size: int |
| controlee_status: List[ControleeStatus] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['SessionUpdateControllerMulticastListNtf', bytes]: |
| if len(span) < 6: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['session_id'] = value_ |
| fields['remaining_multicast_list_size'] = span[4] |
| controlee_status_count = span[5] |
| span = span[6:] |
| if len(span) < controlee_status_count * 7: |
| raise Exception('Invalid packet size') |
| controlee_status = [] |
| for n in range(controlee_status_count): |
| controlee_status.append( |
| ControleeStatus.parse_all(span[n * 7:(n + 1) * 7])) |
| fields['controlee_status'] = controlee_status |
| span = span[controlee_status_count * 7:] |
| return SessionUpdateControllerMulticastListNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeStartCmd(RangingCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeStartCmd', bytes]: |
| return RangeStartCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeStartRsp(RangingResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeStartRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return RangeStartRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class ShortAddressTwoWayRangingMeasurement(Packet): |
| mac_address: int |
| status: StatusCode |
| nlos: int |
| distance: int |
| aoa_azimuth: int |
| aoa_azimuth_fom: int |
| aoa_elevation: int |
| aoa_elevation_fom: int |
| aoa_destination_azimuth: int |
| aoa_destination_azimuth_fom: int |
| aoa_destination_elevation: int |
| aoa_destination_elevation_fom: int |
| slot_index: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['ShortAddressTwoWayRangingMeasurement', bytes]: |
| fields = {'payload': None} |
| if len(span) < 31: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:2], byteorder='little') |
| fields['mac_address'] = value_ |
| fields['status'] = StatusCode(span[2]) |
| fields['nlos'] = span[3] |
| value_ = int.from_bytes(span[4:6], byteorder='little') |
| fields['distance'] = value_ |
| value_ = int.from_bytes(span[6:8], byteorder='little') |
| fields['aoa_azimuth'] = value_ |
| fields['aoa_azimuth_fom'] = span[8] |
| value_ = int.from_bytes(span[9:11], byteorder='little') |
| fields['aoa_elevation'] = value_ |
| fields['aoa_elevation_fom'] = span[11] |
| value_ = int.from_bytes(span[12:14], byteorder='little') |
| fields['aoa_destination_azimuth'] = value_ |
| fields['aoa_destination_azimuth_fom'] = span[14] |
| value_ = int.from_bytes(span[15:17], byteorder='little') |
| fields['aoa_destination_elevation'] = value_ |
| fields['aoa_destination_elevation_fom'] = span[17] |
| fields['slot_index'] = span[18] |
| value_ = int.from_bytes(span[19:31], byteorder='little') |
| span = span[31:] |
| return ShortAddressTwoWayRangingMeasurement(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class ExtendedAddressTwoWayRangingMeasurement(Packet): |
| mac_address: int |
| status: StatusCode |
| nlos: int |
| distance: int |
| aoa_azimuth: int |
| aoa_azimuth_fom: int |
| aoa_elevation: int |
| aoa_elevation_fom: int |
| aoa_destination_azimuth: int |
| aoa_destination_azimuth_fom: int |
| aoa_destination_elevation: int |
| aoa_destination_elevation_fom: int |
| slot_index: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['ExtendedAddressTwoWayRangingMeasurement', bytes]: |
| fields = {'payload': None} |
| if len(span) < 31: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:8], byteorder='little') |
| fields['mac_address'] = value_ |
| fields['status'] = StatusCode(span[8]) |
| fields['nlos'] = span[9] |
| value_ = int.from_bytes(span[10:12], byteorder='little') |
| fields['distance'] = value_ |
| value_ = int.from_bytes(span[12:14], byteorder='little') |
| fields['aoa_azimuth'] = value_ |
| fields['aoa_azimuth_fom'] = span[14] |
| value_ = int.from_bytes(span[15:17], byteorder='little') |
| fields['aoa_elevation'] = value_ |
| fields['aoa_elevation_fom'] = span[17] |
| value_ = int.from_bytes(span[18:20], byteorder='little') |
| fields['aoa_destination_azimuth'] = value_ |
| fields['aoa_destination_azimuth_fom'] = span[20] |
| value_ = int.from_bytes(span[21:23], byteorder='little') |
| fields['aoa_destination_elevation'] = value_ |
| fields['aoa_destination_elevation_fom'] = span[23] |
| fields['slot_index'] = span[24] |
| value_ = int.from_bytes(span[25:31], byteorder='little') |
| span = span[31:] |
| return ExtendedAddressTwoWayRangingMeasurement(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| class RangingMeasurementType(enum.IntEnum): |
| ONE_WAY = 0x0 |
| TWO_WAY = 0x1 |
| |
| |
| @dataclass |
| class RangeDataNtf(RangingNotification): |
| sequence_number: int |
| session_id: int |
| rcr_indicator: int |
| current_ranging_interval: int |
| ranging_measurement_type: RangingMeasurementType |
| mac_address_indicator: MacAddressIndicator |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeDataNtf', bytes]: |
| if len(span) < 24: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:4], byteorder='little') |
| fields['sequence_number'] = value_ |
| value_ = int.from_bytes(span[4:8], byteorder='little') |
| fields['session_id'] = value_ |
| fields['rcr_indicator'] = span[8] |
| value_ = int.from_bytes(span[9:13], byteorder='little') |
| fields['current_ranging_interval'] = value_ |
| fields['ranging_measurement_type'] = RangingMeasurementType(span[13]) |
| fields['mac_address_indicator'] = MacAddressIndicator(span[15]) |
| value_ = int.from_bytes(span[16:24], byteorder='little') |
| span = span[24:] |
| payload = span |
| fields['payload'] = payload |
| if fields['ranging_measurement_type'] == RangingMeasurementType.TWO_WAY and fields['mac_address_indicator'] == MacAddressIndicator.SHORT_ADDRESS: |
| return ShortMacTwoWayRangeDataNtf.parse(fields, payload) |
| elif fields['ranging_measurement_type'] == RangingMeasurementType.TWO_WAY and fields['mac_address_indicator'] == MacAddressIndicator.EXTENDED_ADDRESS: |
| return ExtendedMacTwoWayRangeDataNtf.parse(fields, payload) |
| else: |
| return RangeDataNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class ShortMacTwoWayRangeDataNtf(RangeDataNtf): |
| two_way_ranging_measurements: List[ShortAddressTwoWayRangingMeasurement] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['ShortMacTwoWayRangeDataNtf', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| two_way_ranging_measurements_count = span[0] |
| span = span[1:] |
| if len(span) < two_way_ranging_measurements_count * 31: |
| raise Exception('Invalid packet size') |
| two_way_ranging_measurements = [] |
| for n in range(two_way_ranging_measurements_count): |
| two_way_ranging_measurements.append( |
| ShortAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31])) |
| fields['two_way_ranging_measurements'] = two_way_ranging_measurements |
| span = span[two_way_ranging_measurements_count * 31:] |
| return ShortMacTwoWayRangeDataNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class ExtendedMacTwoWayRangeDataNtf(RangeDataNtf): |
| two_way_ranging_measurements: List[ExtendedAddressTwoWayRangingMeasurement] |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['ExtendedMacTwoWayRangeDataNtf', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| two_way_ranging_measurements_count = span[0] |
| span = span[1:] |
| if len(span) < two_way_ranging_measurements_count * 31: |
| raise Exception('Invalid packet size') |
| two_way_ranging_measurements = [] |
| for n in range(two_way_ranging_measurements_count): |
| two_way_ranging_measurements.append( |
| ExtendedAddressTwoWayRangingMeasurement.parse_all(span[n * 31:(n + 1) * 31])) |
| fields['two_way_ranging_measurements'] = two_way_ranging_measurements |
| span = span[two_way_ranging_measurements_count * 31:] |
| return ExtendedMacTwoWayRangeDataNtf(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeStopCmd(RangingCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeStopCmd', bytes]: |
| return RangeStopCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeStopRsp(RangingResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeStopRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return RangeStopRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeGetRangingCountCmd(RangingCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeGetRangingCountCmd', bytes]: |
| return RangeGetRangingCountCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class RangeGetRangingCountRsp(RangingResponse): |
| status: StatusCode |
| count: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['RangeGetRangingCountRsp', bytes]: |
| if len(span) < 5: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| value_ = int.from_bytes(span[1:5], byteorder='little') |
| fields['count'] = value_ |
| span = span[5:] |
| return RangeGetRangingCountRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaPosition(Packet): |
| x: int |
| y: int |
| z: int |
| yaw: int |
| pitch: int |
| roll: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['PicaPosition', bytes]: |
| fields = {'payload': None} |
| if len(span) < 11: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:2], byteorder='little') |
| fields['x'] = value_ |
| value_ = int.from_bytes(span[2:4], byteorder='little') |
| fields['y'] = value_ |
| value_ = int.from_bytes(span[4:6], byteorder='little') |
| fields['z'] = value_ |
| value_ = int.from_bytes(span[6:8], byteorder='little') |
| fields['yaw'] = value_ |
| fields['pitch'] = span[8] |
| value_ = int.from_bytes(span[9:11], byteorder='little') |
| fields['roll'] = value_ |
| span = span[11:] |
| return PicaPosition(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaInitDeviceCmd(PicaCommand): |
| mac_address: int |
| position: PicaPosition |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaInitDeviceCmd', bytes]: |
| if len(span) < 19: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:8], byteorder='little') |
| fields['mac_address'] = value_ |
| fields['position'] = PicaPosition.parse_all(span[8:19]) |
| span = span[19:] |
| return PicaInitDeviceCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaInitDeviceRsp(PicaResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaInitDeviceRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return PicaInitDeviceRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaSetDevicePositionCmd(PicaCommand): |
| position: PicaPosition |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaSetDevicePositionCmd', bytes]: |
| if len(span) < 11: |
| raise Exception('Invalid packet size') |
| fields['position'] = PicaPosition.parse_all(span[0:11]) |
| span = span[11:] |
| return PicaSetDevicePositionCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaSetDevicePositionRsp(PicaResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaSetDevicePositionRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return PicaSetDevicePositionRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaCreateBeaconCmd(PicaCommand): |
| mac_address: int |
| position: PicaPosition |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaCreateBeaconCmd', bytes]: |
| if len(span) < 19: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:8], byteorder='little') |
| fields['mac_address'] = value_ |
| fields['position'] = PicaPosition.parse_all(span[8:19]) |
| span = span[19:] |
| return PicaCreateBeaconCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaCreateBeaconRsp(PicaResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaCreateBeaconRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return PicaCreateBeaconRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaSetBeaconPositionCmd(PicaCommand): |
| mac_address: int |
| position: PicaPosition |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaSetBeaconPositionCmd', bytes]: |
| if len(span) < 19: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:8], byteorder='little') |
| fields['mac_address'] = value_ |
| fields['position'] = PicaPosition.parse_all(span[8:19]) |
| span = span[19:] |
| return PicaSetBeaconPositionCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaSetBeaconPositionRsp(PicaResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaSetBeaconPositionRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return PicaSetBeaconPositionRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaDestroyBeaconCmd(PicaCommand): |
| mac_address: int |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaDestroyBeaconCmd', bytes]: |
| if len(span) < 8: |
| raise Exception('Invalid packet size') |
| value_ = int.from_bytes(span[0:8], byteorder='little') |
| fields['mac_address'] = value_ |
| span = span[8:] |
| return PicaDestroyBeaconCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PicaDestroyBeaconRsp(PicaResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['PicaDestroyBeaconRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return PicaDestroyBeaconRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidGetPowerStatsCmd(AndroidCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsCmd', bytes]: |
| return AndroidGetPowerStatsCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class PowerStats(Packet): |
| status: StatusCode |
| idle_time_ms: int |
| tx_time_ms: int |
| rx_time_ms: int |
| total_wake_count: int |
| |
| @staticmethod |
| def parse(span: bytes) -> Tuple['PowerStats', bytes]: |
| fields = {'payload': None} |
| if len(span) < 17: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| value_ = int.from_bytes(span[1:5], byteorder='little') |
| fields['idle_time_ms'] = value_ |
| value_ = int.from_bytes(span[5:9], byteorder='little') |
| fields['tx_time_ms'] = value_ |
| value_ = int.from_bytes(span[9:13], byteorder='little') |
| fields['rx_time_ms'] = value_ |
| value_ = int.from_bytes(span[13:17], byteorder='little') |
| fields['total_wake_count'] = value_ |
| span = span[17:] |
| return PowerStats(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidGetPowerStatsRsp(AndroidResponse): |
| stats: PowerStats |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidGetPowerStatsRsp', bytes]: |
| if len(span) < 17: |
| raise Exception('Invalid packet size') |
| fields['stats'] = PowerStats.parse_all(span[0:17]) |
| span = span[17:] |
| return AndroidGetPowerStatsRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidSetCountryCodeCmd(AndroidCommand): |
| country_code: bytes |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeCmd', bytes]: |
| if len(span) < 2: |
| raise Exception('Invalid packet size') |
| country_code = [] |
| for n in range(2): |
| country_code.append(int.from_bytes( |
| span[n:n + 1], byteorder='little')) |
| fields['country_code'] = country_code |
| span = span[2:] |
| return AndroidSetCountryCodeCmd(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class AndroidSetCountryCodeRsp(AndroidResponse): |
| status: StatusCode |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['AndroidSetCountryCodeRsp', bytes]: |
| if len(span) < 1: |
| raise Exception('Invalid packet size') |
| fields['status'] = StatusCode(span[0]) |
| span = span[1:] |
| return AndroidSetCountryCodeRsp(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_A_Command(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Command', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_A_Command(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_B_Command(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Command', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_B_Command(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_E_Command(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Command', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_E_Command(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_F_Command(UciCommand): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Command', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_F_Command(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_A_Response(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Response', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_A_Response(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_B_Response(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Response', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_B_Response(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_E_Response(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Response', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_E_Response(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_F_Response(UciResponse): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Response', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_F_Response(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_A_Notification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_A_Notification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_A_Notification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_B_Notification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_B_Notification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_B_Notification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_E_Notification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_E_Notification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_E_Notification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |
| |
| |
| @dataclass |
| class UciVendor_F_Notification(UciNotification): |
| |
| @staticmethod |
| def parse(fields: dict, span: bytes) -> Tuple['UciVendor_F_Notification', bytes]: |
| payload = span |
| fields['payload'] = payload |
| return UciVendor_F_Notification(**fields), span |
| |
| def serialize(self) -> bytes: |
| pass |