diff --git a/Adafruit_IO/mqtt_client.py b/Adafruit_IO/mqtt_client.py index 3ebdbda..6c34fd6 100644 --- a/Adafruit_IO/mqtt_client.py +++ b/Adafruit_IO/mqtt_client.py @@ -52,6 +52,7 @@ class MQTTClient(object): self.on_connect = None self.on_disconnect = None self.on_message = None + self.on_subscribe = None # Initialize MQTT client. self._client = mqtt.Client() if secure: @@ -66,6 +67,7 @@ class MQTTClient(object): self._client.on_message = self._mqtt_message self._connected = False + def _mqtt_connect(self, client, userdata, flags, rc): logger.debug('Client on_connect called.') # Check if the result code is success (0) or some error (non-zero) and @@ -75,7 +77,7 @@ class MQTTClient(object): self._connected = True print('Connected to Adafruit IO!') else: - # handle RC errors within `errors.py`'s MQTTError class + # handle RC errors within MQTTError class raise MQTTError(rc) # Call the on_connect callback if available. if self.on_connect is not None: @@ -88,6 +90,7 @@ class MQTTClient(object): # log the RC as an error. Continue on to call any disconnect handler # so clients can potentially recover gracefully. if rc != 0: + print("Unexpected disconnection.") raise MQTTError(rc) print('Disconnected from Adafruit IO!') # Call the on_disconnect callback if available. @@ -99,13 +102,17 @@ class MQTTClient(object): # Parse out the feed id and call on_message callback. # Assumes topic looks like "username/feeds/id" parsed_topic = msg.topic.split('/') - if self.on_message is not None and self._username == parsed_topic[0]: + if self.on_message is not None: feed = parsed_topic[2] payload = '' if msg.payload is None else msg.payload.decode('utf-8') elif self.on_message is not None and parsed_topic[0] == 'time': feed = parsed_topic[0] payload = msg.payload.decode('utf-8') self.on_message(self, feed, payload) + + def _mqtt_subscribe(client, userdata, mid, granted_qos): + """Called when broker responds to a subscribe request.""" + def connect(self, **kwargs): """Connect to the Adafruit.IO service. Must be called before any loop @@ -162,16 +169,24 @@ class MQTTClient(object): """ self._client.loop(timeout=timeout_sec) - def subscribe(self, feed_id): + def subscribe(self, feed_id, feed_user=None): """Subscribe to changes on the specified feed. When the feed is updated the on_message function will be called with the feed_id and new value. + + Params: + - feed_id: The id of the feed to update. + - feed_user (optional): The user id of the feed. Used for feed sharing. """ - self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id)) + if feed_user is not None: + (res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(feed_user, feed_id)) + else: + (res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id)) + return res, mid def subscribe_time(self, time): """Subscribe to changes on the Adafruit IO time feeds. When the feed is updated, the on_message function will be called and publish a new value: - time = + time feeds: millis: milliseconds seconds: seconds iso: ISO-8601 (https://en.wikipedia.org/wiki/ISO_8601) @@ -181,15 +196,27 @@ class MQTTClient(object): elif time == 'iso': self._client.subscribe('time/ISO-8601') else: - print('ERROR: Invalid time type specified') + raise TypeError('Invalid Time Feed Specified.') return + + def unsubscribe(self, feed_id): + """Unsubscribes from a specified MQTT feed. + Note: this does not prevent publishing to a feed, it will unsubscribe + from receiving messages via on_message. + """ + (res, mid) = self._client.unsubscribe('{0}/feeds/{1}'.format(self._username, feed_id)) - def publish(self, feed_id, value): + def publish(self, feed_id, value=None, feed_user=None): """Publish a value to a specified feed. - Required parameters: + Params: - feed_id: The id of the feed to update. + - feed_user (optional): The user id of the feed. Used for feed sharing. - value: The new value to publish to the feed. """ - self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id), - payload=value) + if feed_user is not None: + (res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(feed_user, feed_id), + payload=value) + else: + (res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id), + payload=value) \ No newline at end of file diff --git a/docs/feed-sharing.rst b/docs/feed-sharing.rst new file mode 100644 index 0000000..f677f72 --- /dev/null +++ b/docs/feed-sharing.rst @@ -0,0 +1,14 @@ +Feed Sharing +------------ +Feed sharing is a feature of Adafruit IO which allows you to share your feeds with people you specify. + +If you want to share a feed on your Adafruit IO Account with another user, visit the `Sharing a feed page `_ +on the Adafruit Learning System. + +The Adafruit IO Python client supports Feed Sharing in the mqtt_client.py class. + +Usage Example +~~~~~~~~~~~~~ + + +.. literalinclude:: ../examples/mqtt/mqtt_shared_feeds.py \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 6c9a0a8..d081ccd 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -22,6 +22,7 @@ Table of Contents :maxdepth: 6 feeds + feed-sharing data groups diff --git a/examples/mqtt/mqtt_shared_feeds.py b/examples/mqtt/mqtt_shared_feeds.py new file mode 100644 index 0000000..38b6667 --- /dev/null +++ b/examples/mqtt/mqtt_shared_feeds.py @@ -0,0 +1,74 @@ +""" +`mqtt_shared_feeds.py` +--------------------------------------------------------- +Example of reading and writing to a shared Adafruit IO Feed. + +learn.adafruit.com/adafruit-io-basics-feeds/sharing-a-feed + +Author: Brent Rubell for Adafruit Industries 2018 +""" + +# Import standard python modules. +import sys +import time +import random + +# Import Adafruit IO MQTT client. +from Adafruit_IO import MQTTClient + +# Set to your Adafruit IO key. +# Remember, your key is a secret, +# so make sure not to publish it when you publish this code! +ADAFRUIT_IO_KEY = 'YOUR_AIO_KEY' + +# Set to your Adafruit IO username. +# (go to https://accounts.adafruit.com to find your username) +ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME' + +# Shared IO Feed +# Make sure you have read AND write access to this feed to publish. +IO_FEED = 'SHARED_AIO_FEED_NAME' + +# IO Feed Owner's username +IO_FEED_USERNAME = 'SHARED_AIO_FEED_USERNAME' + + +# Define callback functions which will be called when certain events happen. +def connected(client): + """Connected function will be called when the client connects. + """ + client.subscribe(IO_FEED, IO_FEED_USERNAME) + +def disconnected(client): + """Disconnected function will be called when the client disconnects. + """ + print('Disconnected from Adafruit IO!') + sys.exit(1) + +def message(client, feed_id, payload): + """Message function will be called when a subscribed feed has a new value. + The feed_id parameter identifies the feed, and the payload parameter has + the new value. + """ + print('Feed {0} received new value: {1}'.format(feed_id, payload)) + + +# Create an MQTT client instance. +client = MQTTClient(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY) + +# Setup the callback functions defined above. +client.on_connect = connected +client.on_disconnect = disconnected +client.on_message = message + +# Connect to the Adafruit IO server. +client.connect() + +client.loop_background() +print('Publishing a new message every 10 seconds (press Ctrl-C to quit)...') + +while True: + value = random.randint(0, 100) + print('Publishing {0} to {1}.'.format(value, IO_FEED)) + client.publish(IO_FEED, value, IO_FEED_USERNAME) + time.sleep(10)