circuitpython-repl-js/repl.js
2024-06-14 18:59:57 -07:00

844 lines
No EOL
28 KiB
JavaScript

const CHAR_CTRL_A = '\x01';
const CHAR_CTRL_B = '\x02';
const CHAR_CTRL_C = '\x03';
const CHAR_CTRL_D = '\x04';
const CHAR_CTRL_E = '\x05';
const CHAR_CRLF = '\x0a\x0d';
const CHAR_BKSP = '\x08';
const CHAR_TITLE_START = "\x1b]0;";
const CHAR_TITLE_END = "\x1b\\";
const CHAR_RAW_PASTE_UNSUPPORTED = "R\x00";
const CHAR_RAW_PASTE_SUPPORTED = "R\x01";
const REGEX_RAW_PASTE_RESPONSE = /R[\x00\x01]..\x01/;
const MODE_NORMAL = 1;
const MODE_RAW = 2;
const MODE_RAWPASTE = 3;
const TYPE_DIR = 16384;
const TYPE_FILE = 32768;
export const LINE_ENDING_CRLF = "\r\n";
export const LINE_ENDING_LF = "\n";
const CONTROL_SEQUENCES = [
REGEX_RAW_PASTE_RESPONSE
];
// TODO: The title occasionally has duplicate characters (such as 2 snakes). Likely the buffer pointers need adjusting.
// TODO: Get Raw Mode working
// TODO: Add parsing for non-english languages (alternatively, only look at symbols that are common across the languages)
// Mostly needed when the terminal echos back the input
const IGNORE_OUTPUT_LINE_PREFIXES = [/^\... /, /^>>> /];
// Default timeouts in milliseconds (can be overridden with properties)
const PROMPT_TIMEOUT = 10000;
const CODE_EXECUTION_TIMEOUT = 15000;
const PROMPT_CHECK_INTERVAL = 50;
const REGEX_PROMPT_RAW_MODE = /raw REPL; CTRL-B to exit/;
// Class to use python code to get file information
// We want to do stuff like writing files, reading files, and listing files
export class FileOps {
constructor(repl, checkReadOnly=true) {
this._repl = repl;
this._isReadOnly = null;
this._doCheckReadOnly = checkReadOnly;
}
async _checkReadOnly() {
if (!this._doCheckReadOnly) {
return;
}
if (this._isReadOnly == null) {
this._isReadOnly = await this.isReadOnly();
}
if (this._isReadOnly()) {
throw new Error("File System is Read Only. Try disabling or ejecting the drive.");
}
}
async _checkReplErrors() {
let error = this._repl.getErrorOutput();
if (error && error.type == "OSError" && error.errno == 30) {
this._isReadOnly = true;
// Throw an error if needed
await this._checkReadOnly();
}
return error;
}
// Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary
async _writeRawFile(path, contents, offset=0, modificationTime=null) {
let byteString = "";
// Contents needs to be converted from a ArrayBuffer to a byte string
let view = new Uint8Array(contents);
for (let byte of view) {
byteString += String.fromCharCode(byte);
}
contents = btoa(byteString); // Convert binary string to base64
let code = `
import binascii
with open("${path}", "wb") as f:
f.seek(${offset})
byte_string = binascii.a2b_base64("${contents}")
f.write(byte_string)
`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawPasteMode(code);
}
async _writeTextFile(path, contents, offset=0, modificationTime=null) {
// The contents needs to be converted from a UInt8Array to a string
contents = String.fromCharCode.apply(null, contents);
contents = contents.replace(/"/g, '\\"');
let code = `
with open("${path}", "w") as f:
f.seek(${offset})
f.write("""${contents}""")
`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawPasteMode(code);
}
// Write a file to the device path with contents beginning at offset. Modification time can be set and if raw is true, contents is written as binary
async writeFile(path, contents, offset=0, modificationTime=null, raw=false) {
this._repl.terminalOutput = false;
if (raw) {
await this._writeRawFile(path, contents, offset, modificationTime);
} else {
await this._writeTextFile(path, contents, offset, modificationTime);
}
this._repl.terminalOutput = true;
}
async _readRawFile(path) {
try {
let code = `
import binascii
with open("${path}", "rb") as f:
byte_string = f.read()
print(binascii.b2a_base64(byte_string, False))
`;
let result = await this._repl.execRawPasteMode(code);
if (this._checkReplErrors()) {
return null;
}
// strip the b, ending newline, and quotes from the beginning and end
result = result.slice(2, -3);
// convert the base64 string to an ArrayBuffer. Each byte of the Array buffer is a byte of the file with a value between 0-255
result = atob(result); // Convert base64 to binary string
let length = result.length;
let buffer = new ArrayBuffer(length);
let view = new Uint8Array(buffer);
for (let i = 0; i < length; i++) {
view[i] = result.charCodeAt(i);
}
result = new Blob([buffer]);
return result;
} catch(error) {
return null;
}
}
async _readTextFile(path) {
try {
let code = `
with open("${path}", "r") as f:
print(f.read())
`;
let result = await this._repl.execRawPasteMode(code);
if (this._checkReplErrors()) {
return null;
}
// Remove last 2 bytes from the result because \r\n is added to the end
return result.slice(0, -2);
} catch(error) {
return null;
}
}
// Read a file from the device
async readFile(path, raw=false) {
let result;
this._repl.terminalOutput = false;
if (raw) {
result = await this._readRawFile(path);
} else {
result = await this._readTextFile(path);
}
this._repl.terminalOutput = true;
return result;
}
// List files using paste mode on the device returning the result as a javascript array
// We need the file name, whether or not it is a directory, file size and file date
async listDir(path) {
this._repl.terminalOutput = false;
// Mask sure path has a trailing slash
if (path[path.length - 1] != "/") {
path += "/";
}
let code = `
import os
import time
contents = os.listdir("${path}")
for item in contents:
result = os.stat("${path}" + item)
print(item, result[0], result[6], result[9])
`;
const result = await this._repl.execRawPasteMode(code);
let contents = [];
if (!result) {
return contents;
}
for (let line of result.split("\n")) {
let [name, isDir, fileSize, fileDate] = line.split(" ");
contents.push({
path: name,
isDir: isDir == TYPE_DIR,
fileSize: parseInt(fileSize),
fileDate: parseInt(fileDate) * 1000,
});
}
this._repl.terminalOutput = true;
return contents;
}
async isReadOnly() {
this._repl.terminalOutput = false;
let code = `
import storage
print(storage.getmount("/").readonly)
`;
let result = await this._repl.execRawPasteMode(code);
let isReadOnly = result.match("True") != null;
this._repl.terminalOutput = true;
return isReadOnly;
}
async makeDir(path, modificationTime=null) {
await this._checkReadOnly();
this._repl.terminalOutput = false;
let code = `os.mkdir("${path}")\n`;
if (modificationTime) {
code += `os.utime("${path}", (os.path.getatime("${path}"), ${modificationTime}))\n`;
}
await this._repl.execRawPasteMode(code);
this._checkReplErrors();
this._repl.terminalOutput = true;
}
async delete(path) {
await this._checkReadOnly();
this._repl.terminalOutput = false;
let code = `
import os
stat = os.stat("${path}")
if stat[0] == ${TYPE_FILE}:
os.remove("${path}")
else:
os.rmdir("${path}")
`;
await this._repl.execRawPasteMode(code);
this._checkReplErrors();
this._repl.terminalOutput = true;
}
async move(oldPath, newPath) {
await this._checkReadOnly();
// we need to check if the new path already exists
// Return true on success and false on failure
this._repl.terminalOutput = false;
let code = `
import os
os.rename("${oldPath}", "${newPath}")
`;
await this._repl.execRawPasteMode(code);
let error = this._checkReplErrors();
this._repl.terminalOutput = true;
return !error;
}
}
export class REPL {
constructor() {
this._pythonCodeRunning = false;
this._codeOutput = '';
this._errorOutput = '';
this._serialInputBuffer = '';
this._checkingPrompt = false;
this._titleMode = false;
this.promptTimeout = PROMPT_TIMEOUT;
this.promptCheckInterval = PROMPT_CHECK_INTERVAL;
this.title = '';
this.serialTransmit = null;
this._inputLineEnding = LINE_ENDING_CRLF; // The line ending the REPL returns
this._outputLineEnding = LINE_ENDING_LF; // The line ending for the code result
this._tokenQueue = [];
this._mode = MODE_NORMAL;
this._codeCheckPointer = 0; // Used for looking at code output
this._promptCheckPointer = 0; // Used for looking at prompt output/control characters
this._ctrlDCount = 0;
this.terminalOutput = true;
}
_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
_timeout(callback, ms) {
return Promise.race([callback(), this._sleep(ms).then(() => {throw Error("Timed Out");})]);
}
_getControlCharBuffer() {
// Return the Serial Buffer from _controlCharPointer to the next line ending and update the position of the pointer
let bufferLines, controlChar = '';
let remainingBuffer = this._serialInputBuffer.slice(this._controlCharPointer);
if (remainingBuffer.includes(this._inputLineEnding)) {
[controlChar, ...bufferLines] = remainingBuffer.split(this._inputLineEnding);
this._controlCharPointer += controlChar.length + this._inputLineEnding.length;
}
return controlChar;
}
_getInputBufferLines() {
return this._serialInputBuffer.split(this._inputLineEnding);
}
_lineIsPrompt(prompt_regex) {
let lines = this._getInputBufferLines();
if (lines.length == 0) {
return false;
}
let lastLine = lines[lines.length - 1];
return lastLine.match(prompt_regex);
}
_checkForModeChange() {
let lines = this._getInputBufferLines();
for (let line of lines) {
if (line.match(REGEX_PROMPT_RAW_MODE)) {
this._mode = MODE_RAW;
}
}
}
_currentLineIsNormalPrompt() {
return this._lineIsPrompt(/>>> $/);
}
_currentLineIsPastePrompt() {
return this._lineIsPrompt(/=== $/);
}
_currentLineIsPrompt() {
if (this._mode == MODE_NORMAL) {
return this._currentLineIsNormalPrompt();
}
return false;
}
_regexEscape(regexString) {
return regexString.replace(/\\/, "\\\\");
}
setLineEnding(lineEnding) {
if (lineEnding != LINE_ENDING_CRLF && lineEnding != LINE_ENDING_LF) {
throw new Error("Line ending expected to be either be LINE_ENDING_CRLF or LINE_ENDING_LF")
}
this._outputLineEnding = lineEnding;
}
// This should help detect lines like ">>> ", but not ">>> 1+1"
async checkPrompt() {
if (this._mode == MODE_RAWPASTE) {
let bytes = this._serialInputBuffer.slice(this._promptCheckPointer);
this._promptCheckPointer += bytes.length;
while (bytes.length > 0) {
if (bytes.slice(0, 1).match(CHAR_CTRL_D)) {
this._ctrlDCount++;
//console.log("CTRL-D Count: " + this._ctrlDCount);
} else {
if (this._ctrlDCount == 1) {
// Code Output
this._codeOutput += bytes.slice(0, 1);
} else if (this._ctrlDCount == 2) {
// Error Output
this._errorOutput += bytes.slice(0, 1);
} else if (this._ctrlDCount > 2) {
// Code is done running
this._pythonCodeRunning = false;
} else if (!this._pythonCodeRunning && bytes.slice(0, 1).match(">")) {
// We're at a prompt
this._mode = MODE_RAW;
return;
}
}
bytes = bytes.slice(1); // Remove the first byte
}
return;
}
// Only allow one instance of this function to run at a time (unless this could cause it to miss a prompt)
if (!this._currentLineIsPrompt()) {
return;
}
// Check again after a short delay to see if it's still a prompt
await this._sleep(this.promptCheckInterval);
if (!this._currentLineIsPrompt()) {
return;
}
this._pythonCodeRunning = false;
}
async waitForPrompt() {
this._pythonCodeRunning = true;
// Wait for a prompt
try {
await this._timeout(
async () => {
while (this._pythonCodeRunning) {
await this.getToPrompt();
await this._sleep(100);
}
}, this.promptTimeout
);
} catch (error) {
console.error("Awaiting prompt timed out.");
return false;
}
return true;
}
async softRestart() {
await this.serialTransmit(CHAR_CTRL_D);
}
async interruptCode() {
await this.serialTransmit(CHAR_CTRL_C);
}
getErrorOutput(raw = false) {
if (raw) {
return this._errorOutput;
}
if (!this._errorOutput) {
return null;
}
let errorOutput = this._errorOutput;
let errorLines = errorOutput.split(this._inputLineEnding);
let error = {
file: null,
line: null,
type: null,
message: null,
errno: null,
};
if (errorLines.length > 0) {
error.file = errorLines[1].match(/File "(.*)"/)[1];
error.line = parseInt(errorLines[1].match(/line (\d+)/)[1]);
error.type = errorLines[2].match(/(.+?):/)[1];
error.message = errorLines[2].match(/: (.+)$/)[1];
if (error.type == "OSError") {
error.errno = parseInt(error.message.match(/\[Errno (\d+)\]/)[1]);
error.message = error.message.replace(/\[Errno \d+\] /, '');
}
}
error.raw = errorOutput;
return error;
}
getCodeOutput() {
return this._codeOutput;
}
async getToPrompt() {
// We use GetToPrompt to ensure we are at a known place before running code
// This will get from Paste Mode or Running App to Normal Prompt
await this.serialTransmit(CHAR_CTRL_C + CHAR_CTRL_C);
// This will get from Raw Paste or Raw Mode to Normal Prompt
await this.serialTransmit(CHAR_CTRL_B + CHAR_CTRL_D + CHAR_CTRL_B);
this.mode = MODE_NORMAL;
}
async execRawMode(code) {
await this.enterRawMode();
await this.serialTransmit(code);
return await this.finishRawMode();
}
async execRawPasteMode(code, codeTimeoutMs=CODE_EXECUTION_TIMEOUT) {
let success = await this.enterRawPasteMode();
//console.log("Success: " + success);
if (success) {
// We're in raw mode only
await this.serialTransmit(code);
// We need to use flow control
let flowControlWindowSize = this._readSerialBytes(2);
// Convert 2 bytes from unsigned little endian to an integer
flowControlWindowSize = flowControlWindowSize.charCodeAt(0) + (flowControlWindowSize.charCodeAt(1) << 8);
let remainingWindowSize = flowControlWindowSize;
this._readSerialBytes(1); // Skip the last byte
this._clearSerialBytes(); // Clear the serial buffer to remove the previous Raw Prompt
//console.log("Flow Control Window Size: " + remainingWindowSize);
// Send the code in chunks up to the window size
let codeLength = code.length;
let codePointer = 0;
while (codePointer < codeLength) {
let chunk = code.slice(codePointer, codePointer + remainingWindowSize);
await this.serialTransmit(chunk);
codePointer += remainingWindowSize;
// Reduce the remain window size
remainingWindowSize -= chunk.length;
if (remainingWindowSize <= 0) {
// Read the next byte at the flow control pointer
let instruction = this._readSerialBytes(1);
if (instruction.match(CHAR_CTRL_A)) {
remainingWindowSize = flowControlWindowSize;
} else if (instruction.match(CHAR_CTRL_D)) {
// We're done
break;
}
}
}
// Inform the device we're done
await this.serialTransmit(CHAR_CTRL_D);
this._ctrlDCount = 0;
this._pythonCodeRunning = true;
this._codeOutput = '';
this._errorOutput = '';
await this._waitForCodeExecution(codeTimeoutMs);
this._clearSerialBytes();
await this.exitRawMode();
return this._codeOutput.slice(Math.ceil(this._codeOutput.length / 2));
} else {
await this.execRawMode(code);
}
}
async _waitForCodeExecution(codeTimeoutMs=CODE_EXECUTION_TIMEOUT) {
// Wait for the code to finish running, so we can capture the output
if (codeTimeoutMs) {
try {
await this._timeout(
async () => {
while (this._pythonCodeRunning) {
await this._sleep(100);
}
}, codeTimeoutMs
);
} catch (error) {
console.log("Code timed out.");
}
} else {
// Run without timeout
while (this._pythonCodeRunning) {
await this._sleep(100);
}
}
}
_readSerialBytes(byteCount, offset=null) {
if (offset == null) {
offset = this._promptCheckPointer;
}
let bytes = this._serialInputBuffer.slice(offset, offset + byteCount);
this._promptCheckPointer += byteCount;
return bytes;
}
_clearSerialBytes(offset=null) {
// Should this only clear up to the lower of the 2 pointers? (codeCheckPointer and promptCheckPointer)
if (offset == null) {
offset = this._promptCheckPointer;
}
if (offset > this._serialInputBuffer.length) {
offset = this._serialInputBuffer.length;
}
this._serialInputBuffer = this._serialInputBuffer.slice(offset);
this._promptCheckPointer -= offset;
this._codeCheckPointer -= offset;
}
async enterRawPasteMode() {
await this.enterRawMode();
let bufferLength = this._serialInputBuffer.length;
await this.serialTransmit(CHAR_CTRL_E + "A" + CHAR_CTRL_A);
await this._timeout(
async () => {
while (this._serialInputBuffer.length < bufferLength + 2) {
await this._sleep(100);
}
}, this.promptTimeout
);
if (this._serialInputBuffer.length < bufferLength + 2) {
console.error("Failed to enter raw paste mode.");
return;
}
// Grab the two characters after bufferLength
let response = this._readSerialBytes(2, bufferLength);
if (response.match(CHAR_RAW_PASTE_UNSUPPORTED)) {
console.error("Device does not support raw paste mode.");
} else if (response.match(CHAR_RAW_PASTE_SUPPORTED)) {
this._mode = MODE_RAWPASTE;
return true;
} else if (response == "ra") {
console.error("Device does not understand or support raw paste mode.");
}
return false;
}
// Execute pasted code
async finishRawMode() {
this._pythonCodeRunning = true;
this._codeOutput = '';
this._errorOutput = '';
await this.serialTransmit(CHAR_CTRL_D);
// Wait for the code to finish running, so we can capture the output
await this._waitForCodeExecution();
this._mode = MODE_NORMAL;
return this._codeOutput;
}
async _waitForModeChange(mode) {
await this._timeout(
async () => {
while (this._mode != mode) {
this._checkForModeChange();
await this._sleep(100);
}
}, this.promptTimeout
);
}
// Raw mode allows code execution without echoing back to the terminal
async enterRawMode() {
if (this._mode == MODE_RAW) {
return;
}
await this.getToPrompt();
await this.serialTransmit(CHAR_CTRL_A);
await this._waitForModeChange(MODE_RAW);
}
async exitRawMode() {
await this.serialTransmit(CHAR_CTRL_B);
// Wait for >>> to be displayed
this._mode = MODE_NORMAL;
}
_getSerialCodeOutput() {
let bufferLines, codeline = '';
// Get the remaining buffer from _codeCheckPointer to the next line ending and update the position of the pointer
let remainingBuffer = this._serialInputBuffer.slice(this._codeCheckPointer);
if (remainingBuffer.includes(this._inputLineEnding)) {
[codeline, ...bufferLines] = remainingBuffer.split(this._inputLineEnding);
this._codeCheckPointer += codeline.length + this._inputLineEnding.length;
}
return this._formatCodeOutput(codeline);
}
_formatCodeOutput(codeline) {
// Remove lines that are prompts or control characters and strip control sequences
// Return the result
for (let prefix of IGNORE_OUTPUT_LINE_PREFIXES) {
if (codeline.match(prefix)) {
return '';
}
}
for (let sequence of CONTROL_SEQUENCES) {
codeline = codeline.replace(sequence, '');
}
return codeline;
}
async onSerialReceive(e) {
// We tokenize the serial data to handle special character sequences (currently titles only)
// We don't want to modify e.data, so we make a copy of it
let data = e.data;
// Prepend a partial token if it exists
if (this._partialToken) {
data = this._partialToken + data;
this._partialToken = null;
}
// Tokenize the larger string and send to the parent
let tokens = this._tokenize(data);
// Remove any partial tokens and store for the next serial data receive
if (tokens.length && this._hasPartialToken(tokens.slice(-1))) {
this._partialToken = tokens.pop();
}
// Send only full tokens to the token queue
for (let token of tokens) {
this._tokenQueue.push(token);
}
await this._processQueuedTokens();
if (this.terminalOutput) {
return data;
}
return "";
}
async _processQueuedTokens() {
if (this._processing) {
return;
}
this._processing = true;
while (this._tokenQueue.length) {
await this._processToken(this._tokenQueue.shift());
}
this._processing = false;
}
async _processToken(token) {
if (token == CHAR_TITLE_START) {
this._titleMode = true;
//console.log("Title Start");
this._setTitle("");
} else if (token == CHAR_TITLE_END) {
//console.log("Title End");
this._titleMode = false;
} else if (this._titleMode) {
//console.log("New Title: " + token);
this._setTitle(token, true);
}
let codelines = [];
let codeline;
this._serialInputBuffer += token;
if (this._pythonCodeRunning) {
// Check if we are at a prompt
this.checkPrompt(); // Run asynchronously to avoid blocking the serial receive
if (this._mode != MODE_RAWPASTE) {
do {
codeline = this._getSerialCodeOutput();
if (codeline) {
codelines.push(codeline);
}
} while (codeline.length > 0);
}
}
// Is it still running? Then we add to code output if there is any
if (this._pythonCodeRunning && codelines.length > 0) {
for (codeline of codelines) {
//console.log(codeline);
if (!codeline.match(/^\... /) && !codeline.match(/^>>> /) && !codeline.match(/^=== /)) {
if (this._mode != MODE_RAWPASTE) {
if (codeline.match(REGEX_PROMPT_RAW_MODE)) {
this._mode = MODE_RAW;
continue;
}
}
this._codeOutput += codeline + this._outputLineEnding;
}
}
}
}
// Placeholder Function
setTitle(title, append=false) {
return;
}
_setTitle(title, append=false) {
if (append) {
title = this.title + title;
}
this.title = title;
this.setTitle(title, append);
}
getVersion() {
return this._parseTitleInfo(/\| REPL \| (.*)$/);
}
getIpAddress() {
return this._parseTitleInfo(/((?:\d{1,3}\.){3}\d{1,3})/);
}
_parseTitleInfo(regex) {
if (this.title) {
let matches = this.title.match(regex);
if (matches && matches.length >= 2) {
return matches[1];
}
}
return null;
}
async _serialTransmit(msg) {
if (!this.serialTransmit) {
console.log("Default serial transmit function called. Message: " + msg);
throw new Error("REPL serialTransmit must be connected to an external transmit function");
} else {
return await this.serialTransmit(msg);
}
}
// Allows for supplied python code to be run on the device via the REPL in normal mode
async runCode(code, codeTimeoutMs=CODE_EXECUTION_TIMEOUT) {
await this.getToPrompt();
return this.execRawPasteMode(code + LINE_ENDING_LF, codeTimeoutMs);
}
// Split a string up by full title start and end character sequences
_tokenize(string) {
const tokenRegex = new RegExp("(" + this._regexEscape(CHAR_TITLE_START) + "|" + this._regexEscape(CHAR_TITLE_END) + "|" + this._regexEscape(CHAR_CTRL_D) + ")", "gi");
return string.split(tokenRegex);
}
// Check if a chunk of data has a partial title start/end character sequence at the end
_hasPartialToken(chunk) {
const partialToken = /\\x1b(?:\](?:0"?)?)?$/gi;
return partialToken.test(chunk);
}
}