455 lines
17 KiB
Python
455 lines
17 KiB
Python
# The MIT License (MIT)
|
|
#
|
|
# Copyright (c) 2020 Jim Bennett
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
# in the Software without restriction, including without limitation the rights
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
# furnished to do so, subject to the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be included in
|
|
# all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
# THE SOFTWARE.
|
|
"""
|
|
`iot_mqtt`
|
|
=====================
|
|
|
|
An MQTT client for Azure IoT
|
|
|
|
* Author(s): Jim Bennett, Elena Horton
|
|
"""
|
|
|
|
import gc
|
|
import json
|
|
import time
|
|
import adafruit_minimqtt as minimqtt
|
|
from adafruit_minimqtt import MQTT
|
|
import adafruit_logging as logging
|
|
from .iot_error import IoTError
|
|
from .keys import compute_derived_symmetric_key
|
|
from .quote import quote
|
|
from . import constants
|
|
|
|
# pylint: disable=R0903
|
|
class IoTResponse:
|
|
"""A response from a direct method call
|
|
"""
|
|
|
|
def __init__(self, code: int, message: str):
|
|
"""Creates an IoT Response object
|
|
:param int code: The HTTP response code for this method call, for example 200 if the method was handled successfully
|
|
:param str message: The HTTP response message for this method call
|
|
"""
|
|
self.response_code = code
|
|
self.response_message = message
|
|
|
|
|
|
class IoTMQTTCallback:
|
|
"""An interface for classes that can be called by MQTT events
|
|
"""
|
|
|
|
def message_sent(self, data: str) -> None:
|
|
"""Called when a message is sent to the cloud
|
|
:param str data: The data send with the message
|
|
"""
|
|
|
|
def connection_status_change(self, connected: bool) -> None:
|
|
"""Called when the connection status changes
|
|
:param bool connected: True if the device is connected, otherwise false
|
|
"""
|
|
|
|
# pylint: disable=W0613, R0201
|
|
def direct_method_invoked(self, method_name: str, payload: str) -> IoTResponse:
|
|
"""Called when a direct method is invoked
|
|
:param str method_name: The name of the method that was invoked
|
|
:param str payload: The payload with the message
|
|
:returns: A response with a code and status to show if the method was correctly handled
|
|
:rtype: IoTResponse
|
|
"""
|
|
return IoTResponse(200, "")
|
|
|
|
# pylint: disable=C0103
|
|
def cloud_to_device_message_received(self, body: str, properties: dict) -> None:
|
|
"""Called when a cloud to device message is received
|
|
:param str body: The body of the message
|
|
:param dict properties: The propreties sent with the mesage
|
|
"""
|
|
|
|
def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None:
|
|
"""Called when the device twin desired properties are updated
|
|
:param str desired_property_name: The name of the desired property that was updated
|
|
:param desired_property_value: The value of the desired property that was updated
|
|
:param int desired_version: The version of the desired property that was updated
|
|
"""
|
|
|
|
def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None:
|
|
"""Called when the device twin reported values are updated
|
|
:param str reported_property_name: The name of the reported property that was updated
|
|
:param reported_property_value: The value of the reported property that was updated
|
|
:param int reported_version: The version of the reported property that was updated
|
|
"""
|
|
|
|
|
|
# pylint: disable=R0902
|
|
class IoTMQTT:
|
|
"""MQTT client for Azure IoT
|
|
"""
|
|
|
|
def _gen_sas_token(self) -> str:
|
|
token_expiry = int(time.time() + self._token_expires)
|
|
uri = self._hostname + "%2Fdevices%2F" + self._device_id
|
|
signed_hmac_sha256 = compute_derived_symmetric_key(self._key, uri + "\n" + str(token_expiry))
|
|
signature = quote(signed_hmac_sha256, "~()*!.'")
|
|
if signature.endswith("\n"): # somewhere along the crypto chain a newline is inserted
|
|
signature = signature[:-1]
|
|
token = "SharedAccessSignature sr={}&sig={}&se={}".format(uri, signature, token_expiry)
|
|
return token
|
|
|
|
def _create_mqtt_client(self) -> None:
|
|
minimqtt.set_socket(self._socket, self._iface)
|
|
|
|
self._mqtts = MQTT(
|
|
broker=self._hostname,
|
|
username=self._username,
|
|
password=self._passwd,
|
|
port=8883,
|
|
keep_alive=120,
|
|
is_ssl=True,
|
|
client_id=self._device_id,
|
|
log=True,
|
|
)
|
|
|
|
self._mqtts.logger.setLevel(self._logger.getEffectiveLevel())
|
|
|
|
# set actions to take throughout connection lifecycle
|
|
self._mqtts.on_connect = self._on_connect
|
|
self._mqtts.on_log = self._on_log
|
|
self._mqtts.on_publish = self._on_publish
|
|
self._mqtts.on_disconnect = self._on_disconnect
|
|
|
|
# initiate the connection using the adafruit_minimqtt library
|
|
self._mqtts.connect()
|
|
|
|
# pylint: disable=C0103, W0613
|
|
def _on_connect(self, client, userdata, _, rc) -> None:
|
|
self._logger.info("- iot_mqtt :: _on_connect :: rc = " + str(rc) + ", userdata = " + str(userdata))
|
|
if rc == 0:
|
|
self._mqtt_connected = True
|
|
self._auth_response_received = True
|
|
self._callback.connection_status_change(True)
|
|
|
|
# pylint: disable=C0103, W0613
|
|
def _on_log(self, client, userdata, level, buf) -> None:
|
|
self._logger.info("mqtt-log : " + buf)
|
|
if level <= 8:
|
|
self._logger.error("mqtt-log : " + buf)
|
|
|
|
def _on_disconnect(self, client, userdata, rc) -> None:
|
|
self._logger.info("- iot_mqtt :: _on_disconnect :: rc = " + str(rc))
|
|
self._auth_response_received = True
|
|
|
|
if rc == 5:
|
|
self._logger.error("on(disconnect) : Not authorized")
|
|
self.disconnect()
|
|
|
|
if rc == 1:
|
|
self._mqtt_connected = False
|
|
|
|
if rc != 5:
|
|
self._callback.connection_status_change(False)
|
|
|
|
def _on_publish(self, client, data, topic, msg_id) -> None:
|
|
self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic))
|
|
|
|
# pylint: disable=W0703
|
|
def _handle_device_twin_update(self, client, topic: str, msg: str) -> None:
|
|
self._logger.debug("- iot_mqtt :: _echo_desired :: " + topic)
|
|
twin = None
|
|
desired = None
|
|
|
|
try:
|
|
twin = json.loads(msg)
|
|
except json.JSONDecodeError as e:
|
|
self._logger.error("ERROR: JSON parse for Device Twin message object has failed. => " + msg + " => " + str(e))
|
|
return
|
|
|
|
if "reported" in twin:
|
|
reported = twin["reported"]
|
|
|
|
if "$version" in reported:
|
|
reported_version = reported["$version"]
|
|
reported.pop("$version")
|
|
else:
|
|
self._logger.error("ERROR: Unexpected payload for reported twin update => " + msg)
|
|
return
|
|
|
|
for property_name, value in reported.items():
|
|
self._callback.device_twin_reported_updated(property_name, value, reported_version)
|
|
|
|
is_patch = "desired" not in twin
|
|
|
|
if is_patch:
|
|
desired = twin
|
|
else:
|
|
desired = twin["desired"]
|
|
|
|
if "$version" in desired:
|
|
desired_version = desired["$version"]
|
|
desired.pop("$version")
|
|
else:
|
|
self._logger.error("ERROR: Unexpected payload for desired twin update => " + msg)
|
|
return
|
|
|
|
for property_name, value in desired.items():
|
|
self._callback.device_twin_desired_updated(property_name, value, desired_version)
|
|
|
|
def _handle_direct_method(self, client, topic: str, msg: str) -> None:
|
|
index = topic.find("$rid=")
|
|
method_id = 1
|
|
method_name = "None"
|
|
if index == -1:
|
|
self._logger.error("ERROR: C2D doesn't include topic id")
|
|
else:
|
|
method_id = topic[index + 5 :]
|
|
topic_template = "$iothub/methods/POST/"
|
|
len_temp = len(topic_template)
|
|
method_name = topic[len_temp : topic.find("/", len_temp + 1)]
|
|
|
|
ret = self._callback.direct_method_invoked(method_name, msg)
|
|
gc.collect()
|
|
|
|
ret_code = 200
|
|
ret_message = "{}"
|
|
if ret.response_code is not None:
|
|
ret_code = ret.response_code
|
|
if ret.response_message is not None:
|
|
ret_message = ret.response_message
|
|
|
|
# ret message must be JSON
|
|
if not ret_message.startswith("{") or not ret_message.endswith("}"):
|
|
ret_json = {"Value": ret_message}
|
|
ret_message = json.dumps(ret_json)
|
|
|
|
next_topic = "$iothub/methods/res/{}/?$rid={}".format(ret_code, method_id)
|
|
self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name)
|
|
self._send_common(next_topic, ret_message)
|
|
|
|
def _handle_cloud_to_device_message(self, client, topic: str, msg: str) -> None:
|
|
parts = topic.split("&")[1:]
|
|
|
|
properties = {}
|
|
for part in parts:
|
|
key_value = part.split("=")
|
|
properties[key_value[0]] = key_value[1]
|
|
|
|
self._callback.cloud_to_device_message_received(msg, properties)
|
|
gc.collect()
|
|
|
|
def _send_common(self, topic: str, data) -> None:
|
|
# Convert data to a string
|
|
if isinstance(data, dict):
|
|
data = json.dumps(data)
|
|
|
|
if not isinstance(data, str):
|
|
raise IoTError("Data must be a string or a dictionary")
|
|
|
|
self._logger.debug("Sending message on topic: " + topic)
|
|
self._logger.debug("Sending message: " + str(data))
|
|
|
|
retry = 0
|
|
|
|
while True:
|
|
gc.collect()
|
|
try:
|
|
self._logger.debug("Trying to send...")
|
|
self._mqtts.publish(topic, data)
|
|
self._logger.debug("Data sent")
|
|
break
|
|
except RuntimeError as runtime_error:
|
|
self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
|
|
retry = retry + 1
|
|
|
|
if retry >= 10:
|
|
self._logger.error("Failed to send data")
|
|
raise
|
|
|
|
time.sleep(0.5)
|
|
continue
|
|
|
|
gc.collect()
|
|
|
|
def _get_device_settings(self) -> None:
|
|
self._logger.info("- iot_mqtt :: _get_device_settings :: ")
|
|
self.loop()
|
|
self._send_common("$iothub/twin/GET/?$rid=0", " ")
|
|
|
|
# pylint: disable=R0913
|
|
def __init__(
|
|
self,
|
|
callback: IoTMQTTCallback,
|
|
socket,
|
|
iface,
|
|
hostname: str,
|
|
device_id: str,
|
|
key: str,
|
|
token_expires: int = 21600,
|
|
logger: logging = None,
|
|
):
|
|
"""Create the Azure IoT MQTT client
|
|
:param IoTMQTTCallback callback: A callback class
|
|
:param socket: The socket to communicate over
|
|
:param iface: The network interface to communicate over
|
|
:param str hostname: The hostname of the MQTT broker to connect to, get this by registering the device
|
|
:param str device_id: The device ID of the device to register
|
|
:param str key: The primary or secondary key of the device to register
|
|
:param int token_expires: The number of seconds till the token expires, defaults to 6 hours
|
|
:param adafruit_logging logger: The logger
|
|
"""
|
|
self._callback = callback
|
|
self._socket = socket
|
|
self._iface = iface
|
|
self._mqtt_connected = False
|
|
self._auth_response_received = False
|
|
self._mqtts = None
|
|
self._device_id = device_id
|
|
self._hostname = hostname
|
|
self._key = key
|
|
self._token_expires = token_expires
|
|
self._username = "{}/{}/api-version={}".format(self._hostname, device_id, constants.IOTC_API_VERSION)
|
|
self._passwd = self._gen_sas_token()
|
|
self._logger = logger if logger is not None else logging.getLogger("log")
|
|
self._is_subscribed_to_twins = False
|
|
|
|
def _subscribe_to_core_topics(self):
|
|
device_bound_topic = "devices/{}/messages/devicebound/#".format(self._device_id)
|
|
self._mqtts.add_topic_callback(device_bound_topic, self._handle_cloud_to_device_message)
|
|
self._mqtts.subscribe(device_bound_topic)
|
|
|
|
self._mqtts.add_topic_callback("$iothub/methods/#", self._handle_direct_method)
|
|
self._mqtts.subscribe("$iothub/methods/#")
|
|
|
|
def _subscribe_to_twin_topics(self):
|
|
self._mqtts.add_topic_callback("$iothub/twin/PATCH/properties/desired/#", self._handle_device_twin_update)
|
|
self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#") # twin desired property changes
|
|
|
|
self._mqtts.add_topic_callback("$iothub/twin/res/200/#", self._handle_device_twin_update)
|
|
self._mqtts.subscribe("$iothub/twin/res/200/#") # twin properties response
|
|
|
|
def connect(self) -> bool:
|
|
"""Connects to the MQTT broker
|
|
:returns: True if the connection is successful, otherwise False
|
|
:rtype: bool
|
|
"""
|
|
self._logger.info("- iot_mqtt :: connect :: " + self._hostname)
|
|
|
|
self._create_mqtt_client()
|
|
|
|
self._logger.info(" - iot_mqtt :: connect :: created mqtt client. connecting..")
|
|
while self._auth_response_received is None:
|
|
self.loop()
|
|
|
|
self._logger.info(" - iot_mqtt :: connect :: on_connect must be fired. Connected ? " + str(self.is_connected()))
|
|
if not self.is_connected():
|
|
return False
|
|
|
|
self._mqtt_connected = True
|
|
self._auth_response_received = True
|
|
|
|
self._subscribe_to_core_topics()
|
|
|
|
return True
|
|
|
|
def subscribe_to_twins(self) -> None:
|
|
"""Subscribes to digital twin updates
|
|
Only call this if your tier of IoT Hub supports this
|
|
"""
|
|
if self._is_subscribed_to_twins:
|
|
return
|
|
|
|
# do this separately as this is not supported in B1 hubs
|
|
self._subscribe_to_twin_topics()
|
|
|
|
self._get_device_settings()
|
|
|
|
self._is_subscribed_to_twins = True
|
|
|
|
def disconnect(self) -> None:
|
|
"""Disconnects from the MQTT broker
|
|
"""
|
|
if not self.is_connected():
|
|
return
|
|
|
|
self._logger.info("- iot_mqtt :: disconnect :: ")
|
|
self._mqtt_connected = False
|
|
self._mqtts.disconnect()
|
|
|
|
def reconnect(self) -> None:
|
|
"""Reconnects to the MQTT broker
|
|
"""
|
|
self._logger.info("- iot_mqtt :: reconnect :: ")
|
|
|
|
self._mqtts.reconnect()
|
|
|
|
def is_connected(self) -> bool:
|
|
"""Gets if there is an open connection to the MQTT broker
|
|
:returns: True if there is an open connection, False if not
|
|
:rtype: bool
|
|
"""
|
|
return self._mqtt_connected
|
|
|
|
def loop(self) -> None:
|
|
"""Listens for MQTT messages
|
|
"""
|
|
if not self.is_connected():
|
|
return
|
|
|
|
self._mqtts.loop()
|
|
gc.collect()
|
|
|
|
def send_device_to_cloud_message(self, message, system_properties: dict = None) -> None:
|
|
"""Send a device to cloud message from this device to Azure IoT Hub
|
|
:param message: The message data as a JSON string or a dictionary
|
|
:param system_properties: System properties to send with the message
|
|
:raises: ValueError if the message is not a string or dictionary
|
|
:raises RuntimeError: if the internet connection is not responding or is unable to connect
|
|
"""
|
|
self._logger.info("- iot_mqtt :: send_device_to_cloud_message :: " + message)
|
|
topic = "devices/{}/messages/events/".format(self._device_id)
|
|
|
|
if system_properties is not None:
|
|
firstProp = True
|
|
for prop in system_properties:
|
|
if not firstProp:
|
|
topic += "&"
|
|
else:
|
|
firstProp = False
|
|
topic += prop + "=" + str(system_properties[prop])
|
|
|
|
# Convert message to a string
|
|
if isinstance(message, dict):
|
|
message = json.dumps(message)
|
|
|
|
if not isinstance(message, str):
|
|
raise ValueError("message must be a string or a dictionary")
|
|
|
|
self._send_common(topic, message)
|
|
self._callback.message_sent(message)
|
|
|
|
def send_twin_patch(self, patch) -> None:
|
|
"""Send a patch for the reported properties of the device twin
|
|
:param patch: The patch as a JSON string or a dictionary
|
|
:raises: IoTError if the data is not a string or dictionary
|
|
:raises RuntimeError: if the internet connection is not responding or is unable to connect
|
|
"""
|
|
self._logger.info("- iot_mqtt :: sendProperty :: " + str(patch))
|
|
topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(int(time.time()))
|
|
self._send_common(topic, patch)
|