circuitpython/tests/extmod/websocket_basic.py
Jeff Epler e2428a8178 websocket: Implement 64-bit frames, enable split frames.
This fixes several assertion errors that were found in fuzz
testing, for unimplemented portions of the spec. The assertions
were either turned into Python exceptions, or the missing
functionality was implemented.

Frames larger than 4GB are unsupported. Initial reception of
such a frame will result in OSError(EIO) and subsequent operations
on the same websocket will fail because framing has been lost.

Transmitting frames larger than 64kB is unsupported. Attempting
to transmit such a frame will result in OSError(ENOBUFS). Subsequent
operations on the websocket are possible.

Signed-off-by: Jeff Epler <jepler@gmail.com>
2025-08-04 10:22:48 -05:00

85 lines
2.3 KiB
Python

try:
import io
import errno
import websocket
except ImportError:
print("SKIP")
raise SystemExit
# put raw data in the stream and do a websocket read
def ws_read(msg, sz):
ws = websocket.websocket(io.BytesIO(msg))
return ws.read(sz)
# put raw data in the stream and do a series of websocket read
def ws_readn(msg, *args):
ws = websocket.websocket(io.BytesIO(msg))
for sz in args:
yield ws.read(sz)
# do a websocket write and then return the raw data from the stream
def ws_write(msg, sz):
s = io.BytesIO()
ws = websocket.websocket(s)
ws.write(msg)
s.seek(0)
return s.read(sz)
# basic frame
print(ws_read(b"\x81\x04ping", 4))
print(ws_write(b"pong", 6))
# split frames and irregular size reads
for s in ws_readn(b"\x01\x04ping\x00\x04Ping\x80\x04PING", 6, 4, 2, 2):
print(s)
# extended payloads
print(ws_read(b"\x81~\x00\x80" + b"ping" * 32, 128))
print(ws_write(b"pong" * 32, 132))
# 64-bit payload size (but small payload -- appears permitted by spec)
print(ws_read(b"\x81\x7f\x00\x00\x00\x00\x00\x00\x00\x80" + b"ping" * 32, 128))
# >4GB payload size, unsupported by micropython implementation. Framing is lost.
msg = b"\x81\x7f\x01\x00\x00\x00\x00\x00\x00\x80" + b"ping" * 32
ws = websocket.websocket(io.BytesIO(msg))
try:
print(ws.read(1))
except OSError as e:
print("ioctl: EIO:", e.errno == errno.EIO)
# mask (returned data will be 'maskmask' ^ 'maskMASK')
print(ws_read(b"\x81\x88maskmaskMASK", 8))
# mask w/2-byte payload len (returned data will be 'maskmask' ^ 'maskMASK')
print(ws_read(b"\x81\xfe\x00\x08maskmaskMASK", 8))
# mask w/8-byte payload len (returned data will be 'maskmask' ^ 'maskMASK')
print(ws_read(b"\x81\xff\x00\x00\x00\x00\x00\x00\x00\x08maskmaskMASK", 8))
# close control frame
s = io.BytesIO(b"\x88\x00") # FRAME_CLOSE
ws = websocket.websocket(s)
print(ws.read(1))
s.seek(2)
print(s.read(4))
# misc control frames
print(ws_read(b"\x89\x00\x81\x04ping", 4)) # FRAME_PING
print(ws_read(b"\x8a\x00\x81\x04pong", 4)) # FRAME_PONG
# close method
ws = websocket.websocket(io.BytesIO())
ws.close()
# ioctl
ws = websocket.websocket(io.BytesIO())
print(ws.ioctl(8)) # GET_DATA_OPTS
print(ws.ioctl(9, 2)) # SET_DATA_OPTS
print(ws.ioctl(9))
try:
ws.ioctl(-1)
except OSError as e:
print("ioctl: EINVAL:", e.errno == errno.EINVAL)