import struct import time import board import busio from digitalio import DigitalInOut, Direction, Pull from micropython import const class ESP_SPIcontrol: SET_NET_CMD = const(0x10) SET_PASSPHRASE_CMD = const(0x11) GET_CONN_STATUS_CMD = const(0x20) GET_IPADDR_CMD = const(0x21) GET_MACADDR_CMD = const(0x22) GET_CURR_SSID_CMD = const(0x23) GET_CURR_RSSI_CMD = const(0x25) GET_CURR_ENCT_CMD = const(0x26) SCAN_NETWORKS = const(0x27) GET_SOCKET_CMD = const(0x3F) GET_STATE_TCP_CMD = const(0x29) DATA_SENT_TCP_CMD = const(0x2A) AVAIL_DATA_TCP_CMD = const(0x2B) GET_DATA_TCP_CMD = const(0x2C) START_CLIENT_TCP_CMD = const(0x2D) STOP_CLIENT_TCP_CMD = const(0x2E) GET_CLIENT_STATE_TCP_CMD = const(0x2F) DISCONNECT_CMD = const(0x30) GET_IDX_RSSI_CMD = const(0x32) GET_IDX_ENCT_CMD = const(0x33) REQ_HOST_BY_NAME_CMD = const(0x34) GET_HOST_BY_NAME_CMD = const(0x35) START_SCAN_NETWORKS = const(0x36) GET_FW_VERSION_CMD = const(0x37) PING_CMD = const(0x3E) SEND_DATA_TCP_CMD = const(0x44) GET_DATABUF_TCP_CMD = const(0x45) START_CMD = const(0xE0) END_CMD = const(0xEE) ERR_CMD = const(0xEF) REPLY_FLAG = const(1<<7) CMD_FLAG = const(0) WL_NO_SHIELD = const(0xFF) WL_NO_MODULE = const(0xFF) WL_IDLE_STATUS = const(0) WL_NO_SSID_AVAIL = const(1) WL_SCAN_COMPLETED = const(2) WL_CONNECTED = const(3) SOCKET_CLOSED = const(0) SOCKET_LISTEN = const(1) SOCKET_SYN_SENT = const(2) SOCKET_SYN_RCVD = const(3) SOCKET_ESTABLISHED = const(4) SOCKET_FIN_WAIT_1 = const(5) SOCKET_FIN_WAIT_2 = const(6) SOCKET_CLOSE_WAIT = const(7) SOCKET_CLOSING = const(8) SOCKET_LAST_ACK = const(9) SOCKET_TIME_WAIT = const(10) TCP_MODE = const(0) UDP_MODE = const(1) TLS_MODE = const(2) def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin, *, debug=False): self._debug = debug self._buffer = bytearray(10) self._pbuf = bytearray(1) # buffer for param read self._spi = spi self._cs = cs_pin self._ready = ready_pin self._reset = reset_pin self._gpio0 = gpio0_pin self._cs.direction = Direction.OUTPUT self._ready.direction = Direction.INPUT self._reset.direction = Direction.OUTPUT self._gpio0.direction = Direction.INPUT self.reset() def reset(self): self._gpio0.direction = Direction.OUTPUT if self._debug: print("Reset ESP32") self._gpio0.value = True # not bootload mode self._cs.value = True self._reset.value = False time.sleep(0.01) # reset self._reset.value = True time.sleep(0.75) # wait for it to boot up self._gpio0.direction = Direction.INPUT def spi_slave_select(self): while not self._spi.try_lock(): pass self._spi.configure(baudrate=100000) # start slow self._cs.value = False # the actual select times = time.monotonic() while (time.monotonic() - times) < 1: # wait up to 1000ms if self._ready.value: return # some failure self._cs.value = True self._spi.unlock() raise RuntimeError("ESP32 timed out on SPI select") def slave_deselect(self): self._cs.value = True self._spi.unlock() def slave_ready(self): return self._ready.value == False def wait_for_slave_ready(self): if self._debug: print("Wait for slave ready", end='') while not self.slave_ready(): if self._debug: print('.', end='') time.sleep(0.01) if self._debug: print() def wait_for_slave_select(self): self.wait_for_slave_ready() self.spi_slave_select() def send_command(self, cmd, params=None, *, param_len_16=False): if not params: params = [] packet = [] packet.append(START_CMD) packet.append(cmd & ~REPLY_FLAG) packet.append(len(params)) # handle parameters here for i, param in enumerate(params): if self._debug >= 2: print("\tSending param #%d is %d bytes long" % (i, len(param))) if param_len_16: packet.append((len(param) >> 8) & 0xFF) packet.append(len(param) & 0xFF) packet += (param) packet.append(END_CMD) while len(packet) % 4 != 0: packet.append(0xFF) self.wait_for_slave_select() self._spi.write(bytearray(packet)) if self._debug: print("Wrote: ", [hex(b) for b in packet]) self.slave_deselect() def read_byte(self): self._spi.readinto(self._pbuf) if self._debug >= 2: print("\t\tRead:", hex(self._pbuf[0])) return self._pbuf[0] def wait_spi_char(self, desired): times = time.monotonic() while (time.monotonic() - times) < 0.1: r = self.read_byte() if r == ERR_CMD: raise RuntimeError("Error response to command") if r == desired: return True else: raise RuntimeError("Timed out waiting for SPI char") def check_data(self, desired): r = self.read_byte() if r != desired: raise RuntimeError("Expected %02X but got %02X" % (desired, r)) def wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): self.wait_for_slave_ready() self.spi_slave_select() self.wait_spi_char(START_CMD) self.check_data(cmd | REPLY_FLAG) if num_responses is not None: self.check_data(num_responses) else: num_responses = self.read_byte() responses = [] for num in range(num_responses): response = [] param_len = self.read_byte() if param_len_16: param_len <<= 8 param_len |= self.read_byte() if self._debug >= 2: print("\tParameter #%d length is %d" % (num, param_len)) for j in range(param_len): response.append(self.read_byte()) responses.append(bytes(response)) self.check_data(END_CMD) self.slave_deselect() if self._debug: print("Read: ", responses) return responses def send_command_get_response(self, cmd, params=None, *, reply_params=1, sent_param_len_16=False, recv_param_len_16=False): self.send_command(cmd, params, param_len_16=sent_param_len_16) return self.wait_response_cmd(cmd, reply_params, param_len_16=recv_param_len_16) @property def status(self): if self._debug: print("Connection status") resp = self.send_command_get_response(GET_CONN_STATUS_CMD) if self._debug: print("Status:", resp[0][0]) return resp[0][0] # one byte response @property def firmware_version(self): if self._debug: print("Firmware version") resp = self.send_command_get_response(GET_FW_VERSION_CMD) return resp[0] @property def MAC_address(self): if self._debug: print("MAC address") resp = self.send_command_get_response(GET_MACADDR_CMD, [b'\xFF']) return resp[0] def start_scan_networks(self): if self._debug: print("Start scan") resp = self.send_command_get_response(START_SCAN_NETWORKS) if resp[0][0] != 1: raise RuntimeError("Failed to start AP scan") def get_scan_networks(self): self.send_command(SCAN_NETWORKS) names = self.wait_response_cmd(SCAN_NETWORKS) print("SSID names:", names) APs = [] for i, name in enumerate(names): AP = {'ssid': name} rssi = self.send_command_get_response(GET_IDX_RSSI_CMD, [[i]])[0] AP['rssi'] = struct.unpack('H', port) if isinstance(dest, str): # use the 5 arg version dest = bytes(dest, 'utf-8') resp = self.send_command_get_response(START_CLIENT_TCP_CMD, [dest, b'\x00\x00\x00\x00', port_param, [socket_num], [conn_mode]]) else: # ip address, use 4 arg vesion resp = self.send_command_get_response(START_CLIENT_TCP_CMD, [dest, port_param, [socket_num], [conn_mode]]) if resp[0][0] != 1: raise RuntimeError("Could not connect to remote server") def socket_status(self, socket_num): resp = self.send_command_get_response(GET_CLIENT_STATE_TCP_CMD, [[socket_num]]) return resp[0][0] def socket_connected(self, socket_num): return self.socket_status(socket_num) == SOCKET_ESTABLISHED def socket_write(self, socket_num, buffer): if self._debug: print("Writing:", buffer) resp = self.send_command_get_response(SEND_DATA_TCP_CMD, [[socket_num], buffer], sent_param_len_16=True) sent = resp[0][0] if sent != len(buffer): raise RuntimeError("Failed to send %d bytes (sent %d)" % (len(buffer), sent)) resp = self.send_command_get_response(DATA_SENT_TCP_CMD, [[socket_num]]) if resp[0][0] != 1: raise RuntimeError("Failed to verify data sent") def socket_available(self, socket_num): resp = self.send_command_get_response(AVAIL_DATA_TCP_CMD, [[socket_num]]) return struct.unpack('