Adafruit_CircuitPython_Azur.../adafruit_azureiot/iot_mqtt.py
2020-06-08 11:54:15 -07:00

455 lines
17 KiB
Python

# The MIT License (MIT)
#
# Copyright (c) 2020 Jim Bennett
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`iot_mqtt`
=====================
An MQTT client for Azure IoT
* Author(s): Jim Bennett, Elena Horton
"""
import gc
import json
import time
import adafruit_minimqtt as minimqtt
from adafruit_minimqtt import MQTT
import adafruit_logging as logging
from .iot_error import IoTError
from .keys import compute_derived_symmetric_key
from .quote import quote
from . import constants
# pylint: disable=R0903
class IoTResponse:
"""A response from a direct method call
"""
def __init__(self, code: int, message: str):
"""Creates an IoT Response object
:param int code: The HTTP response code for this method call, for example 200 if the method was handled successfully
:param str message: The HTTP response message for this method call
"""
self.response_code = code
self.response_message = message
class IoTMQTTCallback:
"""An interface for classes that can be called by MQTT events
"""
def message_sent(self, data: str) -> None:
"""Called when a message is sent to the cloud
:param str data: The data send with the message
"""
def connection_status_change(self, connected: bool) -> None:
"""Called when the connection status changes
:param bool connected: True if the device is connected, otherwise false
"""
# pylint: disable=W0613, R0201
def direct_method_invoked(self, method_name: str, payload: str) -> IoTResponse:
"""Called when a direct method is invoked
:param str method_name: The name of the method that was invoked
:param str payload: The payload with the message
:returns: A response with a code and status to show if the method was correctly handled
:rtype: IoTResponse
"""
return IoTResponse(200, "")
# pylint: disable=C0103
def cloud_to_device_message_received(self, body: str, properties: dict) -> None:
"""Called when a cloud to device message is received
:param str body: The body of the message
:param dict properties: The propreties sent with the mesage
"""
def device_twin_desired_updated(self, desired_property_name: str, desired_property_value, desired_version: int) -> None:
"""Called when the device twin desired properties are updated
:param str desired_property_name: The name of the desired property that was updated
:param desired_property_value: The value of the desired property that was updated
:param int desired_version: The version of the desired property that was updated
"""
def device_twin_reported_updated(self, reported_property_name: str, reported_property_value, reported_version: int) -> None:
"""Called when the device twin reported values are updated
:param str reported_property_name: The name of the reported property that was updated
:param reported_property_value: The value of the reported property that was updated
:param int reported_version: The version of the reported property that was updated
"""
# pylint: disable=R0902
class IoTMQTT:
"""MQTT client for Azure IoT
"""
def _gen_sas_token(self) -> str:
token_expiry = int(time.time() + self._token_expires)
uri = self._hostname + "%2Fdevices%2F" + self._device_id
signed_hmac_sha256 = compute_derived_symmetric_key(self._key, uri + "\n" + str(token_expiry))
signature = quote(signed_hmac_sha256, "~()*!.'")
if signature.endswith("\n"): # somewhere along the crypto chain a newline is inserted
signature = signature[:-1]
token = "SharedAccessSignature sr={}&sig={}&se={}".format(uri, signature, token_expiry)
return token
def _create_mqtt_client(self) -> None:
minimqtt.set_socket(self._socket, self._iface)
self._mqtts = MQTT(
broker=self._hostname,
username=self._username,
password=self._passwd,
port=8883,
keep_alive=120,
is_ssl=True,
client_id=self._device_id,
log=True,
)
self._mqtts.logger.setLevel(self._logger.getEffectiveLevel())
# set actions to take throughout connection lifecycle
self._mqtts.on_connect = self._on_connect
self._mqtts.on_log = self._on_log
self._mqtts.on_publish = self._on_publish
self._mqtts.on_disconnect = self._on_disconnect
# initiate the connection using the adafruit_minimqtt library
self._mqtts.connect()
# pylint: disable=C0103, W0613
def _on_connect(self, client, userdata, _, rc) -> None:
self._logger.info("- iot_mqtt :: _on_connect :: rc = " + str(rc) + ", userdata = " + str(userdata))
if rc == 0:
self._mqtt_connected = True
self._auth_response_received = True
self._callback.connection_status_change(True)
# pylint: disable=C0103, W0613
def _on_log(self, client, userdata, level, buf) -> None:
self._logger.info("mqtt-log : " + buf)
if level <= 8:
self._logger.error("mqtt-log : " + buf)
def _on_disconnect(self, client, userdata, rc) -> None:
self._logger.info("- iot_mqtt :: _on_disconnect :: rc = " + str(rc))
self._auth_response_received = True
if rc == 5:
self._logger.error("on(disconnect) : Not authorized")
self.disconnect()
if rc == 1:
self._mqtt_connected = False
if rc != 5:
self._callback.connection_status_change(False)
def _on_publish(self, client, data, topic, msg_id) -> None:
self._logger.info("- iot_mqtt :: _on_publish :: " + str(data) + " on topic " + str(topic))
# pylint: disable=W0703
def _handle_device_twin_update(self, client, topic: str, msg: str) -> None:
self._logger.debug("- iot_mqtt :: _echo_desired :: " + topic)
twin = None
desired = None
try:
twin = json.loads(msg)
except json.JSONDecodeError as e:
self._logger.error("ERROR: JSON parse for Device Twin message object has failed. => " + msg + " => " + str(e))
return
if "reported" in twin:
reported = twin["reported"]
if "$version" in reported:
reported_version = reported["$version"]
reported.pop("$version")
else:
self._logger.error("ERROR: Unexpected payload for reported twin update => " + msg)
return
for property_name, value in reported.items():
self._callback.device_twin_reported_updated(property_name, value, reported_version)
is_patch = "desired" not in twin
if is_patch:
desired = twin
else:
desired = twin["desired"]
if "$version" in desired:
desired_version = desired["$version"]
desired.pop("$version")
else:
self._logger.error("ERROR: Unexpected payload for desired twin update => " + msg)
return
for property_name, value in desired.items():
self._callback.device_twin_desired_updated(property_name, value, desired_version)
def _handle_direct_method(self, client, topic: str, msg: str) -> None:
index = topic.find("$rid=")
method_id = 1
method_name = "None"
if index == -1:
self._logger.error("ERROR: C2D doesn't include topic id")
else:
method_id = topic[index + 5 :]
topic_template = "$iothub/methods/POST/"
len_temp = len(topic_template)
method_name = topic[len_temp : topic.find("/", len_temp + 1)]
ret = self._callback.direct_method_invoked(method_name, msg)
gc.collect()
ret_code = 200
ret_message = "{}"
if ret.response_code is not None:
ret_code = ret.response_code
if ret.response_message is not None:
ret_message = ret.response_message
# ret message must be JSON
if not ret_message.startswith("{") or not ret_message.endswith("}"):
ret_json = {"Value": ret_message}
ret_message = json.dumps(ret_json)
next_topic = "$iothub/methods/res/{}/?$rid={}".format(ret_code, method_id)
self._logger.info("C2D: => " + next_topic + " with data " + ret_message + " and name => " + method_name)
self._send_common(next_topic, ret_message)
def _handle_cloud_to_device_message(self, client, topic: str, msg: str) -> None:
parts = topic.split("&")[1:]
properties = {}
for part in parts:
key_value = part.split("=")
properties[key_value[0]] = key_value[1]
self._callback.cloud_to_device_message_received(msg, properties)
gc.collect()
def _send_common(self, topic: str, data) -> None:
# Convert data to a string
if isinstance(data, dict):
data = json.dumps(data)
if not isinstance(data, str):
raise IoTError("Data must be a string or a dictionary")
self._logger.debug("Sending message on topic: " + topic)
self._logger.debug("Sending message: " + str(data))
retry = 0
while True:
gc.collect()
try:
self._logger.debug("Trying to send...")
self._mqtts.publish(topic, data)
self._logger.debug("Data sent")
break
except RuntimeError as runtime_error:
self._logger.info("Could not send data, retrying after 0.5 seconds: " + str(runtime_error))
retry = retry + 1
if retry >= 10:
self._logger.error("Failed to send data")
raise
time.sleep(0.5)
continue
gc.collect()
def _get_device_settings(self) -> None:
self._logger.info("- iot_mqtt :: _get_device_settings :: ")
self.loop()
self._send_common("$iothub/twin/GET/?$rid=0", " ")
# pylint: disable=R0913
def __init__(
self,
callback: IoTMQTTCallback,
socket,
iface,
hostname: str,
device_id: str,
key: str,
token_expires: int = 21600,
logger: logging = None,
):
"""Create the Azure IoT MQTT client
:param IoTMQTTCallback callback: A callback class
:param socket: The socket to communicate over
:param iface: The network interface to communicate over
:param str hostname: The hostname of the MQTT broker to connect to, get this by registering the device
:param str device_id: The device ID of the device to register
:param str key: The primary or secondary key of the device to register
:param int token_expires: The number of seconds till the token expires, defaults to 6 hours
:param adafruit_logging logger: The logger
"""
self._callback = callback
self._socket = socket
self._iface = iface
self._mqtt_connected = False
self._auth_response_received = False
self._mqtts = None
self._device_id = device_id
self._hostname = hostname
self._key = key
self._token_expires = token_expires
self._username = "{}/{}/api-version={}".format(self._hostname, device_id, constants.IOTC_API_VERSION)
self._passwd = self._gen_sas_token()
self._logger = logger if logger is not None else logging.getLogger("log")
self._is_subscribed_to_twins = False
def _subscribe_to_core_topics(self):
device_bound_topic = "devices/{}/messages/devicebound/#".format(self._device_id)
self._mqtts.add_topic_callback(device_bound_topic, self._handle_cloud_to_device_message)
self._mqtts.subscribe(device_bound_topic)
self._mqtts.add_topic_callback("$iothub/methods/#", self._handle_direct_method)
self._mqtts.subscribe("$iothub/methods/#")
def _subscribe_to_twin_topics(self):
self._mqtts.add_topic_callback("$iothub/twin/PATCH/properties/desired/#", self._handle_device_twin_update)
self._mqtts.subscribe("$iothub/twin/PATCH/properties/desired/#") # twin desired property changes
self._mqtts.add_topic_callback("$iothub/twin/res/200/#", self._handle_device_twin_update)
self._mqtts.subscribe("$iothub/twin/res/200/#") # twin properties response
def connect(self) -> bool:
"""Connects to the MQTT broker
:returns: True if the connection is successful, otherwise False
:rtype: bool
"""
self._logger.info("- iot_mqtt :: connect :: " + self._hostname)
self._create_mqtt_client()
self._logger.info(" - iot_mqtt :: connect :: created mqtt client. connecting..")
while self._auth_response_received is None:
self.loop()
self._logger.info(" - iot_mqtt :: connect :: on_connect must be fired. Connected ? " + str(self.is_connected()))
if not self.is_connected():
return False
self._mqtt_connected = True
self._auth_response_received = True
self._subscribe_to_core_topics()
return True
def subscribe_to_twins(self) -> None:
"""Subscribes to digital twin updates
Only call this if your tier of IoT Hub supports this
"""
if self._is_subscribed_to_twins:
return
# do this separately as this is not supported in B1 hubs
self._subscribe_to_twin_topics()
self._get_device_settings()
self._is_subscribed_to_twins = True
def disconnect(self) -> None:
"""Disconnects from the MQTT broker
"""
if not self.is_connected():
return
self._logger.info("- iot_mqtt :: disconnect :: ")
self._mqtt_connected = False
self._mqtts.disconnect()
def reconnect(self) -> None:
"""Reconnects to the MQTT broker
"""
self._logger.info("- iot_mqtt :: reconnect :: ")
self._mqtts.reconnect()
def is_connected(self) -> bool:
"""Gets if there is an open connection to the MQTT broker
:returns: True if there is an open connection, False if not
:rtype: bool
"""
return self._mqtt_connected
def loop(self) -> None:
"""Listens for MQTT messages
"""
if not self.is_connected():
return
self._mqtts.loop()
gc.collect()
def send_device_to_cloud_message(self, message, system_properties: dict = None) -> None:
"""Send a device to cloud message from this device to Azure IoT Hub
:param message: The message data as a JSON string or a dictionary
:param system_properties: System properties to send with the message
:raises: ValueError if the message is not a string or dictionary
:raises RuntimeError: if the internet connection is not responding or is unable to connect
"""
self._logger.info("- iot_mqtt :: send_device_to_cloud_message :: " + message)
topic = "devices/{}/messages/events/".format(self._device_id)
if system_properties is not None:
firstProp = True
for prop in system_properties:
if not firstProp:
topic += "&"
else:
firstProp = False
topic += prop + "=" + str(system_properties[prop])
# Convert message to a string
if isinstance(message, dict):
message = json.dumps(message)
if not isinstance(message, str):
raise ValueError("message must be a string or a dictionary")
self._send_common(topic, message)
self._callback.message_sent(message)
def send_twin_patch(self, patch) -> None:
"""Send a patch for the reported properties of the device twin
:param patch: The patch as a JSON string or a dictionary
:raises: IoTError if the data is not a string or dictionary
:raises RuntimeError: if the internet connection is not responding or is unable to connect
"""
self._logger.info("- iot_mqtt :: sendProperty :: " + str(patch))
topic = "$iothub/twin/PATCH/properties/reported/?$rid={}".format(int(time.time()))
self._send_common(topic, patch)