113 lines
2.9 KiB
Python
113 lines
2.9 KiB
Python
# SPDX-FileCopyrightText: 2023 Michał Pokusa
|
|
#
|
|
# SPDX-License-Identifier: Unlicense
|
|
|
|
from asyncio import create_task, gather, run
|
|
from asyncio import sleep as async_sleep
|
|
|
|
import board
|
|
import microcontroller
|
|
import neopixel
|
|
import socketpool
|
|
import wifi
|
|
|
|
from adafruit_httpserver import GET, Request, Response, Server, Websocket
|
|
|
|
pool = socketpool.SocketPool(wifi.radio)
|
|
server = Server(pool, debug=True)
|
|
|
|
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1)
|
|
|
|
websocket: Websocket = None
|
|
|
|
HTML_TEMPLATE = """
|
|
<html lang="en">
|
|
<head>
|
|
<title>Websocket Client</title>
|
|
</head>
|
|
<body>
|
|
<p>CPU temperature: <strong>-</strong>°C</p>
|
|
<p>NeoPixel Color: <input type="color"></p>
|
|
<script>
|
|
const cpuTemp = document.querySelector('strong');
|
|
const colorPicker = document.querySelector('input[type="color"]');
|
|
|
|
let ws = new WebSocket('ws://' + location.host + '/connect-websocket');
|
|
|
|
ws.onopen = () => console.log('WebSocket connection opened');
|
|
ws.onclose = () => console.log('WebSocket connection closed');
|
|
ws.onmessage = event => cpuTemp.textContent = event.data;
|
|
ws.onerror = error => cpuTemp.textContent = error;
|
|
|
|
colorPicker.oninput = debounce(() => ws.send(colorPicker.value), 200);
|
|
|
|
function debounce(callback, delay = 1000) {
|
|
let timeout
|
|
return (...args) => {
|
|
clearTimeout(timeout)
|
|
timeout = setTimeout(() => {
|
|
callback(...args)
|
|
}, delay)
|
|
}
|
|
}
|
|
</script>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
|
|
@server.route("/client", GET)
|
|
def client(request: Request):
|
|
return Response(request, HTML_TEMPLATE, content_type="text/html")
|
|
|
|
|
|
@server.route("/connect-websocket", GET)
|
|
def connect_client(request: Request):
|
|
global websocket
|
|
|
|
if websocket is not None:
|
|
websocket.close() # Close any existing connection
|
|
|
|
websocket = Websocket(request)
|
|
|
|
return websocket
|
|
|
|
|
|
server.start(str(wifi.radio.ipv4_address))
|
|
|
|
|
|
async def handle_http_requests():
|
|
while True:
|
|
server.poll()
|
|
|
|
await async_sleep(0)
|
|
|
|
|
|
async def handle_websocket_requests():
|
|
while True:
|
|
if websocket is not None:
|
|
if (data := websocket.receive(fail_silently=True)) is not None:
|
|
r, g, b = int(data[1:3], 16), int(data[3:5], 16), int(data[5:7], 16)
|
|
pixel.fill((r, g, b))
|
|
|
|
await async_sleep(0)
|
|
|
|
|
|
async def send_websocket_messages():
|
|
while True:
|
|
if websocket is not None:
|
|
cpu_temp = round(microcontroller.cpu.temperature, 2)
|
|
websocket.send_message(str(cpu_temp), fail_silently=True)
|
|
|
|
await async_sleep(1)
|
|
|
|
|
|
async def main():
|
|
await gather(
|
|
create_task(handle_http_requests()),
|
|
create_task(handle_websocket_requests()),
|
|
create_task(send_websocket_messages()),
|
|
)
|
|
|
|
|
|
run(main())
|