circuitpython-repl-js/repl.js
2024-07-17 12:03:13 -07:00

884 lines
No EOL
27 KiB
JavaScript

const CHAR_CTRL_A = '\x01';
const CHAR_CTRL_B = '\x02';
const CHAR_CTRL_C = '\x03';
const CHAR_CTRL_D = '\x04';
const CHAR_TITLE_START = "\x1b]0;";
const CHAR_TITLE_END = "\x1b\\";
const CHAR_SNAKE = "🐍";
const MODE_NORMAL = 1;
const MODE_RAW = 2;
const TYPE_DIR = 16384;
const TYPE_FILE = 32768;
const DEBUG = false;
export const LINE_ENDING_CRLF = "\r\n";
export const LINE_ENDING_LF = "\n";
// Default timeouts in milliseconds (can be overridden with properties)
const PROMPT_TIMEOUT = 20000;
const CODE_EXECUTION_TIMEOUT = 15000;
const CODE_INTERRUPT_TIMEOUT = 5000;
const PROMPT_CHECK_INTERVAL = 50;
const REGEX_PROMPT_RAW_MODE = /raw REPL; CTRL-B to exit/;
const REGEX_PROMPT_NORMAL_MODE = />>> /;
const modes = [
"Unknown",
"Normal",
"Raw",
];
// Class to use python code to get file information
// We want to do stuff like writing files, reading files, and listing files
export class FileOps {
constructor(repl, checkReadOnly=true) {
this._repl = repl;
this._isReadOnly = null;
this._doCheckReadOnly = checkReadOnly;
}
async _checkReadOnly() {
if (!this._doCheckReadOnly) {
return;
}
if (this._isReadOnly == null) {
this._isReadOnly = await this.isReadOnly();
}
if (this._isReadOnly()) {
throw new Error("File System is Read Only. Try disabling or ejecting the drive.");
}
}
async _checkReplErrors() {
let error = this._repl.getErrorOutput();
if (error) {
console.error("Python Error - " + error.type + ": " + error.message);
if (error.type == "OSError" && error.errno == 30) {
this._isReadOnly = true;
// Throw an error if needed
await this._checkReadOnly();
}
}
return error;
}
// Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary
async _writeRawFile(path, contents, offset=0, modificationTime=null) {
let byteString = "";
// Contents needs to be converted from a ArrayBuffer to a byte string
let view = new Uint8Array(contents);
for (let byte of view) {
byteString += String.fromCharCode(byte);
}
contents = btoa(byteString); // Convert binary string to base64
let code = `
import binascii
with open("${path}", "wb") as f:
f.seek(${offset})
byte_string = binascii.a2b_base64("${contents}")
f.write(byte_string)
`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawMode(code);
}
async _writeTextFile(path, contents, offset=0, modificationTime=null) {
// The contents needs to be converted from a UInt8Array to a string
contents = String.fromCharCode.apply(null, contents);
contents = contents.replace(/"/g, '\\"');
let code = `
with open("${path}", "w") as f:
f.seek(${offset})
f.write("""${contents}""")
`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawMode(code);
}
// Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary
async writeFile(path, contents, offset=0, modificationTime=null, raw=false) {
this._repl.terminalOutput = DEBUG;
if (raw) {
await this._writeRawFile(path, contents, offset, modificationTime);
} else {
await this._writeTextFile(path, contents, offset, modificationTime);
}
this._repl.terminalOutput = true;
}
async _readRawFile(path) {
try {
let code = `
import binascii
with open("${path}", "rb") as f:
byte_string = f.read()
print(binascii.b2a_base64(byte_string, False))
`;
let result = await this._repl.execRawMode(code);
if (this._checkReplErrors()) {
return null;
}
// strip the b, ending newline, and quotes from the beginning and end
result = result.slice(2, -3);
// convert the base64 string to an ArrayBuffer. Each byte of the Array buffer is a byte of the file with a value between 0-255
result = atob(result); // Convert base64 to binary string
let length = result.length;
let buffer = new ArrayBuffer(length);
let view = new Uint8Array(buffer);
for (let i = 0; i < length; i++) {
view[i] = result.charCodeAt(i);
}
result = new Blob([buffer]);
return result;
} catch(error) {
return null;
}
}
async _readTextFile(path) {
try {
let code = `
with open("${path}", "r") as f:
print(f.read())
`;
let result = await this._repl.execRawMode(code);
if (await this._checkReplErrors()) {
return null;
}
// Remove last 2 bytes from the result because \r\n is added to the end
return result.slice(0, -2);
} catch(error) {
return null;
}
}
// Read a file from the device
async readFile(path, raw=false) {
let result;
this._repl.terminalOutput = DEBUG;
if (raw) {
result = await this._readRawFile(path);
} else {
result = await this._readTextFile(path);
}
this._repl.terminalOutput = true;
return result;
}
// List files using paste mode on the device returning the result as a javascript array
// We need the file name, whether or not it is a directory, file size and file date
async listDir(path) {
this._repl.terminalOutput = DEBUG;
// Mask sure path has a trailing slash
if (path[path.length - 1] != "/") {
path += "/";
}
let code = `
import os
import time
contents = os.listdir("${path}")
for item in contents:
result = os.stat("${path}" + item)
print(item, result[0], result[6], result[9])
`;
const result = await this._repl.execRawMode(code);
let contents = [];
if (!result) {
return contents;
}
for (let line of result.split("\n")) {
let [name, isDir, fileSize, fileDate] = line.split(" ");
contents.push({
path: name,
isDir: isDir == TYPE_DIR,
fileSize: parseInt(fileSize),
fileDate: parseInt(fileDate) * 1000,
});
}
this._repl.terminalOutput = true;
return contents;
}
async isReadOnly() {
this._repl.terminalOutput = DEBUG;
// MicroPython doesn't have storage, but also doesn't have a CIRCUITPY drive
let code = `
try:
import storage
print(storage.getmount("/").readonly)
except:
print(False)
`;
let result = await this._repl.execRawMode(code);
let isReadOnly = result.match("True") != null;
this._repl.terminalOutput = true;
return isReadOnly;
}
async makeDir(path, modificationTime=null) {
await this._checkReadOnly();
this._repl.terminalOutput = DEBUG;
let code = `os.mkdir("${path}")\n`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawMode(code);
this._checkReplErrors();
this._repl.terminalOutput = true;
}
async delete(path) {
await this._checkReadOnly();
this._repl.terminalOutput = DEBUG;
let code = `
import os
stat = os.stat("${path}")
if stat[0] == ${TYPE_FILE}:
os.remove("${path}")
else:
os.rmdir("${path}")
`;
await this._repl.execRawMode(code);
this._checkReplErrors();
this._repl.terminalOutput = true;
}
async move(oldPath, newPath) {
await this._checkReadOnly();
// we need to check if the new path already exists
// Return true on success and false on failure
this._repl.terminalOutput = DEBUG;
let code = `
import os
os.rename("${oldPath}", "${newPath}")
`;
await this._repl.execRawMode(code);
let error = this._checkReplErrors();
this._repl.terminalOutput = true;
return !error;
}
}
class InputBuffer {
constructor() {
this._buffer = "";
this._pointer = 0;
this.lineEnding = LINE_ENDING_CRLF;
}
append(data) {
this._buffer += data;
}
get() {
return this._buffer;
}
clear() {
this._buffer = "";
this._pointer = 0;
}
readLine(advancePointer = true) {
let lines = this.getLines();
if (this._buffer.slice(this._pointer).length == 0) {
return null;
}
if (advancePointer) {
this._pointer += lines[0].length + this.lineEnding.length;
}
return lines[0];
}
readLastLine() {
let lines = this.getLines();
if (this._buffer.slice(this._pointer).length == 0) {
return null;
}
return lines[lines.length - 1];
}
getRemainingBuffer() {
// Let the result contain a slice of the buffer from the pointer to the end
let result = this._buffer.slice(this._pointer);
this._pointer += result.length;
return result;
}
readExactly(byteCount) {
let bytes = this._buffer.slice(this._pointer, this._pointer + byteCount);
this._pointer += byteCount;
return bytes;
}
readUntil(value) {
// Read bytes using until the value is found
let currentOffset = this._pointer;
let bytes = this.readExactly(1);
let newByte = ' ';
// Continue to read 1 byte at a time until the last x bytes match the value
while (!bytes.match(value) && newByte.length > 0) {
newByte = this.readExactly(1);
bytes += newByte;
}
if (newByte.length == 0) {
// Buffer end reached, reset the prompt check pointer to the end
this._pointer = currentOffset;
return false;
}
return true;
}
movePointer(offset) {
if (offset < this._pointer) {
return;
} else if (offset > this._buffer.length) {
offset = this._buffer.length;
}
this._pointer = offset;
}
getLines(allLines = false) {
let buffer = this._buffer;
if (!allLines) {
buffer = buffer.slice(this._pointer);
}
return buffer.split(this.lineEnding);
}
}
export class REPL {
constructor() {
this._pythonCodeRunning = false;
this._codeOutput = '';
this._errorOutput = '';
this._serialInputBuffer = new InputBuffer();
this._checkingPrompt = false;
this._titleMode = false;
this.promptTimeout = PROMPT_TIMEOUT;
this.promptCheckInterval = PROMPT_CHECK_INTERVAL;
this.title = '';
this.serialTransmit = null;
this._inputLineEnding = LINE_ENDING_CRLF; // The line ending the REPL returns
this._outputLineEnding = LINE_ENDING_LF; // The line ending for the code result
this._tokenQueue = [];
this._mode = null;
this._codeCheckPointer = 0; // Used for looking at code output
this._promptCheckPointer = 0; // Used for looking at prompt output/control characters
this._checkpointCount = 0;
this._rawByteCount = 0;
this.terminalOutput = true;
}
//// Placeholder Functions ////
setTitle(title, append=false) {
return;
}
writeToTerminal(data) {
return;
}
//// Utility Functions ////
_writeToTerminal(data) {
if (this.terminalOutput) {
this.writeToTerminal(data);
}
}
_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
_timeout(callback, ms) {
return Promise.race([callback(), this._sleep(ms).then(() => {throw Error("Timed Out");})]);
}
_regexEscape(regexString) {
return regexString.replace(/\\/, "\\\\");
}
// Split a string up by full title start and end character sequences
_tokenize(string) {
const tokenRegex = new RegExp("(" + this._regexEscape(CHAR_TITLE_START) + "|" + this._regexEscape(CHAR_TITLE_END) + ")", "gi");
return string.split(tokenRegex);
}
// Check if a chunk of data has a partial title start/end character sequence at the end
_hasPartialToken(chunk) {
const partialToken = /\\x1b(?:\](?:0"?)?)?$/gi;
return partialToken.test(chunk);
}
_parseTitleInfo(regex) {
if (this.title) {
let matches = this.title.match(regex);
if (matches && matches.length >= 2) {
return matches[1];
}
}
return null;
}
async _detectCurrentMode() {
// Go through the buffer and detect the last mode change
let buffer = this._serialInputBuffer.get();
const rawModRegex = new RegExp(REGEX_PROMPT_RAW_MODE, 'g');
const normalModRegex = new RegExp(REGEX_PROMPT_NORMAL_MODE, 'g');
let lastRawPosition = this._findLastRegexPosition(rawModRegex, buffer);
let lastNormalPosition = this._findLastRegexPosition(normalModRegex, buffer);
if (lastRawPosition > lastNormalPosition) {
this._mode = MODE_RAW;
this._serialInputBuffer.movePointer(lastRawPosition);
} else if (lastNormalPosition > lastRawPosition) {
this._mode = MODE_NORMAL;
this._serialInputBuffer.movePointer(lastNormalPosition);
}
// If no mode changes detected, we will assume normal mode with code running
if (!this._mode) {
await this.softRestart();
await this.serialTransmit(CHAR_CTRL_C);
await this._sleep(1000);
}
}
_findLastRegexPosition(regex, str) {
let match;
let lastPosition = -1;
// Reset regex.lastIndex to start from the end of the string
regex.lastIndex = 0;
// Using a loop to find all matches
while ((match = regex.exec(str)) !== null) {
lastPosition = match.index;
// Move to the next position after the match
regex.lastIndex = match.index + 1;
}
return lastPosition;
}
_lineIsPrompt(prompt_regex) {
let currentLine = this._serialInputBuffer.readLastLine();
if (!currentLine) {
return false;
}
return currentLine.match(prompt_regex);
}
_currentLineIsNormalPrompt() {
return this._lineIsPrompt(/>>> $/);
}
async _checkCodeRunning() {
await this._detectCurrentMode();
if (DEBUG) {
console.log("Checking if code is running in " + modes[this._mode]);
}
if (this._mode == MODE_RAW) {
// In raw mode, we simply need to look for OK
// Then we should store the results in the code output
// The next bytes should be 1 of the following:
// We receive OK, followed by code output, followed by Ctrl-D, followed by error output, followed by Ctrl-D
// or we receive an error message
let bytes = this._serialInputBuffer.getRemainingBuffer();
this._rawByteCount += bytes.length;
if (this._rawByteCount >= 2) {
while (bytes.length > 0) {
if (this._checkpointCount == 0) {
if (bytes.slice(0, 2).match("OK")) {
this._checkpointCount++;
bytes = bytes.slice(2);
} else if (bytes.slice(0, 2).match("ra")) {
if (DEBUG) {
console.log("Unexpected bytes encountered. " + bytes);
}
return;
} else {
console.error("Unexpected output in raw mode: " + bytes);
return;
}
} else {
if (bytes.slice(0, 1).match(CHAR_CTRL_D)) {
this._checkpointCount++;
//console.log("Checkpoint Count: " + this._checkpointCount);
} else {
if (this._checkpointCount == 1) {
// Code Output
this._codeOutput += bytes.slice(0, 1);
//console.log("Code Output: " + bytes.slice(0,1));
} else if (this._checkpointCount == 2) {
// Error Output
this._errorOutput += bytes.slice(0, 1);
//console.log("Error: " + bytes.slice(0,1));
} else if (this._checkpointCount >= 2) {
// We're done
this._pythonCodeRunning = false;
}
}
bytes = bytes.slice(1); // Remove the first byte
}
}
}
return;
}
// In normal mode, we need to look for the prompt
if (!!this._currentLineIsNormalPrompt()) {
if (DEBUG) {
console.log("REPL at Normal Mode prompt");
}
this._pythonCodeRunning = false;
}
}
_decodeError(rawError) {
// Errors are typically 3 lines long
let errorLines = rawError.split(this._inputLineEnding);
let error = {
file: null,
line: null,
type: null,
message: null,
errno: null,
};
if (errorLines.length > 0) {
error.file = errorLines[1].match(/File "(.*)"/)[1];
error.line = parseInt(errorLines[1].match(/line (\d+)/)[1]);
error.type = errorLines[2].match(/(.+?):/)[1];
error.message = errorLines[2].match(/: (.+)$/)[1];
if (error.type == "OSError") {
error.errno = parseInt(error.message.match(/\[Errno (\d+)\]/)[1]);
error.message = error.message.replace(/\[Errno \d+\] /, '');
}
}
error.raw = rawError;
return error;
}
async _readUntil(value, timeout=5000) {
// Call readUntil in the SerialInputBuffer, but with a timeout wrapper
try {
await this._timeout(
async () => {
while (!this._serialInputBuffer.readUntil(value)) {
await this._sleep(100);
}
}, timeout
);
} catch (error) {
return false;
}
return true;
}
async _waitForCodeExecution(codeTimeoutMs=CODE_EXECUTION_TIMEOUT) {
// Wait for the code to finish running, so we can capture the output
if (DEBUG) {
console.log("Waiting for code execution");
}
if (codeTimeoutMs) {
try {
await this._timeout(
async () => {
while (this._pythonCodeRunning) {
await this._checkCodeRunning();
await this._sleep(100);
}
}, codeTimeoutMs
);
} catch (error) {
console.error("Code timed out.");
}
} else {
// Run without timeout
while (this._pythonCodeRunning) {
await this._sleep(100);
}
}
}
async _waitForModeChange(mode, keySequence=null) {
if (DEBUG) {
console.log("Waiting for mode change from " + modes[this._mode] + " to " + modes[mode]);
}
try {
await this._timeout(
async () => {
while (this._mode != mode) {
if (keySequence) {
await this.serialTransmit(keySequence);
}
await this._detectCurrentMode();
await this._sleep(100);
}
}, 1000
);
} catch (error) {
console.log("Awaiting mode change timed out.");
}
}
// Raw mode allows code execution without echoing back to the terminal
async _enterRawMode() {
if (this._mode == MODE_RAW) {
await this._exitRawMode();
}
await this._waitForModeChange(MODE_RAW, CHAR_CTRL_A);
}
async _exitRawMode() {
if (this._mode != MODE_RAW) {
return;
}
// Wait for >>> to be displayed
await this._waitForModeChange(MODE_NORMAL, CHAR_CTRL_B);
}
async _processQueuedTokens() {
if (this._processing) {
return;
}
this._processing = true;
while (this._tokenQueue.length) {
await this._processToken(this._tokenQueue.shift());
}
this._processing = false;
}
// Handle Title setting and add to the serial input buffer
async _processToken(token) {
if (token == CHAR_TITLE_START) {
this._titleMode = true;
this._setTitle("");
} else if (token == CHAR_TITLE_END) {
this._titleMode = false;
} else if (this._titleMode) {
this._setTitle(token, true);
// Fix duplicate Title charactes
let snakeIndex = this.title.indexOf(CHAR_SNAKE);
if (snakeIndex > -1) {
this._setTitle(this.title.slice(snakeIndex));
}
}
this._serialInputBuffer.append(token);
this._writeToTerminal(token);
}
//// External Functions ////
_setTitle(title, append=false) {
if (append) {
title = this.title + title;
}
this.title = title;
this.setTitle(title, append);
}
async _serialTransmit(msg) {
if (!this.serialTransmit) {
console.error("Default serial transmit function called. Message: " + msg);
throw new Error("REPL serialTransmit must be connected to an external transmit function");
} else {
return await this.serialTransmit(msg);
}
}
//// Public Functions ////
async onSerialReceive(e) {
// We tokenize the serial data to handle special character sequences (currently titles only)
// We don't want to modify e.data, so we make a copy of it
let data = e.data;
// Prepend a partial token if it exists
if (this._partialToken) {
data = this._partialToken + data;
this._partialToken = null;
}
// Tokenize the larger string and send to the parent
let tokens = this._tokenize(data);
// Remove any partial tokens and store for the next serial data receive
if (tokens.length && this._hasPartialToken(tokens.slice(-1))) {
this._partialToken = tokens.pop();
}
// Send only full tokens to the token queue
for (let token of tokens) {
this._tokenQueue.push(token);
}
await this._processQueuedTokens();
}
// Allows for supplied python code to be run on the device via the REPL in normal mode
async runCode(code, codeTimeoutMs=CODE_EXECUTION_TIMEOUT) {
this.terminalOutput = DEBUG;
await this.getToPrompt();
let result = this.execRawMode(code + LINE_ENDING_LF, codeTimeoutMs);
this.terminalOutput = true;
return result;
}
async softRestart() {
await this.serialTransmit(CHAR_CTRL_D);
}
async interruptCode() {
if (DEBUG) {
console.log("Interrupting code");
}
this._pythonCodeRunning = true;
// Wait for code to be interrupted
try {
await this._timeout(
async () => {
while (this._pythonCodeRunning) {
await this.serialTransmit(CHAR_CTRL_C);
await this._checkCodeRunning();
await this._sleep(200);
}
}, CODE_INTERRUPT_TIMEOUT
);
} catch (error) {
console.log("Awaiting code interruption timed out. Restarting device.");
// Can't determine the state, so restart device
await this.softRestart();
await this.serialTransmit(CHAR_CTRL_C);
return false;
}
}
async waitForPrompt() {
this._pythonCodeRunning = true;
// Wait for a prompt
try {
await this._timeout(
async () => {
while (this._pythonCodeRunning) {
await this.getToPrompt();
await this._sleep(100);
}
}, this.promptTimeout
);
} catch (error) {
console.error("Awaiting prompt timed out.");
return false;
}
return true;
}
async getToPrompt() {
// Attempt to figure out the current mode and change it if needed
while (!this._mode) {
await this._detectCurrentMode();
}
// these will exit Raw Paste Mode or Raw mode if needed, otherwise they do nothing
await this._exitRawMode();
// We use GetToPrompt to ensure we are at a known place before running code
// This will get from Paste Mode or Running App to Normal Prompt
await this.interruptCode();
}
async execRawMode(code) {
await this._enterRawMode();
if (this._readUntil(REGEX_PROMPT_RAW_MODE)) {
this._readUntil(">"); // Read until we get to the prompt
}
await this.serialTransmit(code);
// Execute the code
await this.serialTransmit(CHAR_CTRL_D);
this._checkpointCount = 0;
this._rawByteCount = 0;
this._pythonCodeRunning = true;
this._codeOutput = '';
this._errorOutput = '';
await this._waitForCodeExecution();
await this._exitRawMode();
return this._codeOutput;
}
getCodeOutput() {
return this._codeOutput;
}
getErrorOutput(raw = false) {
if (raw) {
return this._errorOutput;
}
if (!this._errorOutput) {
return null;
}
return this._decodeError(this._errorOutput);
}
getVersion() {
return this._parseTitleInfo(/\| REPL \| (.*)$/);
}
getIpAddress() {
return this._parseTitleInfo(/((?:\d{1,3}\.){3}\d{1,3})/);
}
setLineEnding(lineEnding) {
if (lineEnding != LINE_ENDING_CRLF && lineEnding != LINE_ENDING_LF) {
throw new Error("Line ending expected to be either be LINE_ENDING_CRLF or LINE_ENDING_LF")
}
this._outputLineEnding = lineEnding;
}
}