Pico W WiFi examples

Adding WiFi examples for the Pico W: basic connection test, requests, JSON feed, Twitter feed, Adafruit IO and Azure IoT Central
This commit is contained in:
BlitzCityDIY 2022-10-14 11:09:39 -04:00
parent fe196e9519
commit 4b3c0110d4
6 changed files with 346 additions and 0 deletions

View file

@ -0,0 +1,75 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
# SPDX-License-Identifier: MIT
import os
import time
import ssl
import wifi
import socketpool
import microcontroller
import board
import busio
import adafruit_requests
import adafruit_ahtx0
from adafruit_io.adafruit_io import IO_HTTP, AdafruitIO_RequestError
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
aio_username = os.getenv('aio_username')
aio_key = os.getenv('aio_key')
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
# Initialize an Adafruit IO HTTP API object
io = IO_HTTP(aio_username, aio_key, requests)
print("connected to io")
# use Pico W's GP0 for SDA and GP1 for SCL
i2c = busio.I2C(board.GP1, board.GP0)
aht20 = adafruit_ahtx0.AHTx0(i2c)
try:
# get feed
picowTemp_feed = io.get_feed("pitemp")
picowHumid_feed = io.get_feed("pihumid")
except AdafruitIO_RequestError:
# if no feed exists, create one
picowTemp_feed = io.create_new_feed("pitemp")
picowHumid_feed = io.create_new_feed("pihumid")
# pack feed names into an array for the loop
feed_names = [picowTemp_feed, picowHumid_feed]
print("feeds created")
clock = 300
while True:
try:
# when the clock runs out..
if clock > 300:
# read sensor
data = [aht20.temperature, aht20.relative_humidity]
# send sensor data to respective feeds
for z in range(2):
io.send_data(feed_names[z]["key"], data[z])
print("sent %0.1f" % data[z])
time.sleep(1)
# print sensor data to the REPL
print("\nTemperature: %0.1f C" % aht20.temperature)
print("Humidity: %0.1f %%" % aht20.relative_humidity)
print()
time.sleep(1)
# reset clock
clock = 0
else:
clock += 1
# pylint: disable=broad-except
# any errors, reset Pico W
except Exception as e:
print("Error:\n", str(e))
print("Resetting microcontroller in 10 seconds")
time.sleep(10)
microcontroller.reset()
# delay
time.sleep(1)
print(clock)

View file

@ -0,0 +1,80 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
# SPDX-License-Identifier: MIT
import time
import os
import json
import busio
import microcontroller
import board
import rtc
import socketpool
import wifi
import adafruit_ntp
import adafruit_ahtx0
from adafruit_azureiot import IoTCentralDevice
# use Pico W's GP0 for SDA and GP1 for SCL
i2c = busio.I2C(board.GP1, board.GP0)
aht20 = adafruit_ahtx0.AHTx0(i2c)
print("Connecting to WiFi...")
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
print("Connected to WiFi!")
# ntp clock
pool = socketpool.SocketPool(wifi.radio)
ntp = adafruit_ntp.NTP(pool)
rtc.RTC().datetime = ntp.datetime
if time.localtime().tm_year < 2022:
print("Setting System Time in UTC")
rtc.RTC().datetime = ntp.datetime
else:
print("Year seems good, skipping set time.")
# Create an IoT Hub device client and connect
esp = None
pool = socketpool.SocketPool(wifi.radio)
device = IoTCentralDevice(
pool, esp, os.getenv('id_scope'), os.getenv('device_id'), os.getenv('device_primary_key')
)
print("Connecting to Azure IoT Central...")
device.connect()
print("Connected to Azure IoT Central!")
# clock to count down to sending data to Azure
azure_clock = 500
while True:
try:
# when the azure clock runs out
if azure_clock > 500:
# pack message
message = {"Temperature": aht20.temperature,
"Humidity": aht20.relative_humidity}
print("sending json")
device.send_telemetry(json.dumps(message))
print("data sent")
# reset azure clock
azure_clock = 0
else:
azure_clock += 1
# ping azure
device.loop()
# if something disrupts the loop, reconnect
# pylint: disable=broad-except
# any errors, reset Pico W
except Exception as e:
print("Error:\n", str(e))
print("Resetting microcontroller in 10 seconds")
time.sleep(10)
microcontroller.reset()
# delay
time.sleep(1)
print(azure_clock)

View file

@ -0,0 +1,28 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import os
import ipaddress
import wifi
import socketpool
print()
print("Connecting to WiFi")
# connect to your SSID
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
print("Connected to WiFi")
pool = socketpool.SocketPool(wifi.radio)
# prints MAC address to REPL
print("My MAC addr:", [hex(i) for i in wifi.radio.mac_address])
# prints IP address to REPL
print("My IP address is", wifi.radio.ipv4_address)
# pings Google
ipv4 = ipaddress.ip_address("8.8.4.4")
print("Ping google.com: %f ms" % (wifi.radio.ping(ipv4)*1000))

View file

@ -0,0 +1,64 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
# Use cityname, country code where countrycode is ISO3166 format.
# E.g. "New York, US" or "London, GB"
location = "Manhattan, US"
# openweathermap URL, brings in your location & your token
url = "http://api.openweathermap.org/data/2.5/weather?q="+location
url += "&appid="+os.getenv('openweather_token')
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
while True:
try:
# pings openweather
response = requests.get(url)
# packs the response into a JSON
response_as_json = response.json()
print()
# prints the entire JSON
print(response_as_json)
# gets location name
place = response_as_json['name']
# gets weather type (clouds, sun, etc)
weather = response_as_json['weather'][0]['main']
# gets humidity %
humidity = response_as_json['main']['humidity']
# gets air pressure in hPa
pressure = response_as_json['main']['pressure']
# gets temp in kelvin
temperature = response_as_json['main']['temp']
# converts temp from kelvin to F
converted_temp = (temperature - 273.15) * 9/5 + 32
# converts temp from kelvin to C
# converted_temp = temperature - 273.15
# prints out weather data formatted nicely as pulled from JSON
print()
print("The current weather in %s is:" % place)
print(weather)
print("%s°F" % converted_temp)
print("%s%% Humidity" % humidity)
print("%s hPa" % pressure)
# delay for 5 minutes
time.sleep(300)
# pylint: disable=broad-except
except Exception as e:
print("Error:\n", str(e))
print("Resetting microcontroller in 10 seconds")
time.sleep(10)
microcontroller.reset()

View file

@ -0,0 +1,40 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import os
import time
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
# adafruit quotes URL
quotes_url = "https://www.adafruit.com/api/quotes.php"
# connect to SSID
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
while True:
try:
# pings adafruit quotes
print("Fetching text from %s" % quotes_url)
# gets the quote from adafruit quotes
response = requests.get(quotes_url)
print("-" * 40)
# prints the response to the REPL
print("Text Response: ", response.text)
print("-" * 40)
response.close()
# delays for 1 minute
time.sleep(60)
# pylint: disable=broad-except
except Exception as e:
print("Error:\n", str(e))
print("Resetting microcontroller in 10 seconds")
time.sleep(10)
microcontroller.reset()

View file

@ -0,0 +1,59 @@
# SPDX-FileCopyrightText: 2022 Liz Clark for Adafruit Industries
#
# SPDX-License-Identifier: MIT
import time
import os
import ssl
import wifi
import socketpool
import microcontroller
import adafruit_requests
# query URL for tweets. looking for hashtag partyparrot sent to a specific username
# disabling line-too-long because queries for tweet_query & TIME_URL cannot have line breaks
# pylint: disable=line-too-long
tweet_query = 'https://api.twitter.com/2/tweets/search/recent?query=NEW GUIDE from:adafruit&tweet.fields=created_at'
headers = {'Authorization': 'Bearer ' + os.getenv('bearer_token')}
wifi.radio.connect(os.getenv('WIFI_SSID'), os.getenv('WIFI_PASSWORD'))
pool = socketpool.SocketPool(wifi.radio)
requests = adafruit_requests.Session(pool, ssl.create_default_context())
last_id = 0 # checks last tweet's ID
check = 0 # time.monotonic() holder
print(time.monotonic())
while True:
try:
if (check + 30) < time.monotonic():
# updates current time
# get tweets from rpilocator containing in stock at adafruit
twitter_response = requests.get(url=tweet_query, headers=headers)
# gets data portion of json
twitter_json = twitter_response.json()
twitter_json = twitter_json['data']
# to see the entire json feed, uncomment 'print(twitter_json)'
# print(twitter_json)
# tweet ID number
tweet_id = twitter_json[0]['id']
# tweet text
tweet = twitter_json[0]['text']
# timestamp
timestamp = twitter_json[0]['created_at']
if last_id != tweet_id:
print("New Learn Guide!")
print(tweet)
print(timestamp)
last_id = tweet_id
check = time.monotonic()
# pylint: disable=broad-except
# any errors, reset Pico W
except Exception as e:
print("Error:\n", str(e))
print("Resetting microcontroller in 10 seconds")
time.sleep(10)
microcontroller.reset()