Added example code for video streaming using multipart/x-mixed-replace content type

This commit is contained in:
michalpokusa 2024-11-12 19:54:57 +00:00
parent 0e6180daf6
commit ec89785428

View file

@ -0,0 +1,130 @@
# SPDX-FileCopyrightText: 2024 Michał Pokusa
#
# SPDX-License-Identifier: Unlicense
try:
from typing import Dict, List, Tuple, Union
except ImportError:
pass
from asyncio import create_task, gather, run, sleep
from random import choice
import socketpool
import wifi
from adafruit_pycamera import PyCamera
from adafruit_httpserver import Server, Request, Response, Headers, Status, OK_200
pool = socketpool.SocketPool(wifi.radio)
server = Server(pool, debug=True)
camera = PyCamera()
camera.display.brightness = 0
camera.mode = 0 # JPEG, required for `capture_into_jpeg()`
camera.resolution = "1280x720"
camera.effect = 0 # No effect
class XMixedReplaceResponse(Response):
def __init__(
self,
request: Request,
frame_content_type: str,
*,
status: Union[Status, Tuple[int, str]] = OK_200,
headers: Union[Headers, Dict[str, str]] = None,
cookies: Dict[str, str] = None,
content_type: str = None,
) -> None:
super().__init__(
request=request,
headers=headers,
cookies=cookies,
status=status,
content_type=content_type,
)
self._boundary = self._get_random_boundary()
self._headers.setdefault(
"Content-Type", f"multipart/x-mixed-replace; boundary={self._boundary}"
)
self._frame_content_type = frame_content_type
@staticmethod
def _get_random_boundary() -> str:
symbols = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
return "--" + "".join([choice(symbols) for _ in range(16)])
def send_frame(self, frame: Union[str, bytes] = "") -> None:
encoded_frame = bytes(
frame.encode("utf-8") if isinstance(frame, str) else frame
)
self._send_bytes(
self._request.connection, bytes(f"{self._boundary}\r\n", "utf-8")
)
self._send_bytes(
self._request.connection,
bytes(f"Content-Type: {self._frame_content_type}\r\n\r\n", "utf-8"),
)
self._send_bytes(self._request.connection, encoded_frame)
self._send_bytes(self._request.connection, bytes("\r\n", "utf-8"))
def _send(self) -> None:
self._send_headers()
def close(self) -> None:
self._close_connection()
stream_connections: List[XMixedReplaceResponse] = []
@server.route("/frame")
def frame_handler(request: Request):
frame = camera.capture_into_jpeg()
return Response(request, body=frame, content_type="image/jpeg")
@server.route("/stream")
def stream_handler(request: Request):
response = XMixedReplaceResponse(request, frame_content_type="image/jpeg")
stream_connections.append(response)
return response
async def send_stream_frames():
while True:
await sleep(0.1)
frame = camera.capture_into_jpeg()
for connection in iter(stream_connections):
try:
connection.send_frame(frame)
except BrokenPipeError:
connection.close()
stream_connections.remove(connection)
async def handle_http_requests():
server.start(str(wifi.radio.ipv4_address))
while True:
await sleep(0)
server.poll()
async def main():
await gather(
create_task(send_stream_frames()),
create_task(handle_http_requests()),
)
run(main())