#!/usr/bin/env python # # Original espota.py by Ivan Grokhotkov: # https://gist.github.com/igrr/d35ab8446922179dc58c # # Modified since 2015-09-18 from Pascal Gollor (https://github.com/pgollor) # Modified since 2015-11-09 from Hristo Gochkov (https://github.com/me-no-dev) # Modified since 2016-01-03 from Matthew O'Gorman (https://githumb.com/mogorman) # # This script will push an OTA update to the ESP # use it like: python espota.py -i -I -p -P [-a password] -f # Or to upload SPIFFS image: # python espota.py -i -I -p -P [-a password] -s -f # # Changes # 2015-09-18: # - Add option parser. # - Add logging. # - Send command to controller to differ between flashing and transmitting SPIFFS image. # # Changes # 2015-11-09: # - Added digest authentication # - Enhanced error tracking and reporting # # Changes # 2016-01-03: # - Added more options to parser. # # Changes # 2023-05-22: # - Replaced the deprecated optparse module with argparse. # - Adjusted the code style to conform to PEP 8 guidelines. # - Used with statement for file handling to ensure proper resource cleanup. # - Incorporated exception handling to catch and handle potential errors. # - Made variable names more descriptive for better readability. # - Introduced constants for better code maintainability. from __future__ import print_function import socket import sys import os import argparse import logging import hashlib import random # Commands FLASH = 0 SPIFFS = 100 AUTH = 200 # Constants PROGRESS_BAR_LENGTH = 60 # update_progress(): Displays or updates a console progress bar def update_progress(progress): if PROGRESS: status = "" if isinstance(progress, int): progress = float(progress) if not isinstance(progress, float): progress = 0 status = "Error: progress var must be float\r\n" if progress < 0: progress = 0 status = "Halt...\r\n" if progress >= 1: progress = 1 status = "Done...\r\n" block = int(round(PROGRESS_BAR_LENGTH * progress)) text = "\rUploading: [{0}] {1}% {2}".format("=" * block + " " * (PROGRESS_BAR_LENGTH - block), int(progress * 100), status) sys.stderr.write(text) sys.stderr.flush() else: sys.stderr.write(".") sys.stderr.flush() def serve(remote_addr, local_addr, remote_port, local_port, password, filename, command=FLASH): # Create a TCP/IP socket sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_address = (local_addr, local_port) logging.info('Starting on %s:%s', str(server_address[0]), str(server_address[1])) try: sock.bind(server_address) sock.listen(1) except Exception as e: logging.error("Listen Failed: %s", str(e)) return 1 content_size = os.path.getsize(filename) file_md5 = hashlib.md5(open(filename, 'rb').read()).hexdigest() logging.info('Upload size: %d', content_size) message = '%d %d %d %s\n' % (command, local_port, content_size, file_md5) # Wait for a connection inv_tries = 0 data = '' msg = 'Sending invitation to %s ' % remote_addr sys.stderr.write(msg) sys.stderr.flush() while inv_tries < 10: inv_tries += 1 sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) remote_address = (remote_addr, int(remote_port)) try: sent = sock2.sendto(message.encode(), remote_address) except: sys.stderr.write('failed\n') sys.stderr.flush() sock2.close() logging.error('Host %s Not Found', remote_addr) return 1 sock2.settimeout(TIMEOUT) try: data = sock2.recv(37).decode() break except: sys.stderr.write('.') sys.stderr.flush() sock2.close() sys.stderr.write('\n') sys.stderr.flush() if inv_tries == 10: logging.error('No response from the ESP') return 1 if data != "OK": if data.startswith('AUTH'): nonce = data.split()[1] cnonce_text = '%s%u%s%s' % (filename, content_size, file_md5, remote_addr) cnonce = hashlib.md5(cnonce_text.encode()).hexdigest() passmd5 = hashlib.md5(password.encode()).hexdigest() result_text = '%s:%s:%s' % (passmd5, nonce, cnonce) result = hashlib.md5(result_text.encode()).hexdigest() sys.stderr.write('Authenticating...') sys.stderr.flush() message = '%d %s %s\n' % (AUTH, cnonce, result) sock2.sendto(message.encode(), remote_address) sock2.settimeout(10) try: data = sock2.recv(32).decode() except: sys.stderr.write('FAIL\n') logging.error('No Answer to our Authentication') sock2.close() return 1 if data != "OK": sys.stderr.write('FAIL\n') logging.error('%s', data) sock2.close() sys.exit(1) return 1 sys.stderr.write('OK\n') else: logging.error('Bad Answer: %s', data) sock2.close() return 1 sock2.close() logging.info('Waiting for device...') try: sock.settimeout(10) connection, client_address = sock.accept() sock.settimeout(None) connection.settimeout(None) except: logging.error('No response from device') sock.close() return 1 try: with open(filename, "rb") as f: if PROGRESS: update_progress(0) else: sys.stderr.write('Uploading') sys.stderr.flush() offset = 0 while True: chunk = f.read(1024) if not chunk: break offset += len(chunk) update_progress(offset / float(content_size)) connection.settimeout(10) try: connection.sendall(chunk) res = connection.recv(10) last_response_contained_ok = 'OK' in res.decode() except Exception as e: sys.stderr.write('\n') logging.error('Error Uploading: %s', str(e)) connection.close() return 1 if last_response_contained_ok: logging.info('Success') connection.close() return 0 sys.stderr.write('\n') logging.info('Waiting for result...') count = 0 while count < 5: count += 1 connection.settimeout(60) try: data = connection.recv(32).decode() logging.info('Result: %s', data) if "OK" in data: logging.info('Success') connection.close() return 0 except Exception as e: logging.error('Error receiving result: %s', str(e)) connection.close() return 1 logging.error('Error response from device') connection.close() return 1 finally: connection.close() sock.close() return 1 def parse_args(unparsed_args): parser = argparse.ArgumentParser( description="Transmit image over the air to the ESP32 module with OTA support." ) # destination ip and port parser.add_argument("-i", "--ip", dest="esp_ip", action="store", help="ESP32 IP Address.", default=False) parser.add_argument("-I", "--host_ip", dest="host_ip", action="store", help="Host IP Address.", default="0.0.0.0") parser.add_argument("-p", "--port", dest="esp_port", type=int, help="ESP32 OTA Port. Default: 3232", default=3232) parser.add_argument( "-P", "--host_port", dest="host_port", type=int, help="Host server OTA Port. Default: random 10000-60000", default=random.randint(10000, 60000) ) # authentication parser.add_argument("-a", "--auth", dest="auth", help="Set authentication password.", action="store", default="") # image parser.add_argument("-f", "--file", dest="image", help="Image file.", metavar="FILE", default=None) parser.add_argument("-s", "--spiffs", dest="spiffs", action="store_true", help="Transmit a SPIFFS image and do not flash the module.", default=False) # output parser.add_argument("-d", "--debug", dest="debug", action="store_true", help="Show debug output. Overrides loglevel with debug.", default=False) parser.add_argument("-r", "--progress", dest="progress", action="store_true", help="Show progress output. Does not work for Arduino IDE.", default=False) parser.add_argument("-t", "--timeout", dest="timeout", type=int, help="Timeout to wait for the ESP32 to accept invitation.", default=10) return parser.parse_args(unparsed_args) def main(args): options = parse_args(args) log_level = logging.WARNING if options.debug: log_level = logging.DEBUG logging.basicConfig(level=log_level, format="%(asctime)-8s [%(levelname)s]: %(message)s", datefmt="%H:%M:%S") logging.debug("Options: %s", str(options)) # check options global PROGRESS PROGRESS = options.progress global TIMEOUT TIMEOUT = options.timeout if not options.esp_ip or not options.image: logging.critical("Not enough arguments.") return 1 command = FLASH if options.spiffs: command = SPIFFS return serve( options.esp_ip, options.host_ip, options.esp_port, options.host_port, options.auth, options.image, command ) if __name__ == "__main__": sys.exit(main(sys.argv[1:]))