# The MIT License (MIT) # # Copyright (c) 2019 ladyada for Adafruit Industries # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. """ `adafruit_esp32spi` ================================================================================ CircuitPython driver library for using ESP32 as WiFi co-processor using SPI * Author(s): ladyada Implementation Notes -------------------- **Hardware:** **Software and Dependencies:** * Adafruit CircuitPython firmware for the supported boards: https://github.com/adafruit/circuitpython/releases * Adafruit's Bus Device library: https://github.com/adafruit/Adafruit_CircuitPython_BusDevice """ import struct import time from micropython import const from digitalio import Direction from adafruit_bus_device.spi_device import SPIDevice __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI.git" # pylint: disable=bad-whitespace _SET_NET_CMD = const(0x10) _SET_PASSPHRASE_CMD = const(0x11) _GET_CONN_STATUS_CMD = const(0x20) _GET_IPADDR_CMD = const(0x21) _GET_MACADDR_CMD = const(0x22) _GET_CURR_SSID_CMD = const(0x23) _GET_CURR_RSSI_CMD = const(0x25) _GET_CURR_ENCT_CMD = const(0x26) _SCAN_NETWORKS = const(0x27) _GET_SOCKET_CMD = const(0x3F) _GET_STATE_TCP_CMD = const(0x29) _DATA_SENT_TCP_CMD = const(0x2A) _AVAIL_DATA_TCP_CMD = const(0x2B) _GET_DATA_TCP_CMD = const(0x2C) _START_CLIENT_TCP_CMD = const(0x2D) _STOP_CLIENT_TCP_CMD = const(0x2E) _GET_CLIENT_STATE_TCP_CMD = const(0x2F) _DISCONNECT_CMD = const(0x30) _GET_IDX_RSSI_CMD = const(0x32) _GET_IDX_ENCT_CMD = const(0x33) _REQ_HOST_BY_NAME_CMD = const(0x34) _GET_HOST_BY_NAME_CMD = const(0x35) _START_SCAN_NETWORKS = const(0x36) _GET_FW_VERSION_CMD = const(0x37) _PING_CMD = const(0x3E) _SEND_DATA_TCP_CMD = const(0x44) _GET_DATABUF_TCP_CMD = const(0x45) _START_CMD = const(0xE0) _END_CMD = const(0xEE) _ERR_CMD = const(0xEF) _REPLY_FLAG = const(1<<7) _CMD_FLAG = const(0) SOCKET_CLOSED = const(0) SOCKET_LISTEN = const(1) SOCKET_SYN_SENT = const(2) SOCKET_SYN_RCVD = const(3) SOCKET_ESTABLISHED = const(4) SOCKET_FIN_WAIT_1 = const(5) SOCKET_FIN_WAIT_2 = const(6) SOCKET_CLOSE_WAIT = const(7) SOCKET_CLOSING = const(8) SOCKET_LAST_ACK = const(9) SOCKET_TIME_WAIT = const(10) WL_NO_SHIELD = const(0xFF) WL_NO_MODULE = const(0xFF) WL_IDLE_STATUS = const(0) WL_NO_SSID_AVAIL = const(1) WL_SCAN_COMPLETED = const(2) WL_CONNECTED = const(3) WL_CONNECT_FAILED = const(4) WL_CONNECTION_LOST = const(5) WL_DISCONNECTED = const(6) WL_AP_LISTENING = const(7) WL_AP_CONNECTED = const(8) WL_AP_FAILED = const(9) # pylint: enable=bad-whitespace class ESP_SPIcontrol: # pylint: disable=too-many-public-methods """A class that will talk to an ESP32 module programmed with special firmware that lets it act as a fast an efficient WiFi co-processor""" TCP_MODE = const(0) UDP_MODE = const(1) TLS_MODE = const(2) # pylint: disable=too-many-arguments def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin, *, debug=False): self._debug = debug self._buffer = bytearray(10) self._pbuf = bytearray(1) # buffer for param read self._spi_device = SPIDevice(spi, cs_pin, baudrate=8000000) self._cs = cs_pin self._ready = ready_pin self._reset = reset_pin self._gpio0 = gpio0_pin self._cs.direction = Direction.OUTPUT self._ready.direction = Direction.INPUT self._reset.direction = Direction.OUTPUT self._gpio0.direction = Direction.INPUT self.reset() # pylint: enable=too-many-arguments def reset(self): """Hard reset the ESP32 using the reset pin""" self._gpio0.direction = Direction.OUTPUT if self._debug: print("Reset ESP32") self._gpio0.value = True # not bootload mode self._cs.value = True self._reset.value = False time.sleep(0.01) # reset self._reset.value = True time.sleep(0.75) # wait for it to boot up self._gpio0.direction = Direction.INPUT def _wait_for_ready(self): """Wait until the ready pin goes low""" if self._debug: print("Wait for ESP32 ready", end='') times = time.monotonic() while (time.monotonic() - times) < 10: # wait up to 10 seconds if not self._ready.value: # we're ready! break if self._debug: print('.', end='') else: raise RuntimeError("ESP32 not responding") if self._debug: print() def _send_command(self, cmd, params=None, *, param_len_16=False): """Send over a command with a list of parameters""" if not params: params = [] packet = [] packet.append(_START_CMD) packet.append(cmd & ~_REPLY_FLAG) packet.append(len(params)) # handle parameters here for i, param in enumerate(params): if self._debug >= 2: print("\tSending param #%d is %d bytes long" % (i, len(param))) if param_len_16: packet.append((len(param) >> 8) & 0xFF) packet.append(len(param) & 0xFF) packet += (param) packet.append(_END_CMD) while len(packet) % 4 != 0: packet.append(0xFF) self._wait_for_ready() with self._spi_device as spi: times = time.monotonic() while (time.monotonic() - times) < 1: # wait up to 1000ms if self._ready.value: # ok ready to send! break else: raise RuntimeError("ESP32 timed out on SPI select") spi.write(bytearray(packet)) # pylint: disable=no-member if self._debug: print("Wrote: ", [hex(b) for b in packet]) def _read_byte(self, spi): """Read one byte from SPI""" spi.readinto(self._pbuf) if self._debug >= 2: print("\t\tRead:", hex(self._pbuf[0])) return self._pbuf[0] def _wait_spi_char(self, spi, desired): """Read a byte with a time-out, and if we get it, check that its what we expect""" times = time.monotonic() while (time.monotonic() - times) < 0.1: r = self._read_byte(spi) if r == _ERR_CMD: raise RuntimeError("Error response to command") if r == desired: return True raise RuntimeError("Timed out waiting for SPI char") def _check_data(self, spi, desired): """Read a byte and verify its the value we want""" r = self._read_byte(spi) if r != desired: raise RuntimeError("Expected %02X but got %02X" % (desired, r)) def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): """Wait for ready, then parse the response""" self._wait_for_ready() responses = [] with self._spi_device as spi: times = time.monotonic() while (time.monotonic() - times) < 1: # wait up to 1000ms if self._ready.value: # ok ready to send! break else: raise RuntimeError("ESP32 timed out on SPI select") self._wait_spi_char(spi, _START_CMD) self._check_data(spi, cmd | _REPLY_FLAG) if num_responses is not None: self._check_data(spi, num_responses) else: num_responses = self._read_byte(spi) for num in range(num_responses): response = [] param_len = self._read_byte(spi) if param_len_16: param_len <<= 8 param_len |= self._read_byte(spi) if self._debug >= 2: print("\tParameter #%d length is %d" % (num, param_len)) for _ in range(param_len): response.append(self._read_byte(spi)) responses.append(bytes(response)) self._check_data(spi, _END_CMD) if self._debug: print("Read %d: " % len(responses[0]), responses) return responses def _send_command_get_response(self, cmd, params=None, *, reply_params=1, sent_param_len_16=False, recv_param_len_16=False): """Send a high level SPI command, wait and return the response""" self._send_command(cmd, params, param_len_16=sent_param_len_16) return self._wait_response_cmd(cmd, reply_params, param_len_16=recv_param_len_16) @property def status(self): """The status of the ESP32 WiFi core. Can be WL_NO_SHIELD or WL_NO_MODULE (not found), WL_IDLE_STATUS, WL_NO_SSID_AVAIL, WL_SCAN_COMPLETED, WL_CONNECTED, WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED, WL_AP_LISTENING, WL_AP_CONNECTED, WL_AP_FAILED""" if self._debug: print("Connection status") resp = self._send_command_get_response(_GET_CONN_STATUS_CMD) if self._debug: print("Status:", resp[0][0]) return resp[0][0] # one byte response @property def firmware_version(self): """A string of the firmware version on the ESP32""" if self._debug: print("Firmware version") resp = self._send_command_get_response(_GET_FW_VERSION_CMD) return resp[0] @property def MAC_address(self): # pylint: disable=invalid-name """A bytearray containing the MAC address of the ESP32""" if self._debug: print("MAC address") resp = self._send_command_get_response(_GET_MACADDR_CMD, [b'\xFF']) return resp[0] def start_scan_networks(self): """Begin a scan of visible access points. Follow up with a call to 'get_scan_networks' for response""" if self._debug: print("Start scan") resp = self._send_command_get_response(_START_SCAN_NETWORKS) if resp[0][0] != 1: raise RuntimeError("Failed to start AP scan") def get_scan_networks(self): """The results of the latest SSID scan. Returns a list of dictionaries with 'ssid', 'rssi' and 'encryption' entries, one for each AP found""" self._send_command(_SCAN_NETWORKS) names = self._wait_response_cmd(_SCAN_NETWORKS) print("SSID names:", names) APs = [] # pylint: disable=invalid-name for i, name in enumerate(names): a_p = {'ssid': name} rssi = self._send_command_get_response(_GET_IDX_RSSI_CMD, [[i]])[0] a_p['rssi'] = struct.unpack('H', port) if isinstance(dest, str): # use the 5 arg version dest = bytes(dest, 'utf-8') resp = self._send_command_get_response(_START_CLIENT_TCP_CMD, [dest, b'\x00\x00\x00\x00', port_param, [socket_num], [conn_mode]]) else: # ip address, use 4 arg vesion resp = self._send_command_get_response(_START_CLIENT_TCP_CMD, [dest, port_param, [socket_num], [conn_mode]]) if resp[0][0] != 1: raise RuntimeError("Could not connect to remote server") def socket_status(self, socket_num): """Get the socket connection status, can be SOCKET_CLOSED, SOCKET_LISTEN, SOCKET_SYN_SENT, SOCKET_SYN_RCVD, SOCKET_ESTABLISHED, SOCKET_FIN_WAIT_1, SOCKET_FIN_WAIT_2, SOCKET_CLOSE_WAIT, SOCKET_CLOSING, SOCKET_LAST_ACK, or SOCKET_TIME_WAIT""" resp = self._send_command_get_response(_GET_CLIENT_STATE_TCP_CMD, [[socket_num]]) return resp[0][0] def socket_connected(self, socket_num): """Test if a socket is connected to the destination, returns boolean true/false""" return self.socket_status(socket_num) == SOCKET_ESTABLISHED def socket_write(self, socket_num, buffer): """Write the bytearray buffer to a socket""" if self._debug: print("Writing:", buffer) resp = self._send_command_get_response(_SEND_DATA_TCP_CMD, [[socket_num], buffer], sent_param_len_16=True) sent = resp[0][0] if sent != len(buffer): raise RuntimeError("Failed to send %d bytes (sent %d)" % (len(buffer), sent)) resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, [[socket_num]]) if resp[0][0] != 1: raise RuntimeError("Failed to verify data sent") def socket_available(self, socket_num): """Determine how many bytes are waiting to be read on the socket""" resp = self._send_command_get_response(_AVAIL_DATA_TCP_CMD, [[socket_num]]) reply = struct.unpack('> 8) & 0xFF]], sent_param_len_16=True, recv_param_len_16=True) return resp[0] def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE): """Open and verify we connected a socket to a destination IP address or hostname using the ESP32's internal reference number. By default we use 'conn_mode' TCP_MODE but can also use UDP_MODE or TLS_MODE (dest must be hostname for TLS_MODE!)""" if self._debug: print("*** Socket connect mode", conn_mode) self.socket_open(socket_num, dest, port, conn_mode=conn_mode) times = time.monotonic() while (time.monotonic() - times) < 3: # wait 3 seconds if self.socket_connected(socket_num): return True time.sleep(0.01) raise RuntimeError("Failed to establish connection") def socket_close(self, socket_num): """Close a socket using the ESP32's internal reference number""" resp = self._send_command_get_response(_STOP_CLIENT_TCP_CMD, [[socket_num]]) if resp[0][0] != 1: raise RuntimeError("Failed to close socket")