From 2c4bb7fb20003e3ca81d6043c0c20c85cdfb3ed3 Mon Sep 17 00:00:00 2001 From: Tony DiCola Date: Tue, 13 Jan 2015 23:37:00 -0800 Subject: [PATCH] Add MQTT client and update tests. --- Adafruit_IO/__init__.py | 3 +- Adafruit_IO/client.py | 14 ++-- Adafruit_IO/mqtt_client.py | 146 ++++++++++++++++++++++++++++++++++++ examples/mqtt_client.py | 87 +++++++++++++++++++++ test.py => examples/test.py | 0 setup.py | 2 +- tests/README.txt | 13 +++- tests/base.py | 20 +++++ tests/test_errors.py | 40 ++++------ tests/test_mqtt_client.py | 83 ++++++++++++++++++++ tests/test_setup.py | 17 ++--- 11 files changed, 379 insertions(+), 46 deletions(-) create mode 100644 Adafruit_IO/mqtt_client.py create mode 100644 examples/mqtt_client.py rename test.py => examples/test.py (100%) create mode 100644 tests/base.py create mode 100644 tests/test_mqtt_client.py diff --git a/Adafruit_IO/__init__.py b/Adafruit_IO/__init__.py index 5612024..a6c515e 100644 --- a/Adafruit_IO/__init__.py +++ b/Adafruit_IO/__init__.py @@ -1 +1,2 @@ -from .client import Client, AdafruitIOError, RequestError, ThrottlingError \ No newline at end of file +from .client import Client, AdafruitIOError, RequestError, ThrottlingError +from .mqtt_client import MQTTClient \ No newline at end of file diff --git a/Adafruit_IO/client.py b/Adafruit_IO/client.py index 7c3262a..71ee5ef 100644 --- a/Adafruit_IO/client.py +++ b/Adafruit_IO/client.py @@ -78,33 +78,33 @@ class Client(object): #stream functionality def send(self, feed_name, data): feed_name = quote(feed_name) - path = "api/feeds/{}/streams/send".format(feed_name) + path = "api/feeds/{}/data/send".format(feed_name) return self._post(path, {'value': data}) def receive(self, feed_name): feed_name = quote(feed_name) - path = "api/feeds/{}/streams/last".format(feed_name) + path = "api/feeds/{}/data/last".format(feed_name) return self._get(path) def receive_next(self, feed_name): feed_name = quote(feed_name) - path = "api/feeds/{}/streams/next".format(feed_name) + path = "api/feeds/{}/data/next".format(feed_name) return self._get(path) def receive_previous(self, feed_name): feed_name = quote(feed_name) - path = "api/feeds/{}/streams/last".format(feed_name) + path = "api/feeds/{}/data/last".format(feed_name) return self._get(path) def streams(self, feed_id_or_key, stream_id=None): if stream_id is None: - path = "api/feeds/{}/streams".format(feed_id_or_key) + path = "api/feeds/{}/data".format(feed_id_or_key) else: - path = "api/feeds/{}/streams/{}".format(feed_id_or_key, stream_id) + path = "api/feeds/{}/data/{}".format(feed_id_or_key, stream_id) return self._get(path) def create_stream(self, feed_id_or_key, data): - path = "api/feeds/{}/streams".format(feed_id_or_key) + path = "api/feeds/{}/data".format(feed_id_or_key) return self._post(path, data) #group functionality diff --git a/Adafruit_IO/mqtt_client.py b/Adafruit_IO/mqtt_client.py new file mode 100644 index 0000000..f44bb5c --- /dev/null +++ b/Adafruit_IO/mqtt_client.py @@ -0,0 +1,146 @@ +# MQTT-based client for Adafruit.IO +# Author: Tony DiCola (tdicola@adafruit.com) +# +# Supports publishing and subscribing to feed changes from Adafruit.IO using +# the MQTT protcol. +# +# Depends on the following Python libraries: +# - paho-mqtt: Paho MQTT client for python. +import logging + +import paho.mqtt.client as mqtt + + +SERVICE_HOST = 'io.adafruit.com' +SERVICE_PORT = 1883 +KEEP_ALIVE_SEC = 3600 # One minute + +logger = logging.getLogger(__name__) + + +class MQTTClient(object): + """Interface for publishing and subscribing to feed changes on Adafruit.IO + using the MQTT protocol. + """ + + def __init__(self, key): + """Create instance of MQTT client for provided feed. + + Required parameters: + - key: The Adafruit.IO access key for your account. + - feed_id: The id of the feed to access. + """ + # Initialize event callbacks to be None so they don't fire. + self.on_connect = None + self.on_disconnect = None + self.on_message = None + # Initialize MQTT client. + self._client = mqtt.Client() + self._client.username_pw_set(key) + self._client.on_connect = self._mqtt_connect + self._client.on_disconnect = self._mqtt_disconnect + self._client.on_message = self._mqtt_message + self._connected = False + + def _mqtt_connect(self, client, userdata, flags, rc): + logger.debug('Client on_connect called.') + # Check if the result code is success (0) or some error (non-zero) and + # raise an exception if failed. + if rc == 0: + self._connected = True + else: + # TODO: Make explicit exception classes for these failures: + # 0: Connection successful 1: Connection refused - incorrect protocol version 2: Connection refused - invalid client identifier 3: Connection refused - server unavailable 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused. + raise RuntimeError('Error connecting to Adafruit IO with rc: {0}'.format(rc)) + # Call the on_connect callback if available. + if self.on_connect is not None: + self.on_connect(self) + + def _mqtt_disconnect(self, client, userdata, rc): + logger.debug('Client on_disconnect called.') + self._connected = False + # If this was an unexpected disconnect (non-zero result code) then raise + # an exception. + if rc != 0: + raise RuntimeError('Unexpected disconnect with rc: {0}'.format(rc)) + # Call the on_disconnect callback if available. + if self.on_disconnect is not None: + self.on_disconnect(self) + + def _mqtt_message(self, client, userdata, msg): + logger.debug('Client on_message called.') + # Parse out the feed id and call on_message callback. + # Assumes topic looks like "api/feeds/{feed_id}/streams/receive.json" + if self.on_message is not None and msg.topic.startswith('api/feeds/') \ + and len(msg.topic) >= 31: + feed_id = msg.topic[10:-21] + self.on_message(self, feed_id, msg.payload) + + def connect(self, **kwargs): + """Connect to the Adafruit.IO service. Must be called before any loop + or publish operations are called. Will raise an exception if a + connection cannot be made. Optional keyword arguments will be passed + to paho-mqtt client connect function. + """ + # Skip calling connect if already connected. + if self._connected: + return + # Connect to the Adafruit IO MQTT service. + self._client.connect(SERVICE_HOST, port=SERVICE_PORT, + keepalive=KEEP_ALIVE_SEC, **kwargs) + + def is_connected(self): + """Returns True if connected to Adafruit.IO and False if not connected. + """ + return self._connected + + def disconnect(self): + # Disconnect MQTT client if connected. + if self._connected: + self._client.disconnect() + + def loop_background(self): + """Starts a background thread to listen for messages from Adafruit.IO + and call the appropriate callbacks when feed events occur. Will return + immediately and will not block execution. Should only be called once. + """ + self._client.loop_start() + + def loop_blocking(self): + """Listen for messages from Adafruit.IO and call the appropriate + callbacks when feed events occur. This call will block execution of + your program and will not return until disconnect is explicitly called. + + This is useful if your program doesn't need to do anything else except + listen and respond to Adafruit.IO feed events. If you need to do other + processing, consider using the loop_background function to run a loop + in the background. + """ + self._client.loop_forever() + + def loop(self, timeout_sec=1.0): + """Manually process messages from Adafruit.IO. This is meant to be used + inside your own main loop, where you periodically call this function to + make sure messages are being processed to and from Adafruit_IO. + + The optional timeout_sec parameter specifies at most how long to block + execution waiting for messages when this function is called. The default + is one second. + """ + self._client.loop(timeout=timeout_sec) + + def subscribe(self, feed_id): + """Subscribe to changes on the specified feed. When the feed is updated + the on_message function will be called with the feed_id and new value. + """ + self._client.subscribe('api/feeds/{0}/streams/receive.json'.format(feed_id)) + + def publish(self, feed_id, value): + """Publish a value to a specified feed. + + Required parameters: + - feed_id: The id of the feed to update. + - value: The new value to publish to the feed. + """ + self._client.publish('api/feeds/{0}/streams/send.json'.format(feed_id), + payload=value) diff --git a/examples/mqtt_client.py b/examples/mqtt_client.py new file mode 100644 index 0000000..7fb4cf9 --- /dev/null +++ b/examples/mqtt_client.py @@ -0,0 +1,87 @@ +# Example of using the MQTT client class to subscribe to and publish feed values. +# Author: Tony DiCola (tdicola@adafruit.com) + +# Import standard python modules. +import random +import sys +import time + +# Import Adafruit IO client. +import Adafruit_IO + + +# Set to your Adafruit IO key. +ADAFRUIT_IO_KEY = 'YOUR ADAFRUIT IO KEY' + + +# Define callback functions which will be called when certain events happen. +def connected(client): + # Connected function will be called when the client is connected to Adafruit IO. + # This is a good place to subscribe to feed changes. The client parameter + # passed to this function is the Adafruit IO MQTT client so you can make + # calls against it easily. + print 'Connected to Adafruit IO! Listening for DemoFeed changes...' + # Subscribe to changes on a feed named DemoFeed. + client.subscribe('DemoFeed') + +def disconnected(client): + # Disconnected function will be called when the client disconnects. + print 'Disconnected from Adafruit IO!' + sys.exit(1) + +def message(client, feed_id, payload): + # Message function will be called when a subscribed feed has a new value. + # The feed_id parameter identifies the feed, and the payload parameter has + # the new value. + print 'Feed {0} received new value: {1}'.format(feed_id, payload) + + +# Create an MQTT client instance. +client = Adafruit_IO.MQTTClient(ADAFRUIT_IO_KEY) + +# Setup the callback functions defined above. +client.on_connect = connected +client.on_disconnect = disconnected +client.on_message = message + +# Connect to the Adafruit IO server. +client.connect() + +# Now the program needs to use a client loop function to ensure messages are +# sent and received. There are a few options for driving the message loop, +# depending on what your program needs to do. + +# The first option is to run a thread in the background so you can continue +# doing things in your program. +# client.loop_background() +# Now send new values every 10 seconds. +print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...' +while True: + value = random.randint(0, 100) + print 'Publishing {0} to DemoFeed.'.format(value) + client.publish('DemoFeed', value) + time.sleep(10) + +# Another option is to pump the message loop yourself by periodically calling +# the client loop function. Notice how the loop below changes to call loop +# continuously while still sending a new message every 10 seconds. This is a +# good option if you don't want to or can't have a thread pumping the message +# loop in the background. +#last = 0 +#print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...' +#while True: +# # Explicitly pump the message loop. +# client.loop() +# # Send a new message every 10 seconds. +# if (time.time() - last) >= 10.0: +# value = random.randint(0, 100) +# print 'Publishing {0} to DemoFeed.'.format(value) +# client.publish('DemoFeed', value) +# last = time.time() + +# The last option is to just call loop_blocking. This will run a message loop +# forever, so your program will not get past the loop_blocking call. This is +# good for simple programs which only listen to events. For more complex programs +# you probably need to have a background thread loop or explicit message loop like +# the two previous examples above. +#client.loop_blocking() diff --git a/test.py b/examples/test.py similarity index 100% rename from test.py rename to examples/test.py diff --git a/setup.py b/setup.py index ac83123..9739c1f 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,4 @@ -from distutils.core import setup +from setuptools import setup setup( name='Adafruit_IO', diff --git a/tests/README.txt b/tests/README.txt index 85c2fc6..c7b1211 100644 --- a/tests/README.txt +++ b/tests/README.txt @@ -1,11 +1,16 @@ Adafruit IO Python Client Test README -To run these tests you must have the pytest module installed. You can install -this (assuming you have pip installed) by executing: - sudo pip install pytest +To run the tests you can use python's built in unittest module's auto discovery. +Do this by running inside this tests directory: + python -m unittest discover Some tests require a valid Adafruit IO account to run, and they key for this account is provided in the ADAFRUIT_IO_KEY environment variable. Make sure to set this envirionment variable before running the tests, for example to run all the tests with a key execute in this directory: - ADAFRUIT_IO_KEY=my_io_key_value py.test + ADAFRUIT_IO_KEY=my_io_key_value python -m unittest discover + +To add your own tests you are strongly encouraged to build off the test base +class provided in base.py. This class provides a place for common functions +that don't need to be duplicated across all the tests. See the existing test +code for an example of how tests are written and use the base test case. diff --git a/tests/base.py b/tests/base.py new file mode 100644 index 0000000..ee1d7f0 --- /dev/null +++ b/tests/base.py @@ -0,0 +1,20 @@ +# Base testcase class with functions and state available to all tests. +# Author: Tony DiCola (tdicola@adafruit.com) +import os +import time +import unittest + +import Adafruit_IO + + +class IOTestCase(unittest.TestCase): + + def get_test_key(self): + """Return the AIO key specified in the ADAFRUIT_IO_KEY environment + variable, or raise an exception if it doesn't exist. + """ + key = os.environ.get('ADAFRUIT_IO_KEY', None) + if key is None: + raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be " \ + "set with valid Adafruit IO key to run this test!") + return key diff --git a/tests/test_errors.py b/tests/test_errors.py index 6158845..0431fe7 100644 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -1,31 +1,23 @@ -import os +# Test error responses with REST client. +# Author: Tony DiCola (tdicola@adafruit.com) import time - -import pytest +import unittest import Adafruit_IO +import base -def _get_client(): - """Return an Adafruit IO client instance configured with the key specified in - the ADAFRUIT_IO_KEY environment variable. - """ - key = os.environ.get('ADAFRUIT_IO_KEY', None) - if key is None: - raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be set with " \ - "valid Adafruit IO key to run this test!") - return Adafruit_IO.Client(key) +class TestErrors(base.IOTestCase): + def test_request_error_from_bad_key(self): + io = Adafruit_IO.Client("this is a bad key from a test") + with self.assertRaises(Adafruit_IO.RequestError): + io.send("TestStream", 42) -class TestErrors: - def test_request_error_from_bad_key(self): - io = Adafruit_IO.Client("this is a bad key from a test") - with pytest.raises(Adafruit_IO.RequestError): - io.send("TestStream", 42) - - def test_throttling_error_after_6_requests_in_short_period(self): - io = _get_client() - with pytest.raises(Adafruit_IO.ThrottlingError): - for i in range(6): - io.send("TestStream", 42) - time.sleep(0.1) # Small delay to keep from hammering network. + @unittest.skip("Throttling test must be run in isolation to prevent other failures.") + def test_throttling_error_after_6_requests_in_short_period(self): + io = Adafruit_IO.Client(self.get_test_key()) + with self.assertRaises(Adafruit_IO.ThrottlingError): + for i in range(6): + io.send("TestStream", 42) + time.sleep(0.1) # Small delay to keep from hammering network. diff --git a/tests/test_mqtt_client.py b/tests/test_mqtt_client.py new file mode 100644 index 0000000..62e8832 --- /dev/null +++ b/tests/test_mqtt_client.py @@ -0,0 +1,83 @@ +# Test MQTT client class. +# Author: Tony DiCola (tdicola@adafruit.com) +import logging +import time + +import Adafruit_IO +import base + + +TIMEOUT_SEC = 5 # Max amount of time (in seconds) to wait for asyncronous events + # during test runs. + + +class TestMQTTClient(base.IOTestCase): + + def wait_until_connected(self, client, connect_value=True, timeout_sec=TIMEOUT_SEC): + # Pump the specified client message loop and wait until it's connected, + # or the specified timeout has ellapsed. Can specify an explicit + # connection state to wait for by setting connect_value (defaults to + # waiting until connected, i.e. True). + start = time.time() + while client.is_connected() != connect_value and (time.time() - start) < timeout_sec: + client.loop() + time.sleep(0) + + def test_create_client(self): + # Create MQTT test client. + client = Adafruit_IO.MQTTClient(self.get_test_key()) + # Verify not connected by default. + self.assertFalse(client.is_connected()) + + def test_connect(self): + # Create MQTT test client. + client = Adafruit_IO.MQTTClient(self.get_test_key()) + # Verify on_connect handler is called and expected client is provided. + def on_connect(mqtt_client): + self.assertEqual(mqtt_client, client) + client.on_connect = on_connect + # Connect and wait until on_connect event is fired. + client.connect() + self.wait_until_connected(client) + # Verify connected. + self.assertTrue(client.is_connected()) + + def test_disconnect(self): + # Create MQTT test client. + client = Adafruit_IO.MQTTClient(self.get_test_key()) + # Verify on_connect handler is called and expected client is provided. + def on_disconnect(mqtt_client): + self.assertEqual(mqtt_client, client) + client.on_disconnect = on_disconnect + # Connect and wait until on_connect event is fired. + client.connect() + self.wait_until_connected(client) + # Now disconnect and wait until disconnection event occurs. + client.disconnect() + self.wait_until_connected(client, connect_value=False) + # Verify diconnected. + self.assertFalse(client.is_connected()) + + def test_subscribe_and_publish(self): + # Create MQTT test client. + client = Adafruit_IO.MQTTClient(self.get_test_key()) + # Save all on_message handler responses. + messages = [] + def on_message(mqtt_client, feed_id, payload): + self.assertEqual(mqtt_client, client) + messages.append((feed_id, payload)) + client.on_message = on_message + # Connect and wait until on_connect event is fired. + client.connect() + self.wait_until_connected(client) + # Subscribe to changes on a feed. + client.subscribe('TestFeed') + # Publish a message on the feed. + client.publish('TestFeed', 42) + # Wait for message to be received or timeout. + start = time.time() + while len(messages) == 0 and (time.time() - start) < TIMEOUT_SEC: + client.loop() + time.sleep(0) + # Verify one update message with payload is received. + self.assertListEqual(messages, [('TestFeed', '42')]) diff --git a/tests/test_setup.py b/tests/test_setup.py index ca7b1ce..87e12ab 100644 --- a/tests/test_setup.py +++ b/tests/test_setup.py @@ -1,12 +1,11 @@ -import pytest - +# Test setup of REST client. +# Author: jwcooper from Adafruit_IO import Client +import base -def teardown_module(module): - pass -class TestSetup: - def test_set_key(self): - key = "unique_key_id" - io = Client(key) - assert key == io.key +class TestSetup(base.IOTestCase): + def test_set_key(self): + key = "unique_key_id" + io = Client(key) + self.assertEqual(key, io.key)