# SPDX-FileCopyrightText: 2023 MichaƂ Pokusa # # SPDX-License-Identifier: Unlicense from asyncio import create_task, gather, run from asyncio import sleep as async_sleep import board import microcontroller import neopixel import socketpool import wifi from adafruit_httpserver import GET, Request, Response, Server, Websocket pool = socketpool.SocketPool(wifi.radio) server = Server(pool, debug=True) pixel = neopixel.NeoPixel(board.NEOPIXEL, 1) websocket: Websocket = None HTML_TEMPLATE = """ Websocket Client

CPU temperature: -°C

NeoPixel Color:

""" @server.route("/client", GET) def client(request: Request): return Response(request, HTML_TEMPLATE, content_type="text/html") @server.route("/connect-websocket", GET) def connect_client(request: Request): global websocket if websocket is not None: websocket.close() # Close any existing connection websocket = Websocket(request) return websocket server.start(str(wifi.radio.ipv4_address)) async def handle_http_requests(): while True: server.poll() await async_sleep(0) async def handle_websocket_requests(): while True: if websocket is not None: if (data := websocket.receive(fail_silently=True)) is not None: r, g, b = int(data[1:3], 16), int(data[3:5], 16), int(data[5:7], 16) pixel.fill((r, g, b)) await async_sleep(0) async def send_websocket_messages(): while True: if websocket is not None: cpu_temp = round(microcontroller.cpu.temperature, 2) websocket.send_message(str(cpu_temp), fail_silently=True) await async_sleep(1) async def main(): await gather( create_task(handle_http_requests()), create_task(handle_websocket_requests()), create_task(send_websocket_messages()), ) run(main())