Compare commits

...

44 commits

Author SHA1 Message Date
Tyeth Gundry
a3fd30ac80
Merge pull request #160 from adafruit/digitalout-no-feed-value
Addresses issue raised in #159
2024-12-06 17:47:42 +00:00
tyeth
302fcca696 Addresses issue raised in #159 2024-11-29 23:39:56 +00:00
Tyeth Gundry
3b008d356d
Update _version.py - bump version to 2.8.0 2024-09-26 15:52:51 +01:00
Tyeth Gundry
68ff68dc4a
Merge pull request #158 from adafruit/update-tests-adjusted-time-failure
Add Timezone support to `receive_time` and update tests (adjusted-time-failure)
2024-09-26 15:36:35 +01:00
tyeth
84577df587 Add timezone parameter to receive_time + UTC for test 2024-09-18 17:56:48 +01:00
Tyeth Gundry
4e16f2b070
Update README.rst - python version 3.6 minimum
A user pointed out they cannot run the library on python 3.5, and it appears that 3.6 will be the new minimum python version.
Fixes #156
2024-09-18 12:50:41 +01:00
Brent Rubell
4ce48a8d0b
Merge pull request #153 from adafruit/paho-v1-compatibility
Paho v1 compatibility
2024-03-11 09:25:21 -04:00
tyeth
2737781f4c PR Feedback: label client v1, clarify paho client in example/viewall 2024-02-16 12:42:19 +00:00
tyeth
a0cb9f613c Update error codes + shared_feeds example 2024-02-14 21:50:18 +00:00
tyeth
29bed98b60 Bump version to 2.7.2 2024-02-14 16:26:10 +00:00
tyeth
51117a4ce0 Specify CallbackAPIVersion.VERSION1 2024-02-14 16:20:06 +00:00
Tyeth Gundry
1d996325c3
Update _version.py - fix pip 2023-12-05 17:21:26 +00:00
Brent Rubell
bae611bb49
Update random_data.py 2023-11-27 10:23:12 -05:00
Brent Rubell
d732c49c9a
Merge pull request #150 from brentru/add-neopixel-example
Add new basics example for use with NeoPixel
2023-11-08 16:44:59 -05:00
brentru
31372ce877 add neopixel example 2023-11-08 16:43:41 -05:00
Brent Rubell
bb565141af
Merge pull request #149 from brentru/update-temp-humid-example
Update example, `temp_humidity.py`
2023-11-08 16:07:18 -05:00
brentru
15e1f44e7a only build on master push 2023-11-08 16:07:02 -05:00
brentru
0d1146d2ee target? 2023-11-08 16:02:03 -05:00
brentru
f95539a412 echo for debug 2023-11-08 15:57:58 -05:00
brentru
777a961029 use english configuration 2023-11-06 18:49:29 -05:00
brentru
787150567f use latest python ver 2023-11-06 18:42:40 -05:00
brentru
05ebb1ce24 drop 3.6 req, EOL 2023-11-06 18:41:22 -05:00
brentru
13a875809a use new action 2023-11-06 18:40:17 -05:00
brentru
3fa2cf48c4 update example 2023-11-06 18:34:15 -05:00
Brent Rubell
e418f20c06
Merge pull request #139 from ALfuhrmann/patch-1
copy-and-paste error in batch sending
2023-11-06 18:09:09 -05:00
Brent Rubell
ecefb29ad7
Merge pull request #147 from Pjurek3/patch-1
#146 - Fix Client Interface in Feeds Documentation
2022-11-21 11:27:39 -05:00
Pjurek3
3e8c8b3de1
Update feeds.rst
#146 - updated Feeds documentation for correct `Client` interface
2022-11-20 16:01:43 -08:00
Brent Rubell
36e2dccf61
Merge pull request #144 from tekktrik/dev/fix-readme
Fix CI badge image
2022-09-12 13:16:22 -04:00
Alec Delaney
d54398372a
Fix CI badge image 2022-08-14 22:54:04 -04:00
Brent Rubell
06df42d534
Merge pull request #131 from lcmcninch/get-pages
Implement a means of getting more than 1000 data points
2022-03-10 14:53:02 -05:00
Luke McNinch
b53348282c Lower default page limit from 1000 to 100. 2022-01-17 10:36:08 -05:00
ALfuhrmann
c636eda6ea
copy-and-paste error in batch sending
The "Send Batch Data" did not actually contain an "aio.send_batch_data" call.
Fixed it.
2022-01-17 15:00:44 +01:00
Luke McNinch
c152480ad9 Merge branch 'master' into get-pages
# Conflicts:
#	tests/test_client.py
2021-11-30 16:31:50 -05:00
Brent Rubell
92093d45e4
Merge pull request #136 from cj8scrambler/master
Add support for dashboards, blocks and layouts
2021-11-29 10:40:04 -05:00
Brent Rubell
ce3d868048
Merge branch 'master' into master 2021-11-29 10:38:13 -05:00
Brent Rubell
0f8328915b
Merge pull request #138 from lcmcninch/fix_weekday
receive_time: fix week day
2021-11-29 10:35:01 -05:00
Brent Rubell
6587c92bb6
Update build.yml
https://dev.to/petrsvihlik/using-environment-protection-rules-to-secure-secrets-when-building-external-forks-with-pullrequesttarget-hci
2021-11-29 10:28:49 -05:00
Luke McNinch
c5d1975fcc Address issue 137: receive_time parses week day (wday) as one day off of what is expected by Python.
Implement a method to parse the dict returned by the server and correct the week day.
2021-11-22 20:19:47 -05:00
Doug Zobel
09c021b0f5 Fix Dashboard parsing of Block objects 2021-11-18 09:38:43 -06:00
Doug Zobel
11bddacc67 Add support for dashboards, blocks and layouts 2021-11-16 15:35:05 -06:00
Brent Rubell
d047655176
Delete environmental_monitor.py 2021-11-08 10:16:23 -05:00
Luke McNinch
3d267e97df Eliminate while True: by querying the feed count if max_results is None (caller is requesting all data). 2021-04-15 22:09:10 -04:00
Luke McNinch
e9afd5b317 Implement a "max_results" parameter for the data method, to allow for retrieval of more than 1000 data points.
This implementation is subject to an existing bug in the pagination link header in the API that will break when and if that bug is fixed.
2021-02-26 09:51:14 -05:00
Luke McNinch
e94e2d4bb3 Add params keyword argument to _get method to enable use of URL parameters. 2021-02-25 20:34:46 -05:00
21 changed files with 659 additions and 220 deletions

View file

@ -1,18 +1,23 @@
name: Build-CI
on: [pull_request, push]
on:
push:
branches:
- master
jobs:
build:
runs-on: ubuntu-latest
environment:
name: IO
steps:
- uses: actions/checkout@v2
- name: Set up Python 3.6
uses: actions/setup-python@v1
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.6
python-version: '3.10'
- name: Install dependencies
run: |
@ -30,6 +35,8 @@ jobs:
SECRET_IO_KEY: ${{ secrets.CI_IO_KEY }}
SECRET_IO_USER: ${{ secrets.CI_IO_USERNAME }}
run: |
echo "Secret key length: ${#SECRET_IO_KEY}"
echo "Secret username length: ${#SECRET_IO_USER}"
cd tests/
ADAFRUIT_IO_KEY=$SECRET_IO_KEY ADAFRUIT_IO_USERNAME=$SECRET_IO_USER python -m unittest discover
cd ..

View file

@ -21,5 +21,5 @@
from .client import Client
from .mqtt_client import MQTTClient
from .errors import AdafruitIOError, RequestError, ThrottlingError, MQTTError
from .model import Data, Feed, Group
from ._version import __version__
from .model import Data, Feed, Group, Dashboard, Block, Layout
from ._version import __version__

View file

@ -1 +1 @@
__version__ = "2.5.0"
__version__ = "2.8.0"

View file

@ -18,16 +18,22 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import time
from time import struct_time
import json
import platform
import pkg_resources
import re
from urllib.parse import urlparse
from urllib.parse import parse_qs
# import logging
import requests
from .errors import RequestError, ThrottlingError
from .model import Data, Feed, Group
from .model import Data, Feed, Group, Dashboard, Block, Layout
DEFAULT_PAGE_LIMIT = 100
# set outgoing version, pulled from setup.py
version = pkg_resources.require("Adafruit_IO")[0].version
@ -60,9 +66,13 @@ class Client(object):
# constructing the path.
self.base_url = base_url.rstrip('/')
# Store the last response of a get or post
self._last_response = None
@staticmethod
def to_red(data):
"""Hex color feed to red channel.
:param int data: Color value, in hexadecimal.
"""
return ((int(data[1], 16))*16) + int(data[2], 16)
@ -70,6 +80,7 @@ class Client(object):
@staticmethod
def to_green(data):
"""Hex color feed to green channel.
:param int data: Color value, in hexadecimal.
"""
return (int(data[3], 16) * 16) + int(data[4], 16)
@ -77,6 +88,7 @@ class Client(object):
@staticmethod
def to_blue(data):
"""Hex color feed to blue channel.
:param int data: Color value, in hexadecimal.
"""
return (int(data[5], 16) * 16) + int(data[6], 16)
@ -111,10 +123,12 @@ class Client(object):
def _compose_url(self, path):
return '{0}/api/{1}/{2}/{3}'.format(self.base_url, 'v2', self.username, path)
def _get(self, path):
def _get(self, path, params=None):
response = requests.get(self._compose_url(path),
headers=self._headers({'X-AIO-Key': self.key}),
proxies=self.proxies)
proxies=self.proxies,
params=params)
self._last_response = response
self._handle_error(response)
return response.json()
@ -124,6 +138,7 @@ class Client(object):
'Content-Type': 'application/json'}),
proxies=self.proxies,
data=json.dumps(data))
self._last_response = response
self._handle_error(response)
return response.json()
@ -132,6 +147,7 @@ class Client(object):
headers=self._headers({'X-AIO-Key': self.key,
'Content-Type': 'application/json'}),
proxies=self.proxies)
self._last_response = response
self._handle_error(response)
# Data functionality.
@ -140,6 +156,7 @@ class Client(object):
specified value to the feed identified by either name, key, or ID.
Returns a Data instance with details about the newly appended row of data.
Note that send_data now operates the same as append.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param string value: Value to send.
:param dict metadata: Optional metadata associated with the value.
@ -160,6 +177,7 @@ class Client(object):
ID, feed key, or feed name. Data must be an instance of the Data class
with at least a value property set on it. Returns a Data instance with
details about the newly appended row of data.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param Data data_list: Multiple data values.
"""
@ -172,22 +190,40 @@ class Client(object):
specified value to the feed identified by either name, key, or ID.
Returns a Data instance with details about the newly appended row of data.
Note that unlike send the feed should exist before calling append.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param string value: Value to append to feed.
"""
return self.create_data(feed, Data(value=value))
def receive_time(self):
"""Returns a struct_time from the Adafruit IO Server based on the device's IP address.
def receive_time(self, timezone=None):
"""Returns a struct_time from the Adafruit IO Server based on requested
timezone, or automatically based on the device's IP address.
https://docs.python.org/3.7/library/time.html#time.struct_time
:param string timezone: Optional timezone to return the time in.
See https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List
"""
path = 'integrations/time/struct.json'
time = self._get(path)
return struct_time((time['year'], time['mon'], time['mday'], time['hour'],
time['min'], time['sec'], time['wday'], time['yday'], time['isdst']))
if timezone:
path += f'?tz={timezone}'
return self._parse_time_struct(self._get(path))
@staticmethod
def _parse_time_struct(time_dict: dict) -> time.struct_time:
"""Parse the time data returned by the server and return a time_struct
Corrects for the weekday returned by the server in Sunday=0 format
(Python expects Monday=0)
"""
wday = (time_dict['wday'] - 1) % 7
return struct_time((time_dict['year'], time_dict['mon'], time_dict['mday'],
time_dict['hour'], time_dict['min'], time_dict['sec'],
wday, time_dict['yday'], time_dict['isdst']))
def receive_weather(self, weather_id=None):
"""Adafruit IO Weather Service, Powered by Dark Sky
:param int id: optional ID for retrieving a specified weather record.
"""
if weather_id:
@ -199,6 +235,7 @@ class Client(object):
def receive_random(self, randomizer_id=None):
"""Access to Adafruit IO's Random Data
service.
:param int randomizer_id: optional ID for retrieving a specified randomizer.
"""
if randomizer_id:
@ -210,6 +247,7 @@ class Client(object):
def receive(self, feed):
"""Retrieve the most recent value for the specified feed. Returns a Data
instance whose value property holds the retrieved value.
:param string feed: Name/Key/ID of Adafruit IO feed.
"""
path = "feeds/{0}/data/last".format(feed)
@ -218,6 +256,7 @@ class Client(object):
def receive_next(self, feed):
"""Retrieve the next unread value from the specified feed. Returns a Data
instance whose value property holds the retrieved value.
:param string feed: Name/Key/ID of Adafruit IO feed.
"""
path = "feeds/{0}/data/next".format(feed)
@ -226,27 +265,66 @@ class Client(object):
def receive_previous(self, feed):
"""Retrieve the previous unread value from the specified feed. Returns a
Data instance whose value property holds the retrieved value.
:param string feed: Name/Key/ID of Adafruit IO feed.
"""
path = "feeds/{0}/data/previous".format(feed)
return Data.from_dict(self._get(path))
def data(self, feed, data_id=None):
def data(self, feed, data_id=None, max_results=DEFAULT_PAGE_LIMIT):
"""Retrieve data from a feed. If data_id is not specified then all the data
for the feed will be returned in an array.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param string data_id: ID of the piece of data to delete.
:param int max_results: The maximum number of results to return. To
return all data, set to None.
"""
if data_id is None:
path = "feeds/{0}/data".format(feed)
return list(map(Data.from_dict, self._get(path)))
path = "feeds/{0}/data/{1}".format(feed, data_id)
return Data.from_dict(self._get(path))
if max_results is None:
res = self._get(f'feeds/{feed}/details')
max_results = res['details']['data']['count']
if data_id:
path = "feeds/{0}/data/{1}".format(feed, data_id)
return Data.from_dict(self._get(path))
params = {'limit': max_results} if max_results else None
data = []
path = "feeds/{0}/data".format(feed)
while len(data) < max_results:
data.extend(list(map(Data.from_dict, self._get(path,
params=params))))
nlink = self.get_next_link()
if not nlink:
break
# Parse the link for the query parameters
params = parse_qs(urlparse(nlink).query)
if max_results:
params['limit'] = max_results - len(data)
return data
def get_next_link(self):
"""Parse the `next` page URL in the pagination Link header.
This is necessary because of a bug in the API's implementation of the
link header. If that bug is fixed, the link would be accesible by
response.links['next']['url'] and this method would be broken.
:return: The url for the next page of data
:rtype: str
"""
if not self._last_response:
return
link_header = self._last_response.headers['link']
res = re.search('rel="next", <(.+?)>', link_header)
if not res:
return
return res.groups()[0]
def create_data(self, feed, data):
"""Create a new row of data in the specified feed.
Returns a Data instance with details about the newly
appended row of data.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param Data data: Instance of the Data class. Must have a value property set.
"""
@ -255,6 +333,7 @@ class Client(object):
def delete(self, feed, data_id):
"""Delete data from a feed.
:param string feed: Name/Key/ID of Adafruit IO feed.
:param string data_id: ID of the piece of data to delete.
"""
@ -265,6 +344,7 @@ class Client(object):
def feeds(self, feed=None):
"""Retrieve a list of all feeds, or the specified feed. If feed is not
specified a list of all feeds will be returned.
:param string feed: Name/Key/ID of Adafruit IO feed, defaults to None.
"""
if feed is None:
@ -275,17 +355,21 @@ class Client(object):
def create_feed(self, feed, group_key=None):
"""Create the specified feed.
:param string feed: Key of Adafruit IO feed.
:param group_key group: Group to place new feed in.
"""
f = feed._asdict()
del f['id'] # Don't pass id on create call
path = "feeds/"
if group_key is not None: # create feed in a group
path="/groups/%s/feeds"%group_key
return Feed.from_dict(self._post(path, {"feed": feed._asdict()}))
return Feed.from_dict(self._post(path, {"feed": feed._asdict()}))
return Feed.from_dict(self._post(path, {"feed": f}))
return Feed.from_dict(self._post(path, {"feed": f}))
def delete_feed(self, feed):
"""Delete the specified feed.
:param string feed: Name/Key/ID of Adafruit IO feed.
"""
path = "feeds/{0}".format(feed)
@ -294,6 +378,7 @@ class Client(object):
# Group functionality.
def groups(self, group=None):
"""Retrieve a list of all groups, or the specified group.
:param string group: Name/Key/ID of Adafruit IO Group. Defaults to None.
"""
if group is None:
@ -304,6 +389,7 @@ class Client(object):
def create_group(self, group):
"""Create the specified group.
:param string group: Name/Key/ID of Adafruit IO Group.
"""
path = "groups/"
@ -311,7 +397,86 @@ class Client(object):
def delete_group(self, group):
"""Delete the specified group.
:param string group: Name/Key/ID of Adafruit IO Group.
"""
path = "groups/{0}".format(group)
self._delete(path)
# Dashboard functionality.
def dashboards(self, dashboard=None):
"""Retrieve a list of all dashboards, or the specified dashboard.
:param string dashboard: Key of Adafruit IO Dashboard. Defaults to None.
"""
if dashboard is None:
path = "dashboards/"
return list(map(Dashboard.from_dict, self._get(path)))
path = "dashboards/{0}".format(dashboard)
return Dashboard.from_dict(self._get(path))
def create_dashboard(self, dashboard):
"""Create the specified dashboard.
:param Dashboard dashboard: Dashboard object to create
"""
path = "dashboards/"
return Dashboard.from_dict(self._post(path, dashboard._asdict()))
def delete_dashboard(self, dashboard):
"""Delete the specified dashboard.
:param string dashboard: Key of Adafruit IO Dashboard.
"""
path = "dashboards/{0}".format(dashboard)
self._delete(path)
# Block functionality.
def blocks(self, dashboard, block=None):
"""Retrieve a list of all blocks from a dashboard, or the specified block.
:param string dashboard: Key of Adafruit IO Dashboard.
:param string block: id of Adafruit IO Block. Defaults to None.
"""
if block is None:
path = "dashboards/{0}/blocks".format(dashboard)
return list(map(Block.from_dict, self._get(path)))
path = "dashboards/{0}/blocks/{1}".format(dashboard, block)
return Block.from_dict(self._get(path))
def create_block(self, dashboard, block):
"""Create the specified block under the specified dashboard.
:param string dashboard: Key of Adafruit IO Dashboard.
:param Block block: Block object to create under dashboard
"""
path = "dashboards/{0}/blocks".format(dashboard)
return Block.from_dict(self._post(path, block._asdict()))
def delete_block(self, dashboard, block):
"""Delete the specified block.
:param string dashboard: Key of Adafruit IO Dashboard.
:param string block: id of Adafruit IO Block.
"""
path = "dashboards/{0}/blocks/{1}".format(dashboard, block)
self._delete(path)
# Layout functionality.
def layouts(self, dashboard):
"""Retrieve the layouts array from a dashboard
:param string dashboard: key of Adafruit IO Dashboard.
"""
path = "dashboards/{0}".format(dashboard)
dashboard = self._get(path)
return Layout.from_dict(dashboard['layouts'])
def update_layout(self, dashboard, layout):
"""Update the layout of the specified dashboard.
:param string dashboard: Key of Adafruit IO Dashboard.
:param Layout layout: Layout object to update under dashboard
"""
path = "dashboards/{0}/update_layouts".format(dashboard)
return Layout.from_dict(self._post(path, {'layouts': layout._asdict()}))

View file

@ -20,14 +20,7 @@
# SOFTWARE.
import json, requests
# MQTT RC Error Types
MQTT_ERRORS = [ 'Connection successful',
'Incorrect protocol version',
'Invalid Client ID',
'Server unavailable ',
'Bad username or password',
'Not authorized' ]
from paho.mqtt.client import error_string
class AdafruitIOError(Exception):
"""Base class for all Adafruit IO request failures."""
@ -63,6 +56,6 @@ class MQTTError(Exception):
"""Handles connection attempt failed errors.
"""
def __init__(self, response):
error = MQTT_ERRORS[response]
error = error_string(response)
super(MQTTError, self).__init__(error)
pass

View file

@ -43,6 +43,7 @@ DATA_FIELDS = [ 'created_epoch',
FEED_FIELDS = [ 'name',
'key',
'id',
'description',
'unit_type',
'unit_symbol',
@ -61,6 +62,26 @@ GROUP_FIELDS = [ 'description',
'properties',
'name' ]
DASHBOARD_FIELDS = [ 'name',
'key',
'description',
'show_header',
'color_mode',
'block_borders',
'header_image_url',
'blocks' ]
BLOCK_FIELDS = [ 'name',
'id',
'visual_type',
'properties',
'block_feeds' ]
LAYOUT_FIELDS = ['xl',
'lg',
'md',
'sm',
'xs' ]
# These are very simple data model classes that are based on namedtuple. This is
# to keep the classes simple and prevent any confusion around updating data
@ -71,15 +92,24 @@ GROUP_FIELDS = [ 'description',
Data = namedtuple('Data', DATA_FIELDS)
Feed = namedtuple('Feed', FEED_FIELDS)
Group = namedtuple('Group', GROUP_FIELDS)
Dashboard = namedtuple('Dashboard', DASHBOARD_FIELDS)
Block = namedtuple('Block', BLOCK_FIELDS)
Layout = namedtuple('Layout', LAYOUT_FIELDS)
# Magic incantation to make all parameters to the initializers optional with a
# default value of None.
Group.__new__.__defaults__ = tuple(None for x in GROUP_FIELDS)
Data.__new__.__defaults__ = tuple(None for x in DATA_FIELDS)
Layout.__new__.__defaults__ = tuple(None for x in LAYOUT_FIELDS)
# explicitly set dashboard values so that 'color_mode' is 'dark'
Dashboard.__new__.__defaults__ = (None, None, None, False, "dark", True, None, None)
# explicitly set block values so 'properties' is a dictionary
Block.__new__.__defaults__ = (None, None, None, {}, None)
# explicitly set feed values
Feed.__new__.__defaults__ = (None, None, None, None, None, 'ON', 'Private', None, None, None)
Feed.__new__.__defaults__ = (None, None, None, None, None, None, 'ON', 'Private', None, None, None)
# Define methods to convert from dicts to the data types.
def _from_dict(cls, data):
@ -103,7 +133,17 @@ def _group_from_dict(cls, data):
return cls(**params)
def _dashboard_from_dict(cls, data):
params = {x: data.get(x, None) for x in cls._fields}
# Parse the blocks if they're provided and generate block instances.
params['blocks'] = tuple(map(Block.from_dict, data.get('blocks', [])))
return cls(**params)
# Now add the from_dict class methods defined above to the data types.
Data.from_dict = classmethod(_from_dict)
Feed.from_dict = classmethod(_feed_from_dict)
Group.from_dict = classmethod(_group_from_dict)
Dashboard.from_dict = classmethod(_dashboard_from_dict)
Block.from_dict = classmethod(_from_dict)
Layout.from_dict = classmethod(_from_dict)

View file

@ -59,8 +59,8 @@ class MQTTClient(object):
self.on_disconnect = None
self.on_message = None
self.on_subscribe = None
# Initialize MQTT client.
self._client = mqtt.Client()
# Initialize v1 MQTT client.
self._client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
if secure:
self._client.tls_set_context()
self._secure = True

View file

@ -9,7 +9,7 @@ Adafruit IO Python
:target: https://adafru.it/discord
:alt: Chat
.. image:: https://github.com/adafruit/Adafruit_IO_Python/workflows/Build%20CI/badge.svg
.. image:: https://github.com/adafruit/Adafruit_IO_Python/workflows/Build-CI/badge.svg
:target: https://github.com/adafruit/Adafruit_IO_Python/actions
:alt: Build Status
@ -20,7 +20,7 @@ Adafruit IO Python
A Python library and examples for use with `io.adafruit.com <https://io.adafruit.com>`_.
Compatible with Python Versions 3.4+
Compatible with Python Versions 3.6+
Installation
================

View file

@ -29,7 +29,7 @@ master_doc = 'index'
# General information about the project.
project = u'adafruit-io-python'
copyright = u'2019 Adafruit Industries'
copyright = u'2023 Adafruit Industries'
author = u'Adafruit Industries'
# The version info for the project you're documenting, acts as replacement for
@ -46,7 +46,7 @@ release = u'2.1.0'
#
# This is also used if you do content translation via gettext catalogs.
# Usually you set "language" from the command line for these cases.
language = None
language = "en"
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.

View file

@ -33,6 +33,19 @@ You can get all of the data for a feed by using the ``data(feed)`` method. The r
for d in data:
print('Data value: {0}'.format(d.value))
By default, the maximum number of data points returned is 1000. This limit can be changed by using the max_results parameter.
.. code-block:: python
# Get less than the default number of data points
data = aio.data('Test', max_results=100)
# Get more than the default number of data points
data = aio.data('Test', max_results=2000)
# Get all of the points
data = aio.data('Test', max_results=None)
You can also get a specific value by ID by using the ``feeds(feed, data_id)`` method. This will return a single piece of feed data with the provided data ID if it exists in the feed. The returned object will be an instance of the Data class.
@ -79,7 +92,9 @@ Data can be created after you create a feed, by using the ``send_batch_data(feed
# Create a data items in the 'Test' feed.
data_list = [Data(value=10), Data(value=11)]
aio.create_data('Test', data)
# send batch data
aio.send_batch_data(temperature.key, data_list)
Receive Data

View file

@ -11,7 +11,7 @@ Create a feed by constructing a Feed instance with at least a name specified, an
# Import library and create instance of REST client.
from Adafruit_IO import Client, Feed
aio = Client('YOUR ADAFRUIT IO KEY')
aio = Client('YOUR ADAFRUIT IO USERNAME', 'YOUR ADAFRUIT IO KEY')
# Create Feed object with name 'Foo'.
feed = Feed(name='Foo')
@ -30,7 +30,7 @@ You can get a list of your feeds by using the ``feeds()`` method which will retu
# Import library and create instance of REST client.
from Adafruit_IO import Client
aio = Client('YOUR ADAFRUIT IO KEY')
aio = Client('YOUR ADAFRUIT IO USERNAME', 'YOUR ADAFRUIT IO KEY')
# Get list of feeds.
feeds = aio.feeds()
@ -45,7 +45,7 @@ Alternatively you can retrieve the metadata for a single feed by calling ``feeds
# Import library and create instance of REST client.
from Adafruit_IO import Client
aio = Client('YOUR ADAFRUIT IO KEY')
aio = Client('YOUR ADAFRUIT IO USERNAME', 'YOUR ADAFRUIT IO KEY')
# Get feed 'Foo'
feed = aio.feeds('Foo')

View file

@ -12,8 +12,8 @@ import json
from Adafruit_IO import Client, Feed, RequestError
# Set to your Adafruit IO key.
ADAFRUIT_IO_USERNAME = 'brubell'
ADAFRUIT_IO_KEY = '6ec4b31bd2c54a09be911e0c1909b7ab'
ADAFRUIT_IO_USERNAME = 'YOUR_IO_USERNAME'
ADAFRUIT_IO_KEY = 'YOUR_IO_KEY'
# Create an instance of the REST client.
aio = Client(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
@ -25,4 +25,4 @@ random_data = aio.receive_random(generator_id)
# Parse the API response
data = json.dumps(random_data)
data = json.loads(data)
print('Random Data: {0}'.format(data['value']))
print('Random Data: {0}'.format(data['value']))

View file

@ -0,0 +1,88 @@
"""
'dashboard.py'
=========================================
Creates a dashboard with 3 blocks and feed it data
Author(s): Doug Zobel
"""
from time import sleep
from random import randrange
from Adafruit_IO import Client, Feed, Block, Dashboard, Layout
# Set to your Adafruit IO key.
# Remember, your key is a secret,
# so make sure not to publish it when you publish this code!
ADAFRUIT_IO_USERNAME = ''
# Set to your Adafruit IO username.
# (go to https://accounts.adafruit.com to find your username)
ADAFRUIT_IO_KEY = ''
# Create an instance of the REST client.
aio = Client(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
# Create a new feed named 'Dashboard Data' under the default group
feed = aio.create_feed(Feed(name="Dashboard Data"), "default")
# Fetch group info (group.id needed when adding feeds to blocks)
group = aio.groups("default")
# Create a new dasbhoard named 'Example Dashboard'
dashboard = aio.create_dashboard(Dashboard(name="Example Dashboard"))
# Create a line_chart
linechart = Block(name="Linechart Data",
visual_type = 'line_chart',
properties = {
"gridLines": True,
"historyHours": "2"},
block_feeds = [{
"group_id": group.id,
"feed_id": feed.id
}])
linechart = aio.create_block(dashboard.key, linechart)
# Create a gauge
gauge = Block(name="Gauge Data",
visual_type = 'gauge',
block_feeds = [{
"group_id": group.id,
"feed_id": feed.id
}])
gauge = aio.create_block(dashboard.key, gauge)
# Create a text stream
stream = Block(name="Stream Data",
visual_type = 'stream',
properties = {
"fontSize": "12",
"fontColor": "#63de00",
"showGroupName": "no"},
block_feeds = [{
"group_id": group.id,
"feed_id": feed.id
}])
stream = aio.create_block(dashboard.key, stream)
# Update the large layout to:
# |----------------|
# | Line Chart |
# |----------------|
# | Gauge | Stream |
# |----------------|
layout = Layout(lg = [
{'x': 0, 'y': 0, 'w': 16, 'h': 4, 'i': str(linechart.id)},
{'x': 0, 'y': 4, 'w': 8, 'h': 4, 'i': str(gauge.id)},
{'x': 8, 'y': 4, 'w': 8, 'h': 4, 'i': str(stream.id)}])
aio.update_layout(dashboard.key, layout)
print("Dashboard created at: " +
"https://io.adafruit.com/{0}/dashboards/{1}".format(ADAFRUIT_IO_USERNAME,
dashboard.key))
# Now send some data
value = 0
while True:
value = (value + randrange(0, 10)) % 100
print('sending data: ', value)
aio.send_data(feed.key, value)
sleep(3)

View file

@ -40,7 +40,10 @@ led.direction = digitalio.Direction.OUTPUT
while True:
data = aio.receive(digital.key)
try:
data = aio.receive(digital.key)
except RequestError as re:
pass # feed with no data will return 404
if int(data.value) == 1:
print('received <- ON\n')
elif int(data.value) == 0:

View file

@ -1,133 +0,0 @@
"""
'environmental_monitor.py'
===============================================================================
Example of sending I2C sensor data
from multiple sensors to Adafruit IO.
Tutorial Link: https://learn.adafruit.com/adafruit-io-air-quality-monitor
Adafruit invests time and resources providing this open source code.
Please support Adafruit and open source hardware by purchasing
products from Adafruit!
Author(s): Brent Rubell for Adafruit Industries
Copyright (c) 2018 Adafruit Industries
Licensed under the MIT license.
All text above must be included in any redistribution.
Dependencies:
- Adafruit_Blinka (CircuitPython, on Pi.)
(https://github.com/adafruit/Adafruit_Blinka)
- Adafruit_CircuitPython_SGP30.
(https://github.com/adafruit/Adafruit_CircuitPython_SGP30)
- Adafruit_CircuitPython_VEML6070.
(https://github.com/adafruit/Adafruit_CircuitPython_VEML6070)
- Adafruit_CircuitPython_BME280.
(https://github.com/adafruit/Adafruit_CircuitPython_BME280)
"""
# Import standard python modules
import time
# import Adafruit Blinka
import board
import busio
# import CircuitPython sensor libraries
import adafruit_sgp30
import adafruit_veml6070
import adafruit_bme280
# import Adafruit IO REST client
from Adafruit_IO import Client, Feed, RequestError
# loop timeout, in seconds.
LOOP_DELAY = 10
# Set to your Adafruit IO key.
# Remember, your key is a secret,
# so make sure not to publish it when you publish this code!
ADAFRUIT_IO_KEY = 'YOUR_AIO_KEY'
# Set to your Adafruit IO username.
# (go to https://accounts.adafruit.com to find your username)
ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME'
# Create an instance of the REST client
aio = Client(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
try: # if we already have the feeds, assign them.
tvoc_feed = aio.feeds('tvoc')
eCO2_feed = aio.feeds('eco2')
uv_feed = aio.feeds('uv')
temperature_feed = aio.feeds('temperature')
humidity_feed = aio.feeds('humidity')
pressure_feed = aio.feeds('pressure')
altitude_feed = aio.feeds('altitude')
except RequestError: # if we don't, create and assign them.
tvoc_feed = aio.create_feed(Feed(name='tvoc'))
eCO2_feed = aio.create_feed(Feed(name='eco2'))
uv_feed = aio.create_feed(Feed(name='uv'))
temperature_feed = aio.create_feed(Feed(name='temperature'))
humidity_feed = aio.create_feed(Feed(name='humidity'))
pressure_feed = aio.create_feed(Feed(name='pressure'))
altitude_feed = aio.create_feed(Feed(name='altitude'))
# Create busio I2C
i2c = busio.I2C(board.SCL, board.SDA, frequency=100000)
# Create VEML6070 object.
uv = adafruit_veml6070.VEML6070(i2c)
# Create BME280 object.
bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)
bme280.sea_level_pressure = 1013.25
# Create SGP30 object using I2C.
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c)
sgp30.iaq_init()
sgp30.set_iaq_baseline(0x8973, 0x8aae)
# Sample VEML6070
def sample_VEML():
for _ in range(10):
uv_raw = uv.uv_raw
return uv_raw
while True:
print('Reading sensors...')
# Read SGP30.
eCO2_data = sgp30.eCO2
tvoc_data = sgp30.TVOC
# Read VEML6070.
uv_data = sample_VEML()
# Read BME280.
temp_data = bme280.temperature
# convert temperature (C->F)
temp_data = int(temp_data) * 1.8 + 32
humid_data = bme280.humidity
pressure_data = bme280.pressure
alt_data = bme280.altitude
print('sending data to adafruit io...')
# Send SGP30 Data to Adafruit IO.
print('eCO2:', eCO2_data)
aio.send(eCO2_feed.key, eCO2_data)
print('tvoc:', tvoc_data)
aio.send(tvoc_feed.key, tvoc_data)
time.sleep(2)
# Send VEML6070 Data to Adafruit IO.
print('UV Level: ', uv_data)
aio.send(uv_feed.key, uv_data)
time.sleep(2)
# Send BME280 Data to Adafruit IO.
print('Temperature: %0.1f C' % temp_data)
aio.send(temperature_feed.key, temp_data)
print("Humidity: %0.1f %%" % humid_data)
aio.send(humidity_feed.key, int(humid_data))
time.sleep(2)
print("Pressure: %0.1f hPa" % pressure_data)
aio.send(pressure_feed.key, int(pressure_data))
print("Altitude = %0.2f meters" % alt_data)
aio.send(altitude_feed.key, int(alt_data))
# avoid timeout from adafruit io
time.sleep(LOOP_DELAY * 60)

View file

@ -0,0 +1,69 @@
"""
`rgb_led.py`
=======================================================================
Control a NeoPixel RGB LED using Adafruit IO and Python
Tutorial Link: https://learn.adafruit.com/adafruit-io-basics-color
Adafruit invests time and resources providing this open source code.
Please support Adafruit and open source hardware by purchasing
products from Adafruit!
Author(s): Brent Rubell for Adafruit Industries
Copyright (c) 2023 Adafruit Industries
Licensed under the MIT license.
All text above must be included in any redistribution.
Dependencies:
- Adafruit_Blinka
(https://github.com/adafruit/Adafruit_Blinka)
- Adafruit_CircuitPython_NeoPixel
(https://github.com/adafruit/Adafruit_CircuitPython_NeoPixel)
"""
import time
import board
import neopixel
from Adafruit_IO import Client, Feed, RequestError
# Choose an open pin connected to the Data In of the NeoPixel strip, i.e. board.D18
# NeoPixels must be connected to D10, D12, D18 or D21 to work.
pixel_pin = board.D18
# The number of NeoPixels
num_pixels = 1
# The order of the pixel colors - RGB or GRB. Some NeoPixels have red and green reversed!
ORDER = neopixel.GRB
pixels = neopixel.NeoPixel(
pixel_pin, num_pixels, brightness=0.2, auto_write=False, pixel_order=ORDER
)
# Set to your Adafruit IO key.
# Remember, your key is a secret,
# so make sure not to publish it when you publish this code!
ADAFRUIT_IO_KEY = 'YOUR_AIO_KEY'
# Set to your Adafruit IO username.
# (go to https://accounts.adafruit.com to find your username)
ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME'
# Create an instance of the REST client.
aio = Client(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
try: # if we have a 'color' feed
color = aio.feeds('color')
except RequestError: # create an `color` feed
feed = Feed(name='color')
color = aio.create_feed(feed)
while True:
# get the value of the Adafruit IO `color` feed
color_val = aio.receive(color.key)
# Print hex value
print('Received Color HEX: ', color_val)
pixels.fill(color_val.value)
pixels.show()
# let's sleep/wait so we don't flood adafruit io's servers with requests
time.sleep(3)

View file

@ -1,8 +1,7 @@
"""
'temp_humidity.py'
==================================
Example of sending analog sensor
values to an Adafruit IO feed.
Example of sending temperature and humidity data to Adafruit IO
Author(s): Brent Rubell
@ -11,24 +10,27 @@ Tutorial Link: Tutorial Link: https://learn.adafruit.com/adafruit-io-basics-temp
Dependencies:
- Adafruit IO Python Client
(https://github.com/adafruit/io-client-python)
- Adafruit_Python_DHT
(https://github.com/adafruit/Adafruit_Python_DHT)
- Adafruit_CircuitPython_AHTx0
(https://github.com/adafruit/Adafruit_CircuitPython_AHTx0)
"""
# import standard python modules.
import time
# import adafruit dht library.
import Adafruit_DHT
# import adafruit-blinka modules
import board
# import Adafruit IO REST client.
from Adafruit_IO import Client, Feed
from Adafruit_IO import Client, Feed, RequestError
# Delay in-between sensor readings, in seconds.
DHT_READ_TIMEOUT = 5
# Import AHTx0 library
import adafruit_ahtx0
# Pin connected to DHT22 data pin
DHT_DATA_PIN = 26
# Set true to send tempertaure data in degrees fahrenheit ('f')?
USE_DEGREES_F = False
# Time between sensor reads, in seconds
READ_TIMEOUT = 60
# Set to your Adafruit IO key.
# Remember, your key is a secret,
@ -42,23 +44,40 @@ ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME'
# Create an instance of the REST client.
aio = Client(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)
# Set up Adafruit IO Feeds.
temperature_feed = aio.feeds('temperature')
humidity_feed = aio.feeds('humidity')
# Assign a temperature feed, if one exists already
try:
temperature_feed = aio.feeds('temperature')
except RequestError: # Doesn't exist, create a new feed
feed_temp = Feed(name="temperature")
temperature_feed = aio.create_feed(feed_temp)
# Set up DHT22 Sensor.
dht22_sensor = Adafruit_DHT.DHT22
# Assign a humidity feed, if one exists already
try:
humidity_feed = aio.feeds('humidity')
except RequestError: # Doesn't exist, create a new feed
feed_humid = Feed(name="humidity")
humidity_feed = aio.create_feed(feed_humid)
# Initialize the board's default I2C bus
i2c = board.I2C() # uses board.SCL and board.SDA
# Initialize AHT20 using the default address (0x38) and the board's default i2c bus
sensor = adafruit_ahtx0.AHTx0(i2c)
while True:
humidity, temperature = Adafruit_DHT.read_retry(dht22_sensor, DHT_DATA_PIN)
if humidity is not None and temperature is not None:
print('Temp={0:0.1f}*C Humidity={1:0.1f}%'.format(temperature, humidity))
# Send humidity and temperature feeds to Adafruit IO
temperature = '%.2f'%(temperature)
humidity = '%.2f'%(humidity)
aio.send(temperature_feed.key, str(temperature))
aio.send(humidity_feed.key, str(humidity))
temperature = sensor.temperature
humidity = sensor.relative_humidity
if USE_DEGREES_F:
temperature = temperature * 9.0 / 5.0 + 32.0
print('Temp={0:0.1f}*F'.format(temperature))
else:
print('Failed to get DHT22 Reading, trying again in ', DHT_READ_TIMEOUT, 'seconds')
print('Temp={0:0.1f}*C'.format(temperature))
print('Humidity={1:0.1f}%'.format(humidity))
# Format sensor data as string for sending to Adafruit IO
temperature = '%.2f'%(temperature)
humidity = '%.2f'%(humidity)
# Send humidity and temperature data to Adafruit IO
aio.send(temperature_feed.key, str(temperature))
aio.send(humidity_feed.key, str(humidity))
# Timeout to avoid flooding Adafruit IO
time.sleep(DHT_READ_TIMEOUT)
time.sleep(READ_TIMEOUT)

View file

@ -70,5 +70,5 @@ print('Publishing a new message every 10 seconds (press Ctrl-C to quit)...')
while True:
value = random.randint(0, 100)
print('Publishing {0} to {1}.'.format(value, IO_FEED))
client.publish(IO_FEED, value, IO_FEED_USERNAME)
client.publish(IO_FEED, value, feed_user=IO_FEED_USERNAME)
time.sleep(10)

View file

@ -50,12 +50,12 @@ def on_connect(client, userdata, flags, rc):
def on_disconnect(client, userdata, rc):
print('Disconnected!')
def on_message(client, userdata, msg, retain):
def on_message(client, userdata, msg):
print('Received on {0}: {1}'.format(msg.topic, msg.payload.decode('utf-8')))
# Create MQTT client and connect to Adafruit IO.
client = mqtt.Client()
# Create Paho v1 MQTT client and connect to Adafruit IO.
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
client.username_pw_set(USERNAME, KEY)
client.on_connect = on_connect
client.on_disconnect = on_disconnect

View file

@ -3,7 +3,7 @@
import time
import unittest
from Adafruit_IO import Client, Data, Feed, Group, RequestError
from Adafruit_IO import Client, Data, Feed, Group, Dashboard, Block, Layout, RequestError
import base
@ -23,7 +23,7 @@ class TestClient(base.IOTestCase):
# If your IP isn't put on the list of non-throttled IPs, uncomment the
# function below to waste time between tests to prevent throttling.
#def tearDown(self):
time.sleep(30.0)
# time.sleep(30.0)
# Helper Methods
def get_client(self):
@ -46,9 +46,25 @@ class TestClient(base.IOTestCase):
# Swallow the error if the group doesn't exist.
pass
def ensure_dashboard_deleted(self, client, dashboard):
# Delete the specified dashboard if it exists.
try:
client.delete_dashboard(dashboard)
except RequestError:
# Swallow the error if the dashboard doesn't exist.
pass
def ensure_block_deleted(self, client, dashboard, block):
# Delete the specified block if it exists.
try:
client.delete_block(dashboard, block)
except RequestError:
# Swallow the error if the block doesn't exist.
pass
def empty_feed(self, client, feed):
# Remove all the data from a specified feed (but don't delete the feed).
data = client.data(feed)
data = client.data(feed, max_results=None)
for d in data:
client.delete(feed, d.id)
@ -138,7 +154,7 @@ class TestClient(base.IOTestCase):
data = Data(value=42)
result = aio.create_data('testfeed', data)
self.assertEqual(int(result.value), 42)
def test_location_data(self):
"""receive_location
"""
@ -160,11 +176,41 @@ class TestClient(base.IOTestCase):
"""receive_time
"""
aio = self.get_client()
time = aio.receive_time()
server_time = aio.receive_time(timezone='UTC')
# Check that each value is rx'd properly
# (should never be None type)
for time_data in time:
for time_data in server_time:
self.assertIsNotNone(time_data)
# Check that the week day was interpreted properly
adjusted_time = time.localtime(time.mktime(server_time))
self.assertEqual(server_time.tm_wday, adjusted_time.tm_wday)
def test_parse_time_struct(self):
"""Ensure the _parse_time_struct method properly handles all 7
week days. Particularly important to make sure Sunday is 6,
not -1"""
# Zero time is a dictionary as would be provided by server
# (wday is one higher than it should be)
zero_time = {'year': 1970,
'mon': 1,
'mday': 1,
'hour': 0,
'min': 0,
'sec': 0,
'wday': 4,
'yday': 1,
'isdst': 0}
# Create a good struct for each day of the week and make sure
# the server-style dictionary is parsed correctly
for k in range(7):
real_struct = time.gmtime(k * 86400)
d = zero_time.copy()
d['mday'] += k
d['wday'] += k
d['yday'] += k
newd = Client._parse_time_struct(d)
self.assertEqual(newd.tm_wday, real_struct.tm_wday)
# Test Feed Functionality
def test_append_by_feed_name(self):
@ -269,3 +315,95 @@ class TestClient(base.IOTestCase):
group = io.create_group(Group(name='grouprx'))
response = io.groups(group.key)
self.assertEqual(response.key, 'grouprx')
# Test Dashboard Functionality
def test_dashboard_create_dashboard(self):
io = self.get_client()
self.ensure_dashboard_deleted(io, 'dashtest')
response = io.create_dashboard(Dashboard(name='dashtest'))
self.assertEqual(response.name, 'dashtest')
def test_dashboard_returns_all_dashboards(self):
io = self.get_client()
self.ensure_dashboard_deleted(io, 'dashtest')
dashboard = io.create_dashboard(Dashboard(name='dashtest'))
response = io.dashboards()
self.assertGreaterEqual(len(response), 1)
def test_dashboard_returns_requested_feed(self):
io = self.get_client()
self.ensure_dashboard_deleted(io, 'dashtest')
dashboard = io.create_dashboard(Dashboard(name='dashtest'))
response = io.dashboards('dashtest')
self.assertEqual(response.name, 'dashtest')
# Test Block Functionality
def test_block_create_block(self):
io = self.get_client()
self.ensure_block_deleted(io, 'dashtest', 'blocktest')
self.ensure_dashboard_deleted(io, 'dashtest')
dash = io.create_dashboard(Dashboard(name='dashtest'))
block = io.create_block(dash.key, Block(name='blocktest',
visual_type = 'line_chart'))
self.assertEqual(block.name, 'blocktest')
io.delete_block(dash.key, block.id)
io.delete_dashboard(dash.key)
def test_dashboard_returns_all_blocks(self):
io = self.get_client()
self.ensure_block_deleted(io, 'dashtest', 'blocktest')
self.ensure_dashboard_deleted(io, 'dashtest')
dash = io.create_dashboard(Dashboard(name='dashtest'))
block = io.create_block(dash.key, Block(name='blocktest',
visual_type = 'line_chart'))
response = io.blocks(dash.key)
self.assertEqual(len(response), 1)
io.delete_block(dash.key, block.id)
io.delete_dashboard(dash.key)
def test_dashboard_returns_requested_block(self):
io = self.get_client()
self.ensure_block_deleted(io, 'dashtest', 'blocktest')
self.ensure_dashboard_deleted(io, 'dashtest')
dash = io.create_dashboard(Dashboard(name='dashtest'))
block = io.create_block(dash.key, Block(name='blocktest',
visual_type = 'line_chart'))
response = io.blocks(dash.key, block.id)
self.assertEqual(response.name, 'blocktest')
io.delete_block(dash.key, block.id)
io.delete_dashboard(dash.key)
# Test Layout Functionality
def test_layout_returns_all_layouts(self):
io = self.get_client()
self.ensure_block_deleted(io, 'dashtest', 'blocktest')
self.ensure_dashboard_deleted(io, 'dashtest')
dash = io.create_dashboard(Dashboard(name='dashtest'))
block = io.create_block(dash.key, Block(name='blocktest',
visual_type = 'line_chart'))
response = io.layouts(dash.key)
self.assertEqual(len(response), 5) # 5 layouts: xs, sm, md, lg, xl
self.assertEqual(len(response.lg), 1)
io.delete_block(dash.key, block.id)
io.delete_dashboard(dash.key)
def test_layout_update_layout(self):
io = self.get_client()
self.ensure_block_deleted(io, 'dashtest', 'blocktest')
self.ensure_dashboard_deleted(io, 'dashtest')
dash = io.create_dashboard(Dashboard(name='dashtest'))
block = io.create_block(dash.key, Block(name='blocktest',
visual_type = 'line_chart'))
layout = Layout(lg = [
{'x': 0, 'y': 0, 'w': 16, 'h': 4, 'i': str(block.id)}])
io.update_layout(dash.key, layout)
response = io.layouts(dash.key)
self.assertEqual(len(response.lg), 1)
self.assertEqual(response.lg[0]['w'], 16)
io.delete_block(dash.key, block.id)
io.delete_dashboard(dash.key)
if __name__ == "__main__":
unittest.main()

View file

@ -18,7 +18,7 @@
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from Adafruit_IO import Data, Feed, Group
from Adafruit_IO import Data, Feed, Group, Dashboard, Block, Layout
import base
@ -45,11 +45,12 @@ class TestData(base.IOTestCase):
def test_feeds_have_explicitly_set_values(self):
""" Let's make sure feeds are explicitly set from within the model:
Feed.__new__.__defaults__ = (None, None, None, None, None, 'ON', 'Private', None, None, None)
Feed.__new__.__defaults__ = (None, None, None, None, None, None, 'ON', 'Private', None, None, None)
"""
feed = Feed(name='foo')
self.assertEqual(feed.name, 'foo')
self.assertIsNone(feed.key)
self.assertIsNone(feed.id)
self.assertIsNone(feed.description)
self.assertIsNone(feed.unit_type)
self.assertIsNone(feed.unit_symbol)
@ -69,6 +70,40 @@ class TestData(base.IOTestCase):
self.assertIsNone(group.feeds)
self.assertIsNone(group.properties)
""" Let's make sure feeds are explicitly set from within the model:
Dashboard.__new__.__defaults__ = (None, None, None, False, "dark", True, None, None)
"""
def test_dashboard_have_explicitly_set_values(self):
dashboard = Dashboard(name="foo")
self.assertEqual(dashboard.name, 'foo')
self.assertIsNone(dashboard.key)
self.assertIsNone(dashboard.description)
self.assertFalse(dashboard.show_header)
self.assertEqual(dashboard.color_mode, 'dark')
self.assertTrue(dashboard.block_borders)
self.assertIsNone(dashboard.header_image_url)
self.assertIsNone(dashboard.blocks)
""" Let's make sure feeds are explicitly set from within the model:
Block.__new__.__defaults__ = (None, None, None {}, None)
"""
def test_block_have_explicitly_set_values(self):
block = Block(name="foo")
self.assertEqual(block.name, 'foo')
self.assertIsNone(block.id)
self.assertIsNone(block.visual_type)
self.assertEqual(type(block.properties), dict)
self.assertEqual(len(block.properties), 0)
self.assertIsNone(block.block_feeds)
def test_layout_properties_are_optional(self):
layout = Layout()
self.assertIsNone(layout.xl)
self.assertIsNone(layout.lg)
self.assertIsNone(layout.md)
self.assertIsNone(layout.sm)
self.assertIsNone(layout.xs)
def test_from_dict_ignores_unknown_items(self):
data = Data.from_dict({'value': 'foo', 'feed_id': 10, 'unknown_param': 42})