Adafruit_CircuitPython_ESP_.../adafruit_espatcontrol/adafruit_espatcontrol.py
2020-03-15 17:22:41 -04:00

643 lines
24 KiB
Python

# The MIT License (MIT)
#
# Copyright (c) 2018 ladyada for Adafruit Industries
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""
`adafruit_espatcontrol.adafruit_espatcontrol`
====================================================
Use the ESP AT command sent to communicate with the Interwebs.
Its slow, but works to get data into CircuitPython
Command set:
https://www.espressif.com/sites/default/files/documentation/4a-esp8266_at_instruction_set_en.pdf
Examples:
https://www.espressif.com/sites/default/files/documentation/4b-esp8266_at_command_examples_en.pdf
* Author(s): ladyada
Implementation Notes
--------------------
**Hardware:**
* Adafruit `ESP8266 Huzzah Breakout
<https://www.adafruit.com/product/2471>`_ (Product ID: 2471)
**Software and Dependencies:**
* Adafruit CircuitPython firmware for the supported boards:
https://github.com/adafruit/circuitpython/releases
"""
import gc
import time
from digitalio import Direction
__version__ = "0.0.0-auto.0"
__repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git"
class OKError(Exception):
"""The exception thrown when we didn't get acknowledgement to an AT command"""
class ESP_ATcontrol:
"""A wrapper for AT commands to a connected ESP8266 or ESP32 module to do
some very basic internetting. The ESP module must be pre-programmed with
AT command firmware, you can use esptool or our CircuitPython miniesptool
to upload firmware"""
# pylint: disable=too-many-public-methods, too-many-instance-attributes
MODE_STATION = 1
MODE_SOFTAP = 2
MODE_SOFTAPSTATION = 3
TYPE_TCP = "TCP"
TCP_MODE = "TCP"
TYPE_UDP = "UDP"
TYPE_SSL = "SSL"
TLS_MODE = "SSL"
STATUS_APCONNECTED = 2
STATUS_SOCKETOPEN = 3
STATUS_SOCKETCLOSED = 4
STATUS_NOTCONNECTED = 5
USER_AGENT = "esp-idf/1.0 esp32"
def __init__(
self,
uart,
default_baudrate,
*,
run_baudrate=None,
rts_pin=None,
reset_pin=None,
debug=False
):
"""This function doesn't try to do any sync'ing, just sets up
# the hardware, that way nothing can unexpectedly fail!"""
self._uart = uart
if not run_baudrate:
run_baudrate = default_baudrate
self._default_baudrate = default_baudrate
self._run_baudrate = run_baudrate
self._uart.baudrate = default_baudrate
self._reset_pin = reset_pin
self._rts_pin = rts_pin
if self._reset_pin:
self._reset_pin.direction = Direction.OUTPUT
self._reset_pin.value = True
if self._rts_pin:
self._rts_pin.direction = Direction.OUTPUT
self.hw_flow(True)
self._debug = debug
self._versionstrings = []
self._version = None
self._ipdpacket = bytearray(1500)
self._ifconfig = []
self._initialized = False
def begin(self):
"""Initialize the module by syncing, resetting if necessary, setting up
the desired baudrate, turning on single-socket mode, and configuring
SSL support. Required before using the module but we dont do in __init__
because this can throw an exception."""
# Connect and sync
for _ in range(3):
try:
if not self.sync() and not self.soft_reset():
self.hard_reset()
self.soft_reset()
self.echo(False)
# set flow control if required
self.baudrate = self._run_baudrate
# get and cache versionstring
self.get_version()
if self.cipmux != 0:
self.cipmux = 0
try:
self.at_response("AT+CIPSSLSIZE=4096", retries=1, timeout=3)
except OKError:
# ESP32 doesnt use CIPSSLSIZE, its ok!
self.at_response("AT+CIPSSLCCONF?")
self._initialized = True
return
except OKError:
pass # retry
def connect(self, secrets):
"""Repeatedly try to connect to an access point with the details in
the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are
defined in the secrets dict! If 'timezone' is set, we'll also configure
SNTP"""
# Connect to WiFi if not already
retries = 3
while True:
try:
if not self._initialized or retries == 0:
self.begin()
retries = 3
AP = self.remote_AP # pylint: disable=invalid-name
print("Connected to", AP[0])
if AP[0] != secrets["ssid"]:
self.join_AP(secrets["ssid"], secrets["password"])
if "timezone" in secrets:
tzone = secrets["timezone"]
ntp = None
if "ntp_server" in secrets:
ntp = secrets["ntp_server"]
self.sntp_config(True, tzone, ntp)
print("My IP Address:", self.local_ip)
return # yay!
except (RuntimeError, OKError) as exp:
print("Failed to connect, retrying\n", exp)
retries -= 1
continue
# *************************** SOCKET SETUP ****************************
@property
def cipmux(self):
"""The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket"""
replies = self.at_response("AT+CIPMUX?", timeout=3).split(b"\r\n")
for reply in replies:
if reply.startswith(b"+CIPMUX:"):
return int(reply[8:])
raise RuntimeError("Bad response to CIPMUX?")
def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries=1):
"""Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote
can be an IP address or DNS (we'll do the lookup for you. Remote port
is integer port on other side. We can't set the local port"""
# lets just do one connection at a time for now
while True:
stat = self.status
if stat in (self.STATUS_APCONNECTED, self.STATUS_SOCKETCLOSED):
break
if stat == self.STATUS_SOCKETOPEN:
self.socket_disconnect()
else:
time.sleep(1)
if not conntype in (self.TYPE_TCP, self.TYPE_UDP, self.TYPE_SSL):
raise RuntimeError("Connection type must be TCP, UDL or SSL")
cmd = (
'AT+CIPSTART="'
+ conntype
+ '","'
+ remote
+ '",'
+ str(remote_port)
+ ","
+ str(keepalive)
)
replies = self.at_response(cmd, timeout=3, retries=retries).split(b"\r\n")
for reply in replies:
if reply == b"CONNECT" and self.status == self.STATUS_SOCKETOPEN:
return True
return False
def socket_send(self, buffer, timeout=1):
"""Send data over the already-opened socket, buffer must be bytes"""
cmd = "AT+CIPSEND=%d" % len(buffer)
self.at_response(cmd, timeout=5, retries=1)
prompt = b""
stamp = time.monotonic()
while (time.monotonic() - stamp) < timeout:
if self._uart.in_waiting:
prompt += self._uart.read(1)
self.hw_flow(False)
# print(prompt)
if prompt[-1:] == b">":
break
else:
self.hw_flow(True)
if not prompt or (prompt[-1:] != b">"):
raise RuntimeError("Didn't get data prompt for sending")
self._uart.reset_input_buffer()
self._uart.write(buffer)
stamp = time.monotonic()
response = b""
while (time.monotonic() - stamp) < timeout:
if self._uart.in_waiting:
response += self._uart.read(self._uart.in_waiting)
if response[-9:] == b"SEND OK\r\n":
break
if response[-7:] == b"ERROR\r\n":
break
if self._debug:
print("<---", response)
# Get newlines off front and back, then split into lines
return True
def socket_receive(self, timeout=5):
# pylint: disable=too-many-nested-blocks, too-many-branches
"""Check for incoming data over the open socket, returns bytes"""
incoming_bytes = None
bundle = []
toread = 0
gc.collect()
i = 0 # index into our internal packet
stamp = time.monotonic()
ipd_start = b"+IPD,"
while (time.monotonic() - stamp) < timeout:
if self._uart.in_waiting:
stamp = time.monotonic() # reset timestamp when there's data!
if not incoming_bytes:
self.hw_flow(False) # stop the flow
# read one byte at a time
self._ipdpacket[i] = self._uart.read(1)[0]
if chr(self._ipdpacket[0]) != "+":
i = 0 # keep goin' till we start with +
continue
i += 1
# look for the IPD message
if (ipd_start in self._ipdpacket) and chr(
self._ipdpacket[i - 1]
) == ":":
try:
ipd = str(self._ipdpacket[5 : i - 1], "utf-8")
incoming_bytes = int(ipd)
if self._debug:
print("Receiving:", incoming_bytes)
except ValueError:
raise RuntimeError("Parsing error during receive", ipd)
i = 0 # reset the input buffer now that we know the size
elif i > 20:
i = 0 # Hmm we somehow didnt get a proper +IPD packet? start over
else:
self.hw_flow(False) # stop the flow
# read as much as we can!
toread = min(incoming_bytes - i, self._uart.in_waiting)
# print("i ", i, "to read:", toread)
self._ipdpacket[i : i + toread] = self._uart.read(toread)
i += toread
if i == incoming_bytes:
# print(self._ipdpacket[0:i])
gc.collect()
bundle.append(self._ipdpacket[0:i])
gc.collect()
i = incoming_bytes = 0
else: # no data waiting
self.hw_flow(True) # start the floooow
totalsize = sum([len(x) for x in bundle])
ret = bytearray(totalsize)
i = 0
for x in bundle:
for char in x:
ret[i] = char
i += 1
for x in bundle:
del x
gc.collect()
return ret
def socket_disconnect(self):
"""Close any open socket, if there is one"""
try:
self.at_response("AT+CIPCLOSE", retries=1)
except OKError:
pass # this is ok, means we didn't have an open socket
# *************************** SNTP SETUP ****************************
def sntp_config(self, enable, timezone=None, server=None):
"""Configure the built in ESP SNTP client with a UTC-offset number (timezone)
and server as IP or hostname."""
cmd = "AT+CIPSNTPCFG="
if enable:
cmd += "1"
else:
cmd += "0"
if timezone is not None:
cmd += ",%d" % timezone
if server is not None:
cmd += ',"%s"' % server
self.at_response(cmd, timeout=3)
@property
def sntp_time(self):
"""Return a string with time/date information using SNTP, may return
1970 'bad data' on the first few minutes, without warning!"""
replies = self.at_response("AT+CIPSNTPTIME?", timeout=5).split(b"\r\n")
for reply in replies:
if reply.startswith(b"+CIPSNTPTIME:"):
return reply[13:]
return None
# *************************** WIFI SETUP ****************************
@property
def is_connected(self):
"""Initialize module if not done yet, and check if we're connected to
an access point, returns True or False"""
if not self._initialized:
self.begin()
try:
self.echo(False)
self.baudrate = self.baudrate
stat = self.status
if stat in (
self.STATUS_APCONNECTED,
self.STATUS_SOCKETOPEN,
self.STATUS_SOCKETCLOSED,
):
return True
except (OKError, RuntimeError):
pass
return False
@property
def status(self):
"""The IP connection status number (see AT+CIPSTATUS datasheet for meaning)"""
replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n")
for reply in replies:
if reply.startswith(b"STATUS:"):
return int(reply[7:8])
return None
@property
def mode(self):
"""What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
if not self._initialized:
self.begin()
replies = self.at_response("AT+CWMODE?", timeout=5).split(b"\r\n")
for reply in replies:
if reply.startswith(b"+CWMODE:"):
return int(reply[8:])
raise RuntimeError("Bad response to CWMODE?")
@mode.setter
def mode(self, mode):
"""Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION"""
if not self._initialized:
self.begin()
if not mode in (1, 2, 3):
raise RuntimeError("Invalid Mode")
self.at_response("AT+CWMODE=%d" % mode, timeout=3)
@property
def local_ip(self):
"""Our local IP address as a dotted-quad string"""
reply = self.at_response("AT+CIFSR").strip(b"\r\n")
for line in reply.split(b"\r\n"):
if line and line.startswith(b'+CIFSR:STAIP,"'):
return str(line[14:-1], "utf-8")
raise RuntimeError("Couldn't find IP address")
def ping(self, host):
"""Ping the IP or hostname given, returns ms time or None on failure"""
reply = self.at_response('AT+PING="%s"' % host.strip('"'), timeout=5)
for line in reply.split(b"\r\n"):
if line and line.startswith(b"+"):
try:
if line[1:5] == b"PING":
return int(line[6:])
return int(line[1:])
except ValueError:
return None
raise RuntimeError("Couldn't ping")
def nslookup(self, host):
"""Return a dotted-quad IP address strings that matches the hostname"""
reply = self.at_response('AT+CIPDOMAIN="%s"' % host.strip('"'), timeout=3)
for line in reply.split(b"\r\n"):
if line and line.startswith(b"+CIPDOMAIN:"):
return str(line[11:], "utf-8")
raise RuntimeError("Couldn't find IP address")
# *************************** AP SETUP ****************************
@property
def remote_AP(self): # pylint: disable=invalid-name
"""The name of the access point we're connected to, as a string"""
stat = self.status
if stat != self.STATUS_APCONNECTED:
return [None] * 4
replies = self.at_response("AT+CWJAP?", timeout=10).split(b"\r\n")
for reply in replies:
if not reply.startswith("+CWJAP:"):
continue
reply = reply[7:].split(b",")
for i, val in enumerate(reply):
reply[i] = str(val, "utf-8")
try:
reply[i] = int(reply[i])
except ValueError:
reply[i] = reply[i].strip('"') # its a string!
return reply
return [None] * 4
def join_AP(self, ssid, password): # pylint: disable=invalid-name
"""Try to join an access point by name and password, will return
immediately if we're already connected and won't try to reconnect"""
# First make sure we're in 'station' mode so we can connect to AP's
if self.mode != self.MODE_STATION:
self.mode = self.MODE_STATION
router = self.remote_AP
if router and router[0] == ssid:
return # we're already connected!
for _ in range(3):
reply = self.at_response(
'AT+CWJAP="' + ssid + '","' + password + '"', timeout=15, retries=3
)
if b"WIFI CONNECTED" not in reply:
print("no CONNECTED")
raise RuntimeError("Couldn't connect to WiFi")
if b"WIFI GOT IP" not in reply:
print("no IP")
raise RuntimeError("Didn't get IP address")
return
def scan_APs(self, retries=3): # pylint: disable=invalid-name
"""Ask the module to scan for access points and return a list of lists
with name, RSSI, MAC addresses, etc"""
for _ in range(retries):
try:
if self.mode != self.MODE_STATION:
self.mode = self.MODE_STATION
scan = self.at_response("AT+CWLAP", timeout=5).split(b"\r\n")
except RuntimeError:
continue
routers = []
for line in scan:
if line.startswith(b"+CWLAP:("):
router = line[8:-1].split(b",")
for i, val in enumerate(router):
router[i] = str(val, "utf-8")
try:
router[i] = int(router[i])
except ValueError:
router[i] = router[i].strip('"') # its a string!
routers.append(router)
return routers
# ************************** AT LOW LEVEL ****************************
@property
def version(self):
"""The cached version string retrieved via the AT+GMR command"""
return self._version
def get_version(self):
"""Request the AT firmware version string and parse out the
version number"""
reply = self.at_response("AT+GMR", timeout=3).strip(b"\r\n")
self._version = None
for line in reply.split(b"\r\n"):
if line:
self._versionstrings.append(str(line, "utf-8"))
# get the actual version out
if b"AT version:" in line:
self._version = str(line, "utf-8")
return self._version
def hw_flow(self, flag):
"""Turn on HW flow control (if available) on to allow data, or off to stop"""
if self._rts_pin:
self._rts_pin.value = not flag
def at_response(self, at_cmd, timeout=5, retries=3):
"""Send an AT command, check that we got an OK response,
and then cut out the reply lines to return. We can set
a variable timeout (how long we'll wait for response) and
how many times to retry before giving up"""
# pylint: disable=too-many-branches
for _ in range(retries):
self.hw_flow(True) # allow any remaning data to stream in
time.sleep(0.1) # wait for uart data
self._uart.reset_input_buffer() # flush it
self.hw_flow(False) # and shut off flow control again
if self._debug:
print("--->", at_cmd)
self._uart.write(bytes(at_cmd, "utf-8"))
self._uart.write(b"\x0d\x0a")
stamp = time.monotonic()
response = b""
while (time.monotonic() - stamp) < timeout:
if self._uart.in_waiting:
response += self._uart.read(1)
self.hw_flow(False)
if response[-4:] == b"OK\r\n":
break
if response[-7:] == b"ERROR\r\n":
break
if "AT+CWJAP=" in at_cmd:
if b"WIFI GOT IP\r\n" in response:
break
else:
if b"WIFI CONNECTED\r\n" in response:
break
if b"ERR CODE:" in response:
break
else:
self.hw_flow(True)
# eat beginning \n and \r
if self._debug:
print("<---", response)
# special case, AT+CWJAP= does not return an ok :P
if "AT+CWJAP=" in at_cmd and b"WIFI GOT IP\r\n" in response:
return response
# special case, ping also does not return an OK
if "AT+PING" in at_cmd and b"ERROR\r\n" in response:
return response
if response[-4:] != b"OK\r\n":
time.sleep(1)
continue
return response[:-4]
raise OKError("No OK response to " + at_cmd)
def sync(self):
"""Check if we have AT commmand sync by sending plain ATs"""
try:
self.at_response("AT", timeout=1)
return True
except OKError:
return False
@property
def baudrate(self):
"""The baudrate of our UART connection"""
return self._uart.baudrate
@baudrate.setter
def baudrate(self, baudrate):
"""Change the modules baudrate via AT commands and then check
that we're still sync'd."""
at_cmd = "AT+UART_CUR=" + str(baudrate) + ",8,1,0,"
if self._rts_pin is not None:
at_cmd += "2"
else:
at_cmd += "0"
at_cmd += "\r\n"
if self._debug:
print("Changing baudrate to:", baudrate)
print("--->", at_cmd)
self._uart.write(bytes(at_cmd, "utf-8"))
time.sleep(0.25)
self._uart.baudrate = baudrate
time.sleep(0.25)
self._uart.reset_input_buffer()
if not self.sync():
raise RuntimeError("Failed to resync after Baudrate change")
def echo(self, echo):
"""Set AT command echo on or off"""
if echo:
self.at_response("ATE1", timeout=1)
else:
self.at_response("ATE0", timeout=1)
def soft_reset(self):
"""Perform a software reset by AT command. Returns True
if we successfully performed, false if failed to reset"""
try:
self._uart.reset_input_buffer()
reply = self.at_response("AT+RST", timeout=1)
if reply.strip(b"\r\n") == b"AT+RST":
time.sleep(2)
self._uart.reset_input_buffer()
return True
except OKError:
pass # fail, see below
return False
def factory_reset(self):
"""Perform a hard reset, then send factory restore settings request"""
self.hard_reset()
self.at_response("AT+RESTORE", timeout=1)
self._initialized = False
def hard_reset(self):
"""Perform a hardware reset by toggling the reset pin, if it was
defined in the initialization of this object"""
if self._reset_pin:
self._reset_pin.direction = Direction.OUTPUT
self._reset_pin.value = False
time.sleep(0.1)
self._reset_pin.value = True
self._uart.baudrate = self._default_baudrate
time.sleep(3) # give it a few seconds to wake up
self._uart.reset_input_buffer()
self._initialized = False