# The MIT License (MIT) # # Copyright (c) 2019 Brent Rubell for Adafruit Industries # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ `adafruit_io` ================================================================================ A CircuitPython library for communicating with Adafruit IO. * Author(s): Brent Rubell for Adafruit Industries Implementation Notes -------------------- **Software and Dependencies:** * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases """ import time import json from adafruit_io.adafruit_io_errors import ( AdafruitIO_RequestError, AdafruitIO_ThrottleError, AdafruitIO_MQTTError, ) __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_Adafruit_IO.git" CLIENT_HEADERS = {"User-Agent": "AIO-CircuitPython/{0}".format(__version__)} class IO_MQTT: """ Client for interacting with Adafruit IO MQTT API. https://io.adafruit.com/api/docs/mqtt.html#adafruit-io-mqtt-api :param MiniMQTT mqtt_client: MiniMQTT Client object. """ # pylint: disable=protected-access def __init__(self, mqtt_client): # Check for MiniMQTT client mqtt_client_type = str(type(mqtt_client)) if "MQTT" in mqtt_client_type: self._client = mqtt_client else: raise TypeError( "This class requires a MiniMQTT client object, please create one." ) # MiniMQTT's username kwarg is optional, IO requires a username try: self._user = self._client.user except: raise TypeError( "Adafruit IO requires a username, please set one in MiniMQTT" ) # User-defined MQTT callback methods must be init'd to None self.on_connect = None self.on_disconnect = None self.on_message = None self.on_subscribe = None self.on_unsubscribe = None # MQTT event callbacks self._client.on_connect = self._on_connect_mqtt self._client.on_disconnect = self._on_disconnect_mqtt self._client.on_message = self._on_message_mqtt self._client.on_subscribe = self._on_subscribe_mqtt self._client.on_unsubscribe = self._on_unsubscribe_mqtt self._logger = False # Write to the MiniMQTT logger, if avaliable. if self._client.logger is not None: self._logger = True self._client.set_logger_level("DEBUG") self._connected = False def __enter__(self): return self def __exit__(self, exception_type, exception_value, traceback): self.disconnect() def reconnect(self): """Attempts to reconnect to the Adafruit IO MQTT Broker. """ try: self._client.reconnect() except: raise AdafruitIO_MQTTError("Unable to reconnect to Adafruit IO.") def connect(self): """Connects to the Adafruit IO MQTT Broker. Must be called before any other API methods are called. """ try: self._client.connect() except: raise AdafruitIO_MQTTError("Unable to connect to Adafruit IO.") def disconnect(self): """Disconnects from Adafruit IO MQTT Broker. """ if self._connected: self._client.disconnect() @property def is_connected(self): """Returns if connected to Adafruit IO MQTT Broker.""" return self._client.is_connected # pylint: disable=not-callable, unused-argument def _on_connect_mqtt(self, client, userdata, flags, return_code): """Runs when the client calls on_connect. """ if self._logger: self._client._logger.debug("Client called on_connect.") if return_code == 0: self._connected = True else: raise AdafruitIO_MQTTError(return_code) # Call the user-defined on_connect callback if defined if self.on_connect is not None: self.on_connect(self) # pylint: disable=not-callable, unused-argument def _on_disconnect_mqtt(self, client, userdata, return_code): """Runs when the client calls on_disconnect. """ if self._logger: self._client._logger.debug("Client called on_disconnect") self._connected = False # Call the user-defined on_disconnect callblack if defined if self.on_disconnect is not None: self.on_disconnect(self) # pylint: disable=not-callable def _on_message_mqtt(self, client, topic, payload): """Runs when the client calls on_message. Parses and returns incoming data from Adafruit IO feeds. :param MQTT client: A MQTT Client Instance. :param str topic: MQTT topic response from Adafruit IO. :param str payload: MQTT payload data response from Adafruit IO. """ if self._logger: self._client._logger.debug("Client called on_message.") if self.on_message is not None: # Parse the MQTT topic string topic_name = topic.split("/") if topic_name[1] == "groups": # Adafruit IO Group Feed(s) feeds = [] messages = [] # Conversion of incoming group to a json response payload = json.loads(payload) for feed in payload["feeds"]: feeds.append(feed) for msg in feeds: payload = payload["feeds"][msg] messages.append(payload) topic_name = feeds message = messages elif topic_name[0] == "time": # Adafruit IO Time Topic topic_name = topic_name[1] message = payload else: # Standard Adafruit IO Feed topic_name = topic_name[2] message = payload else: raise ValueError( "You must define an on_message method before calling this callback." ) self.on_message(self, topic_name, message) # pylint: disable=not-callable def _on_subscribe_mqtt(self, client, user_data, topic, qos): """Runs when the client calls on_subscribe. """ if self._logger: self._client._logger.debug("Client called on_subscribe") if self.on_subscribe is not None: self.on_subscribe(self, user_data, topic, qos) # pylint: disable=not-callable def _on_unsubscribe_mqtt(self, client, user_data, topic, pid): """Runs when the client calls on_unsubscribe. """ if self._logger: self._client._logger.debug("Client called on_unsubscribe") if self.on_unsubscribe is not None: self.on_unsubscribe(self, user_data, topic, pid) def loop(self): """Manually process messages from Adafruit IO. Call this method to check incoming subscription messages. Example usage of polling the message queue using loop. ..code-block:: python while True: io.loop() """ self._client.loop() # Subscriptions def subscribe(self, feed_key=None, group_key=None, shared_user=None): """Subscribes to your Adafruit IO feed or group. Can also subscribe to someone else's feed. :param str feed_key: Adafruit IO Feed key. :param str group_key: Adafruit IO Group key. :param str shared_user: Owner of the Adafruit IO feed, required for shared feeds. Example of subscribing to an Adafruit IO Feed named 'temperature'. .. code-block:: python client.subscribe('temperature') Example of subscribing to two Adafruit IO feeds: 'temperature' and 'humidity'. .. code-block:: python client.subscribe([('temperature'), ('humidity')]) """ if shared_user is not None and feed_key is not None: self._client.subscribe("{0}/feeds/{1}".format(shared_user, feed_key)) elif group_key is not None: self._client.subscribe("{0}/groups/{1}".format(self._user, group_key)) elif feed_key is not None: self._client.subscribe("{0}/feeds/{1}".format(self._user, feed_key)) else: raise AdafruitIO_MQTTError("Must provide a feed_key or group_key.") def subscribe_to_throttling(self): """Subscribes to your personal Adafruit IO /throttle topic. https://io.adafruit.com/api/docs/mqtt.html#mqtt-api-rate-limiting """ self._client.subscribe("%s/throttle" % self._user) def subscribe_to_errors(self): """Subscribes to your personal Adafruit IO /errors topic. Notifies you of errors relating to publish/subscribe calls. """ self._client.subscribe("%s/errors" % self._user) def subscribe_to_randomizer(self, randomizer_id): """Subscribes to a random data stream created by the Adafruit IO Words service. :param int randomizer_id: Random word record you want data for. """ self._client.subscribe( "{0}/integration/words/{1}".format(self._user, randomizer_id) ) def subscribe_to_weather(self, weather_record, forecast): """Subscribes to a weather forecast using the Adafruit IO PLUS weather service. This feature is only avaliable to Adafruit IO PLUS subscribers. :param int weather_record: Weather record you want data for. :param str forecast: Forecast data you'd like to recieve. """ self._client.subscribe( "{0}/integration/weather/{1}/{2}".format( self._user, weather_record, forecast ) ) def subscribe_to_time(self, time_type): """Adafruit IO provides some built-in MQTT topics for getting the current server time. :param str time_type: Current Adafruit IO server time. Can be 'seconds', 'millis', or 'iso'. Information about these topics can be found on the Adafruit IO MQTT API Docs.: https://io.adafruit.com/api/docs/mqtt.html#time-topics """ if "seconds" or "millis" or "hours" in time_type: self._client.subscribe("time/" + time_type) elif time_type == "iso": self._client.subscribe("time/ISO-8601") else: raise TypeError("Invalid time feed type specified") def unsubscribe(self, feed_key=None, group_key=None, shared_user=None): """Unsubscribes from an Adafruit IO feed or group. Can also subscribe to someone else's feed. :param str feed_key: Adafruit IO Feed key. :param str group_key: Adafruit IO Group key. :param str shared_user: Owner of the Adafruit IO feed, required for shared feeds. Example of unsubscribing from a feed. .. code-block:: python client.unsubscribe('temperature') Example of unsubscribing from two feeds: 'temperature' and 'humidity' .. code-block:: python client.unsubscribe([('temperature'), ('humidity')]) Example of unsubscribing from a shared feed. .. code-block:: python client.unsubscribe('temperature', shared_user='adabot') """ if shared_user is not None and feed_key is not None: self._client.unsubscribe("{0}/feeds/{1}".format(shared_user, feed_key)) elif group_key is not None: self._client.unsubscribe("{0}/groups/{1}".format(self._user, feed_key)) elif feed_key is not None: self._client.unsubscribe("{0}/feeds/{1}".format(self._user, feed_key)) else: raise AdafruitIO_MQTTError("Must provide a feed_key or group_key.") # Publishing def publish_multiple(self, feeds_and_data, timeout=3, is_group=False): """Publishes multiple data points to multiple feeds or groups with a variable timeout. :param str feeds_and_data: List of tuples containing topic strings and data values. :param int timeout: Delay between publishing data points to Adafruit IO, in seconds. :param bool is_group: Set to True if you're publishing to a group. Example of publishing multiple data points on different feeds to Adafruit IO: ..code-block:: python client.publish_multiple([('humidity', 24.5), ('temperature', 54)]) """ if isinstance(feeds_and_data, list): feed_data = [] for topic, data in feeds_and_data: feed_data.append((topic, data)) else: raise AdafruitIO_MQTTError("This method accepts a list of tuples.") for topic, data in feed_data: if is_group: self.publish(topic, data, is_group=True) else: self.publish(topic, data) time.sleep(timeout) # pylint: disable=too-many-arguments def publish(self, feed_key, data, metadata=None, shared_user=None, is_group=False): """Publishes to an An Adafruit IO Feed. :param str feed_key: Adafruit IO Feed key. :param str data: Data to publish to the feed or group. :param int data: Data to publish to the feed or group. :param float data: Data to publish to the feed or group. :param str metadata: Optional metadata associated with the data. :param str shared_user: Owner of the Adafruit IO feed, required for feed sharing. :param bool is_group: Set True if publishing to an Adafruit IO Group. Example of publishing an integer to Adafruit IO on feed 'temperature'. ..code-block:: python client.publish('temperature', 30) Example of publishing a floating point value to feed 'temperature'. ..code-block:: python client.publish('temperature', 3.14) Example of publishing a string to feed 'temperature'. ..code-block:: python client.publish('temperature, 'thirty degrees') Example of publishing an integer to group 'weatherstation'. ..code-block:: python client.publish('weatherstation', 12, is_group=True) Example of publishing to a shared feed. ..code-block:: python client.publish('temperature', shared_user='myfriend') Example of publishing a value along with locational metadata to a feed. ..code-block:: python data = 42 # format: "lat, lon, ele" metadata = "40.726190, -74.005334, -6" io.publish("location-feed", data, metadata) """ if is_group: self._client.publish("{0}/groups/{1}".format(self._user, feed_key), data) if shared_user is not None: self._client.publish("{0}/feeds/{1}".format(shared_user, feed_key), data) if metadata is not None: if isinstance(data, int or float): data = str(data) csv_string = data + "," + metadata self._client.publish( "{0}/feeds/{1}/csv".format(self._user, feed_key), csv_string ) else: self._client.publish("{0}/feeds/{1}".format(self._user, feed_key), data) def get(self, feed_key): """Calling this method will make Adafruit IO publish the most recent value on feed_key. https://io.adafruit.com/api/docs/mqtt.html#retained-values :param str feed_key: Adafruit IO Feed key. Example of obtaining a recently published value on a feed: ..code-block:: python io.get('temperature') """ self._client.publish("{0}/feeds{1}/get".format(self._user, feed_key), "\0") class IO_HTTP: """ Client for interacting with the Adafruit IO HTTP API. https://io.adafruit.com/api/docs/#adafruit-io-http-api :param str adafruit_io_username: Adafruit IO Username :param str adafruit_io_key: Adafruit IO Key :param wifi_manager: WiFiManager object from ESPSPI_WiFiManager or ESPAT_WiFiManager """ def __init__(self, adafruit_io_username, adafruit_io_key, wifi_manager): self.username = adafruit_io_username self.key = adafruit_io_key wifi_type = str(type(wifi_manager)) if "ESPSPI_WiFiManager" in wifi_type or "ESPAT_WiFiManager" in wifi_type: self.wifi = wifi_manager else: raise TypeError("This library requires a WiFiManager object.") self._aio_headers = [ {"X-AIO-KEY": self.key, "Content-Type": "application/json"}, {"X-AIO-KEY": self.key}, ] @staticmethod def _create_headers(io_headers): """Creates http request headers. """ headers = CLIENT_HEADERS.copy() headers.update(io_headers) return headers @staticmethod def _create_data(data, metadata): """Creates JSON data payload """ if metadata is not None: return { "value": data, "lat": metadata["lat"], "lon": metadata["lon"], "ele": metadata["ele"], "created_at": metadata["created_at"], } return {"value": data} @staticmethod def _handle_error(response): """Checks HTTP status codes and raises errors. """ if response.status_code == 429: raise AdafruitIO_ThrottleError if response.status_code == 400: raise AdafruitIO_RequestError(response) if response.status_code >= 400: raise AdafruitIO_RequestError(response) def _compose_path(self, path): """Composes a valid API request path. :param str path: Adafruit IO API URL path. """ return "https://io.adafruit.com/api/v2/{0}/{1}".format(self.username, path) # HTTP Requests def _post(self, path, payload): """ POST data to Adafruit IO :param str path: Formatted Adafruit IO URL from _compose_path :param json payload: JSON data to send to Adafruit IO """ response = self.wifi.post( path, json=payload, headers=self._create_headers(self._aio_headers[0]) ) self._handle_error(response) json_data = response.json() response.close() return json_data def _get(self, path): """ GET data from Adafruit IO :param str path: Formatted Adafruit IO URL from _compose_path """ response = self.wifi.get( path, headers=self._create_headers(self._aio_headers[1]) ) self._handle_error(response) json_data = response.json() response.close() return json_data def _delete(self, path): """ DELETE data from Adafruit IO. :param str path: Formatted Adafruit IO URL from _compose_path """ response = self.wifi.delete( path, headers=self._create_headers(self._aio_headers[0]) ) self._handle_error(response) json_data = response.json() response.close() return json_data # Data def send_data(self, feed_key, data, metadata=None, precision=None): """ Sends value data to a specified Adafruit IO feed. :param str feed_key: Adafruit IO feed key :param str data: Data to send to the Adafruit IO feed :param dict metadata: Optional metadata associated with the data :param int precision: Optional amount of precision points to send with floating point data """ path = self._compose_path("feeds/{0}/data".format(feed_key)) if precision: try: data = round(data, precision) except NotImplementedError: # received a non-float value raise NotImplementedError("Precision requires a floating point value") payload = self._create_data(data, metadata) self._post(path, payload) def receive_data(self, feed_key): """ Return the most recent value for the specified feed. :param string feed_key: Adafruit IO feed key """ path = self._compose_path("feeds/{0}/data/last".format(feed_key)) return self._get(path) def delete_data(self, feed_key, data_id): """ Deletes an existing Data point from a feed. :param string feed: Adafruit IO feed key :param string data_id: Data point to delete from the feed """ path = self._compose_path("feeds/{0}/data/{1}".format(feed_key, data_id)) return self._delete(path) # Groups def add_feed_to_group(self, group_key, feed_key): """ Adds an existing feed to a group :param str group_key: Group :param str feed_key: Feed to add to the group """ path = self._compose_path("groups/{0}/add".format(group_key)) payload = {"feed_key": feed_key} return self._post(path, payload) def create_new_group(self, group_key, group_description): """ Creates a new Adafruit IO Group. :param str group_key: Adafruit IO Group Key :param str group_description: Brief summary about the group """ path = self._compose_path("groups") payload = {"name": group_key, "description": group_description} return self._post(path, payload) def delete_group(self, group_key): """ Deletes an existing group. :param str group_key: Adafruit IO Group Key """ path = self._compose_path("groups/{0}".format(group_key)) return self._delete(path) def get_group(self, group_key): """ Returns Group based on Group Key :param str group_key: Adafruit IO Group Key """ path = self._compose_path("groups/{0}".format(group_key)) return self._get(path) # Feeds def get_feed(self, feed_key, detailed=False): """ Returns an Adafruit IO feed based on the feed key :param str feed_key: Adafruit IO Feed Key :param bool detailed: Returns a more verbose feed record """ if detailed: path = self._compose_path("feeds/{0}/details".format(feed_key)) else: path = self._compose_path("feeds/{0}".format(feed_key)) return self._get(path) def create_new_feed(self, feed_key, feed_desc=None, feed_license=None): """ Creates a new Adafruit IO feed. :param str feed_key: Adafruit IO Feed Key :param str feed_desc: Optional description of feed :param str feed_license: Optional feed license """ path = self._compose_path("feeds") payload = {"name": feed_key, "description": feed_desc, "license": feed_license} return self._post(path, payload) def delete_feed(self, feed_key): """ Deletes an existing feed. :param str feed_key: Valid feed key """ path = self._compose_path("feeds/{0}".format(feed_key)) return self._delete(path) # Adafruit IO Connected Services def receive_weather(self, weather_id): """ Get data from the Adafruit IO Weather Forecast Service NOTE: This service is avaliable to Adafruit IO Plus subscribers only. :param int weather_id: ID for retrieving a specified weather record. """ path = self._compose_path("integrations/weather/{0}".format(weather_id)) return self._get(path) def receive_random_data(self, generator_id): """ Get data from the Adafruit IO Random Data Stream Service :param int generator_id: Specified randomizer record """ path = self._compose_path("integrations/words/{0}".format(generator_id)) return self._get(path) def receive_time(self): """ Returns a struct_time from the Adafruit IO Server based on the device's IP address. https://circuitpython.readthedocs.io/en/latest/shared-bindings/time/__init__.html#time.struct_time """ path = self._compose_path("integrations/time/struct.json") time_struct = self._get(path) return time.struct_time( ( time_struct["year"], time_struct["mon"], time_struct["mday"], time_struct["hour"], time_struct["min"], time_struct["sec"], time_struct["wday"], time_struct["yday"], time_struct["isdst"], ) )