# The MIT License (MIT) # # Copyright (c) 2018 ladyada for Adafruit Industries # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ `adafruit_espatcontrol.adafruit_espatcontrol` ==================================================== Use the ESP AT command sent to communicate with the Interwebs. Its slow, but works to get data into CircuitPython Command set: https://www.espressif.com/sites/default/files/documentation/4a-esp8266_at_instruction_set_en.pdf Examples: https://www.espressif.com/sites/default/files/documentation/4b-esp8266_at_command_examples_en.pdf * Author(s): ladyada Implementation Notes -------------------- **Hardware:** * Adafruit `ESP8266 Huzzah Breakout `_ (Product ID: 2471) **Software and Dependencies:** * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases """ import gc import time from digitalio import Direction __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_espATcontrol.git" class OKError(Exception): """The exception thrown when we didn't get acknowledgement to an AT command""" class ESP_ATcontrol: """A wrapper for AT commands to a connected ESP8266 or ESP32 module to do some very basic internetting. The ESP module must be pre-programmed with AT command firmware, you can use esptool or our CircuitPython miniesptool to upload firmware""" # pylint: disable=too-many-public-methods, too-many-instance-attributes MODE_STATION = 1 MODE_SOFTAP = 2 MODE_SOFTAPSTATION = 3 TYPE_TCP = "TCP" TCP_MODE = "TCP" TYPE_UDP = "UDP" TYPE_SSL = "SSL" TLS_MODE = "SSL" STATUS_APCONNECTED = 2 STATUS_SOCKETOPEN = 3 STATUS_SOCKETCLOSED = 4 STATUS_NOTCONNECTED = 5 USER_AGENT = "esp-idf/1.0 esp32" def __init__( self, uart, default_baudrate, *, run_baudrate=None, rts_pin=None, reset_pin=None, debug=False ): """This function doesn't try to do any sync'ing, just sets up # the hardware, that way nothing can unexpectedly fail!""" self._uart = uart if not run_baudrate: run_baudrate = default_baudrate self._default_baudrate = default_baudrate self._run_baudrate = run_baudrate self._uart.baudrate = default_baudrate self._reset_pin = reset_pin self._rts_pin = rts_pin if self._reset_pin: self._reset_pin.direction = Direction.OUTPUT self._reset_pin.value = True if self._rts_pin: self._rts_pin.direction = Direction.OUTPUT self.hw_flow(True) self._debug = debug self._versionstrings = [] self._version = None self._ipdpacket = bytearray(1500) self._ifconfig = [] self._initialized = False def begin(self): """Initialize the module by syncing, resetting if necessary, setting up the desired baudrate, turning on single-socket mode, and configuring SSL support. Required before using the module but we dont do in __init__ because this can throw an exception.""" # Connect and sync for _ in range(3): try: if not self.sync() and not self.soft_reset(): self.hard_reset() self.soft_reset() self.echo(False) # set flow control if required self.baudrate = self._run_baudrate # get and cache versionstring self.get_version() if self.cipmux != 0: self.cipmux = 0 try: self.at_response("AT+CIPSSLSIZE=4096", retries=1, timeout=3) except OKError: # ESP32 doesnt use CIPSSLSIZE, its ok! self.at_response("AT+CIPSSLCCONF?") self._initialized = True return except OKError: pass # retry def connect(self, secrets): """Repeatedly try to connect to an access point with the details in the passed in 'secrets' dictionary. Be sure 'ssid' and 'password' are defined in the secrets dict! If 'timezone' is set, we'll also configure SNTP""" # Connect to WiFi if not already retries = 3 while True: try: if not self._initialized or retries == 0: self.begin() retries = 3 AP = self.remote_AP # pylint: disable=invalid-name print("Connected to", AP[0]) if AP[0] != secrets["ssid"]: self.join_AP(secrets["ssid"], secrets["password"]) if "timezone" in secrets: tzone = secrets["timezone"] ntp = None if "ntp_server" in secrets: ntp = secrets["ntp_server"] self.sntp_config(True, tzone, ntp) print("My IP Address:", self.local_ip) return # yay! except (RuntimeError, OKError) as exp: print("Failed to connect, retrying\n", exp) retries -= 1 continue # *************************** SOCKET SETUP **************************** @property def cipmux(self): """The IP socket multiplexing setting. 0 for one socket, 1 for multi-socket""" replies = self.at_response("AT+CIPMUX?", timeout=3).split(b"\r\n") for reply in replies: if reply.startswith(b"+CIPMUX:"): return int(reply[8:]) raise RuntimeError("Bad response to CIPMUX?") def socket_connect(self, conntype, remote, remote_port, *, keepalive=10, retries=1): """Open a socket. conntype can be TYPE_TCP, TYPE_UDP, or TYPE_SSL. Remote can be an IP address or DNS (we'll do the lookup for you. Remote port is integer port on other side. We can't set the local port""" # lets just do one connection at a time for now while True: stat = self.status if stat in (self.STATUS_APCONNECTED, self.STATUS_SOCKETCLOSED): break if stat == self.STATUS_SOCKETOPEN: self.socket_disconnect() else: time.sleep(1) if not conntype in (self.TYPE_TCP, self.TYPE_UDP, self.TYPE_SSL): raise RuntimeError("Connection type must be TCP, UDL or SSL") cmd = ( 'AT+CIPSTART="' + conntype + '","' + remote + '",' + str(remote_port) + "," + str(keepalive) ) replies = self.at_response(cmd, timeout=3, retries=retries).split(b"\r\n") for reply in replies: if reply == b"CONNECT" and self.status == self.STATUS_SOCKETOPEN: return True return False def socket_send(self, buffer, timeout=1): """Send data over the already-opened socket, buffer must be bytes""" cmd = "AT+CIPSEND=%d" % len(buffer) self.at_response(cmd, timeout=5, retries=1) prompt = b"" stamp = time.monotonic() while (time.monotonic() - stamp) < timeout: if self._uart.in_waiting: prompt += self._uart.read(1) self.hw_flow(False) # print(prompt) if prompt[-1:] == b">": break else: self.hw_flow(True) if not prompt or (prompt[-1:] != b">"): raise RuntimeError("Didn't get data prompt for sending") self._uart.reset_input_buffer() self._uart.write(buffer) stamp = time.monotonic() response = b"" while (time.monotonic() - stamp) < timeout: if self._uart.in_waiting: response += self._uart.read(self._uart.in_waiting) if response[-9:] == b"SEND OK\r\n": break if response[-7:] == b"ERROR\r\n": break if self._debug: print("<---", response) # Get newlines off front and back, then split into lines return True def socket_receive(self, timeout=5): # pylint: disable=too-many-nested-blocks, too-many-branches """Check for incoming data over the open socket, returns bytes""" incoming_bytes = None bundle = [] toread = 0 gc.collect() i = 0 # index into our internal packet stamp = time.monotonic() ipd_start = b"+IPD," while (time.monotonic() - stamp) < timeout: if self._uart.in_waiting: stamp = time.monotonic() # reset timestamp when there's data! if not incoming_bytes: self.hw_flow(False) # stop the flow # read one byte at a time self._ipdpacket[i] = self._uart.read(1)[0] if chr(self._ipdpacket[0]) != "+": i = 0 # keep goin' till we start with + continue i += 1 # look for the IPD message if (ipd_start in self._ipdpacket) and chr( self._ipdpacket[i - 1] ) == ":": try: ipd = str(self._ipdpacket[5 : i - 1], "utf-8") incoming_bytes = int(ipd) if self._debug: print("Receiving:", incoming_bytes) except ValueError: raise RuntimeError("Parsing error during receive", ipd) i = 0 # reset the input buffer now that we know the size elif i > 20: i = 0 # Hmm we somehow didnt get a proper +IPD packet? start over else: self.hw_flow(False) # stop the flow # read as much as we can! toread = min(incoming_bytes - i, self._uart.in_waiting) # print("i ", i, "to read:", toread) self._ipdpacket[i : i + toread] = self._uart.read(toread) i += toread if i == incoming_bytes: # print(self._ipdpacket[0:i]) gc.collect() bundle.append(self._ipdpacket[0:i]) gc.collect() i = incoming_bytes = 0 else: # no data waiting self.hw_flow(True) # start the floooow totalsize = sum([len(x) for x in bundle]) ret = bytearray(totalsize) i = 0 for x in bundle: for char in x: ret[i] = char i += 1 for x in bundle: del x gc.collect() return ret def socket_disconnect(self): """Close any open socket, if there is one""" try: self.at_response("AT+CIPCLOSE", retries=1) except OKError: pass # this is ok, means we didn't have an open socket # *************************** SNTP SETUP **************************** def sntp_config(self, enable, timezone=None, server=None): """Configure the built in ESP SNTP client with a UTC-offset number (timezone) and server as IP or hostname.""" cmd = "AT+CIPSNTPCFG=" if enable: cmd += "1" else: cmd += "0" if timezone is not None: cmd += ",%d" % timezone if server is not None: cmd += ',"%s"' % server self.at_response(cmd, timeout=3) @property def sntp_time(self): """Return a string with time/date information using SNTP, may return 1970 'bad data' on the first few minutes, without warning!""" replies = self.at_response("AT+CIPSNTPTIME?", timeout=5).split(b"\r\n") for reply in replies: if reply.startswith(b"+CIPSNTPTIME:"): return reply[13:] return None # *************************** WIFI SETUP **************************** @property def is_connected(self): """Initialize module if not done yet, and check if we're connected to an access point, returns True or False""" if not self._initialized: self.begin() try: self.echo(False) self.baudrate = self.baudrate stat = self.status if stat in ( self.STATUS_APCONNECTED, self.STATUS_SOCKETOPEN, self.STATUS_SOCKETCLOSED, ): return True except (OKError, RuntimeError): pass return False @property def status(self): """The IP connection status number (see AT+CIPSTATUS datasheet for meaning)""" replies = self.at_response("AT+CIPSTATUS", timeout=5).split(b"\r\n") for reply in replies: if reply.startswith(b"STATUS:"): return int(reply[7:8]) return None @property def mode(self): """What mode we're in, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION""" if not self._initialized: self.begin() replies = self.at_response("AT+CWMODE?", timeout=5).split(b"\r\n") for reply in replies: if reply.startswith(b"+CWMODE:"): return int(reply[8:]) raise RuntimeError("Bad response to CWMODE?") @mode.setter def mode(self, mode): """Station or AP mode selection, can be MODE_STATION, MODE_SOFTAP or MODE_SOFTAPSTATION""" if not self._initialized: self.begin() if not mode in (1, 2, 3): raise RuntimeError("Invalid Mode") self.at_response("AT+CWMODE=%d" % mode, timeout=3) @property def local_ip(self): """Our local IP address as a dotted-quad string""" reply = self.at_response("AT+CIFSR").strip(b"\r\n") for line in reply.split(b"\r\n"): if line and line.startswith(b'+CIFSR:STAIP,"'): return str(line[14:-1], "utf-8") raise RuntimeError("Couldn't find IP address") def ping(self, host): """Ping the IP or hostname given, returns ms time or None on failure""" reply = self.at_response('AT+PING="%s"' % host.strip('"'), timeout=5) for line in reply.split(b"\r\n"): if line and line.startswith(b"+"): try: if line[1:5] == b"PING": return int(line[6:]) return int(line[1:]) except ValueError: return None raise RuntimeError("Couldn't ping") def nslookup(self, host): """Return a dotted-quad IP address strings that matches the hostname""" reply = self.at_response('AT+CIPDOMAIN="%s"' % host.strip('"'), timeout=3) for line in reply.split(b"\r\n"): if line and line.startswith(b"+CIPDOMAIN:"): return str(line[11:], "utf-8") raise RuntimeError("Couldn't find IP address") # *************************** AP SETUP **************************** @property def remote_AP(self): # pylint: disable=invalid-name """The name of the access point we're connected to, as a string""" stat = self.status if stat != self.STATUS_APCONNECTED: return [None] * 4 replies = self.at_response("AT+CWJAP?", timeout=10).split(b"\r\n") for reply in replies: if not reply.startswith("+CWJAP:"): continue reply = reply[7:].split(b",") for i, val in enumerate(reply): reply[i] = str(val, "utf-8") try: reply[i] = int(reply[i]) except ValueError: reply[i] = reply[i].strip('"') # its a string! return reply return [None] * 4 def join_AP(self, ssid, password): # pylint: disable=invalid-name """Try to join an access point by name and password, will return immediately if we're already connected and won't try to reconnect""" # First make sure we're in 'station' mode so we can connect to AP's if self.mode != self.MODE_STATION: self.mode = self.MODE_STATION router = self.remote_AP if router and router[0] == ssid: return # we're already connected! for _ in range(3): reply = self.at_response( 'AT+CWJAP="' + ssid + '","' + password + '"', timeout=15, retries=3 ) if b"WIFI CONNECTED" not in reply: print("no CONNECTED") raise RuntimeError("Couldn't connect to WiFi") if b"WIFI GOT IP" not in reply: print("no IP") raise RuntimeError("Didn't get IP address") return def scan_APs(self, retries=3): # pylint: disable=invalid-name """Ask the module to scan for access points and return a list of lists with name, RSSI, MAC addresses, etc""" for _ in range(retries): try: if self.mode != self.MODE_STATION: self.mode = self.MODE_STATION scan = self.at_response("AT+CWLAP", timeout=5).split(b"\r\n") except RuntimeError: continue routers = [] for line in scan: if line.startswith(b"+CWLAP:("): router = line[8:-1].split(b",") for i, val in enumerate(router): router[i] = str(val, "utf-8") try: router[i] = int(router[i]) except ValueError: router[i] = router[i].strip('"') # its a string! routers.append(router) return routers # ************************** AT LOW LEVEL **************************** @property def version(self): """The cached version string retrieved via the AT+GMR command""" return self._version def get_version(self): """Request the AT firmware version string and parse out the version number""" reply = self.at_response("AT+GMR", timeout=3).strip(b"\r\n") self._version = None for line in reply.split(b"\r\n"): if line: self._versionstrings.append(str(line, "utf-8")) # get the actual version out if b"AT version:" in line: self._version = str(line, "utf-8") return self._version def hw_flow(self, flag): """Turn on HW flow control (if available) on to allow data, or off to stop""" if self._rts_pin: self._rts_pin.value = not flag def at_response(self, at_cmd, timeout=5, retries=3): """Send an AT command, check that we got an OK response, and then cut out the reply lines to return. We can set a variable timeout (how long we'll wait for response) and how many times to retry before giving up""" # pylint: disable=too-many-branches for _ in range(retries): self.hw_flow(True) # allow any remaning data to stream in time.sleep(0.1) # wait for uart data self._uart.reset_input_buffer() # flush it self.hw_flow(False) # and shut off flow control again if self._debug: print("--->", at_cmd) self._uart.write(bytes(at_cmd, "utf-8")) self._uart.write(b"\x0d\x0a") stamp = time.monotonic() response = b"" while (time.monotonic() - stamp) < timeout: if self._uart.in_waiting: response += self._uart.read(1) self.hw_flow(False) if response[-4:] == b"OK\r\n": break if response[-7:] == b"ERROR\r\n": break if "AT+CWJAP=" in at_cmd: if b"WIFI GOT IP\r\n" in response: break else: if b"WIFI CONNECTED\r\n" in response: break if b"ERR CODE:" in response: break else: self.hw_flow(True) # eat beginning \n and \r if self._debug: print("<---", response) # special case, AT+CWJAP= does not return an ok :P if "AT+CWJAP=" in at_cmd and b"WIFI GOT IP\r\n" in response: return response # special case, ping also does not return an OK if "AT+PING" in at_cmd and b"ERROR\r\n" in response: return response if response[-4:] != b"OK\r\n": time.sleep(1) continue return response[:-4] raise OKError("No OK response to " + at_cmd) def sync(self): """Check if we have AT commmand sync by sending plain ATs""" try: self.at_response("AT", timeout=1) return True except OKError: return False @property def baudrate(self): """The baudrate of our UART connection""" return self._uart.baudrate @baudrate.setter def baudrate(self, baudrate): """Change the modules baudrate via AT commands and then check that we're still sync'd.""" at_cmd = "AT+UART_CUR=" + str(baudrate) + ",8,1,0," if self._rts_pin is not None: at_cmd += "2" else: at_cmd += "0" at_cmd += "\r\n" if self._debug: print("Changing baudrate to:", baudrate) print("--->", at_cmd) self._uart.write(bytes(at_cmd, "utf-8")) time.sleep(0.25) self._uart.baudrate = baudrate time.sleep(0.25) self._uart.reset_input_buffer() if not self.sync(): raise RuntimeError("Failed to resync after Baudrate change") def echo(self, echo): """Set AT command echo on or off""" if echo: self.at_response("ATE1", timeout=1) else: self.at_response("ATE0", timeout=1) def soft_reset(self): """Perform a software reset by AT command. Returns True if we successfully performed, false if failed to reset""" try: self._uart.reset_input_buffer() reply = self.at_response("AT+RST", timeout=1) if reply.strip(b"\r\n") == b"AT+RST": time.sleep(2) self._uart.reset_input_buffer() return True except OKError: pass # fail, see below return False def factory_reset(self): """Perform a hard reset, then send factory restore settings request""" self.hard_reset() self.at_response("AT+RESTORE", timeout=1) self._initialized = False def hard_reset(self): """Perform a hardware reset by toggling the reset pin, if it was defined in the initialization of this object""" if self._reset_pin: self._reset_pin.direction = Direction.OUTPUT self._reset_pin.value = False time.sleep(0.1) self._reset_pin.value = True self._uart.baudrate = self._default_baudrate time.sleep(3) # give it a few seconds to wake up self._uart.reset_input_buffer() self._initialized = False