arduino-pico/tools/pluggable_discovery.py
Sebastian Krzyszkowiak 4e16a700c6
pluggable_discovery: Allow the scanner thread to quit (#1029)
When receiving a `QUIT` message, the main thread was trying to tell   
the scanner thread to quit - however, what it was actually doing was
creating a local variable that shadowed the global flag used by the
scanner thread. Fix that by ensuring that the main thread uses the
global variable instead.

Fixes #1028
2022-12-04 11:50:06 -08:00

120 lines
2.7 KiB
Python
Executable file

#!/usr/bin/env python3
import os
import subprocess
import sys
import time
import threading
toolspath = os.path.dirname(os.path.realpath(__file__))
try:
sys.path.insert(0, os.path.join(toolspath, ".")) # Add pyserial dir to search path
import uf2conv # If this fails, we can't continue and will bomb below
except ImportError:
sys.stderr.write("uf2conv not found next to this tool.\n")
sys.exit(1)
scannerGo = False
def scanner():
global scannerGo
scannerGo = True
boards = False
while scannerGo:
l = uf2conv.get_drives()
if (len(l) > 0) and scannerGo and not boards:
boards = True
print ("""{
"eventType": "add",
"port": {
"address": "UF2_Board",
"label": "UF2 Board",
"protocol": "uf2conv",
"protocolLabel": "UF2 Devices",
"properties": {
"mac": "ffffffffffff",
"pid" : "0x2e8a",
"vid" : "0x000a"
}
}
}""", flush=True)
elif (len(l) == 0) and scannerGo and boards:
boards = False
print("""{
"eventType": "remove",
"port": {
"address": "UF2_Board",
"protocol": "uf2conv"
}
}""", flush = True)
n = time.time() + 2
while scannerGo and (time.time() < n):
time.sleep(.1)
scannerGo = True
def main():
global scannerGo
while True:
cmdline = input()
cmd = cmdline.split()[0]
if cmd == "HELLO":
print(""" {
"eventType": "hello",
"message": "OK",
"protocolVersion": 1
}""", flush = True)
elif cmd == "START":
print("""{
"eventType": "start",
"message": "OK"
}""", flush = True);
elif cmd == "STOP":
scannerGo = False
while not scannerGo:
time.sleep(.1)
print("""{
"eventType": "stop",
"message": "OK"
}""", flush = True)
elif cmd == "QUIT":
scannerGo = False
print("""{
"eventType": "quit",
"message": "OK"
}""", flush = True)
return
elif cmd == "LIST":
l = uf2conv.get_drives()
if len(l) > 0:
print ("""{
"eventType": "list",
"ports": [
{
"address": "UF2_Board",
"label": "UF2 Board",
"protocol": "uf2conv",
"protocolLabel": "UF2 Devices",
"properties": {
"mac": "ffffffffffff",
"pid" : "0x2e8a",
"vid" : "0x000a"
}
}
]
}""", flush=True)
else:
print ("""{
"eventType": "list",
"ports": [ ]
}""", flush=True)
elif cmd == "START_SYNC":
print("""{
"eventType": "start_sync",
"message": "OK"
}""", flush = True)
scannerGo = True
threading.Thread(target = scanner).start()
time.sleep(.5)
main()