More tests and UTF8 string

This commit is contained in:
Scott Shawcroft 2024-07-12 15:45:49 -07:00
parent 6c60bef9c6
commit 83090d411c
No known key found for this signature in database
GPG key ID: 0DFD512649C052DA
6 changed files with 448 additions and 64 deletions

162
.gitignore vendored
View file

@ -1,2 +1,162 @@
dist
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

157
circuitmatter/__main__.py Normal file
View file

@ -0,0 +1,157 @@
"""Pure Python implementation of the Matter IOT protocol."""
import enum
import math
import subprocess
import socket
import struct
from . import tlv
import circuitmatter as cm
from typing import Optional, Type, Any
# descriminator = 3840
# avahi = subprocess.Popen(["avahi-publish-service", "-v", f"--subtype=_L{descriminator}._sub._matterc._udp", "--subtype=_CM._sub._matterc._udp", "FA93546B21F5FB54", "_matterc._udp", "5540", "PI=", "PH=33", "CM=1", f"D={descriminator}", "CRI=3000", "CRA=4000", "T=1", "VP=65521+32769"])
# # Define the UDP IP address and port
# UDP_IP = "::" # Listen on all available network interfaces
# UDP_PORT = 5540
# # Create the UDP socket
# sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
# # Bind the socket to the IP and port
# sock.bind((UDP_IP, UDP_PORT))
# print(f"Listening on UDP port {UDP_PORT}")
unsecured_session_context = {
}
# while True:
# # Receive data from the socket (1280 is the minimum ipv6 MTU and the max UDP matter packet size.)
# data, addr = sock.recvfrom(1280)
data = b'\x04\x00\x00\x00\x0b\x06\xb7\t)\xad\x07\xd9\xae\xa1\xee\xa0\x05 j\x15\x00\x00\x150\x01 \x97\x064#\x1c\xd1E7H\x0b|\xc2G\xa7\xc38\xe9\xce3\x11\xb2@M\x86\xd7\xb5{)\xaa`\xddb%\x02\xc2\x86$\x03\x00(\x045\x05%\x01\xf4\x01%\x02,\x01%\x03\xa0\x0f$\x04\x11$\x05\x0b&\x06\x00\x00\x03\x01$\x07\x01\x18\x18'
addr = None
import pathlib
import json
# pathlib.Path("data.bin").write_bytes(data)
bookmarks = []
def add_bookmark(start, length, name, color=0x0000ff):
bookmarks.append({
"color": 0x4f000000 | color,
"comment": "\n",
"id": len(bookmarks),
"locked": True,
"name": name,
"region": {
"address": start,
"size": length
}
})
# Write every time in case we crash
# pathlib.Path("parsed.hexbm").write_text(json.dumps({"bookmarks": bookmarks}))
def run():
# Print the received data and the address of the sender
print(f"Received packet from {addr}: {data}")
print(f"Data length: {len(data)} bytes")
flags, session_id, security_flags, message_counter = struct.unpack_from("<BHBI", data)
add_bookmark(0, 8, "Header")
print(f"Flags: {flags:x} Session ID: {session_id:x} Security Flags: {cm.SecurityFlags(security_flags)} Message Counter: {message_counter}")
offset = 8
if flags & (1 << 2):
source_node_id = struct.unpack_from("<Q", data, 8)[0]
add_bookmark(8, 8, "Source Node ID")
print(source_node_id)
offset += 8
print(f"DSIZ {flags & (0x3)}")
if (flags >> 4) != 0:
print("Incorrect version")
# continue
secure_session = security_flags & 0x3 != 0 or session_id != 0
if not secure_session:
print("Unsecured session")
print(data[offset:offset+8])
decrypted_message = memoryview(data)[offset:]
context = {"role": "responder", "node_id": source_node_id}
unsecured_session_context[source_node_id] = context
exchange_flags, protocol_opcode, exchange_id = struct.unpack_from("<BBH", decrypted_message)
add_bookmark(offset, 4, "Protocol header")
exchange_flags = cm.ExchangeFlags(exchange_flags)
print(f"Exchange Flags: {exchange_flags} Exchange ID: {exchange_id}")
decrypted_offset = 4
protocol_vendor_id = 0
if exchange_flags & cm.ExchangeFlags.V:
protocol_vendor_id = struct.unpack_from("<H", decrypted_message, decrypted_offset)[0]
add_bookmark(offset + decrypted_offset, 2, "Protocol Vendor ID")
decrypted_offset += 2
protocol_id = struct.unpack_from("<H", decrypted_message, decrypted_offset)[0]
add_bookmark(offset + decrypted_offset, 2, "Protocol ID")
decrypted_offset += 2
protocol_id = cm.ProtocolId(protocol_id)
protocol_opcode = cm.PROTOCOL_OPCODES[protocol_id](protocol_opcode)
print(f"Protocol Vendor ID: {protocol_vendor_id} Protocol ID: {protocol_id} Protocol Opcode: {protocol_opcode}")
acknowledged_message_counter = None
if exchange_flags & cm.ExchangeFlags.A:
acknowledged_message_counter = struct.unpack_from("<I", decrypted_message, decrypted_offset)[0]
decrypted_offset += 4
print(f"Acknowledged Message Counter: {acknowledged_message_counter}")
if protocol_id == cm.ProtocolId.SECURE_CHANNEL:
if protocol_opcode == cm.SecureProtocolOpcode.MSG_COUNTER_SYNC_REQ:
print("Received Message Counter Synchronization Request")
response = struct.pack("<BHBI", 0, 0, 0, 0)
sock.sendto(response, addr)
print(f"Sent Message Counter Synchronization Response to {addr}")
elif protocol_opcode == cm.SecureProtocolOpcode.MSG_COUNTER_SYNC_RSP:
print("Received Message Counter Synchronization Response")
elif protocol_opcode == cm.SecureProtocolOpcode.PBKDF_PARAM_REQUEST:
print("Received PBKDF Parameter Request")
request = cm.PBKDFParamRequest(decrypted_message[decrypted_offset+1:])
print(request)
response = cm.PBKDFParamResponse()
response.initiatorRandom = request.initiatorRandom
response.responderRandom = b"\x00" * 32
response.responderSessionId = 0
params = cm.Crypto_PBKDFParameterSet()
params.iterations = 1000
params.salt = b"\x00" * 32
response.pbkdf_parameters = params
print(response)
elif protocol_opcode == cm.SecureProtocolOpcode.PBKDF_PARAM_RESPONSE:
print("Received PBKDF Parameter Response")
elif protocol_opcode == cm.SecureProtocolOpcode.PASE_PAKE1:
print("Received PASE PAKE1")
elif protocol_opcode == cm.SecureProtocolOpcode.PASE_PAKE2:
print("Received PASE PAKE2")
elif protocol_opcode == cm.SecureProtocolOpcode.PASE_PAKE3:
print("Received PASE PAKE3")
elif protocol_opcode == cm.SecureProtocolOpcode.CASE_SIGMA1:
print("Received CASE Sigma1")
elif protocol_opcode == cm.SecureProtocolOpcode.CASE_SIGMA2:
print("Received CASE Sigma2")
elif protocol_opcode == cm.SecureProtocolOpcode.CASE_SIGMA3:
print("Received CASE Sigma3")
elif protocol_opcode == cm.SecureProtocolOpcode.CASE_SIGMA2_RESUME:
print("Received CASE Sigma2 Resume")
elif protocol_opcode == cm.SecureProtocolOpcode.STATUS_REPORT:
print("Received Status Report")
elif protocol_opcode == cm.SecureProtocolOpcode.ICD_CHECK_IN:
print("Received ICD Check-in")
# avahi.kill()
if __name__ == "__main__":
run()

View file

@ -1,4 +1,5 @@
import enum
import math
from typing import Optional, Type, Any
import struct
@ -27,7 +28,6 @@ class TLVStructure:
descriptor_class = vars(type(self))[field]
if field.startswith("_") or not isinstance(descriptor_class, Member):
continue
print(field)
value = descriptor_class.print(self)
if isinstance(descriptor_class, StructMember):
value = value.replace("\n", "\n ")
@ -35,6 +35,8 @@ class TLVStructure:
return "{\n " + ",\n ".join(members) + "\n}"
def scan_until(self, tag):
if self.buffer is None:
return
print(bytes(self.buffer[self._offset:]))
print(f"Looking for {tag}")
while self._offset < len(self.buffer):
@ -123,14 +125,6 @@ class Member:
self.tag = tag
self.optional = optional
def __set__(self, obj: TLVStructure, value: Any) -> None:
obj.cached_values[self.tag] = value
class NumberMember(Member):
def __init__(self, tag, _format, optional=False):
self.format = _format
super().__init__(tag, optional)
def __get__(
self,
obj: Optional[TLVStructure],
@ -140,16 +134,27 @@ class NumberMember(Member):
return obj.cached_values[self.tag]
if self.tag not in obj.tag_value_offset:
obj.scan_until(self.tag)
if self.tag not in obj.tag_value_offset:
return None
print(self.tag, obj.tag_value_length)
encoded_format = INT_SIZE[int(math.log(obj.tag_value_length[self.tag], 2))]
if self.format.islower():
encoded_format = encoded_format.lower()
value = struct.unpack_from(encoded_format, obj.buffer, offset=obj.tag_value_offset[self.tag])[0]
value = self.decode(obj.buffer, obj.tag_value_length[self.tag], offset=obj.tag_value_offset[self.tag])
obj.cached_values[self.tag] = value
return value
def __set__(self, obj: TLVStructure, value: Any) -> None:
obj.cached_values[self.tag] = value
class NumberMember(Member):
def __init__(self, tag, _format, optional=False):
self.format = _format
super().__init__(tag, optional)
def decode(self, buffer, length, offset=0):
encoded_format = INT_SIZE[int(math.log(length, 2))]
if self.format.islower():
encoded_format = encoded_format.lower()
return struct.unpack_from(encoded_format, buffer, offset=offset)[0]
def print(self, obj):
value = self.__get__(obj)
@ -157,21 +162,9 @@ class NumberMember(Member):
return f"{value}{unsigned}"
class BoolMember(Member):
def __get__(
self,
obj: Optional[TLVStructure],
objtype: Optional[Type[TLVStructure]] = None,
) -> bool:
if self.tag in obj.cached_values:
return obj.cached_values[self.tag]
if self.tag not in obj.tag_value_offset:
obj.scan_until(self.tag)
octet = obj.buffer[obj.tag_value_offset[self.tag]]
value = octet & 1 == 1
obj.cached_values[self.tag] = value
return value
def decode(self, buffer, length, offset=0) -> bool:
octet = buffer[offset]
return octet & 1 == 1
def print(self, obj):
if self.__get__(obj):
@ -183,40 +176,33 @@ class OctetStringMember(Member):
self.max_length = max_length
super().__init__(tag, optional)
def __get__(
self,
obj: Optional[TLVStructure],
objtype: Optional[Type[TLVStructure]] = None,
) -> memoryview:
if self.tag not in obj.tag_value_offset:
obj.scan_until(self.tag)
offset = obj.tag_value_offset[self.tag]
length = obj.tag_value_length[self.tag]
return obj.buffer[offset:offset + length]
def decode(self, buffer, length, offset=0):
return buffer[offset:offset + length]
def print(self, obj):
value = self.__get__(obj)
return " ".join((f"{byte:02x}" for byte in value))
class UTF8StringMember(Member):
def __init__(self, tag, max_length, optional=False):
self.max_length = max_length
super().__init__(tag, optional)
def decode(self, buffer, length, offset=0):
return buffer[offset:offset + length].decode("utf-8")
def print(self, obj):
value = self.__get__(obj)
return f"\"{value}\""
class StructMember(Member):
def __init__(self, tag, substruct_class, optional=False):
self.substruct_class = substruct_class
super().__init__(tag, optional)
def __get__(
self,
obj: Optional[TLVStructure],
objtype: Optional[Type[TLVStructure]] = None,
) -> Optional[TLVStructure]:
if self.tag not in obj.tag_value_offset:
obj.scan_until(self.tag)
if self.optional and (self.tag not in obj.tag_value_offset or obj.tag_value_length == 0):
return None
value_offset = obj.tag_value_offset[self.tag]
value_length = obj.tag_value_length[self.tag]
# TODO: Cache this so we can reuse the object.
return self.substruct_class(obj.buffer[value_offset:value_offset + value_length])
def decode(self, buffer, length, offset=0) -> TLVStructure:
return self.substruct_class(buffer[offset:offset + length])
def print(self, obj):
value = self.__get__(obj)

View file

@ -6,39 +6,120 @@ from circuitmatter import tlv
# Encoding (hex)
# Boolean false
# 08
# Boolean true
# 09
class Bool(tlv.TLVStructure):
b = tlv.BoolMember(None)
class TestBoolFalse:
class TestBool:
def test_bool_false_decode(self):
s = Bool(b"\x08")
assert str(s) == "{\n b = false\n}"
assert not s.b
def test_bool_false_encode(self):
s = Bool()
s.b = False
assert bytes(s) == b"\x08"
def test_bool_true_decode(self):
s = Bool(b"\x09")
assert str(s) == "{\n b = true\n}"
assert s.b
# def test_bool_false_encode(self):
# s = Bool()
# s.b = False
# assert bytes(s) == b"\x08"
class SignedIntOneOctet(tlv.TLVStructure):
i = tlv.NumberMember(None, "b")
class SignedIntTwoOctet(tlv.TLVStructure):
i = tlv.NumberMember(None, "h")
class SignedIntFourOctet(tlv.TLVStructure):
i = tlv.NumberMember(None, "i")
class SignedIntEightOctet(tlv.TLVStructure):
i = tlv.NumberMember(None, "q")
# Boolean true
# 09
# Signed Integer, 1-octet, value 42
# 00 2a
# Signed Integer, 1-octet, value -17
# 00 ef
# Unsigned Integer, 1-octet, value 42U
# 04 2a
# Signed Integer, 2-octet, value 42
# 01 2a 00
# Signed Integer, 4-octet, value -170000
# 02 f0 67 fd ff
# Signed Integer, 8-octet, value 40000000000
# 03 00 90 2f 50 09 00 00 00
class TestSignedInt:
def test_signed_int_42_decode(self):
s = SignedIntOneOctet(b"\x00\x2a")
assert str(s) == "{\n i = 42\n}"
assert s.i == 42
def test_signed_int_negative_17_decode(self):
s = SignedIntOneOctet(b"\x00\xef")
assert str(s) == "{\n i = -17\n}"
assert s.i == -17
# def test_signed_int_42_encode(self):
# s = SignedInt()
# s.i = 42
# assert bytes(s) == b"\x00\x2a"
def test_signed_int_42_two_octet_decode(self):
s = SignedIntTwoOctet(b"\x01\x2a\x00")
assert str(s) == "{\n i = 42\n}"
assert s.i == 42
def test_signed_int_negative_170000_decode(self):
s = SignedIntFourOctet(b"\x02\xf0\x67\xfd\xff")
assert str(s) == "{\n i = -170000\n}"
assert s.i == -170000
def test_signed_int_40000000000_decode(self):
s = SignedIntEightOctet(b"\x03\x00\x90\x2f\x50\x09\x00\x00\x00")
assert str(s) == "{\n i = 40000000000\n}"
assert s.i == 40000000000
class UnsignedIntOneOctet(tlv.TLVStructure):
i = tlv.NumberMember(None, "B")
# Unsigned Integer, 1-octet, value 42U
# 04 2a
class TestUnsignedInt:
def test_unsigned_int_42_decode(self):
s = UnsignedIntOneOctet(b"\x00\x2a")
assert str(s) == "{\n i = 42U\n}"
assert s.i == 42
# def test_unsigned_int_42_encode(self):
# s = UnsignedInt()
# s.i = 42
# assert bytes(s) == b"\x00\x2a"
# UTF-8 String, 1-octet length, "Hello!"
# 0c 06 48 65 6c 6c 6f 21
# UTF-8 String, 1-octet length, "Tschüs"
# 0c 07 54 73 63 68 c3 bc 73
class UTF8StringOneOctet(tlv.TLVStructure):
s = tlv.UTF8StringMember(None, 16)
class TestUTF8String:
def test_utf8_string_hello_decode(self):
s = UTF8StringOneOctet(b"\x0c\x06Hello!")
assert str(s) == "{\n s = \"Hello!\"\n}"
assert s.s == "Hello!"
def test_utf8_string_tschs_decode(self):
s = UTF8StringOneOctet(b"\x0c\x07Tsch\xc3\xbcs")
assert str(s) == "{\n s = \"Tschüs\"\n}"
assert s.s == "Tschüs"
# def test_utf8_string_hello_encode(self):
# s = UTF8String()
# s.s = b"Hello!"
# assert bytes(s) == b"\x0c\x06Hello!"
# Octet String, 1-octet length, octets 00 01 02 03 04 10 05 00 01 02 03 04
# Null
# 14