Merge pull request #2 from adafruit/mqtt_client

Add MQTT client, refactor tests to use unittest module.
This commit is contained in:
Tony DiCola 2015-01-14 10:39:40 -08:00
commit f1d11bf17b
11 changed files with 378 additions and 46 deletions

View file

@ -1 +1,2 @@
from .client import Client, AdafruitIOError, RequestError, ThrottlingError from .client import Client, AdafruitIOError, RequestError, ThrottlingError
from .mqtt_client import MQTTClient

View file

@ -78,33 +78,33 @@ class Client(object):
#stream functionality #stream functionality
def send(self, feed_name, data): def send(self, feed_name, data):
feed_name = quote(feed_name) feed_name = quote(feed_name)
path = "api/feeds/{}/streams/send".format(feed_name) path = "api/feeds/{}/data/send".format(feed_name)
return self._post(path, {'value': data}) return self._post(path, {'value': data})
def receive(self, feed_name): def receive(self, feed_name):
feed_name = quote(feed_name) feed_name = quote(feed_name)
path = "api/feeds/{}/streams/last".format(feed_name) path = "api/feeds/{}/data/last".format(feed_name)
return self._get(path) return self._get(path)
def receive_next(self, feed_name): def receive_next(self, feed_name):
feed_name = quote(feed_name) feed_name = quote(feed_name)
path = "api/feeds/{}/streams/next".format(feed_name) path = "api/feeds/{}/data/next".format(feed_name)
return self._get(path) return self._get(path)
def receive_previous(self, feed_name): def receive_previous(self, feed_name):
feed_name = quote(feed_name) feed_name = quote(feed_name)
path = "api/feeds/{}/streams/last".format(feed_name) path = "api/feeds/{}/data/last".format(feed_name)
return self._get(path) return self._get(path)
def streams(self, feed_id_or_key, stream_id=None): def streams(self, feed_id_or_key, stream_id=None):
if stream_id is None: if stream_id is None:
path = "api/feeds/{}/streams".format(feed_id_or_key) path = "api/feeds/{}/data".format(feed_id_or_key)
else: else:
path = "api/feeds/{}/streams/{}".format(feed_id_or_key, stream_id) path = "api/feeds/{}/data/{}".format(feed_id_or_key, stream_id)
return self._get(path) return self._get(path)
def create_stream(self, feed_id_or_key, data): def create_stream(self, feed_id_or_key, data):
path = "api/feeds/{}/streams".format(feed_id_or_key) path = "api/feeds/{}/data".format(feed_id_or_key)
return self._post(path, data) return self._post(path, data)
#group functionality #group functionality

145
Adafruit_IO/mqtt_client.py Normal file
View file

@ -0,0 +1,145 @@
# MQTT-based client for Adafruit.IO
# Author: Tony DiCola (tdicola@adafruit.com)
#
# Supports publishing and subscribing to feed changes from Adafruit IO using
# the MQTT protcol.
#
# Depends on the following Python libraries:
# - paho-mqtt: Paho MQTT client for python.
import logging
import paho.mqtt.client as mqtt
SERVICE_HOST = 'io.adafruit.com'
SERVICE_PORT = 1883
KEEP_ALIVE_SEC = 3600 # One minute
logger = logging.getLogger(__name__)
class MQTTClient(object):
"""Interface for publishing and subscribing to feed changes on Adafruit IO
using the MQTT protocol.
"""
def __init__(self, key):
"""Create instance of MQTT client.
Required parameters:
- key: The Adafruit.IO access key for your account.
"""
# Initialize event callbacks to be None so they don't fire.
self.on_connect = None
self.on_disconnect = None
self.on_message = None
# Initialize MQTT client.
self._client = mqtt.Client()
self._client.username_pw_set(key)
self._client.on_connect = self._mqtt_connect
self._client.on_disconnect = self._mqtt_disconnect
self._client.on_message = self._mqtt_message
self._connected = False
def _mqtt_connect(self, client, userdata, flags, rc):
logger.debug('Client on_connect called.')
# Check if the result code is success (0) or some error (non-zero) and
# raise an exception if failed.
if rc == 0:
self._connected = True
else:
# TODO: Make explicit exception classes for these failures:
# 0: Connection successful 1: Connection refused - incorrect protocol version 2: Connection refused - invalid client identifier 3: Connection refused - server unavailable 4: Connection refused - bad username or password 5: Connection refused - not authorised 6-255: Currently unused.
raise RuntimeError('Error connecting to Adafruit IO with rc: {0}'.format(rc))
# Call the on_connect callback if available.
if self.on_connect is not None:
self.on_connect(self)
def _mqtt_disconnect(self, client, userdata, rc):
logger.debug('Client on_disconnect called.')
self._connected = False
# If this was an unexpected disconnect (non-zero result code) then raise
# an exception.
if rc != 0:
raise RuntimeError('Unexpected disconnect with rc: {0}'.format(rc))
# Call the on_disconnect callback if available.
if self.on_disconnect is not None:
self.on_disconnect(self)
def _mqtt_message(self, client, userdata, msg):
logger.debug('Client on_message called.')
# Parse out the feed id and call on_message callback.
# Assumes topic looks like "api/feeds/{feed_id}/data/receive.json"
if self.on_message is not None and msg.topic.startswith('api/feeds/') \
and len(msg.topic) >= 28:
feed_id = msg.topic[10:-18]
self.on_message(self, feed_id, msg.payload)
def connect(self, **kwargs):
"""Connect to the Adafruit.IO service. Must be called before any loop
or publish operations are called. Will raise an exception if a
connection cannot be made. Optional keyword arguments will be passed
to paho-mqtt client connect function.
"""
# Skip calling connect if already connected.
if self._connected:
return
# Connect to the Adafruit IO MQTT service.
self._client.connect(SERVICE_HOST, port=SERVICE_PORT,
keepalive=KEEP_ALIVE_SEC, **kwargs)
def is_connected(self):
"""Returns True if connected to Adafruit.IO and False if not connected.
"""
return self._connected
def disconnect(self):
# Disconnect MQTT client if connected.
if self._connected:
self._client.disconnect()
def loop_background(self):
"""Starts a background thread to listen for messages from Adafruit.IO
and call the appropriate callbacks when feed events occur. Will return
immediately and will not block execution. Should only be called once.
"""
self._client.loop_start()
def loop_blocking(self):
"""Listen for messages from Adafruit.IO and call the appropriate
callbacks when feed events occur. This call will block execution of
your program and will not return until disconnect is explicitly called.
This is useful if your program doesn't need to do anything else except
listen and respond to Adafruit.IO feed events. If you need to do other
processing, consider using the loop_background function to run a loop
in the background.
"""
self._client.loop_forever()
def loop(self, timeout_sec=1.0):
"""Manually process messages from Adafruit.IO. This is meant to be used
inside your own main loop, where you periodically call this function to
make sure messages are being processed to and from Adafruit_IO.
The optional timeout_sec parameter specifies at most how long to block
execution waiting for messages when this function is called. The default
is one second.
"""
self._client.loop(timeout=timeout_sec)
def subscribe(self, feed_id):
"""Subscribe to changes on the specified feed. When the feed is updated
the on_message function will be called with the feed_id and new value.
"""
self._client.subscribe('api/feeds/{0}/data/receive.json'.format(feed_id))
def publish(self, feed_id, value):
"""Publish a value to a specified feed.
Required parameters:
- feed_id: The id of the feed to update.
- value: The new value to publish to the feed.
"""
self._client.publish('api/feeds/{0}/data/send.json'.format(feed_id),
payload=value)

87
examples/mqtt_client.py Normal file
View file

@ -0,0 +1,87 @@
# Example of using the MQTT client class to subscribe to and publish feed values.
# Author: Tony DiCola (tdicola@adafruit.com)
# Import standard python modules.
import random
import sys
import time
# Import Adafruit IO client.
import Adafruit_IO
# Set to your Adafruit IO key.
ADAFRUIT_IO_KEY = 'YOUR ADAFRUIT IO KEY'
# Define callback functions which will be called when certain events happen.
def connected(client):
# Connected function will be called when the client is connected to Adafruit IO.
# This is a good place to subscribe to feed changes. The client parameter
# passed to this function is the Adafruit IO MQTT client so you can make
# calls against it easily.
print 'Connected to Adafruit IO! Listening for DemoFeed changes...'
# Subscribe to changes on a feed named DemoFeed.
client.subscribe('DemoFeed')
def disconnected(client):
# Disconnected function will be called when the client disconnects.
print 'Disconnected from Adafruit IO!'
sys.exit(1)
def message(client, feed_id, payload):
# Message function will be called when a subscribed feed has a new value.
# The feed_id parameter identifies the feed, and the payload parameter has
# the new value.
print 'Feed {0} received new value: {1}'.format(feed_id, payload)
# Create an MQTT client instance.
client = Adafruit_IO.MQTTClient(ADAFRUIT_IO_KEY)
# Setup the callback functions defined above.
client.on_connect = connected
client.on_disconnect = disconnected
client.on_message = message
# Connect to the Adafruit IO server.
client.connect()
# Now the program needs to use a client loop function to ensure messages are
# sent and received. There are a few options for driving the message loop,
# depending on what your program needs to do.
# The first option is to run a thread in the background so you can continue
# doing things in your program.
# client.loop_background()
# Now send new values every 10 seconds.
print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...'
while True:
value = random.randint(0, 100)
print 'Publishing {0} to DemoFeed.'.format(value)
client.publish('DemoFeed', value)
time.sleep(10)
# Another option is to pump the message loop yourself by periodically calling
# the client loop function. Notice how the loop below changes to call loop
# continuously while still sending a new message every 10 seconds. This is a
# good option if you don't want to or can't have a thread pumping the message
# loop in the background.
#last = 0
#print 'Publishing a new message every 10 seconds (press Ctrl-C to quit)...'
#while True:
# # Explicitly pump the message loop.
# client.loop()
# # Send a new message every 10 seconds.
# if (time.time() - last) >= 10.0:
# value = random.randint(0, 100)
# print 'Publishing {0} to DemoFeed.'.format(value)
# client.publish('DemoFeed', value)
# last = time.time()
# The last option is to just call loop_blocking. This will run a message loop
# forever, so your program will not get past the loop_blocking call. This is
# good for simple programs which only listen to events. For more complex programs
# you probably need to have a background thread loop or explicit message loop like
# the two previous examples above.
#client.loop_blocking()

View file

@ -1,4 +1,4 @@
from distutils.core import setup from setuptools import setup
setup( setup(
name='Adafruit_IO', name='Adafruit_IO',

View file

@ -1,11 +1,16 @@
Adafruit IO Python Client Test README Adafruit IO Python Client Test README
To run these tests you must have the pytest module installed. You can install To run the tests you can use python's built in unittest module's auto discovery.
this (assuming you have pip installed) by executing: Do this by running inside this tests directory:
sudo pip install pytest python -m unittest discover
Some tests require a valid Adafruit IO account to run, and they key for this Some tests require a valid Adafruit IO account to run, and they key for this
account is provided in the ADAFRUIT_IO_KEY environment variable. Make sure to account is provided in the ADAFRUIT_IO_KEY environment variable. Make sure to
set this envirionment variable before running the tests, for example to run all set this envirionment variable before running the tests, for example to run all
the tests with a key execute in this directory: the tests with a key execute in this directory:
ADAFRUIT_IO_KEY=my_io_key_value py.test ADAFRUIT_IO_KEY=my_io_key_value python -m unittest discover
To add your own tests you are strongly encouraged to build off the test base
class provided in base.py. This class provides a place for common functions
that don't need to be duplicated across all the tests. See the existing test
code for an example of how tests are written and use the base test case.

20
tests/base.py Normal file
View file

@ -0,0 +1,20 @@
# Base testcase class with functions and state available to all tests.
# Author: Tony DiCola (tdicola@adafruit.com)
import os
import time
import unittest
import Adafruit_IO
class IOTestCase(unittest.TestCase):
def get_test_key(self):
"""Return the AIO key specified in the ADAFRUIT_IO_KEY environment
variable, or raise an exception if it doesn't exist.
"""
key = os.environ.get('ADAFRUIT_IO_KEY', None)
if key is None:
raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be " \
"set with valid Adafruit IO key to run this test!")
return key

View file

@ -1,31 +1,23 @@
import os # Test error responses with REST client.
# Author: Tony DiCola (tdicola@adafruit.com)
import time import time
import unittest
import pytest
import Adafruit_IO import Adafruit_IO
import base
def _get_client(): class TestErrors(base.IOTestCase):
"""Return an Adafruit IO client instance configured with the key specified in
the ADAFRUIT_IO_KEY environment variable.
"""
key = os.environ.get('ADAFRUIT_IO_KEY', None)
if key is None:
raise RuntimeError("ADAFRUIT_IO_KEY environment variable must be set with " \
"valid Adafruit IO key to run this test!")
return Adafruit_IO.Client(key)
class TestErrors:
def test_request_error_from_bad_key(self): def test_request_error_from_bad_key(self):
io = Adafruit_IO.Client("this is a bad key from a test") io = Adafruit_IO.Client("this is a bad key from a test")
with pytest.raises(Adafruit_IO.RequestError): with self.assertRaises(Adafruit_IO.RequestError):
io.send("TestStream", 42) io.send("TestStream", 42)
@unittest.skip("Throttling test must be run in isolation to prevent other failures.")
def test_throttling_error_after_6_requests_in_short_period(self): def test_throttling_error_after_6_requests_in_short_period(self):
io = _get_client() io = Adafruit_IO.Client(self.get_test_key())
with pytest.raises(Adafruit_IO.ThrottlingError): with self.assertRaises(Adafruit_IO.ThrottlingError):
for i in range(6): for i in range(6):
io.send("TestStream", 42) io.send("TestStream", 42)
time.sleep(0.1) # Small delay to keep from hammering network. time.sleep(0.1) # Small delay to keep from hammering network.

83
tests/test_mqtt_client.py Normal file
View file

@ -0,0 +1,83 @@
# Test MQTT client class.
# Author: Tony DiCola (tdicola@adafruit.com)
import logging
import time
import Adafruit_IO
import base
TIMEOUT_SEC = 5 # Max amount of time (in seconds) to wait for asyncronous events
# during test runs.
class TestMQTTClient(base.IOTestCase):
def wait_until_connected(self, client, connect_value=True, timeout_sec=TIMEOUT_SEC):
# Pump the specified client message loop and wait until it's connected,
# or the specified timeout has ellapsed. Can specify an explicit
# connection state to wait for by setting connect_value (defaults to
# waiting until connected, i.e. True).
start = time.time()
while client.is_connected() != connect_value and (time.time() - start) < timeout_sec:
client.loop()
time.sleep(0)
def test_create_client(self):
# Create MQTT test client.
client = Adafruit_IO.MQTTClient(self.get_test_key())
# Verify not connected by default.
self.assertFalse(client.is_connected())
def test_connect(self):
# Create MQTT test client.
client = Adafruit_IO.MQTTClient(self.get_test_key())
# Verify on_connect handler is called and expected client is provided.
def on_connect(mqtt_client):
self.assertEqual(mqtt_client, client)
client.on_connect = on_connect
# Connect and wait until on_connect event is fired.
client.connect()
self.wait_until_connected(client)
# Verify connected.
self.assertTrue(client.is_connected())
def test_disconnect(self):
# Create MQTT test client.
client = Adafruit_IO.MQTTClient(self.get_test_key())
# Verify on_connect handler is called and expected client is provided.
def on_disconnect(mqtt_client):
self.assertEqual(mqtt_client, client)
client.on_disconnect = on_disconnect
# Connect and wait until on_connect event is fired.
client.connect()
self.wait_until_connected(client)
# Now disconnect and wait until disconnection event occurs.
client.disconnect()
self.wait_until_connected(client, connect_value=False)
# Verify diconnected.
self.assertFalse(client.is_connected())
def test_subscribe_and_publish(self):
# Create MQTT test client.
client = Adafruit_IO.MQTTClient(self.get_test_key())
# Save all on_message handler responses.
messages = []
def on_message(mqtt_client, feed_id, payload):
self.assertEqual(mqtt_client, client)
messages.append((feed_id, payload))
client.on_message = on_message
# Connect and wait until on_connect event is fired.
client.connect()
self.wait_until_connected(client)
# Subscribe to changes on a feed.
client.subscribe('TestFeed')
# Publish a message on the feed.
client.publish('TestFeed', 42)
# Wait for message to be received or timeout.
start = time.time()
while len(messages) == 0 and (time.time() - start) < TIMEOUT_SEC:
client.loop()
time.sleep(0)
# Verify one update message with payload is received.
self.assertListEqual(messages, [('TestFeed', '42')])

View file

@ -1,12 +1,11 @@
import pytest # Test setup of REST client.
# Author: jwcooper
from Adafruit_IO import Client from Adafruit_IO import Client
import base
def teardown_module(module):
pass
class TestSetup: class TestSetup(base.IOTestCase):
def test_set_key(self): def test_set_key(self):
key = "unique_key_id" key = "unique_key_id"
io = Client(key) io = Client(key)
assert key == io.key self.assertEqual(key, io.key)