Initial version
This commit is contained in:
commit
b3bdd11acb
8 changed files with 623 additions and 0 deletions
21
LICENSE
Normal file
21
LICENSE
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2024 Scott Shawcroft
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
299
circuitmatter/__init__.py
Normal file
299
circuitmatter/__init__.py
Normal file
|
|
@ -0,0 +1,299 @@
|
|||
"""Pure Python implementation of the Matter IOT protocol."""
|
||||
|
||||
import enum
|
||||
import math
|
||||
import subprocess
|
||||
import socket
|
||||
import struct
|
||||
|
||||
from . import tlv
|
||||
|
||||
from typing import Optional, Type, Any
|
||||
|
||||
__version__ = "0.0.0"
|
||||
|
||||
# descriminator = 3840
|
||||
# avahi = subprocess.Popen(["avahi-publish-service", "-v", f"--subtype=_L{descriminator}._sub._matterc._udp", "--subtype=_CM._sub._matterc._udp", "FA93546B21F5FB54", "_matterc._udp", "5540", "PI=", "PH=33", "CM=1", f"D={descriminator}", "CRI=3000", "CRA=4000", "T=1", "VP=65521+32769"])
|
||||
|
||||
# # Define the UDP IP address and port
|
||||
# UDP_IP = "::" # Listen on all available network interfaces
|
||||
# UDP_PORT = 5540
|
||||
|
||||
# # Create the UDP socket
|
||||
# sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
||||
|
||||
# # Bind the socket to the IP and port
|
||||
# sock.bind((UDP_IP, UDP_PORT))
|
||||
|
||||
# print(f"Listening on UDP port {UDP_PORT}")
|
||||
|
||||
unsecured_session_context = {
|
||||
|
||||
}
|
||||
|
||||
class ProtocolId(enum.Enum):
|
||||
SECURE_CHANNEL = 0
|
||||
INTERACTION_MODEL = 1
|
||||
BDX = 2
|
||||
USER_DIRECTED_COMMISSIONING = 3
|
||||
FOR_TESTING = 4
|
||||
|
||||
class SecurityFlags(enum.Flag):
|
||||
P = 1 << 7
|
||||
C = 1 << 6
|
||||
MX = 1 << 5
|
||||
|
||||
class ExchangeFlags(enum.Flag):
|
||||
V = 1 << 4
|
||||
SX = 1 << 3
|
||||
R = 1 << 2
|
||||
A = 1 << 1
|
||||
I = 1 << 0
|
||||
|
||||
class SecureProtocolOpcode(enum.Enum):
|
||||
MSG_COUNTER_SYNC_REQ = 0x00
|
||||
"""The Message Counter Synchronization Request message queries the current message counter from a peer to bootstrap replay protection."""
|
||||
|
||||
MSG_COUNTER_SYNC_RSP = 0x01
|
||||
"""The Message Counter Synchronization Response message provides the current message counter from a peer to bootstrap replay protection."""
|
||||
|
||||
MRP_STANDALONE_ACK = 0x10
|
||||
"""This message is dedicated for the purpose of sending a stand-alone acknowledgement when there is no other data message available to piggyback an acknowledgement on top of."""
|
||||
|
||||
PBKDF_PARAM_REQUEST = 0x20
|
||||
"""The request for PBKDF parameters necessary to complete the PASE protocol."""
|
||||
|
||||
PBKDF_PARAM_RESPONSE = 0x21
|
||||
"""The PBKDF parameters sent in response to PBKDF-ParamRequest during the PASE protocol."""
|
||||
|
||||
PASE_PAKE1 = 0x22
|
||||
"""The first PAKE message of the PASE protocol."""
|
||||
|
||||
PASE_PAKE2 = 0x23
|
||||
"""The second PAKE message of the PASE protocol."""
|
||||
|
||||
PASE_PAKE3 = 0x24
|
||||
"""The third PAKE message of the PASE protocol."""
|
||||
|
||||
CASE_SIGMA1 = 0x30
|
||||
"""The first message of the CASE protocol."""
|
||||
|
||||
CASE_SIGMA2 = 0x31
|
||||
"""The second message of the CASE protocol."""
|
||||
|
||||
CASE_SIGMA3 = 0x32
|
||||
"""The third message of the CASE protocol."""
|
||||
|
||||
CASE_SIGMA2_RESUME = 0x33
|
||||
"""The second resumption message of the CASE protocol."""
|
||||
|
||||
STATUS_REPORT = 0x40
|
||||
"""The Status Report message encodes the result of an operation in the Secure Channel as well as other protocols."""
|
||||
|
||||
ICD_CHECK_IN = 0x50
|
||||
"""The Check-in message notifies a client that the ICD is available for communication."""
|
||||
|
||||
PROTOCOL_OPCODES = {
|
||||
ProtocolId.SECURE_CHANNEL: SecureProtocolOpcode,
|
||||
}
|
||||
|
||||
|
||||
# session-parameter-struct => STRUCTURE [ tag-order ]
|
||||
# {
|
||||
# SESSION_IDLE_INTERVAL
|
||||
# [1, optional] : UNSIGNED INTEGER [ range 32-bits ],
|
||||
# SESSION_ACTIVE_INTERVAL
|
||||
# [2, optional] : UNSIGNED INTEGER [ range 32-bits ],
|
||||
# SESSION_ACTIVE_THRESHOLD
|
||||
# [3, optional] : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# DATA_MODEL_REVISION
|
||||
# [4]
|
||||
# : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# INTERACTION_MODEL_REVISION [5]
|
||||
# : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# SPECIFICATION_VERSION
|
||||
# [6]
|
||||
# : UNSIGNED INTEGER [ range 32-bits ],
|
||||
# MAX_PATHS_PER_INVOKE
|
||||
# [7]
|
||||
# : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# }
|
||||
class SessionParameterStruct(tlv.TLVStructure):
|
||||
session_idle_interval = tlv.NumberMember(1, "<I", optional=True)
|
||||
session_active_interval = tlv.NumberMember(2, "<I", optional=True)
|
||||
session_active_threshold = tlv.NumberMember(3, "<H", optional=True)
|
||||
data_model_revision = tlv.NumberMember(4, "<H")
|
||||
interaction_model_revision = tlv.NumberMember(5, "<H")
|
||||
specification_version = tlv.NumberMember(6, "<I")
|
||||
max_paths_per_invoke = tlv.NumberMember(7, "<H")
|
||||
|
||||
# pbkdfparamreq-struct => STRUCTURE [ tag-order ]
|
||||
# {
|
||||
# initiatorRandom
|
||||
# [1] : OCTET STRING [ length 32 ],
|
||||
# initiatorSessionId
|
||||
# [2] : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# passcodeId
|
||||
# [3] : UNSIGNED INTEGER [ length 16-bits ],
|
||||
# hasPBKDFParameters
|
||||
# [4] : BOOLEAN,
|
||||
# initiatorSessionParams [5, optional] : session-parameter-struct
|
||||
# }
|
||||
class PBKDFParamRequest(tlv.TLVStructure):
|
||||
initiatorRandom = tlv.OctetStringMember(1, 32)
|
||||
initiatorSessionId = tlv.NumberMember(2, "<H")
|
||||
passcodeId = tlv.NumberMember(3, "<H")
|
||||
hasPBKDFParameters = tlv.BoolMember(4)
|
||||
initiatorSessionParams = tlv.StructMember(5, SessionParameterStruct, optional=True)
|
||||
|
||||
# Crypto_PBKDFParameterSet => STRUCTURE [ tag-order ]
|
||||
# {
|
||||
# iterations [1] : UNSIGNED INTEGER [ range 32-bits ],
|
||||
# salt [2] : OCTET STRING [ length 16..32 ],
|
||||
# }
|
||||
class Crypto_PBKDFParameterSet(tlv.TLVStructure):
|
||||
iterations = tlv.NumberMember(1, "<I")
|
||||
salt = tlv.OctetStringMember(2, 32)
|
||||
|
||||
# pbkdfparamresp-struct => STRUCTURE [ tag-order ]
|
||||
# {
|
||||
# initiatorRandom
|
||||
# [1] : OCTET STRING [ length 32 ],
|
||||
# responderRandom
|
||||
# [2] : OCTET STRING [ length 32 ],
|
||||
# responderSessionId
|
||||
# [3] : UNSIGNED INTEGER [ range 16-bits ],
|
||||
# pbkdf_parameters
|
||||
# [4] : Crypto_PBKDFParameterSet,
|
||||
# responderSessionParams [5, optional] : session-parameter-struct
|
||||
# }
|
||||
class PBKDFParamResponse(tlv.TLVStructure):
|
||||
initiatorRandom = tlv.OctetStringMember(1, 32)
|
||||
responderRandom = tlv.OctetStringMember(2, 32)
|
||||
responderSessionId = tlv.NumberMember(3, "<H")
|
||||
pbkdf_parameters = tlv.StructMember(4, Crypto_PBKDFParameterSet)
|
||||
responderSessionParams = tlv.StructMember(5, SessionParameterStruct, optional=True)
|
||||
|
||||
# while True:
|
||||
# # Receive data from the socket (1280 is the minimum ipv6 MTU and the max UDP matter packet size.)
|
||||
# data, addr = sock.recvfrom(1280)
|
||||
data = b'\x04\x00\x00\x00\x0b\x06\xb7\t)\xad\x07\xd9\xae\xa1\xee\xa0\x05 j\x15\x00\x00\x150\x01 \x97\x064#\x1c\xd1E7H\x0b|\xc2G\xa7\xc38\xe9\xce3\x11\xb2@M\x86\xd7\xb5{)\xaa`\xddb%\x02\xc2\x86$\x03\x00(\x045\x05%\x01\xf4\x01%\x02,\x01%\x03\xa0\x0f$\x04\x11$\x05\x0b&\x06\x00\x00\x03\x01$\x07\x01\x18\x18'
|
||||
addr = None
|
||||
|
||||
import pathlib
|
||||
|
||||
import json
|
||||
# pathlib.Path("data.bin").write_bytes(data)
|
||||
|
||||
bookmarks = []
|
||||
|
||||
def add_bookmark(start, length, name, color=0x0000ff):
|
||||
bookmarks.append({
|
||||
"color": 0x4f000000 | color,
|
||||
"comment": "\n",
|
||||
"id": len(bookmarks),
|
||||
"locked": True,
|
||||
"name": name,
|
||||
"region": {
|
||||
"address": start,
|
||||
"size": length
|
||||
}
|
||||
})
|
||||
# Write every time in case we crash
|
||||
pathlib.Path("parsed.hexbm").write_text(json.dumps({"bookmarks": bookmarks}))
|
||||
|
||||
def run():
|
||||
# Print the received data and the address of the sender
|
||||
print(f"Received packet from {addr}: {data}")
|
||||
print(f"Data length: {len(data)} bytes")
|
||||
flags, session_id, security_flags, message_counter = struct.unpack_from("<BHBI", data)
|
||||
add_bookmark(0, 8, "Header")
|
||||
print(f"Flags: {flags:x} Session ID: {session_id:x} Security Flags: {SecurityFlags(security_flags)} Message Counter: {message_counter}")
|
||||
offset = 8
|
||||
if flags & (1 << 2):
|
||||
source_node_id = struct.unpack_from("<Q", data, 8)[0]
|
||||
add_bookmark(8, 8, "Source Node ID")
|
||||
print(source_node_id)
|
||||
offset += 8
|
||||
print(f"DSIZ {flags & (0x3)}")
|
||||
if (flags >> 4) != 0:
|
||||
print("Incorrect version")
|
||||
# continue
|
||||
secure_session = security_flags & 0x3 != 0 or session_id != 0
|
||||
|
||||
if not secure_session:
|
||||
print("Unsecured session")
|
||||
print(data[offset:offset+8])
|
||||
decrypted_message = memoryview(data)[offset:]
|
||||
|
||||
context = {"role": "responder", "node_id": source_node_id}
|
||||
unsecured_session_context[source_node_id] = context
|
||||
|
||||
exchange_flags, protocol_opcode, exchange_id = struct.unpack_from("<BBH", decrypted_message)
|
||||
add_bookmark(offset, 4, "Protocol header")
|
||||
exchange_flags = ExchangeFlags(exchange_flags)
|
||||
print(f"Exchange Flags: {exchange_flags} Exchange ID: {exchange_id}")
|
||||
decrypted_offset = 4
|
||||
protocol_vendor_id = 0
|
||||
if exchange_flags & ExchangeFlags.V:
|
||||
protocol_vendor_id = struct.unpack_from("<H", decrypted_message, decrypted_offset)[0]
|
||||
add_bookmark(offset + decrypted_offset, 2, "Protocol Vendor ID")
|
||||
decrypted_offset += 2
|
||||
protocol_id = struct.unpack_from("<H", decrypted_message, decrypted_offset)[0]
|
||||
add_bookmark(offset + decrypted_offset, 2, "Protocol ID")
|
||||
decrypted_offset += 2
|
||||
protocol_id = ProtocolId(protocol_id)
|
||||
protocol_opcode = PROTOCOL_OPCODES[protocol_id](protocol_opcode)
|
||||
print(f"Protocol Vendor ID: {protocol_vendor_id} Protocol ID: {protocol_id} Protocol Opcode: {protocol_opcode}")
|
||||
|
||||
acknowledged_message_counter = None
|
||||
if exchange_flags & ExchangeFlags.A:
|
||||
acknowledged_message_counter = struct.unpack_from("<I", decrypted_message, decrypted_offset)[0]
|
||||
decrypted_offset += 4
|
||||
print(f"Acknowledged Message Counter: {acknowledged_message_counter}")
|
||||
|
||||
if protocol_id == ProtocolId.SECURE_CHANNEL:
|
||||
if protocol_opcode == SecureProtocolOpcode.MSG_COUNTER_SYNC_REQ:
|
||||
print("Received Message Counter Synchronization Request")
|
||||
response = struct.pack("<BHBI", 0, 0, 0, 0)
|
||||
sock.sendto(response, addr)
|
||||
print(f"Sent Message Counter Synchronization Response to {addr}")
|
||||
elif protocol_opcode == SecureProtocolOpcode.MSG_COUNTER_SYNC_RSP:
|
||||
print("Received Message Counter Synchronization Response")
|
||||
elif protocol_opcode == SecureProtocolOpcode.PBKDF_PARAM_REQUEST:
|
||||
print("Received PBKDF Parameter Request")
|
||||
request = PBKDFParamRequest(decrypted_message[decrypted_offset+1:])
|
||||
response = PBKDFParamResponse()
|
||||
response.initiatorRandom = request.initiatorRandom
|
||||
response.responderRandom = b"\x00" * 32
|
||||
response.responderSessionId = 0
|
||||
params = response.pbkdf_parameters
|
||||
params.iterations = 1000
|
||||
params.salt = b"\x00" * 32
|
||||
print(response)
|
||||
|
||||
elif protocol_opcode == SecureProtocolOpcode.PBKDF_PARAM_RESPONSE:
|
||||
print("Received PBKDF Parameter Response")
|
||||
elif protocol_opcode == SecureProtocolOpcode.PASE_PAKE1:
|
||||
print("Received PASE PAKE1")
|
||||
elif protocol_opcode == SecureProtocolOpcode.PASE_PAKE2:
|
||||
print("Received PASE PAKE2")
|
||||
elif protocol_opcode == SecureProtocolOpcode.PASE_PAKE3:
|
||||
print("Received PASE PAKE3")
|
||||
elif protocol_opcode == SecureProtocolOpcode.CASE_SIGMA1:
|
||||
print("Received CASE Sigma1")
|
||||
elif protocol_opcode == SecureProtocolOpcode.CASE_SIGMA2:
|
||||
print("Received CASE Sigma2")
|
||||
elif protocol_opcode == SecureProtocolOpcode.CASE_SIGMA3:
|
||||
print("Received CASE Sigma3")
|
||||
elif protocol_opcode == SecureProtocolOpcode.CASE_SIGMA2_RESUME:
|
||||
print("Received CASE Sigma2 Resume")
|
||||
elif protocol_opcode == SecureProtocolOpcode.STATUS_REPORT:
|
||||
print("Received Status Report")
|
||||
elif protocol_opcode == SecureProtocolOpcode.ICD_CHECK_IN:
|
||||
print("Received ICD Check-in")
|
||||
|
||||
# avahi.kill()
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
BIN
circuitmatter/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
circuitmatter/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
circuitmatter/__pycache__/tlv.cpython-312.pyc
Normal file
BIN
circuitmatter/__pycache__/tlv.cpython-312.pyc
Normal file
Binary file not shown.
225
circuitmatter/tlv.py
Normal file
225
circuitmatter/tlv.py
Normal file
|
|
@ -0,0 +1,225 @@
|
|||
import enum
|
||||
from typing import Optional, Type, Any
|
||||
import struct
|
||||
|
||||
# As a byte string to save space.
|
||||
TAG_LENGTH = b"\x00\x01\x02\x04\x02\x04\x06\x08"
|
||||
INT_SIZE = "BHIQ"
|
||||
|
||||
class ElementType(enum.IntEnum):
|
||||
STRUCTURE = 0b10101
|
||||
ARRAY = 0b10110
|
||||
LIST = 0b10111
|
||||
END_OF_CONTAINER = 0b11000
|
||||
|
||||
class TLVStructure:
|
||||
def __init__(self, buffer = None):
|
||||
self.buffer: memoryview = buffer
|
||||
# These three dicts are keyed by tag.
|
||||
self.tag_value_offset = {}
|
||||
self.tag_value_length = {}
|
||||
self.cached_values = {}
|
||||
self._offset = 0 # Stopped at the next control octet
|
||||
|
||||
def __str__(self):
|
||||
members = []
|
||||
for field in vars(type(self)):
|
||||
descriptor_class = vars(type(self))[field]
|
||||
if field.startswith("_") or not isinstance(descriptor_class, Member):
|
||||
continue
|
||||
print(field)
|
||||
value = descriptor_class.print(self)
|
||||
if isinstance(descriptor_class, StructMember):
|
||||
value = value.replace("\n", "\n ")
|
||||
members.append(f"{field} = {value}")
|
||||
return "{\n " + ",\n ".join(members) + "\n}"
|
||||
|
||||
def scan_until(self, tag):
|
||||
print(bytes(self.buffer[self._offset:]))
|
||||
print(f"Looking for {tag}")
|
||||
while self._offset < len(self.buffer):
|
||||
control_octet = self.buffer[self._offset]
|
||||
tag_control = control_octet >> 5
|
||||
element_type = control_octet & 0x1F
|
||||
print(f"Control 0x{control_octet:x} tag_control {tag_control} element_type {element_type}")
|
||||
|
||||
this_tag = None
|
||||
if tag_control == 0: # Anonymous
|
||||
this_tag = None
|
||||
elif tag_control == 1: # Context specific
|
||||
print("context specific tag")
|
||||
this_tag = self.buffer[self._offset + 1]
|
||||
else:
|
||||
vendor_id = None
|
||||
profile_number = None
|
||||
if tag_control >= 6: # Fully qualified
|
||||
vendor_id, profile_number = struct.unpack_from("<HH", self.buffer, self._offset + 1)
|
||||
|
||||
if tag_control in (0b010, 0b011):
|
||||
raise NotImplementedError("Common profile tag")
|
||||
|
||||
if tag_control == 7: # 4 octet tag number
|
||||
tag_number = struct.unpack_from("<I", self.buffer, self._offset + 5)[0]
|
||||
else:
|
||||
tag_number = struct.unpack_from("<H", self.buffer, self._offset + 5)[0]
|
||||
if vendor_id:
|
||||
this_tag = (vendor_id, profile_number, tag_number)
|
||||
else:
|
||||
this_tag = tag_number
|
||||
print(f"found tag {this_tag}")
|
||||
|
||||
length_offset = self._offset + 1 + TAG_LENGTH[tag_control]
|
||||
element_category = element_type >> 2
|
||||
if element_category == 0 or element_category == 1: # ints
|
||||
value_offset = length_offset
|
||||
value_length = 1 << (element_type & 0x3)
|
||||
elif element_category == 2: # Bool or float
|
||||
if element_type & 0x3 <= 1:
|
||||
value_offset = self._offset
|
||||
value_length = 1
|
||||
else: # Float
|
||||
value_offset = length_offset
|
||||
value_length = 4 << (element_type & 0x1)
|
||||
elif element_category == 3 or element_category == 4: # UTF-8 String or Octet String
|
||||
power_of_two = (element_type & 0x3)
|
||||
length_length = 1 << power_of_two
|
||||
value_offset = length_offset + length_length
|
||||
value_length = struct.unpack_from(INT_SIZE[power_of_two], self.buffer, length_offset)[0]
|
||||
elif element_type == 0b10100: # Null
|
||||
value_offset = self._offset
|
||||
value_length = 1
|
||||
else: # Container
|
||||
value_offset = length_offset
|
||||
value_length = 1
|
||||
nesting = 0
|
||||
print("in container")
|
||||
while self.buffer[value_offset + value_length] != ElementType.END_OF_CONTAINER or nesting > 0:
|
||||
octet = self.buffer[value_offset + value_length]
|
||||
if octet == ElementType.END_OF_CONTAINER:
|
||||
nesting -= 1
|
||||
print(nesting)
|
||||
elif (octet & 0x1f) in (ElementType.STRUCTURE, ElementType.ARRAY, ElementType.LIST):
|
||||
nesting += 1
|
||||
print(nesting)
|
||||
value_length += 1
|
||||
print(f"new length {value_length} {self.buffer[value_offset + value_length]:02x}")
|
||||
print(f"container length {value_length}")
|
||||
|
||||
self.tag_value_offset[this_tag] = value_offset
|
||||
self.tag_value_length[this_tag] = value_length
|
||||
|
||||
# A few values are encoded in the control byte. Move our offset past
|
||||
# the tag where the length would be in that case.
|
||||
if self._offset == value_offset:
|
||||
self._offset = length_offset
|
||||
else:
|
||||
self._offset = value_offset + value_length
|
||||
|
||||
if tag == this_tag:
|
||||
break
|
||||
|
||||
class Member:
|
||||
def __init__(self, tag, optional=False):
|
||||
self.tag = tag
|
||||
self.optional = optional
|
||||
|
||||
def __set__(self, obj: TLVStructure, value: Any) -> None:
|
||||
obj.cached_values[self.tag] = value
|
||||
|
||||
class NumberMember(Member):
|
||||
def __init__(self, tag, _format, optional=False):
|
||||
self.format = _format
|
||||
super().__init__(tag, optional)
|
||||
|
||||
def __get__(
|
||||
self,
|
||||
obj: Optional[TLVStructure],
|
||||
objtype: Optional[Type[TLVStructure]] = None,
|
||||
) -> Any:
|
||||
if self.tag in obj.cached_values:
|
||||
return obj.cached_values[self.tag]
|
||||
if self.tag not in obj.tag_value_offset:
|
||||
obj.scan_until(self.tag)
|
||||
|
||||
print(self.tag, obj.tag_value_length)
|
||||
encoded_format = INT_SIZE[int(math.log(obj.tag_value_length[self.tag], 2))]
|
||||
if self.format.islower():
|
||||
encoded_format = encoded_format.lower()
|
||||
|
||||
value = struct.unpack_from(encoded_format, obj.buffer, offset=obj.tag_value_offset[self.tag])[0]
|
||||
obj.cached_values[self.tag] = value
|
||||
return value
|
||||
|
||||
|
||||
def print(self, obj):
|
||||
value = self.__get__(obj)
|
||||
unsigned = "U" if self.format.isupper() else ""
|
||||
return f"{value}{unsigned}"
|
||||
|
||||
class BoolMember(Member):
|
||||
def __get__(
|
||||
self,
|
||||
obj: Optional[TLVStructure],
|
||||
objtype: Optional[Type[TLVStructure]] = None,
|
||||
) -> bool:
|
||||
if self.tag in obj.cached_values:
|
||||
return obj.cached_values[self.tag]
|
||||
if self.tag not in obj.tag_value_offset:
|
||||
obj.scan_until(self.tag)
|
||||
|
||||
octet = obj.buffer[obj.tag_value_offset[self.tag]]
|
||||
|
||||
value = octet & 1 == 1
|
||||
obj.cached_values[self.tag] = value
|
||||
return value
|
||||
|
||||
def print(self, obj):
|
||||
if self.__get__(obj):
|
||||
return "true"
|
||||
return "false"
|
||||
|
||||
class OctetStringMember(Member):
|
||||
def __init__(self, tag, max_length, optional=False):
|
||||
self.max_length = max_length
|
||||
super().__init__(tag, optional)
|
||||
|
||||
def __get__(
|
||||
self,
|
||||
obj: Optional[TLVStructure],
|
||||
objtype: Optional[Type[TLVStructure]] = None,
|
||||
) -> memoryview:
|
||||
if self.tag not in obj.tag_value_offset:
|
||||
obj.scan_until(self.tag)
|
||||
|
||||
offset = obj.tag_value_offset[self.tag]
|
||||
length = obj.tag_value_length[self.tag]
|
||||
return obj.buffer[offset:offset + length]
|
||||
|
||||
def print(self, obj):
|
||||
value = self.__get__(obj)
|
||||
return " ".join((f"{byte:02x}" for byte in value))
|
||||
|
||||
class StructMember(Member):
|
||||
def __init__(self, tag, substruct_class, optional=False):
|
||||
self.substruct_class = substruct_class
|
||||
super().__init__(tag, optional)
|
||||
|
||||
def __get__(
|
||||
self,
|
||||
obj: Optional[TLVStructure],
|
||||
objtype: Optional[Type[TLVStructure]] = None,
|
||||
) -> Optional[TLVStructure]:
|
||||
if self.tag not in obj.tag_value_offset:
|
||||
obj.scan_until(self.tag)
|
||||
if self.optional and (self.tag not in obj.tag_value_offset or obj.tag_value_length == 0):
|
||||
return None
|
||||
value_offset = obj.tag_value_offset[self.tag]
|
||||
value_length = obj.tag_value_length[self.tag]
|
||||
# TODO: Cache this so we can reuse the object.
|
||||
return self.substruct_class(obj.buffer[value_offset:value_offset + value_length])
|
||||
|
||||
def print(self, obj):
|
||||
value = self.__get__(obj)
|
||||
if value is None:
|
||||
return "null"
|
||||
return str(value)
|
||||
13
pyproject.toml
Normal file
13
pyproject.toml
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
[build-system]
|
||||
requires = ["flit_core >=3.2,<4"]
|
||||
build-backend = "flit_core.buildapi"
|
||||
|
||||
[project]
|
||||
name = "circuitmatter"
|
||||
authors = [{name = "Scott Shawcroft", email = "scott@adafruit.com"}]
|
||||
license = {file = "LICENSE"}
|
||||
classifiers = ["License :: OSI Approved :: MIT License"]
|
||||
dynamic = ["version", "description"]
|
||||
|
||||
[project.urls]
|
||||
Home = "https://github.com/adafruit/circuitmatter"
|
||||
BIN
tests/__pycache__/test_tlv.cpython-312-pytest-8.2.2.pyc
Normal file
BIN
tests/__pycache__/test_tlv.cpython-312-pytest-8.2.2.pyc
Normal file
Binary file not shown.
65
tests/test_tlv.py
Normal file
65
tests/test_tlv.py
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
from circuitmatter import tlv
|
||||
|
||||
# Test TLV encoding using examples from spec
|
||||
|
||||
# Type and Value
|
||||
# Encoding (hex)
|
||||
# Boolean false
|
||||
# 08
|
||||
|
||||
class Bool(tlv.TLVStructure):
|
||||
b = tlv.BoolMember(None)
|
||||
|
||||
class TestBoolFalse:
|
||||
def test_bool_false_decode(self):
|
||||
s = Bool(b"\x08")
|
||||
assert str(s) == "{\n b = false\n}"
|
||||
assert not s.b
|
||||
|
||||
def test_bool_false_encode(self):
|
||||
s = Bool()
|
||||
s.b = False
|
||||
assert bytes(s) == b"\x08"
|
||||
|
||||
# Boolean true
|
||||
# 09
|
||||
# Signed Integer, 1-octet, value 42
|
||||
# 00 2a
|
||||
# Signed Integer, 1-octet, value -17
|
||||
# 00 ef
|
||||
# Unsigned Integer, 1-octet, value 42U
|
||||
# 04 2a
|
||||
# Signed Integer, 2-octet, value 42
|
||||
# 01 2a 00
|
||||
# Signed Integer, 4-octet, value -170000
|
||||
# 02 f0 67 fd ff
|
||||
# Signed Integer, 8-octet, value 40000000000
|
||||
# 03 00 90 2f 50 09 00 00 00
|
||||
# UTF-8 String, 1-octet length, "Hello!"
|
||||
# 0c 06 48 65 6c 6c 6f 21
|
||||
# UTF-8 String, 1-octet length, "Tschüs"
|
||||
# 0c 07 54 73 63 68 c3 bc 73
|
||||
# Octet String, 1-octet length, octets 00 01 02 03 04 10 05 00 01 02 03 04
|
||||
# Null
|
||||
# 14
|
||||
# Single precision floating point 0.0
|
||||
# 0a 00 00 00 00
|
||||
# Single precision floating point (1.0 / 3.0)
|
||||
# 0a ab aa aa 3e
|
||||
# Single precision floating point 17.9
|
||||
# 0a 33 33 8f 41
|
||||
# Single precision floating point infinity (∞)
|
||||
# 0a 00 00 80 7f
|
||||
# Single precision floating point negative infinity
|
||||
# 0a 00 00 80 ff
|
||||
# (-∞)
|
||||
# Double precision floating point 0.0
|
||||
# 0b 00 00 00 00 00 00 00 00
|
||||
# Double precision floating point (1.0 / 3.0)
|
||||
# 0b 55 55 55 55 55 55 d5 3f
|
||||
# Double precision floating point 17.9
|
||||
# 0b 66 66 66 66 66 e6 31 40
|
||||
# Double precision floating point infinity (∞)
|
||||
# 0b 00 00 00 00 00 00 f0 7f
|
||||
# Double precision floating point negative infinity 0b 00 00 00 00 00 00 f0 ff
|
||||
# (-∞)
|
||||
Loading…
Reference in a new issue