const CHAR_CTRL_A = '\x01'; const CHAR_CTRL_B = '\x02'; const CHAR_CTRL_C = '\x03'; const CHAR_CTRL_D = '\x04'; const CHAR_TITLE_START = "\x1b]0;"; const CHAR_TITLE_END = "\x1b\\"; const CHAR_SNAKE = "🐍"; const MODE_NORMAL = 1; const MODE_RAW = 2; const TYPE_DIR = 16384; const TYPE_FILE = 32768; const DEBUG = false; export const LINE_ENDING_CRLF = "\r\n"; export const LINE_ENDING_LF = "\n"; // Default timeouts in milliseconds (can be overridden with properties) const PROMPT_TIMEOUT = 20000; const CODE_EXECUTION_TIMEOUT = 15000; const CODE_INTERRUPT_TIMEOUT = 5000; const PROMPT_CHECK_INTERVAL = 50; const REGEX_PROMPT_RAW_MODE = /raw REPL; CTRL-B to exit/; const REGEX_PROMPT_NORMAL_MODE = />>> /; const modes = [ "Unknown", "Normal", "Raw", ]; // Class to use python code to get file information // We want to do stuff like writing files, reading files, and listing files export class FileOps { constructor(repl, checkReadOnly=true) { this._repl = repl; this._isReadOnly = null; this._doCheckReadOnly = checkReadOnly; } async _checkReadOnly() { if (!this._doCheckReadOnly) { return; } if (this._isReadOnly == null) { this._isReadOnly = await this.isReadOnly(); } if (this._isReadOnly()) { throw new Error("File System is Read Only. Try disabling or ejecting the drive."); } } async _checkReplErrors() { let error = this._repl.getErrorOutput(); if (error) { console.error("Python Error - " + error.type + ": " + error.message); if (error.type == "OSError" && error.errno == 30) { this._isReadOnly = true; // Throw an error if needed await this._checkReadOnly(); } } return error; } // Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary async _writeRawFile(path, contents, offset=0, modificationTime=null) { let byteString = ""; // Contents needs to be converted from a ArrayBuffer to a byte string let view = new Uint8Array(contents); for (let byte of view) { byteString += String.fromCharCode(byte); } contents = btoa(byteString); // Convert binary string to base64 let code = ` import binascii with open("${path}", "wb") as f: f.seek(${offset}) byte_string = binascii.a2b_base64("${contents}") f.write(byte_string) `; if (modificationTime) { code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`; } await this._repl.execRawMode(code); } async _writeTextFile(path, contents, offset=0, modificationTime=null) { // The contents needs to be converted from a UInt8Array to a string contents = String.fromCharCode.apply(null, contents); contents = contents.replace(/"/g, '\\"'); let code = ` with open("${path}", "w") as f: f.seek(${offset}) f.write("""${contents}""") `; if (modificationTime) { code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`; } await this._repl.execRawMode(code); } // Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary async writeFile(path, contents, offset=0, modificationTime=null, raw=false) { this._repl.terminalOutput = DEBUG; if (raw) { await this._writeRawFile(path, contents, offset, modificationTime); } else { await this._writeTextFile(path, contents, offset, modificationTime); } this._repl.terminalOutput = true; } async _readRawFile(path) { try { let code = ` import binascii with open("${path}", "rb") as f: byte_string = f.read() print(binascii.b2a_base64(byte_string, False)) `; let result = await this._repl.execRawMode(code); if (this._checkReplErrors()) { return null; } // strip the b, ending newline, and quotes from the beginning and end result = result.slice(2, -3); // convert the base64 string to an ArrayBuffer. Each byte of the Array buffer is a byte of the file with a value between 0-255 result = atob(result); // Convert base64 to binary string let length = result.length; let buffer = new ArrayBuffer(length); let view = new Uint8Array(buffer); for (let i = 0; i < length; i++) { view[i] = result.charCodeAt(i); } result = new Blob([buffer]); return result; } catch(error) { return null; } } async _readTextFile(path) { try { let code = ` with open("${path}", "r") as f: print(f.read()) `; let result = await this._repl.execRawMode(code); if (await this._checkReplErrors()) { return null; } // Remove last 2 bytes from the result because \r\n is added to the end return result.slice(0, -2); } catch(error) { return null; } } // Read a file from the device async readFile(path, raw=false) { let result; this._repl.terminalOutput = DEBUG; if (raw) { result = await this._readRawFile(path); } else { result = await this._readTextFile(path); } this._repl.terminalOutput = true; return result; } // List files using paste mode on the device returning the result as a javascript array // We need the file name, whether or not it is a directory, file size and file date async listDir(path) { this._repl.terminalOutput = DEBUG; // Mask sure path has a trailing slash if (path[path.length - 1] != "/") { path += "/"; } let code = ` import os import time contents = os.listdir("${path}") for item in contents: result = os.stat("${path}" + item) print(item, result[0], result[6], result[9]) `; const result = await this._repl.execRawMode(code); let contents = []; if (!result) { return contents; } for (let line of result.split("\n")) { let [name, isDir, fileSize, fileDate] = line.split(" "); contents.push({ path: name, isDir: isDir == TYPE_DIR, fileSize: parseInt(fileSize), fileDate: parseInt(fileDate) * 1000, }); } this._repl.terminalOutput = true; return contents; } async isReadOnly() { this._repl.terminalOutput = DEBUG; // MicroPython doesn't have storage, but also doesn't have a CIRCUITPY drive let code = ` try: import storage print(storage.getmount("/").readonly) except: print(False) `; let result = await this._repl.execRawMode(code); let isReadOnly = result.match("True") != null; this._repl.terminalOutput = true; return isReadOnly; } async makeDir(path, modificationTime=null) { await this._checkReadOnly(); this._repl.terminalOutput = DEBUG; let code = `os.mkdir("${path}")\n`; if (modificationTime) { code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`; } await this._repl.execRawMode(code); this._checkReplErrors(); this._repl.terminalOutput = true; } async delete(path) { await this._checkReadOnly(); this._repl.terminalOutput = DEBUG; let code = ` import os stat = os.stat("${path}") if stat[0] == ${TYPE_FILE}: os.remove("${path}") else: os.rmdir("${path}") `; await this._repl.execRawMode(code); this._checkReplErrors(); this._repl.terminalOutput = true; } async move(oldPath, newPath) { await this._checkReadOnly(); // we need to check if the new path already exists // Return true on success and false on failure this._repl.terminalOutput = DEBUG; let code = ` import os os.rename("${oldPath}", "${newPath}") `; await this._repl.execRawMode(code); let error = this._checkReplErrors(); this._repl.terminalOutput = true; return !error; } } class InputBuffer { constructor() { this._buffer = ""; this._pointer = 0; this.lineEnding = LINE_ENDING_CRLF; } append(data) { this._buffer += data; } get() { return this._buffer; } clear() { this._buffer = ""; this._pointer = 0; } readLine(advancePointer = true) { let lines = this.getLines(); if (this._buffer.slice(this._pointer).length == 0) { return null; } if (advancePointer) { this._pointer += lines[0].length + this.lineEnding.length; } return lines[0]; } readLastLine() { let lines = this.getLines(); if (this._buffer.slice(this._pointer).length == 0) { return null; } return lines[lines.length - 1]; } getRemainingBuffer() { // Let the result contain a slice of the buffer from the pointer to the end let result = this._buffer.slice(this._pointer); this._pointer += result.length; return result; } readExactly(byteCount) { let bytes = this._buffer.slice(this._pointer, this._pointer + byteCount); this._pointer += byteCount; return bytes; } readUntil(value) { // Read bytes using until the value is found let currentOffset = this._pointer; let bytes = this.readExactly(1); let newByte = ' '; // Continue to read 1 byte at a time until the last x bytes match the value while (!bytes.match(value) && newByte.length > 0) { newByte = this.readExactly(1); bytes += newByte; } if (newByte.length == 0) { // Buffer end reached, reset the prompt check pointer to the end this._pointer = currentOffset; return false; } return true; } movePointer(offset) { if (offset < this._pointer) { return; } else if (offset > this._buffer.length) { offset = this._buffer.length; } this._pointer = offset; } getLines(allLines = false) { let buffer = this._buffer; if (!allLines) { buffer = buffer.slice(this._pointer); } return buffer.split(this.lineEnding); } } export class REPL { constructor() { this._pythonCodeRunning = false; this._codeOutput = ''; this._errorOutput = ''; this._serialInputBuffer = new InputBuffer(); this._checkingPrompt = false; this._titleMode = false; this.promptTimeout = PROMPT_TIMEOUT; this.promptCheckInterval = PROMPT_CHECK_INTERVAL; this.title = ''; this.serialTransmit = null; this._inputLineEnding = LINE_ENDING_CRLF; // The line ending the REPL returns this._outputLineEnding = LINE_ENDING_LF; // The line ending for the code result this._tokenQueue = []; this._mode = null; this._codeCheckPointer = 0; // Used for looking at code output this._promptCheckPointer = 0; // Used for looking at prompt output/control characters this._checkpointCount = 0; this._rawByteCount = 0; this.terminalOutput = true; } //// Placeholder Functions //// setTitle(title, append=false) { return; } writeToTerminal(data) { return; } //// Utility Functions //// _writeToTerminal(data) { if (this.terminalOutput) { this.writeToTerminal(data); } } _sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } _timeout(callback, ms) { return Promise.race([callback(), this._sleep(ms).then(() => {throw Error("Timed Out");})]); } _regexEscape(regexString) { return regexString.replace(/\\/, "\\\\"); } // Split a string up by full title start and end character sequences _tokenize(string) { const tokenRegex = new RegExp("(" + this._regexEscape(CHAR_TITLE_START) + "|" + this._regexEscape(CHAR_TITLE_END) + ")", "gi"); return string.split(tokenRegex); } // Check if a chunk of data has a partial title start/end character sequence at the end _hasPartialToken(chunk) { const partialToken = /\\x1b(?:\](?:0"?)?)?$/gi; return partialToken.test(chunk); } _parseTitleInfo(regex) { if (this.title) { let matches = this.title.match(regex); if (matches && matches.length >= 2) { return matches[1]; } } return null; } async _detectCurrentMode() { // Go through the buffer and detect the last mode change let buffer = this._serialInputBuffer.get(); const rawModRegex = new RegExp(REGEX_PROMPT_RAW_MODE, 'g'); const normalModRegex = new RegExp(REGEX_PROMPT_NORMAL_MODE, 'g'); let lastRawPosition = this._findLastRegexPosition(rawModRegex, buffer); let lastNormalPosition = this._findLastRegexPosition(normalModRegex, buffer); if (lastRawPosition > lastNormalPosition) { this._mode = MODE_RAW; this._serialInputBuffer.movePointer(lastRawPosition); } else if (lastNormalPosition > lastRawPosition) { this._mode = MODE_NORMAL; this._serialInputBuffer.movePointer(lastNormalPosition); } // If no mode changes detected, we will assume normal mode with code running if (!this._mode) { await this.softRestart(); await this.serialTransmit(CHAR_CTRL_C); await this._sleep(1000); } } _findLastRegexPosition(regex, str) { let match; let lastPosition = -1; // Reset regex.lastIndex to start from the end of the string regex.lastIndex = 0; // Using a loop to find all matches while ((match = regex.exec(str)) !== null) { lastPosition = match.index; // Move to the next position after the match regex.lastIndex = match.index + 1; } return lastPosition; } _lineIsPrompt(prompt_regex) { let currentLine = this._serialInputBuffer.readLastLine(); if (!currentLine) { return false; } return currentLine.match(prompt_regex); } _currentLineIsNormalPrompt() { return this._lineIsPrompt(/>>> $/); } async _checkCodeRunning() { await this._detectCurrentMode(); if (DEBUG) { console.log("Checking if code is running in " + modes[this._mode]); } if (this._mode == MODE_RAW) { // In raw mode, we simply need to look for OK // Then we should store the results in the code output // The next bytes should be 1 of the following: // We receive OK, followed by code output, followed by Ctrl-D, followed by error output, followed by Ctrl-D // or we receive an error message let bytes = this._serialInputBuffer.getRemainingBuffer(); this._rawByteCount += bytes.length; if (this._rawByteCount >= 2) { while (bytes.length > 0) { if (this._checkpointCount == 0) { if (bytes.slice(0, 2).match("OK")) { this._checkpointCount++; bytes = bytes.slice(2); } else if (bytes.slice(0, 2).match("ra")) { if (DEBUG) { console.log("Unexpected bytes encountered. " + bytes); } return; } else { console.error("Unexpected output in raw mode: " + bytes); return; } } else { if (bytes.slice(0, 1).match(CHAR_CTRL_D)) { this._checkpointCount++; //console.log("Checkpoint Count: " + this._checkpointCount); } else { if (this._checkpointCount == 1) { // Code Output this._codeOutput += bytes.slice(0, 1); //console.log("Code Output: " + bytes.slice(0,1)); } else if (this._checkpointCount == 2) { // Error Output this._errorOutput += bytes.slice(0, 1); //console.log("Error: " + bytes.slice(0,1)); } else if (this._checkpointCount >= 2) { // We're done this._pythonCodeRunning = false; } } bytes = bytes.slice(1); // Remove the first byte } } } return; } // In normal mode, we need to look for the prompt if (!!this._currentLineIsNormalPrompt()) { if (DEBUG) { console.log("REPL at Normal Mode prompt"); } this._pythonCodeRunning = false; } } _decodeError(rawError) { // Errors are typically 3 lines long let errorLines = rawError.split(this._inputLineEnding); let error = { file: null, line: null, type: null, message: null, errno: null, }; if (errorLines.length > 0) { error.file = errorLines[1].match(/File "(.*)"/)[1]; error.line = parseInt(errorLines[1].match(/line (\d+)/)[1]); error.type = errorLines[2].match(/(.+?):/)[1]; error.message = errorLines[2].match(/: (.+)$/)[1]; if (error.type == "OSError") { error.errno = parseInt(error.message.match(/\[Errno (\d+)\]/)[1]); error.message = error.message.replace(/\[Errno \d+\] /, ''); } } error.raw = rawError; return error; } async _readUntil(value, timeout=5000) { // Call readUntil in the SerialInputBuffer, but with a timeout wrapper try { await this._timeout( async () => { while (!this._serialInputBuffer.readUntil(value)) { await this._sleep(100); } }, timeout ); } catch (error) { return false; } return true; } async _waitForCodeExecution(codeTimeoutMs=CODE_EXECUTION_TIMEOUT) { // Wait for the code to finish running, so we can capture the output if (DEBUG) { console.log("Waiting for code execution"); } if (codeTimeoutMs) { try { await this._timeout( async () => { while (this._pythonCodeRunning) { await this._checkCodeRunning(); await this._sleep(100); } }, codeTimeoutMs ); } catch (error) { console.error("Code timed out."); } } else { // Run without timeout while (this._pythonCodeRunning) { await this._sleep(100); } } } async _waitForModeChange(mode, keySequence=null) { if (DEBUG) { console.log("Waiting for mode change from " + modes[this._mode] + " to " + modes[mode]); } try { await this._timeout( async () => { while (this._mode != mode) { if (keySequence) { await this.serialTransmit(keySequence); } await this._detectCurrentMode(); await this._sleep(100); } }, 1000 ); } catch (error) { console.log("Awaiting mode change timed out."); } } // Raw mode allows code execution without echoing back to the terminal async _enterRawMode() { if (this._mode == MODE_RAW) { await this._exitRawMode(); } await this._waitForModeChange(MODE_RAW, CHAR_CTRL_A); } async _exitRawMode() { if (this._mode != MODE_RAW) { return; } // Wait for >>> to be displayed await this._waitForModeChange(MODE_NORMAL, CHAR_CTRL_B); } async _processQueuedTokens() { if (this._processing) { return; } this._processing = true; while (this._tokenQueue.length) { await this._processToken(this._tokenQueue.shift()); } this._processing = false; } // Handle Title setting and add to the serial input buffer async _processToken(token) { if (token == CHAR_TITLE_START) { this._titleMode = true; this._setTitle(""); } else if (token == CHAR_TITLE_END) { this._titleMode = false; } else if (this._titleMode) { this._setTitle(token, true); // Fix duplicate Title charactes let snakeIndex = this.title.indexOf(CHAR_SNAKE); if (snakeIndex > -1) { this._setTitle(this.title.slice(snakeIndex)); } } this._serialInputBuffer.append(token); this._writeToTerminal(token); } //// External Functions //// _setTitle(title, append=false) { if (append) { title = this.title + title; } this.title = title; this.setTitle(title, append); } async _serialTransmit(msg) { if (!this.serialTransmit) { console.error("Default serial transmit function called. Message: " + msg); throw new Error("REPL serialTransmit must be connected to an external transmit function"); } else { return await this.serialTransmit(msg); } } //// Public Functions //// async onSerialReceive(e) { // We tokenize the serial data to handle special character sequences (currently titles only) // We don't want to modify e.data, so we make a copy of it let data = e.data; // Prepend a partial token if it exists if (this._partialToken) { data = this._partialToken + data; this._partialToken = null; } // Tokenize the larger string and send to the parent let tokens = this._tokenize(data); // Remove any partial tokens and store for the next serial data receive if (tokens.length && this._hasPartialToken(tokens.slice(-1))) { this._partialToken = tokens.pop(); } // Send only full tokens to the token queue for (let token of tokens) { this._tokenQueue.push(token); } await this._processQueuedTokens(); } // Allows for supplied python code to be run on the device via the REPL in normal mode async runCode(code, codeTimeoutMs=CODE_EXECUTION_TIMEOUT) { this.terminalOutput = DEBUG; await this.getToPrompt(); let result = this.execRawMode(code + LINE_ENDING_LF, codeTimeoutMs); this.terminalOutput = true; return result; } async softRestart() { await this.serialTransmit(CHAR_CTRL_D); } async interruptCode() { if (DEBUG) { console.log("Interrupting code"); } this._pythonCodeRunning = true; // Wait for code to be interrupted try { await this._timeout( async () => { while (this._pythonCodeRunning) { await this.serialTransmit(CHAR_CTRL_C); await this._checkCodeRunning(); await this._sleep(200); } }, CODE_INTERRUPT_TIMEOUT ); } catch (error) { console.log("Awaiting code interruption timed out. Restarting device."); // Can't determine the state, so restart device await this.softRestart(); await this.serialTransmit(CHAR_CTRL_C); return false; } } async waitForPrompt() { this._pythonCodeRunning = true; // Wait for a prompt try { await this._timeout( async () => { while (this._pythonCodeRunning) { await this.getToPrompt(); await this._sleep(100); } }, this.promptTimeout ); } catch (error) { console.error("Awaiting prompt timed out."); return false; } return true; } async getToPrompt() { // Attempt to figure out the current mode and change it if needed while (!this._mode) { await this._detectCurrentMode(); } // these will exit Raw Paste Mode or Raw mode if needed, otherwise they do nothing await this._exitRawMode(); // We use GetToPrompt to ensure we are at a known place before running code // This will get from Paste Mode or Running App to Normal Prompt await this.interruptCode(); } async execRawMode(code) { await this._enterRawMode(); if (this._readUntil(REGEX_PROMPT_RAW_MODE)) { this._readUntil(">"); // Read until we get to the prompt } await this.serialTransmit(code); // Execute the code await this.serialTransmit(CHAR_CTRL_D); this._checkpointCount = 0; this._rawByteCount = 0; this._pythonCodeRunning = true; this._codeOutput = ''; this._errorOutput = ''; await this._waitForCodeExecution(); await this._exitRawMode(); return this._codeOutput; } getCodeOutput() { return this._codeOutput; } getErrorOutput(raw = false) { if (raw) { return this._errorOutput; } if (!this._errorOutput) { return null; } return this._decodeError(this._errorOutput); } getVersion() { return this._parseTitleInfo(/\| REPL \| (.*)$/); } getIpAddress() { return this._parseTitleInfo(/((?:\d{1,3}\.){3}\d{1,3})/); } setLineEnding(lineEnding) { if (lineEnding != LINE_ENDING_CRLF && lineEnding != LINE_ENDING_LF) { throw new Error("Line ending expected to be either be LINE_ENDING_CRLF or LINE_ENDING_LF") } this._outputLineEnding = lineEnding; } }