arduino-pico/tools/espota.py
Earle F. Philhower, III da86a8942b
Add OTA update support (#711)
Adds a 12K OTA stub 3rd stage bootloader, which reads new firmware
from the LittleFS filesystem and flashes on reboot.

By storing the OTA commands in a file in flash, it is possible to
recover from a power failure during OTA programming.  On power
resume, the OTA block will simply re-program from the beginning.

Support cryptographic signed OTA updates, if desired.  Includes
host-side signing logic via openssl.

Add PicoOTA library which encapsulates the file format for
the updater, including CRC32 checking.

Add LEAmDNS support to allow Arduino IDE discovery

Add ArduinoOTA class for IDE uploads

Add MD5Builder class

Add Updater class which supports writing and validating
cryptographically signed binaries from any source (http,
Ethernet, WiFi, Serial, etc.)

Add documentation and readmes.
2022-08-12 00:26:51 -07:00

345 lines
9.5 KiB
Python
Executable file

#!/usr/bin/env python3
#
# Original espota.py by Ivan Grokhotkov:
# https://gist.github.com/igrr/d35ab8446922179dc58c
#
# Modified since 2015-09-18 from Pascal Gollor (https://github.com/pgollor)
# Modified since 2015-11-09 from Hristo Gochkov (https://github.com/me-no-dev)
# Modified since 2016-01-03 from Matthew O'Gorman (https://githumb.com/mogorman)
#
# This script will push an OTA update to the ESP
# use it like: python3 espota.py -i <ESP_IP_address> -I <Host_IP_address> -p <ESP_port> -P <Host_port> [-a password] -f <sketch.bin>
# Or to upload SPIFFS image:
# python3 espota.py -i <ESP_IP_address> -I <Host_IP_address> -p <ESP_port> -P <HOST_port> [-a password] -s -f <spiffs.bin>
#
# Changes
# 2015-09-18:
# - Add option parser.
# - Add logging.
# - Send command to controller to differ between flashing and transmitting SPIFFS image.
#
# Changes
# 2015-11-09:
# - Added digest authentication
# - Enhanced error tracking and reporting
#
# Changes
# 2016-01-03:
# - Added more options to parser.
#
from __future__ import print_function
import socket
import sys
import os
import optparse
import logging
import hashlib
import random
# Commands
FLASH = 0
SPIFFS = 100
AUTH = 200
PROGRESS = False
# update_progress() : Displays or updates a console progress bar
## Accepts a float between 0 and 1. Any int will be converted to a float.
## A value under 0 represents a 'halt'.
## A value at 1 or bigger represents 100%
def update_progress(progress):
if (PROGRESS):
barLength = 60 # Modify this to change the length of the progress bar
status = ""
if isinstance(progress, int):
progress = float(progress)
if not isinstance(progress, float):
progress = 0
status = "error: progress var must be float\r\n"
if progress < 0:
progress = 0
status = "Halt...\r\n"
if progress >= 1:
progress = 1
status = "Done...\r\n"
block = int(round(barLength*progress))
text = "\rUploading: [{0}] {1}% {2}".format( "="*block + " "*(barLength-block), int(progress*100), status)
sys.stderr.write(text)
sys.stderr.flush()
else:
sys.stderr.write('.')
sys.stderr.flush()
def serve(remoteAddr, localAddr, remotePort, localPort, password, filename, command = FLASH):
# Create a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_address = (localAddr, localPort)
logging.info('Starting on %s:%s', str(server_address[0]), str(server_address[1]))
try:
sock.bind(server_address)
sock.listen(1)
except Exception:
logging.error("Listen Failed")
return 1
# Check whether Signed Update is used.
if ( os.path.isfile(filename + '.signed') ):
filename = filename + '.signed'
file_check_msg = 'Detected Signed Update. %s will be uploaded instead.' % (filename)
sys.stderr.write(file_check_msg + '\n')
sys.stderr.flush()
logging.info(file_check_msg)
content_size = os.path.getsize(filename)
f = open(filename,'rb')
file_md5 = hashlib.md5(f.read()).hexdigest()
f.close()
logging.info('Upload size: %d', content_size)
message = '%d %d %d %s\n' % (command, localPort, content_size, file_md5)
# Wait for a connection
logging.info('Sending invitation to: %s', remoteAddr)
sock2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
remote_address = (remoteAddr, int(remotePort))
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(128).decode()
except Exception:
logging.error('No Answer')
sock2.close()
return 1
if (data != "OK"):
if(data.startswith('AUTH')):
nonce = data.split()[1]
cnonce_text = '%s%u%s%s' % (filename, content_size, file_md5, remoteAddr)
cnonce = hashlib.md5(cnonce_text.encode()).hexdigest()
passmd5 = hashlib.md5(password.encode()).hexdigest()
result_text = '%s:%s:%s' % (passmd5 ,nonce, cnonce)
result = hashlib.md5(result_text.encode()).hexdigest()
sys.stderr.write('Authenticating...')
sys.stderr.flush()
message = '%d %s %s\n' % (AUTH, cnonce, result)
sock2.sendto(message.encode(), remote_address)
sock2.settimeout(10)
try:
data = sock2.recv(32).decode()
except Exception:
sys.stderr.write('FAIL\n')
logging.error('No Answer to our Authentication')
sock2.close()
return 1
if (data != "OK"):
sys.stderr.write('FAIL\n')
logging.error('%s', data)
sock2.close()
sys.exit(1)
return 1
sys.stderr.write('OK\n')
else:
logging.error('Bad Answer: %s', data)
sock2.close()
return 1
sock2.close()
logging.info('Waiting for device...')
try:
sock.settimeout(10)
connection, client_address = sock.accept()
sock.settimeout(None)
connection.settimeout(None)
except Exception:
logging.error('No response from device')
sock.close()
return 1
received_ok = False
try:
f = open(filename, "rb")
if (PROGRESS):
update_progress(0)
else:
sys.stderr.write('Uploading')
sys.stderr.flush()
offset = 0
while True:
chunk = f.read(1460)
if not chunk: break
offset += len(chunk)
update_progress(offset/float(content_size))
connection.settimeout(10)
try:
connection.sendall(chunk)
if connection.recv(32).decode().find('O') >= 0:
# connection will receive only digits or 'OK'
received_ok = True
except Exception:
sys.stderr.write('\n')
logging.error('Error Uploading')
connection.close()
f.close()
sock.close()
return 1
sys.stderr.write('\n')
logging.info('Waiting for result...')
# libraries/ArduinoOTA/ArduinoOTA.cpp L311 L320
# only sends digits or 'OK'. We must not not close
# the connection before receiving the 'O' of 'OK'
try:
connection.settimeout(60)
received_ok = False
received_error = False
while not (received_ok or received_error):
reply = connection.recv(64).decode()
# Look for either the "E" in ERROR or the "O" in OK response
# Check for "E" first, since both strings contain "O"
if reply.find('E') >= 0:
sys.stderr.write('\n')
logging.error('%s', reply)
received_error = True
elif reply.find('O') >= 0:
logging.info('Result: OK')
received_ok = True
connection.close()
f.close()
sock.close()
if received_ok:
sys.stderr.write("Complete\n")
sys.stderr.flush()
return 0
return 1
except Exception:
logging.error('No Result!')
connection.close()
f.close()
sock.close()
return 1
finally:
connection.close()
f.close()
sock.close()
return 1
# end serve
def parser(unparsed_args):
parser = optparse.OptionParser(
usage = "%prog [options]",
description = "Transmit image over the air to the esp8266 module with OTA support."
)
# destination ip and port
group = optparse.OptionGroup(parser, "Destination")
group.add_option("-i", "--ip",
dest = "esp_ip",
action = "store",
help = "ESP8266 IP Address.",
default = False
)
group.add_option("-I", "--host_ip",
dest = "host_ip",
action = "store",
help = "Host IP Address.",
default = "0.0.0.0"
)
group.add_option("-p", "--port",
dest = "esp_port",
type = "int",
help = "ESP8266 ota Port. Default 8266",
default = 8266
)
group.add_option("-P", "--host_port",
dest = "host_port",
type = "int",
help = "Host server ota Port. Default random 10000-60000",
default = random.randint(10000,60000)
)
parser.add_option_group(group)
# auth
group = optparse.OptionGroup(parser, "Authentication")
group.add_option("-a", "--auth",
dest = "auth",
help = "Set authentication password.",
action = "store",
default = ""
)
parser.add_option_group(group)
# image
group = optparse.OptionGroup(parser, "Image")
group.add_option("-f", "--file",
dest = "image",
help = "Image file.",
metavar="FILE",
default = None
)
group.add_option("-s", "--spiffs",
dest = "spiffs",
action = "store_true",
help = "Use this option to transmit a SPIFFS image and do not flash the module.",
default = False
)
parser.add_option_group(group)
# output group
group = optparse.OptionGroup(parser, "Output")
group.add_option("-d", "--debug",
dest = "debug",
help = "Show debug output. And override loglevel with debug.",
action = "store_true",
default = False
)
group.add_option("-r", "--progress",
dest = "progress",
help = "Show progress output. Does not work for ArduinoIDE",
action = "store_true",
default = False
)
parser.add_option_group(group)
(options, args) = parser.parse_args(unparsed_args)
return options
# end parser
def main(args):
# get options
options = parser(args)
# adapt log level
loglevel = logging.WARNING
if (options.debug):
loglevel = logging.DEBUG
# end if
# logging
logging.basicConfig(level = loglevel, format = '%(asctime)-8s [%(levelname)s]: %(message)s', datefmt = '%H:%M:%S')
logging.debug("Options: %s", str(options))
# check options
global PROGRESS
PROGRESS = options.progress
if (not options.esp_ip or not options.image):
logging.critical("Not enough arguments.")
return 1
# end if
command = FLASH
if (options.spiffs):
command = SPIFFS
# end if
return serve(options.esp_ip, options.host_ip, options.esp_port, options.host_port, options.auth, options.image, command)
# end main
if __name__ == '__main__':
sys.exit(main(sys.argv))
# end if