Add command support and split TLV parsing

TLV parsing can now happen without a concrete TLV structure to
follow. Structures are parsed into dictionaries and list/arrays are
into Python lists. They can't be reencoded this way! It is assumed
they will be used to create a more concrete object via `.from_value`

This handles the variable argument structures by allowing calling
code to make them concrete once they determine the type.

tlv.List is now a container superclass like tlv.Structure. It
allows for Structure like attribute access but preserves order
and duplicate keys (which may be unused).

Commands are now supported by subclassing the data model cluster
and overriding the class level Command() object. It stores input
and output type info.
This commit is contained in:
Scott Shawcroft 2024-09-19 16:14:05 -07:00
parent 39c27d0b55
commit 2fe5fac82b
No known key found for this signature in database
8 changed files with 639 additions and 381 deletions

View file

@ -499,19 +499,6 @@ class Message:
else:
struct.pack_into("<Q", buffer, offset, self.destination_node_id)
offset += 8
struct.pack_into(
"BBHH",
buffer,
offset,
self.exchange_flags,
self.protocol_opcode,
self.exchange_id,
self.protocol_id,
)
offset += 6
if self.acknowledged_message_counter is not None:
struct.pack_into("I", buffer, offset, self.acknowledged_message_counter)
offset += 4
if cipher is not None:
unencrypted_buffer = memoryview(bytearray(1280))
@ -520,8 +507,27 @@ class Message:
unencrypted_buffer = buffer
unencrypted_offset = offset
struct.pack_into(
"BBHH",
unencrypted_buffer,
unencrypted_offset,
self.exchange_flags,
self.protocol_opcode,
self.exchange_id,
self.protocol_id,
)
unencrypted_offset += 6
if self.acknowledged_message_counter is not None:
struct.pack_into(
"I",
unencrypted_buffer,
unencrypted_offset,
self.acknowledged_message_counter,
)
unencrypted_offset += 4
if self.application_payload is not None:
if isinstance(self.application_payload, tlv.TLVStructure):
if isinstance(self.application_payload, tlv.Structure):
# Wrap the structure in an anonymous tag.
unencrypted_buffer[unencrypted_offset] = 0x15
unencrypted_offset += 1
@ -875,6 +881,25 @@ class SessionManager:
return exchange
class GeneralCommissioningCluster(data_model.GeneralCommissioningCluster):
def __init__(self):
super().__init__()
basic_commissioning_info = (
data_model.GeneralCommissioningCluster.BasicCommissioningInfo()
)
basic_commissioning_info.FailSafeExpiryLengthSeconds = 10
basic_commissioning_info.MaxCumulativeFailsafeSeconds = 900
self.basic_commissioning_info = basic_commissioning_info
def arm_fail_safe(
self, args: data_model.GeneralCommissioningCluster.ArmFailSafe
) -> data_model.GeneralCommissioningCluster.ArmFailSafeResponse:
response = data_model.GeneralCommissioningCluster.ArmFailSafeResponse()
response.ErrorCode = data_model.CommissioningErrorEnum.OK
print("respond", response)
return response
class CircuitMatter:
def __init__(
self,
@ -928,13 +953,7 @@ class CircuitMatter:
network_info = data_model.NetworkCommissioningCluster()
network_info.connect_max_time_seconds = 10
self.add_cluster(0, network_info)
general_commissioning = data_model.GeneralCommissioningCluster()
basic_commissioning_info = (
data_model.GeneralCommissioningCluster.BasicCommissioningInfo()
)
basic_commissioning_info.FailSafeExpiryLengthSeconds = 10
basic_commissioning_info.MaxCumulativeFailsafeSeconds = 900
general_commissioning.basic_commissioning_info = basic_commissioning_info
general_commissioning = GeneralCommissioningCluster()
self.add_cluster(0, general_commissioning)
def start_commissioning(self):
@ -992,6 +1011,25 @@ class CircuitMatter:
report.AttributeData = cluster.get_attribute_data(path)
return report
def invoke(self, cluster, path, fields, command_ref):
response = interaction_model.InvokeResponseIB()
cstatus = interaction_model.CommandStatusIB()
cstatus.CommandPath = path
status = interaction_model.StatusIB()
status.Status = 0
status.ClusterStatus = 0
cstatus.Status = status
if command_ref is not None:
cstatus.CommandRef = command_ref
response.Status = cstatus
cdata = interaction_model.CommandDataIB()
cdata.Path = path
cdata.CommandFields = cluster.invoke(path, fields)
if command_ref is not None:
cdata.CommandRef = command_ref
response.Command = cdata
return response
def process_packet(self, address, data):
# Print the received data and the address of the sender
# This is section 4.7.2
@ -1027,7 +1065,10 @@ class CircuitMatter:
from . import pase
# This is Section 4.14.1.2
request = pase.PBKDFParamRequest(message.application_payload[1:-1])
request, _ = pase.PBKDFParamRequest.decode(
message.application_payload[0], message.application_payload[1:-1]
)
print("PBKDF", request)
exchange.commissioning_hash = hashlib.sha256(
b"CHIP PAKE V1 Commissioning"
)
@ -1067,7 +1108,9 @@ class CircuitMatter:
from . import pase
print("Received PASE PAKE1")
pake1 = pase.PAKE1(message.application_payload[1:-1])
pake1, _ = pase.PAKE1.decode(
message.application_payload[0], message.application_payload[1:]
)
pake2 = pase.PAKE2()
verifier = binascii.a2b_base64(self.nonvolatile["verifier"])
context = exchange.commissioning_hash.digest()
@ -1088,7 +1131,9 @@ class CircuitMatter:
from . import pase
print("Received PASE PAKE3")
pake3 = pase.PAKE3(message.application_payload[1:-1])
pake3, _ = pase.PAKE3.decode(
message.application_payload[0], message.application_payload[1:]
)
if pake3.cA != exchange.cA:
del exchange.cA
del exchange.Ke
@ -1145,36 +1190,32 @@ class CircuitMatter:
print("application payload", message.application_payload.hex(" "))
if protocol_opcode == InteractionModelOpcode.READ_REQUEST:
print("Received Read Request")
read_request = interaction_model.ReadRequestMessage(
message.application_payload[1:-1]
read_request, _ = interaction_model.ReadRequestMessage.decode(
message.application_payload[0], message.application_payload[1:]
)
print(read_request)
attribute_reports = []
for attribute in read_request.AttributeRequests:
for path in attribute:
attribute = (
"*" if path.Attribute is None else f"0x{path.Attribute:04x}"
)
print(
f"Endpoint: {path.Endpoint}, Cluster: 0x{path.Cluster:02x}, Attribute: {attribute}"
)
if path.Endpoint is None:
# Wildcard so we get it from every endpoint.
for endpoint in self._endpoints:
if path.Cluster in self._endpoints[endpoint]:
cluster = self._endpoints[endpoint][path.Cluster]
path.Endpoint = endpoint
attribute_reports.append(
self.get_report(cluster, path)
)
else:
print(f"Cluster 0x{path.Cluster:02x} not found")
else:
if path.Cluster in self._endpoints[path.Endpoint]:
cluster = self._endpoints[path.Endpoint][path.Cluster]
for path in read_request.AttributeRequests:
attribute = (
"*" if path.Attribute is None else f"0x{path.Attribute:04x}"
)
print(
f"Endpoint: {path.Endpoint}, Cluster: 0x{path.Cluster:02x}, Attribute: {attribute}"
)
if path.Endpoint is None:
# Wildcard so we get it from every endpoint.
for endpoint in self._endpoints:
if path.Cluster in self._endpoints[endpoint]:
cluster = self._endpoints[endpoint][path.Cluster]
path.Endpoint = endpoint
attribute_reports.append(self.get_report(cluster, path))
else:
print(f"Cluster 0x{path.Cluster:02x} not found")
else:
if path.Cluster in self._endpoints[path.Endpoint]:
cluster = self._endpoints[path.Endpoint][path.Cluster]
attribute_reports.append(self.get_report(cluster, path))
else:
print(f"Cluster 0x{path.Cluster:02x} not found")
response = interaction_model.ReportDataMessage()
response.AttributeReports = attribute_reports
exchange.send(
@ -1182,7 +1223,43 @@ class CircuitMatter:
InteractionModelOpcode.REPORT_DATA,
response,
)
if protocol_opcode == InteractionModelOpcode.INVOKE_REQUEST:
elif protocol_opcode == InteractionModelOpcode.INVOKE_REQUEST:
print("Received Invoke Request")
invoke_request, _ = interaction_model.InvokeRequestMessage.decode(
message.application_payload[0], message.application_payload[1:]
)
for invoke in invoke_request.InvokeRequests:
print(invoke)
path = invoke.CommandPath
print(path)
command = "*" if path.Command is None else f"0x{path.Command:04x}"
print(
f"Invoke Endpoint: {path.Endpoint}, Cluster: 0x{path.Cluster:04x}, Command: {command}"
)
invoke_responses = []
if path.Endpoint is None:
# Wildcard so we get it from every endpoint.
for endpoint in self._endpoints:
if path.Cluster in self._endpoints[endpoint]:
cluster = self._endpoints[endpoint][path.Cluster]
path.Endpoint = endpoint
invoke_responses.append(
self.invoke(cluster, path, invoke.CommandFields)
)
else:
print(f"Cluster 0x{path.Cluster:02x} not found")
else:
if path.Cluster in self._endpoints[path.Endpoint]:
cluster = self._endpoints[path.Endpoint][path.Cluster]
invoke_responses.append(
self.invoke(
cluster,
path,
invoke.CommandFields,
invoke.CommandRef,
)
)
else:
print(f"Cluster 0x{path.Cluster:02x} not found")
elif protocol_opcode == InteractionModelOpcode.INVOKE_RESPONSE:
print("Received Invoke Response")

View file

@ -46,7 +46,7 @@ class NumberAttribute(Attribute):
super().__init__(_id, default=default)
@staticmethod
def encode_number(value, *, signed=True):
def encode_number(value, *, signed=True) -> bytes:
bit_length = value.bit_length()
format_string = None
if signed:
@ -69,7 +69,7 @@ class NumberAttribute(Attribute):
return struct.pack(format_string, type | length, value)
def encode(self, value):
def encode(self, value) -> bytes:
return NumberAttribute.encode_number(value, signed=self.signed)
@ -90,7 +90,8 @@ class ListAttribute(Attribute):
class BoolAttribute(Attribute):
pass
def encode(self, value) -> bytes:
return struct.pack("B", tlv.ElementType.BOOL | (1 if value else 0))
class StructAttribute(Attribute):
@ -124,6 +125,18 @@ class BitmapAttribute(Attribute):
pass
class Command:
def __init__(self, command_id, request_type, response_id, response_type):
self.command_id = command_id
self.request_type = request_type
self.response_id = response_id
self.response_type = response_type
def __call__(self, arg):
print("call command")
pass
class Cluster:
feature_map = FeatureMap()
@ -134,12 +147,10 @@ class Cluster:
@classmethod
def _attributes(cls) -> Iterable[tuple[str, Attribute]]:
for field_name, descriptor in vars(cls).items():
if not field_name.startswith("_") and isinstance(descriptor, Attribute):
yield field_name, descriptor
for field_name, descriptor in vars(Cluster).items():
if not field_name.startswith("_") and isinstance(descriptor, Attribute):
yield field_name, descriptor
for superclass in cls.__mro__:
for field_name, descriptor in vars(superclass).items():
if not field_name.startswith("_") and isinstance(descriptor, Attribute):
yield field_name, descriptor
def get_attribute_data(self, path) -> interaction_model.AttributeDataIB:
print("get_attribute_data", path.Attribute)
@ -148,6 +159,7 @@ class Cluster:
data.Path = path
found = False
for field_name, descriptor in self._attributes():
print("maybe", field_name, descriptor)
if descriptor.id != path.Attribute:
continue
print("read", field_name, descriptor)
@ -158,6 +170,26 @@ class Cluster:
print("not found", path.Attribute)
return data
@classmethod
def _commands(cls) -> Iterable[tuple[str, Command]]:
for superclass in cls.__mro__:
for field_name, descriptor in vars(superclass).items():
if not field_name.startswith("_") and isinstance(descriptor, Command):
yield field_name, descriptor
def invoke(self, path, fields):
print("invoke", path.Command)
found = False
for field_name, descriptor in self._commands():
if descriptor.command_id != path.Command:
continue
arg = descriptor.request_type.from_value(fields)
print("invoke", field_name, descriptor, arg)
return getattr(self, field_name)(arg)
if not found:
print("not found", path.Attribute)
return None
class ProductFinish(enum.IntEnum):
OTHER = 0
@ -195,7 +227,7 @@ class Color(enum.IntEnum):
class BasicInformationCluster(Cluster):
CLUSTER_ID = 0x0028
class CapabilityMinima(tlv.TLVStructure):
class CapabilityMinima(tlv.Structure):
CaseSessionsPerFabric = tlv.IntMember(
0, signed=False, octets=2, minimum=3, default=3
)
@ -203,7 +235,7 @@ class BasicInformationCluster(Cluster):
1, signed=False, octets=2, minimum=3, default=3
)
class ProductAppearance(tlv.TLVStructure):
class ProductAppearance(tlv.Structure):
Finish = tlv.EnumMember(0, ProductFinish)
PrimaryColor = tlv.EnumMember(1, Color)
@ -232,20 +264,26 @@ class BasicInformationCluster(Cluster):
max_paths_per_invoke = NumberAttribute(0x16, signed=False, bits=16, default=1)
class CommissioningErrorEnum(Enum8):
OK = 0
VALUE_OUTSIDE_RANGE = 1
INVALID_AUTHENTICATION = 2
NO_FAIL_SAFE = 3
BUSY_WITH_OTHER_ADMIN = 4
class GeneralCommissioningCluster(Cluster):
CLUSTER_ID = 0x0030
class BasicCommissioningInfo(tlv.TLVStructure):
class BasicCommissioningInfo(tlv.Structure):
FailSafeExpiryLengthSeconds = tlv.IntMember(0, signed=False, octets=2)
MaxCumulativeFailsafeSeconds = tlv.IntMember(1, signed=False, octets=2)
class RegulatoryLocationType(enum.IntEnum):
class RegulatoryLocationType(Enum8):
INDOOR = 0
OUTDOOR = 1
INDOOR_OUTDOOR = 2
bits = 8
breadcrumb = NumberAttribute(0, signed=False, bits=64, default=0)
basic_commissioning_info = StructAttribute(1, BasicCommissioningInfo)
regulatory_config = EnumAttribute(
@ -256,6 +294,16 @@ class GeneralCommissioningCluster(Cluster):
)
support_concurrent_connection = BoolAttribute(4, default=True)
class ArmFailSafe(tlv.Structure):
ExpiryLengthSeconds = tlv.IntMember(0, signed=False, octets=2, default=900)
Breadcrumb = tlv.IntMember(1, signed=False, octets=8)
class ArmFailSafeResponse(tlv.Structure):
ErrorCode = tlv.EnumMember(0, CommissioningErrorEnum)
DebugText = tlv.UTF8StringMember(1, max_length=128)
arm_fail_safe = Command(0x00, ArmFailSafe, 0x01, ArmFailSafeResponse)
class NetworkCommissioningCluster(Cluster):
CLUSTER_ID = 0x0031

View file

@ -1,7 +1,7 @@
from . import tlv
class AttributePathIB(tlv.TLVStructure):
class AttributePathIB(tlv.List):
"""Section 10.6.2"""
EnableTagCompression = tlv.BoolMember(0, optional=True)
@ -13,7 +13,7 @@ class AttributePathIB(tlv.TLVStructure):
WildcardPathFlags = tlv.IntMember(6, signed=False, octets=4, optional=True)
class EventPathIB(tlv.TLVStructure):
class EventPathIB(tlv.Structure):
"""Section 10.6.8"""
Node = tlv.IntMember(0, signed=False, octets=8)
@ -23,59 +23,59 @@ class EventPathIB(tlv.TLVStructure):
IsUrgent = tlv.BoolMember(4)
class EventFilterIB(tlv.TLVStructure):
class EventFilterIB(tlv.Structure):
"""Section 10.6.6"""
Node = tlv.IntMember(0, signed=False, octets=8)
EventMinimumInterval = tlv.IntMember(1, signed=False, octets=8)
class ClusterPathIB(tlv.TLVStructure):
class ClusterPathIB(tlv.List):
Node = tlv.IntMember(0, signed=False, octets=8)
Endpoint = tlv.IntMember(1, signed=False, octets=2)
Cluster = tlv.IntMember(2, signed=False, octets=4)
class DataVersionFilterIB(tlv.TLVStructure):
class DataVersionFilterIB(tlv.Structure):
Path = tlv.StructMember(0, ClusterPathIB)
DataVersion = tlv.IntMember(1, signed=False, octets=4)
class StatusIB(tlv.TLVStructure):
class StatusIB(tlv.Structure):
Status = tlv.IntMember(0, signed=False, octets=1)
ClusterStatus = tlv.IntMember(1, signed=False, octets=1)
class AttributeDataIB(tlv.TLVStructure):
class AttributeDataIB(tlv.Structure):
DataVersion = tlv.IntMember(0, signed=False, octets=4)
Path = tlv.StructMember(1, AttributePathIB)
Data = tlv.AnythingMember(2)
class AttributeStatusIB(tlv.TLVStructure):
class AttributeStatusIB(tlv.Structure):
Path = tlv.StructMember(0, AttributePathIB)
Status = tlv.StructMember(1, StatusIB)
class AttributeReportIB(tlv.TLVStructure):
class AttributeReportIB(tlv.Structure):
AttributeStatus = tlv.StructMember(0, AttributeStatusIB)
AttributeData = tlv.StructMember(1, AttributeDataIB)
class ReadRequestMessage(tlv.TLVStructure):
AttributeRequests = tlv.ArrayMember(0, tlv.List(AttributePathIB))
class ReadRequestMessage(tlv.Structure):
AttributeRequests = tlv.ArrayMember(0, AttributePathIB)
EventRequests = tlv.ArrayMember(1, EventPathIB)
EventFilters = tlv.ArrayMember(2, EventFilterIB)
FabricFiltered = tlv.BoolMember(3)
DataVersionFilters = tlv.ArrayMember(4, DataVersionFilterIB)
class EventStatusIB(tlv.TLVStructure):
class EventStatusIB(tlv.Structure):
Path = tlv.StructMember(0, EventPathIB)
Status = tlv.StructMember(1, StatusIB)
class EventDataIB(tlv.TLVStructure):
class EventDataIB(tlv.Structure):
Path = tlv.StructMember(0, EventPathIB)
EventNumber = tlv.IntMember(1, signed=False, octets=8)
PriorityLevel = tlv.IntMember(2, signed=False, octets=1)
@ -89,14 +89,49 @@ class EventDataIB(tlv.TLVStructure):
Data = tlv.AnythingMember(7)
class EventReportIB(tlv.TLVStructure):
class EventReportIB(tlv.Structure):
EventStatus = tlv.StructMember(0, EventStatusIB)
EventData = tlv.StructMember(1, EventDataIB)
class ReportDataMessage(tlv.TLVStructure):
class ReportDataMessage(tlv.Structure):
SubscriptionId = tlv.IntMember(0, signed=False, octets=4, optional=True)
AttributeReports = tlv.ArrayMember(1, AttributeReportIB, optional=True)
EventReports = tlv.ArrayMember(2, EventReportIB, optional=True)
MoreChunkedMessages = tlv.BoolMember(3, optional=True)
SuppressResponse = tlv.BoolMember(4, optional=True)
class CommandPathIB(tlv.List):
Endpoint = tlv.IntMember(0, signed=False, octets=2)
Cluster = tlv.IntMember(1, signed=False, octets=4)
Command = tlv.IntMember(2, signed=False, octets=4)
class CommandDataIB(tlv.Structure):
CommandPath = tlv.ListMember(0, CommandPathIB)
CommandFields = tlv.AnythingMember(1, optional=True)
CommandRef = tlv.NumberMember(2, "H", optional=True)
class CommandStatusIB(tlv.Structure):
CommandPath = tlv.ListMember(0, CommandPathIB)
Status = tlv.StructMember(1, StatusIB)
CommandRef = tlv.NumberMember(2, "H", optional=True)
class InvokeResponseIB(tlv.Structure):
Command = tlv.StructMember(0, CommandDataIB)
Status = tlv.StructMember(1, CommandStatusIB)
class InvokeRequestMessage(tlv.Structure):
SuppressResponse = tlv.BoolMember(0)
TimedRequest = tlv.BoolMember(1)
InvokeRequests = tlv.ArrayMember(2, CommandDataIB)
class InvokeResponseMessage(tlv.Structure):
SuppressResponse = tlv.BoolMember(0)
InvokeResponses = tlv.ArrayMember(1, InvokeResponseIB)
MoreChunkedMessages = tlv.BoolMember(2, optional=True)

View file

@ -23,7 +23,7 @@ from ecdsa.curves import NIST256p
# [4] : BOOLEAN,
# initiatorSessionParams [5, optional] : session-parameter-struct
# }
class PBKDFParamRequest(tlv.TLVStructure):
class PBKDFParamRequest(tlv.Structure):
initiatorRandom = tlv.OctetStringMember(1, 32)
initiatorSessionId = tlv.IntMember(2, signed=False, octets=2)
passcodeId = tlv.IntMember(3, signed=False, octets=2)
@ -38,7 +38,7 @@ class PBKDFParamRequest(tlv.TLVStructure):
# iterations [1] : UNSIGNED INTEGER [ range 32-bits ],
# salt [2] : OCTET STRING [ length 16..32 ],
# }
class Crypto_PBKDFParameterSet(tlv.TLVStructure):
class Crypto_PBKDFParameterSet(tlv.Structure):
iterations = tlv.IntMember(1, signed=False, octets=4)
salt = tlv.OctetStringMember(2, 32)
@ -55,7 +55,7 @@ class Crypto_PBKDFParameterSet(tlv.TLVStructure):
# [4] : Crypto_PBKDFParameterSet,
# responderSessionParams [5, optional] : session-parameter-struct
# }
class PBKDFParamResponse(tlv.TLVStructure):
class PBKDFParamResponse(tlv.Structure):
initiatorRandom = tlv.OctetStringMember(1, 32)
responderRandom = tlv.OctetStringMember(2, 32)
responderSessionId = tlv.IntMember(3, signed=False, octets=2)
@ -74,16 +74,16 @@ CRYPTO_HASH_LEN_BYTES = 32
CRYPTO_HASH_BLOCK_LEN_BYTES = 64
class PAKE1(tlv.TLVStructure):
class PAKE1(tlv.Structure):
pA = tlv.OctetStringMember(1, CRYPTO_PUBLIC_KEY_SIZE_BYTES)
class PAKE2(tlv.TLVStructure):
class PAKE2(tlv.Structure):
pB = tlv.OctetStringMember(1, CRYPTO_PUBLIC_KEY_SIZE_BYTES)
cB = tlv.OctetStringMember(2, CRYPTO_HASH_LEN_BYTES)
class PAKE3(tlv.TLVStructure):
class PAKE3(tlv.Structure):
cA = tlv.OctetStringMember(1, CRYPTO_HASH_LEN_BYTES)

View file

@ -9,7 +9,7 @@ CRYPTO_AEAD_MIC_LENGTH_BYTES = 16
CRYPTO_AEAD_NONCE_LENGTH_BYTES = 13
class SessionParameterStruct(tlv.TLVStructure):
class SessionParameterStruct(tlv.Structure):
session_idle_interval = tlv.IntMember(1, signed=False, octets=4, optional=True)
session_active_interval = tlv.IntMember(2, signed=False, octets=4, optional=True)
session_active_threshold = tlv.IntMember(3, signed=False, octets=2, optional=True)

View file

@ -16,7 +16,6 @@ from typing import (
overload,
)
from typing_extensions import Buffer
# As a byte string to save space.
TAG_LENGTH = b"\x00\x01\x02\x04\x02\x04\x06\x08"
@ -37,34 +36,71 @@ class ElementType(enum.IntEnum):
END_OF_CONTAINER = 0b11000
def find_container_end(buffer, start):
nesting = 0
end = start
while buffer[end] != ElementType.END_OF_CONTAINER or nesting > 0:
octet = buffer[end]
if octet == ElementType.END_OF_CONTAINER:
nesting -= 1
elif (octet & 0x1F) in (
ElementType.STRUCTURE,
ElementType.ARRAY,
ElementType.LIST,
):
nesting += 1
end += 1
return end + 1
def decode_tag(control_octet, buffer, offset=0):
tag_control = control_octet >> 5
this_tag = None
if tag_control == 0: # Anonymous
this_tag = None
elif tag_control == 1: # Context specific
this_tag = buffer[offset]
else:
vendor_id = None
profile_number = None
if tag_control >= 6: # Fully qualified
vendor_id, profile_number = struct.unpack_from("<HH", buffer, offset)
if tag_control in (0b010, 0b011):
raise NotImplementedError("Common profile tag")
if tag_control == 7: # 4 octet tag number
tag_number = struct.unpack_from("<I", buffer, offset + 4)[0]
else:
tag_number = struct.unpack_from("<H", buffer, offset + 4)[0]
if vendor_id:
this_tag = (vendor_id, profile_number, tag_number)
else:
this_tag = tag_number
return this_tag, offset + TAG_LENGTH[tag_control]
class TLVStructure:
def decode_element(control_octet, buffer, offset, depth):
element_type = control_octet & 0x1F
element_category = element_type >> 2
if element_category == 0 or element_category == 1: # ints
member_class = NumberMember
elif element_category == 2: # Bool or float
if element_type & 0x3 <= 1:
member_class = BoolMember
else: # Float
member_class = NumberMember
elif element_type == 0b10100: # Null
member_class = None
elif element_type == ElementType.UTF8_STRING:
member_class = UTF8StringMember
elif element_type == ElementType.OCTET_STRING:
member_class = OctetStringMember
elif element_type == ElementType.STRUCTURE:
member_class = StructMember
elif element_type == ElementType.ARRAY:
member_class = ArrayMember
elif element_type == ElementType.LIST:
member_class = ListMember
if member_class is None:
value = None
offset = offset
else:
result = member_class.decode(control_octet, buffer, offset, depth)
value, offset = result
return value, offset
class Container:
_max_length = None
def __init__(self, buffer: Optional[Buffer] = None):
self.buffer = memoryview(buffer) if buffer is not None else None
# These three dicts are keyed by tag.
self.tag_value_offset = {}
self.null_tags = set()
self.tag_value_length = {}
self.cached_values = {}
self._offset = 0 # Stopped at the next control octet
def __init__(self):
self.values = {}
@classmethod
def max_length(cls):
@ -72,12 +108,35 @@ class TLVStructure:
cls._max_length = sum(member.max_length for _, member in cls._members())
return cls._max_length
@classmethod
def _members(cls) -> Iterable[tuple[str, Member]]:
for field_name, descriptor in vars(cls).items():
if not field_name.startswith("_") and isinstance(descriptor, Member):
yield field_name, descriptor
@classmethod
def _members_by_tag(cls) -> dict[int, tuple[str, Member]]:
if hasattr(cls, "_members_by_tag_cache"):
return cls._members_by_tag_cache
members = {}
for field_name, descriptor in vars(cls).items():
if not field_name.startswith("_") and isinstance(descriptor, Member):
members[descriptor.tag] = (field_name, descriptor)
cls._members_by_tag_cache = members
return members
class Structure(Container):
def __str__(self):
members = []
for field, descriptor_class in self._members():
value = descriptor_class.print(self)
if not value:
continue
value = getattr(self, field) # type: ignore # self inference issues
if value is None:
if descriptor_class.optional:
continue
value = "null"
else:
value = descriptor_class.print(value)
if isinstance(descriptor_class, StructMember):
value = value.replace("\n", "\n ")
members.append(f"{field} = {value}")
@ -94,98 +153,37 @@ class TLVStructure:
return offset
@classmethod
def _members(cls) -> Iterable[tuple[str, Member]]:
for field_name, descriptor in vars(cls).items():
if not field_name.startswith("_") and isinstance(descriptor, Member):
yield field_name, descriptor
def decode(cls, control_octet, buffer, offset=0, depth=0) -> tuple[dict, int]:
values = {}
buffer = memoryview(buffer)
while offset < len(buffer) and buffer[offset] != ElementType.END_OF_CONTAINER:
control_octet = buffer[offset]
this_tag, offset = decode_tag(control_octet, buffer, offset + 1)
value, offset = decode_element(control_octet, buffer, offset, depth + 1)
values[this_tag] = value
def scan_until(self, tag):
if self.buffer is None:
return
while self._offset < len(self.buffer):
control_octet = self.buffer[self._offset]
tag_control = control_octet >> 5
element_type = control_octet & 0x1F
if cls == Structure:
return values, offset
this_tag = None
if tag_control == 0: # Anonymous
this_tag = None
elif tag_control == 1: # Context specific
this_tag = self.buffer[self._offset + 1]
else:
vendor_id = None
profile_number = None
if tag_control >= 6: # Fully qualified
vendor_id, profile_number = struct.unpack_from(
"<HH", self.buffer, self._offset + 1
)
return cls.from_value(values), offset
if tag_control in (0b010, 0b011):
raise NotImplementedError("Common profile tag")
def construct_containers(self):
print("construct_containers")
for name, member_class in self._members():
print(name, member_class)
tag = member_class.tag
if tag not in self.values:
continue
self.values[tag] = member_class.from_value(self.values[tag])
print("replaced", name, self.values[tag])
print("construct_containers done")
if tag_control == 7: # 4 octet tag number
tag_number = struct.unpack_from(
"<I", self.buffer, self._offset + 5
)[0]
else:
tag_number = struct.unpack_from(
"<H", self.buffer, self._offset + 5
)[0]
if vendor_id:
this_tag = (vendor_id, profile_number, tag_number)
else:
this_tag = tag_number
length_offset = self._offset + 1 + TAG_LENGTH[tag_control]
element_category = element_type >> 2
if element_category == 0 or element_category == 1: # ints
value_offset = length_offset
value_length = 1 << (element_type & 0x3)
elif element_category == 2: # Bool or float
if element_type & 0x3 <= 1:
value_offset = self._offset
value_length = 1
else: # Float
value_offset = length_offset
value_length = 4 << (element_type & 0x1)
elif (
element_category == 3 or element_category == 4
): # UTF-8 String or Octet String
power_of_two = element_type & 0x3
length_length = 1 << power_of_two
value_offset = length_offset + length_length
value_length = struct.unpack_from(
INT_SIZE[power_of_two], self.buffer, length_offset
)[0]
elif element_type == 0b10100: # Null
value_offset = self._offset
value_length = 1
self.null_tags.add(this_tag)
else: # Container
value_offset = length_offset
end = find_container_end(self.buffer, value_offset)
value_length = end - value_offset - 1
self.tag_value_offset[this_tag] = value_offset
self.tag_value_length[this_tag] = value_length
# A few values are encoded in the control byte. Move our offset past
# the tag where the length would be in that case.
if self._offset == value_offset:
self._offset = length_offset
else:
self._offset = value_offset + value_length
if element_type in (
ElementType.STRUCTURE,
ElementType.ARRAY,
ElementType.LIST,
):
# One more for the trailing 0x18
self._offset += 1
if tag == this_tag:
break
@classmethod
def from_value(cls, value):
instance = cls()
instance.values = value
instance.construct_containers()
return instance
_T = TypeVar("_T")
@ -233,51 +231,40 @@ class Member(ABC, Generic[_T, _OPT, _NULLABLE]):
self: Union[
Member[_T, Literal[True], _NULLABLE], Member[_T, _OPT, Literal[True]]
],
obj: TLVStructure,
objtype: Optional[Type[TLVStructure]] = None,
obj: Structure,
objtype: Optional[Type[Structure]] = None,
) -> Optional[_T]: ...
@overload
def __get__(
self: Member[_T, Literal[False], Literal[False]],
obj: TLVStructure,
objtype: Optional[Type[TLVStructure]] = None,
obj: Structure,
objtype: Optional[Type[Structure]] = None,
) -> _T: ...
def __get__(self, obj, objtype=None):
if self.tag in obj.cached_values:
return obj.cached_values[self.tag]
if self.tag not in obj.tag_value_offset:
obj.scan_until(self.tag)
if self.tag not in obj.tag_value_offset or self.tag in obj.null_tags:
return self._default
value = self.decode(
obj.buffer,
obj.tag_value_length[self.tag],
offset=obj.tag_value_offset[self.tag],
)
obj.cached_values[self.tag] = value
return value
if self.tag in obj.values:
return obj.values[self.tag]
return self._default
@overload
def __set__(
self: Union[
Member[_T, Literal[True], _NULLABLE], Member[_T, _OPT, Literal[True]]
],
obj: TLVStructure,
obj: Structure,
value: Optional[_T],
) -> None: ...
@overload
def __set__(
self: Member[_T, Literal[False], Literal[False]], obj: TLVStructure, value: _T
self: Member[_T, Literal[False], Literal[False]], obj: Structure, value: _T
) -> None: ...
def __set__(self, obj, value):
if value is None and not self.nullable:
raise ValueError("Not nullable")
obj.cached_values[self.tag] = value
obj.values[self.tag] = value
def encode_into(self, obj: TLVStructure, buffer: bytearray, offset: int) -> int:
def encode_into(self, obj: Container, buffer: bytearray, offset: int) -> int:
value = self.__get__(obj) # type: ignore # self inference issues
element_type = ElementType.NULL
if value is not None:
@ -321,17 +308,11 @@ class Member(ABC, Generic[_T, _OPT, _NULLABLE]):
return new_offset
return offset
def print(self, obj: TLVStructure) -> Optional[str]:
value = self.__get__(obj) # type: ignore # self inference issues
if value is None:
if self.optional:
return None
return "null"
return self._print(value)
@abstractmethod
def decode(self, buffer: memoryview, length: int, offset: int = 0) -> _T:
"Return the decoded value at `offset` in `buffer`"
def decode(
self, control_octet: int, buffer: memoryview, offset: int = 0
) -> (_T, int):
"Return the decoded value at `offset` in `buffer`. `offset` is after the tag (but before any length)"
...
@abstractmethod
@ -365,10 +346,13 @@ class Member(ABC, Generic[_T, _OPT, _NULLABLE]):
...
@abstractmethod
def _print(self, value: _T) -> str:
def print(self, value: _T) -> str:
"Return string representation of `value`"
...
def from_value(cls, value):
return value
# number type
_NT = TypeVar("_NT", float, int)
@ -420,19 +404,27 @@ class NumberMember(Member[_NT, _OPT, _NULLABLE], Generic[_NT, _OPT, _NULLABLE]):
super().__set__(obj, value) # type: ignore # self inference issues
def decode(self, buffer, length, offset=0) -> _NT:
if self.integer:
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0) -> tuple[_NT, int]:
element_type = control_octet & 0x1F
element_category = element_type >> 2
if element_category == 0 or element_category == 1:
length = 1 << (control_octet & 0x3)
encoded_format = INT_SIZE[int(math.log(length, 2))]
if self.format.islower():
if element_category == 0:
encoded_format = encoded_format.lower()
else:
length = 4 << (control_octet & 0x1)
if length == 4:
encoded_format = "<f"
else:
encoded_format = "<d"
return struct.unpack_from(encoded_format, buffer, offset=offset)[0]
return (
struct.unpack_from(encoded_format, buffer, offset=offset)[0],
offset + struct.calcsize(encoded_format),
)
def _print(self, value):
def print(self, value):
unsigned = "" if self.signed else "U"
return f"{value}{unsigned}"
@ -489,7 +481,7 @@ class EnumMember(IntMember):
return self.enum_class(value)
return
def _print(self, value):
def print(self, value):
return self.enum_class(value).name
@ -520,11 +512,11 @@ class FloatMember(NumberMember[float, _OPT, _NULLABLE]):
class BoolMember(Member[bool, _OPT, _NULLABLE]):
max_value_length = 0
def decode(self, buffer, length, offset=0):
octet = buffer[offset]
return octet & 1 == 1
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
return (control_octet & 1 == 1, offset)
def _print(self, value):
def print(self, value):
if value:
return "true"
return "false"
@ -555,7 +547,7 @@ class StringMember(Member[AnyStr, _OPT, _NULLABLE], Generic[AnyStr, _OPT, _NULLA
self.length_length = struct.calcsize(self.length_format)
super().__init__(tag, optional=optional, nullable=nullable, **kwargs)
def _print(self, value):
def print(self, value):
return " ".join((f"{byte:02x}" for byte in value))
def encode_element_type(self, value):
@ -567,28 +559,43 @@ class StringMember(Member[AnyStr, _OPT, _NULLABLE], Generic[AnyStr, _OPT, _NULLA
buffer[offset : offset + len(value)] = value
return offset + len(value)
@staticmethod
def parse_length(control_octet, buffer, offset=0):
element_type = control_octet & 0x1F
power_of_two = element_type & 0x3
length_length = 1 << power_of_two
value_length = struct.unpack_from(INT_SIZE[power_of_two], buffer, offset)[0]
return value_length, offset + length_length
class OctetStringMember(StringMember[bytes, _OPT, _NULLABLE]):
_base_element_type: ElementType = ElementType.OCTET_STRING
def decode(self, buffer, length, offset=0):
return buffer[offset : offset + length].tobytes()
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
length, offset = StringMember.parse_length(control_octet, buffer, offset)
return (buffer[offset : offset + length].tobytes(), offset + length)
class UTF8StringMember(StringMember[str, _OPT, _NULLABLE]):
_base_element_type = ElementType.UTF8_STRING
def decode(self, buffer, length, offset=0):
return buffer[offset : offset + length].tobytes().decode("utf-8")
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
length, offset = StringMember.parse_length(control_octet, buffer, offset)
return (
buffer[offset : offset + length].tobytes().decode("utf-8"),
offset + length,
)
def encode_value_into(self, value: str, buffer, offset) -> int:
return super().encode_value_into(value.encode("utf-8"), buffer, offset)
def _print(self, value):
def print(self, value):
return f'"{value}"'
_TLVStruct = TypeVar("_TLVStruct", bound=TLVStructure)
_TLVStruct = TypeVar("_TLVStruct", bound=Structure)
class StructMember(Member[_TLVStruct, _OPT, _NULLABLE]):
@ -605,10 +612,12 @@ class StructMember(Member[_TLVStruct, _OPT, _NULLABLE]):
self.max_value_length = substruct_class.max_length() + 1
super().__init__(tag, optional=optional, nullable=nullable, **kwargs)
def decode(self, buffer, length, offset=0):
return self.substruct_class(buffer[offset : offset + length])
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
value, offset = Structure.decode(control_octet, buffer, offset, depth)
return value, offset + 1
def _print(self, value):
def print(self, value):
return str(value)
def encode_element_type(self, value):
@ -619,6 +628,9 @@ class StructMember(Member[_TLVStruct, _OPT, _NULLABLE]):
buffer[offset] = ElementType.END_OF_CONTAINER
return offset + 1
def from_value(self, value):
return self.substruct_class.from_value(value)
class ArrayMember(Member[_TLVStruct, _OPT, _NULLABLE]):
def __init__(
@ -634,29 +646,22 @@ class ArrayMember(Member[_TLVStruct, _OPT, _NULLABLE]):
self.max_value_length = 1280
super().__init__(tag, optional=optional, nullable=nullable, **kwargs)
def decode(self, buffer, length, offset=0):
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
entries = []
if isinstance(self.substruct_class, List):
i = 0
while i < length:
if buffer[offset + i] != ElementType.LIST:
raise RuntimeError("Expected list start")
start = offset + i
end = start + 1
while buffer[end] != ElementType.END_OF_CONTAINER:
end += 1
entries.append(self.substruct_class(buffer[start + 1 : end]))
while buffer[offset] != ElementType.END_OF_CONTAINER:
control_octet = buffer[offset]
value, offset = decode_element(control_octet, buffer, offset + 1, depth + 1)
entries.append(value)
return (entries, offset + 1)
i = (end + 1) - offset
return entries
def _print(self, value):
s = ["[["]
def print(self, value):
s = ["["]
items = []
for v in value:
items.append(str(v))
s.append(", ".join(items))
s.append("]]")
s.append("]")
return "".join(s)
def encode_element_type(self, value):
@ -668,61 +673,140 @@ class ArrayMember(Member[_TLVStruct, _OPT, _NULLABLE]):
buffer[offset] = ElementType.END_OF_CONTAINER
return offset + 1
def from_value(self, value):
for i in range(len(value)):
value[i] = self.substruct_class.from_value(value[i])
return value
class ListIterator:
def __init__(self, tlv_list: List):
self.list = tlv_list
self._offset = 0
class List(Container):
def __init__(self):
self.items = []
# items by tag. First occurence wins.
self.values = {}
def __iter__(self):
return self
def __next__(self):
if self._offset >= len(self.list.buffer):
raise StopIteration
next_item = self.list.substruct_class(self.list.buffer)
self._offset = len(self.list.buffer)
return next_item
class List:
def __init__(self, substruct_class: Type[_TLVStruct], buffer=None):
self.buffer = buffer
self.substruct_class = substruct_class
def __call__(self, buffer):
return List(self.substruct_class, buffer)
def _print_struct_members(self, struct):
members = []
for field, descriptor_class in struct._members():
value = descriptor_class.print(struct)
if not value:
continue
if isinstance(descriptor_class, StructMember):
value = value.replace("\n ", " ")
members.append(f"{field} = {value}")
return ", ".join(members)
return iter(self.items)
def __str__(self):
items = []
for v in self:
items.append(self._print_struct_members(v))
return "[[" + ", ".join(items) + "]]"
members = []
member_by_tag = self._members_by_tag()
for item in self.items:
if isinstance(item, tuple):
tag, value = item
if tag in member_by_tag:
name, member = member_by_tag[tag]
else:
name = tag
else:
name = None
value = item
def __iter__(self):
return ListIterator(self)
if member:
value = member.print(value)
if not value:
continue
if isinstance(member, StructMember):
value = value.replace("\n", "\n ")
if name:
members.append(f"{name} = {value}")
else:
members.append(value)
return "[[ " + ", ".join(members) + "]]"
def encode(self) -> memoryview:
buffer = bytearray(self.max_length())
end = self.encode_into(buffer)
return memoryview(buffer)[:end]
def encode_into(self, buffer: bytearray, offset: int = 0) -> int:
member_by_tag = self._members_by_tag()
for item in self.items:
if isinstance(item, tuple):
tag, value = item
if tag in member_by_tag:
name, member = member_by_tag[tag]
else:
raise NotImplementedError("Unknown tag")
offset = member.encode_into(self, buffer, offset)
else:
raise NotImplementedError("Anonymous list member")
return offset
@classmethod
def from_value(cls, value):
instance = cls()
instance.items = value
instance.values = {}
members_by_tag = cls._members_by_tag()
for i, item in enumerate(value):
if isinstance(item, tuple):
tag, value = item
if tag in members_by_tag:
value = members_by_tag[tag][1].from_value(value)
instance.items[i] = (tag, value)
if tag in instance.values:
continue
instance.values[tag] = value
return instance
_TLVList = TypeVar("_TLVList", bound=List)
class ListMember(Member):
def __init__(
self,
tag,
substruct_class: Type[_TLVList],
*,
optional: _OPT = False,
nullable: _NULLABLE = False,
**kwargs,
):
self.substruct_class = substruct_class
self.max_value_length = substruct_class.max_length() + 1
super().__init__(tag, optional=optional, nullable=nullable, **kwargs)
@staticmethod
def decode(control_octet, buffer, offset=0, depth=0):
raw_list = []
while buffer[offset] != ElementType.END_OF_CONTAINER:
control_octet = buffer[offset]
this_tag, offset = decode_tag(control_octet, buffer, offset + 1)
value, offset = decode_element(control_octet, buffer, offset, depth + 1)
if this_tag is None:
raw_list.append(value)
else:
raw_list.append((this_tag, value))
return raw_list, offset + 1
def print(self, value):
return str(value)
def encode_element_type(self, value):
return ElementType.LIST
def encode_value_into(self, value, buffer: bytearray, offset: int) -> int:
offset = value.encode_into(buffer, offset)
buffer[offset] = ElementType.END_OF_CONTAINER
return offset + 1
def from_value(self, value):
return self.substruct_class.from_value(value)
class AnythingMember(Member):
"""Stores a TLV encoded value."""
def decode(self, buffer, length, offset=0):
def decode(self, control_octet, buffer, offset=0):
print(f"anything 0x{control_octet:02x} buffer", buffer[offset:].hex(" "))
return None
def _print(self, value):
return value.hex()
def print(self, value):
return str(value)
def encode_element_type(self, value):
return value[0]

View file

@ -1,15 +1,28 @@
["urandom", 93605477486462, 8, "gF88cM8Fdws="]
["receive", 93609109439051, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "BAAAAG1aTQypZZhQ61R6pwUg7l8AABUwASCAi8eYa+yxOHt1wczNj2sib00cVne73qdB4ACn2EL1tSUCfq0kAwAoBDUFJQH0ASUCLAElA6APJAQRJAULJgYAAAMBJAcBGBg="]
["urandom", 93609138892087, 32, "aI0a5kAMERbOZkdcsYK1fyjy+SUis5vCMr/z5mVbFUI="]
["send", 93609139058692, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AQAAAAwHlwCpZZhQ61R6pwIh7l8AAG1aTQwVMAEggIvHmGvssTh7dcHMzY9rIm9NHFZ3u96nQeAAp9hC9bUwAiBojRrmQAwRFs5mR1yxgrV/KPL5JSKzm8Iyv/PmZVsVQiUDAQA1BCYBECcAADACIObgj9CEx2MyPagRHuoX1OB32N8u1aKUpNKjb4b854YkGBg="]
["receive", 93609145909196, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "BAAAAG5aTQypZZhQ61R6pwUi7l8AABUwAUEECQ1E55Ge/QdwIt9ApvlujnzwNkxWOAQo33o1VjxvjRlgF/gAgGR/qT/nHVjT/Y08DUw2BPybYQ8yiALpkh7cRRg="]
["randbelow", 93609146092151, 115792089210356248762697446949407573529996955224135760342422259061068512044369, 113375977605381650694547482532764789231695354705278667510956348485473112713874]
["send", 93609162243032, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AQAAAA0HlwCpZZhQ61R6pwIj7l8AAG5aTQwVMAFBBJ89SvxsKCYUC4N3JxEmLyV7/2O3E+qYJkuajUaO48PKO42ty4VNcuo9hUtFF7RAAQdGyhHAucKUZNNRHZeRVVIwAiDU1KDBDAD9SI3uEZ6/nqB4EFZhIi7SeVFRvRvruGT7IRg="]
["receive", 93609162813558, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "BAAAAG9aTQypZZhQ61R6pwUk7l8AABUwASBxL8z46GInTI1DJ2HFO9zzGw9FKdZ7aYzPSaNcvwn2nRg="]
["send", 93609162944084, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AQAAAA4HlwCpZZhQ61R6pwJA7l8AAG9aTQwAAAAAAAAAAA=="]
["receive", 93609163134143, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AAEAABIkJQLjhJ21cI7QD9KovB/pAZsmra0uYJpejzRJeboWAbrUor9cDs0AQAkbvSzuEJOhzo47NVtkKqM1fIjL1P0FYWhFEryNZLrgxFuaz9b0V/JrTXAmcdnDslb+r36XCPPIPEJ17Rk0IgA1Hyfi8+z+hZVKjaf9wnJ1GbkNZTs="]
["send", 93609164255729, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AH6tALRcoQ0CBe9fAQASJCUCPC2ZWwg2ID2SHwL84Kb4/ZgcGR/3fJzb83+CE9H38SDdzsAvub8z61zZS4NpJvhMCUczsN1NolqQbAUV7PbQ/CrbIxgU21vOpsh0K6Q+4N/aQKrZG4rOOg2Iu4WH4rPSVdI58ry/f4OiC5eq8+yjeQWWBrgu0N3hwxBcwHpeBVmPXfkX9lwMsZaYoU+UiriKFMWyubmjaDqctl5z3i0yNRuJVFElqUSECY3VvxHaUKFdd8T460u/QTbHZOEPQpA/Mqnnpwe/bT5p0y3P2bCMQrzrl6w+ByABL2RNC52G7r5GTGN0x8rpSvDrZOYa4wfcA1FNFxWqqFvJFXCJ5kjpTKwh2QOAW3zVQugzl9+8RBIB4pS/XLpgaDWoMIEvKw8nlksWbfwGNVFMWnxd5GPOAv3FwQDQt4sRasgZAuC/wRqYKkWeCSsdS/NRuag+ruYX3jF/bnJp2oVisd3asu1VHjyqYCxtjfBs0WtX0cBls2r79XmrtO9qPfltYChvwYKLF6XM21wYBf33rWV/332qoWc35O1MjKz2CtsBBtEv0L0LH2XdgfE18tAm8DhWyhWvxeJ1NkhZM1O7aPBkHZzcnlDHooQMl32wBu7QIAdsuvOg62bGDN/kBLI/vqMu"]
["receive", 93609540233976, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AAEAABIkJQLjhJ21cI7QD9KovB/pAZsmra0uYJpejzRJeboWAbrUor9cDs0AQAkbvSzuEJOhzo47NVtkKqM1fIjL1P0FYWhFEryNZLrgxFuaz9b0V/JrTXAmcdnDslb+r36XCPPIPEJ17Rk0IgA1Hyfi8+z+hZVKjaf9wnJ1GbkNZTs="]
["receive", 93609920720629, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AAEAABIkJQLjhJ21cI7QD9KovB/pAZsmra0uYJpejzRJeboWAbrUor9cDs0AQAkbvSzuEJOhzo47NVtkKqM1fIjL1P0FYWhFEryNZLrgxFuaz9b0V/JrTXAmcdnDslb+r36XCPPIPEJ17Rk0IgA1Hyfi8+z+hZVKjaf9wnJ1GbkNZTs="]
["receive", 93610459358202, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AAEAABIkJQLjhJ21cI7QD9KovB/pAZsmra0uYJpejzRJeboWAbrUor9cDs0AQAkbvSzuEJOhzo47NVtkKqM1fIjL1P0FYWhFEryNZLrgxFuaz9b0V/JrTXAmcdnDslb+r36XCPPIPEJ17Rk0IgA1Hyfi8+z+hZVKjaf9wnJ1GbkNZTs="]
["receive", 93611345336845, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 45610, 0, 0], "AAEAABIkJQLjhJ21cI7QD9KovB/pAZsmra0uYJpejzRJeboWAbrUor9cDs0AQAkbvSzuEJOhzo47NVtkKqM1fIjL1P0FYWhFEryNZLrgxFuaz9b0V/JrTXAmcdnDslb+r36XCPPIPEJ17Rk0IgA1Hyfi8+z+hZVKjaf9wnJ1GbkNZTs="]
["urandom", 160614526879484, 8, "8YsmPKFtlz4="]
["receive", 160618295172071, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "BAAAAEiWhgLtobv+k7S8oAUgwOUAABUwASDACCVgi6wHtD02m6EwComQdn2IFjOtafHq7o3J35TokSUCWBUkAwAoBDUFJQH0ASUCLAElA6APJAQRJAULJgYAAAMBJAcBGBg="]
["urandom", 160618321366957, 32, "es6TR37V0Vs3gU1qjRIv7FpG/HE/v7vaePV733fuYEk="]
["send", 160618321501802, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AQAAAMvRYA7tobv+k7S8oAIhwOUAAEiWhgIVMAEgwAglYIusB7Q9NpuhMAqJkHZ9iBYzrWnx6u6Nyd+U6JEwAiB6zpNHftXRWzeBTWqNEi/sWkb8cT+/u9p49Xvfd+5gSSUDAQA1BCYBECcAADACIObgj9CEx2MyPagRHuoX1OB32N8u1aKUpNKjb4b854YkGBg="]
["receive", 160618328400875, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "BAAAAEmWhgLtobv+k7S8oAUiwOUAABUwAUEEyh96+016RRlMo2cltQnM5jj/5oWQNd3GH7996DLCOLcN+pXEuJZVwIbk5g6FNhAp5/59hkvEvXmR1/+HSkISKBg="]
["randbelow", 160618328500723, 115792089210356248762697446949407573529996955224135760342422259061068512044369, 36493665673707781682760077969958150174565691242371595029269024942496324546290]
["send", 160618343816534, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AQAAAMzRYA7tobv+k7S8oAIjwOUAAEmWhgIVMAFBBFTaW9UkewH4kdP99RTkZ9xiXyVlkkJ8bs3tnCkVx/rU2Itl7wM8XmTIIgOXn+kKf1Z0Fr/LrCocxhCOPfNvlQkwAiDpIXysLp1OXkFoP6drRa6SYqEhyXtF0xAPrDdz8F1ZOxg="]
["receive", 160618344275711, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "BAAAAEqWhgLtobv+k7S8oAUkwOUAABUwASATSI2pTtC+AOxP4yCirp6kEj9Q9begb0tq3w3u4yZyPBg="]
["send", 160618344358517, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AQAAAM3RYA7tobv+k7S8oAJAwOUAAEqWhgIAAAAAAAAAAA=="]
["receive", 160618344531223, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEgYiwIK5lkiVUIKtC0izVHThMjdXagA6dzzfTXmQ+v4qFv7FUIB6GgxuPeemnaeoxmRYWjh623cnnbdHdQCV3xU1+5t7Fs9AGaqklrhuyteB3iw/EKHo/6LewBW937y7G4TM8MHeAw1cEJRamYJ6BEwAhwlxCiufDTdNMXSx3o="]
["send", 160618345504430, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AFgVAPFi8wgJ0GpSMNPuib58Gf5jbw0EsxOqAZ0BO6Xe+2LZhhCHQoauhXDvuqI6/wviLJs6rYy6pYH8//V/vmCeQrm5aabgg3kp2/wDSqHKlFpQ1d5v8ZwVQOKGTrVcjUpzjTiBegcCe2ImAqB2rOXEL79w8TWI3ksg5LUQHFK4sil9OTuurhj2gQmv6rp/c2dZMzL7wbVRfFfNB7REq8S+QEfuAWiEfnJG3nMxvOFKS1/55fhkxXmjwteTrR3VgN/Dl4Ov2i40QRvSO7eSA3KoQsg7ZOozXomaEbtZgmuaDU1rUG0lt/jqgkK+T3CwMp651gAhEsXUrAiU45LmjPaX2FYjc643mBwkIArLfAjIQMpB6nJ31fEtv3tnM4W3SpBKksqLIzq/4HLasW2gtvKEAOkiPMXoITaghkdDcfzaaY22yoVPqg6Z/pD51PoLqznH6UDLWTdFsixJxt9rutVHvJYXMSqmOPcOUk4OwiU4Y0WWYFrGaY5+a5R7AKSwSIvVidtD7+RNAt57hzbMRuuPxzhCTSPNRBWn6UVgVzUgds5eTLzget1dO7XBCGvFHF93hqqkDZkofHeUrxXMhEwDjnRZqs0TTvGbZoXWtfe/OFSw7FyY0BpGwMR9cXsLAmFEeQAIYjhXQ7ifpwqZ"]
["receive", 160618345613566, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEkYiwJ4b6vh+FvbVDqyiHBT68yOoi8m24rlNj6/pc61Ji4="]
["receive", 160618345708054, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEoYiwLICJ1qK0KIOVY4fZhkDFM1br8s2AIMnuB3LlNNOtpx06Px1NU1LITijZmuwfuKd+s55lfeHUN4Eq7VaZ3TaMm/YMZcOugVztZfPD8Z6g=="]
["send", 160618346064587, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AFgVAPJi8whT/1rJ8+/PmPLtfJU+gfqvKxRqoxXHVrVNwSQRtT8KNcHG7AUB4GXmqQNkIespcsaB5YdJ0vx5pZRlPdat/hgnL7IMHs5oQpILupXH0Mk0wemG9cpxYoHU"]
["receive", 160618346162823, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEsYiwIEbVnLlyzaGClk4HiarUKr2gOzdREtt061v4dxrWU="]
["receive", 160618346248334, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEwYiwJOXgbJhUfASZelDvyYFl81ZlpiirdsKSFhwvOLGVe621V88HpP5DMT9fDJDu+9uNahj3yrLZqBw44="]
["receive", 160618706956191, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEwYiwJOXgbJhUfASZelDvyYFl81ZlpiirdsKSFhwvOLGVe621V88HpP5DMT9fDJDu+9uNahj3yrLZqBw44="]
["receive", 160618736081048, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEsYiwIEbVnLlyzaGClk4HiarUKr2gOzdREtt061v4dxrWU="]
["receive", 160618745189903, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEkYiwJ4b6vh+FvbVDqyiHBT68yOoi8m24rlNj6/pc61Ji4="]
["receive", 160619063600537, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEwYiwJOXgbJhUfASZelDvyYFl81ZlpiirdsKSFhwvOLGVe621V88HpP5DMT9fDJDu+9uNahj3yrLZqBw44="]
["receive", 160619108728162, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEsYiwIEbVnLlyzaGClk4HiarUKr2gOzdREtt061v4dxrWU="]
["receive", 160619143844829, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEkYiwJ4b6vh+FvbVDqyiHBT68yOoi8m24rlNj6/pc61Ji4="]
["receive", 160619645443106, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEwYiwJOXgbJhUfASZelDvyYFl81ZlpiirdsKSFhwvOLGVe621V88HpP5DMT9fDJDu+9uNahj3yrLZqBw44="]
["receive", 160619729615633, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEsYiwIEbVnLlyzaGClk4HiarUKr2gOzdREtt061v4dxrWU="]
["receive", 160619772725940, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEkYiwJ4b6vh+FvbVDqyiHBT68yOoi8m24rlNj6/pc61Ji4="]
["receive", 160620573635719, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEwYiwJOXgbJhUfASZelDvyYFl81ZlpiirdsKSFhwvOLGVe621V88HpP5DMT9fDJDu+9uNahj3yrLZqBw44="]
["receive", 160620635781868, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEsYiwIEbVnLlyzaGClk4HiarUKr2gOzdREtt061v4dxrWU="]
["receive", 160620722965760, ["fd98:bbab:bd61:8040:642:1aff:fe0c:9f2a", 43382, 0, 0], "AAEAAEkYiwJ4b6vh+FvbVDqyiHBT68yOoi8m24rlNj6/pc61Ji4="]

View file

@ -18,18 +18,18 @@ from circuitmatter import tlv
# 09
class Bool(tlv.TLVStructure):
class Bool(tlv.Structure):
b = tlv.BoolMember(None)
class TestBool:
def test_bool_false_decode(self):
s = Bool(b"\x08")
s, _ = Bool.decode(0x15, b"\x08\x18")
assert str(s) == "{\n b = false\n}"
assert s.b is False
def test_bool_true_decode(self):
s = Bool(b"\x09")
s, _ = Bool.decode(0x15, b"\x09\x18")
assert str(s) == "{\n b = true\n}"
assert s.b is True
@ -44,19 +44,19 @@ class TestBool:
assert s.encode().tobytes() == b"\x09"
class SignedIntOneOctet(tlv.TLVStructure):
class SignedIntOneOctet(tlv.Structure):
i = tlv.NumberMember(None, "b")
class SignedIntTwoOctet(tlv.TLVStructure):
class SignedIntTwoOctet(tlv.Structure):
i = tlv.NumberMember(None, "h")
class SignedIntFourOctet(tlv.TLVStructure):
class SignedIntFourOctet(tlv.Structure):
i = tlv.NumberMember(None, "i")
class SignedIntEightOctet(tlv.TLVStructure):
class SignedIntEightOctet(tlv.Structure):
i = tlv.NumberMember(None, "q")
@ -72,27 +72,27 @@ class SignedIntEightOctet(tlv.TLVStructure):
# 03 00 90 2f 50 09 00 00 00
class TestSignedInt:
def test_signed_int_42_decode(self):
s = SignedIntOneOctet(b"\x00\x2a")
s, _ = SignedIntOneOctet.decode(0x15, b"\x00\x2a")
assert str(s) == "{\n i = 42\n}"
assert s.i == 42
def test_signed_int_negative_17_decode(self):
s = SignedIntOneOctet(b"\x00\xef")
s, _ = SignedIntOneOctet.decode(0x15, b"\x00\xef")
assert str(s) == "{\n i = -17\n}"
assert s.i == -17
def test_signed_int_42_two_octet_decode(self):
s = SignedIntTwoOctet(b"\x01\x2a\x00")
s, _ = SignedIntTwoOctet.decode(0x15, b"\x01\x2a\x00")
assert str(s) == "{\n i = 42\n}"
assert s.i == 42
def test_signed_int_negative_170000_decode(self):
s = SignedIntFourOctet(b"\x02\xf0\x67\xfd\xff")
s, _ = SignedIntFourOctet.decode(0x15, b"\x02\xf0\x67\xfd\xff")
assert str(s) == "{\n i = -170000\n}"
assert s.i == -170000
def test_signed_int_40000000000_decode(self):
s = SignedIntEightOctet(b"\x03\x00\x90\x2f\x50\x09\x00\x00\x00")
s, _ = SignedIntEightOctet.decode(0x15, b"\x03\x00\x90\x2f\x50\x09\x00\x00\x00")
assert str(s) == "{\n i = 40000000000\n}"
assert s.i == 40000000000
@ -131,7 +131,7 @@ class TestSignedInt:
],
)
def test_bounds_checks(self, octets, lower, upper):
class SignedIntStruct(tlv.TLVStructure):
class SignedIntStruct(tlv.Structure):
i = tlv.IntMember(None, signed=True, octets=octets)
s = SignedIntStruct()
@ -146,7 +146,7 @@ class TestSignedInt:
s.i = upper
class UnsignedIntOneOctet(tlv.TLVStructure):
class UnsignedIntOneOctet(tlv.Structure):
i = tlv.NumberMember(None, "B")
@ -154,7 +154,7 @@ class UnsignedIntOneOctet(tlv.TLVStructure):
# 04 2a
class TestUnsignedInt:
def test_unsigned_int_42_decode(self):
s = UnsignedIntOneOctet(b"\x04\x2a")
s, _ = UnsignedIntOneOctet.decode(0x15, b"\x04\x2a")
assert str(s) == "{\n i = 42U\n}"
assert s.i == 42
@ -173,7 +173,7 @@ class TestUnsignedInt:
],
)
def test_bounds_checks(self, octets, lower, upper):
class UnsignedIntStruct(tlv.TLVStructure):
class UnsignedIntStruct(tlv.Structure):
i = tlv.IntMember(None, signed=False, octets=octets)
s = UnsignedIntStruct()
@ -193,13 +193,13 @@ class TestUnsignedInt:
s.i = v
buffer = s.encode().tobytes()
s2 = UnsignedIntOneOctet(buffer)
s2, _ = UnsignedIntOneOctet.decode(0x15, buffer)
assert s2.i == s.i
assert str(s2) == str(s)
def test_nullability(self):
class Struct(tlv.TLVStructure):
class Struct(tlv.Structure):
i = tlv.IntMember(None)
ni = tlv.IntMember(None, nullable=True)
@ -218,18 +218,18 @@ class TestUnsignedInt:
# 0c 06 48 65 6c 6c 6f 21
# UTF-8 String, 1-octet length, "Tschüs"
# 0c 07 54 73 63 68 c3 bc 73
class UTF8StringOneOctet(tlv.TLVStructure):
class UTF8StringOneOctet(tlv.Structure):
s = tlv.UTF8StringMember(None, 16)
class TestUTF8String:
def test_utf8_string_hello_decode(self):
s = UTF8StringOneOctet(b"\x0c\x06Hello!")
s, _ = UTF8StringOneOctet.decode(0x15, b"\x0c\x06Hello!")
assert str(s) == '{\n s = "Hello!"\n}'
assert s.s == "Hello!"
def test_utf8_string_tschs_decode(self):
s = UTF8StringOneOctet(b"\x0c\x07Tsch\xc3\xbcs")
s, _ = UTF8StringOneOctet.decode(0x15, b"\x0c\x07Tsch\xc3\xbcs")
assert str(s) == '{\n s = "Tschüs"\n}'
assert s.s == "Tschüs"
@ -249,7 +249,7 @@ class TestUTF8String:
s.s = v
buffer = s.encode().tobytes()
s2 = UTF8StringOneOctet(buffer)
s2, _ = UTF8StringOneOctet.decode(0x15, buffer)
assert s2.s == s.s
assert str(s2) == str(s)
@ -257,13 +257,13 @@ class TestUTF8String:
# Octet String, 1-octet length, octets 00 01 02 03 04
# encoded: 10 05 00 01 02 03 04
class OctetStringOneOctet(tlv.TLVStructure):
class OctetStringOneOctet(tlv.Structure):
s = tlv.OctetStringMember(None, 16)
class TestOctetString:
def test_octet_string_decode(self):
s = OctetStringOneOctet(b"\x10\x05\x00\x01\x02\x03\x04")
s, _ = OctetStringOneOctet.decode(0x15, b"\x10\x05\x00\x01\x02\x03\x04")
assert str(s) == "{\n s = 00 01 02 03 04\n}"
assert s.s == b"\x00\x01\x02\x03\x04"
@ -278,7 +278,7 @@ class TestOctetString:
s.s = v
buffer = s.encode().tobytes()
s2 = OctetStringOneOctet(buffer)
s2, _ = OctetStringOneOctet.decode(0x15, buffer)
assert s2.s == s.s
assert str(s2) == str(s)
@ -288,18 +288,18 @@ class TestOctetString:
# 14
class Null(tlv.TLVStructure):
class Null(tlv.Structure):
n = tlv.BoolMember(None, nullable=True)
class NotNull(tlv.TLVStructure):
class NotNull(tlv.Structure):
n = tlv.BoolMember(None, nullable=True)
b = tlv.BoolMember(None)
class TestNull:
def test_null_decode(self):
s = Null(b"\x14")
s, _ = Null.decode(0x15, b"\x14")
assert str(s) == "{\n n = null\n}"
assert s.n is None
@ -337,38 +337,38 @@ class TestNull:
# 0b 00 00 00 00 00 00 f0 7f
# Double precision floating point negative infinity 0b 00 00 00 00 00 00 f0 ff
# (-∞)
class FloatSingle(tlv.TLVStructure):
class FloatSingle(tlv.Structure):
f = tlv.FloatMember(None)
class FloatDouble(tlv.TLVStructure):
class FloatDouble(tlv.Structure):
f = tlv.FloatMember(None, octets=8)
class TestFloatSingle:
def test_precision_float_0_0_decode(self):
s = FloatSingle(b"\x0a\x00\x00\x00\x00")
s, _ = FloatSingle.decode(0x15, b"\x0a\x00\x00\x00\x00")
assert str(s) == "{\n f = 0.0\n}"
assert s.f == 0.0
def test_precision_float_1_3_decode(self):
s = FloatSingle(b"\x0a\xab\xaa\xaa\x3e")
s, _ = FloatSingle.decode(0x15, b"\x0a\xab\xaa\xaa\x3e")
# assert str(s) == "{\n f = 0.3333333432674408\n}"
f = s.f
assert math.isclose(f, 1.0 / 3.0, rel_tol=1e-06)
def test_precision_float_17_9_decode(self):
s = FloatSingle(b"\x0a\x33\x33\x8f\x41")
s, _ = FloatSingle.decode(0x15, b"\x0a\x33\x33\x8f\x41")
assert str(s) == "{\n f = 17.899999618530273\n}"
assert math.isclose(s.f, 17.9, rel_tol=1e-06)
def test_precision_float_infinity_decode(self):
s = FloatSingle(b"\x0a\x00\x00\x80\x7f")
s, _ = FloatSingle.decode(0x15, b"\x0a\x00\x00\x80\x7f")
assert str(s) == "{\n f = inf\n}"
assert math.isinf(s.f)
def test_precision_float_negative_infinity_decode(self):
s = FloatSingle(b"\x0a\x00\x00\x80\xff")
s, _ = FloatSingle.decode(0x15, b"\x0a\x00\x00\x80\xff")
assert str(s) == "{\n f = -inf\n}"
assert math.isinf(s.f)
@ -403,7 +403,7 @@ class TestFloatSingle:
s.f = v
buffer = s.encode().tobytes()
s2 = FloatDouble(buffer)
s2, _ = FloatDouble.decode(0x15, buffer)
assert (
(math.isnan(s.f) and math.isnan(s2.f))
@ -425,7 +425,7 @@ class TestFloatSingle:
s.f = v
buffer = s.encode().tobytes()
s2 = FloatSingle(buffer)
s2, _ = FloatSingle.decode(0x15, buffer)
assert (math.isnan(s.f) and math.isnan(s2.f)) or math.isclose(
s2.f, s.f, rel_tol=1e-7, abs_tol=1e-9
@ -434,28 +434,28 @@ class TestFloatSingle:
class TestFloatDouble:
def test_precision_float_0_0_decode(self):
s = FloatDouble(b"\x0b\x00\x00\x00\x00\x00\x00\x00\x00")
s, _ = FloatDouble.decode(0x15, b"\x0b\x00\x00\x00\x00\x00\x00\x00\x00")
assert str(s) == "{\n f = 0.0\n}"
assert s.f == 0.0
def test_precision_float_1_3_decode(self):
s = FloatDouble(b"\x0b\x55\x55\x55\x55\x55\x55\xd5\x3f")
s, _ = FloatDouble.decode(0x15, b"\x0b\x55\x55\x55\x55\x55\x55\xd5\x3f")
# assert str(s) == "{\n f = 0.3333333333333333\n}"
f = s.f
assert math.isclose(f, 1.0 / 3.0, rel_tol=1e-06)
def test_precision_float_17_9_decode(self):
s = FloatDouble(b"\x0b\x66\x66\x66\x66\x66\xe6\x31\x40")
s, _ = FloatDouble.decode(0x15, b"\x0b\x66\x66\x66\x66\x66\xe6\x31\x40")
assert str(s) == "{\n f = 17.9\n}"
assert math.isclose(s.f, 17.9, rel_tol=1e-06)
def test_precision_float_infinity_decode(self):
s = FloatDouble(b"\x0b\x00\x00\x00\x00\x00\x00\xf0\x7f")
s, _ = FloatDouble.decode(0x15, b"\x0b\x00\x00\x00\x00\x00\x00\xf0\x7f")
assert str(s) == "{\n f = inf\n}"
assert math.isinf(s.f)
def test_precision_float_negative_infinity_decode(self):
s = FloatDouble(b"\x0b\x00\x00\x00\x00\x00\x00\xf0\xff")
s, _ = FloatDouble.decode(0x15, b"\x0b\x00\x00\x00\x00\x00\x00\xf0\xff")
assert str(s) == "{\n f = -inf\n}"
assert math.isinf(s.f)
@ -490,7 +490,7 @@ class TestFloatDouble:
s.f = v
buffer = s.encode().tobytes()
s2 = FloatDouble(buffer)
s2, _ = FloatDouble.decode(0x15, buffer)
assert (
(math.isnan(s.f) and math.isnan(s2.f))
@ -500,18 +500,18 @@ class TestFloatDouble:
)
class InnerStruct(tlv.TLVStructure):
class InnerStruct(tlv.Structure):
a = tlv.IntMember(0, signed=True, optional=True, octets=4)
b = tlv.IntMember(1, signed=True, optional=True, octets=4)
class OuterStruct(tlv.TLVStructure):
class OuterStruct(tlv.Structure):
s = tlv.StructMember(None, InnerStruct)
class TestStruct:
def test_inner_struct_decode(self):
s = OuterStruct(b"\x15\x20\x00\x2a\x20\x01\xef\x18")
s, _ = OuterStruct.decode(0x15, b"\x15\x20\x00\x2a\x20\x01\xef\x18")
assert_type(s, OuterStruct)
assert_type(s.s, InnerStruct)
assert_type(s.s.a, Optional[int])
@ -520,8 +520,8 @@ class TestStruct:
assert s.s.b == -17
def test_inner_struct_decode_empty(self):
s = OuterStruct(b"\x15\x18")
assert str(s) == "{\n s = {\n a = null,\n b = null\n }\n}"
s, _ = OuterStruct.decode(0x15, b"\x15\x18")
assert str(s) == "{\n s = {\n \n }\n}"
assert s.s.a is None
assert s.s.b is None
@ -542,15 +542,16 @@ class TestStruct:
assert s.encode().tobytes() == b"\x15\x18"
class FullyQualified(tlv.TLVStructure):
class FullyQualified(tlv.Structure):
a = tlv.IntMember((0xADA, 0xF00, 0x123), signed=True, optional=True, octets=4)
b = tlv.IntMember((0xADA, 0xF00, 0x12345), signed=True, optional=True, octets=4)
class TestFullyQualifiedTags:
def test_decode(self):
s = FullyQualified(
b"\xc2\xda\x0a\x00\x0f\x23\x01\x2a\x00\x00\x00\xe2\xda\x0a\x00\x0f\x45\x23\x01\x00\xef\xff\xff\xff"
s, _ = FullyQualified.decode(
0x15,
b"\xc2\xda\x0a\x00\x0f\x23\x01\x2a\x00\x00\x00\xe2\xda\x0a\x00\x0f\x45\x23\x01\x00\xef\xff\xff\xff",
)
assert_type(s, FullyQualified)
assert_type(s.a, Optional[int])