wwvbpy/src/wwvb/wwvbtk.py

155 lines
5.2 KiB
Python
Executable file

#!/usr/bin/python3
"""Visualize the WWVB signal in realtime"""
# SPDX-FileCopyrightText: 2021-2024 Jeff Epler
#
# SPDX-License-Identifier: GPL-3.0-only
from __future__ import annotations
import datetime
import functools
from tkinter import Canvas, Event, TclError, Tk
import click
import wwvb
TYPE_CHECKING = False
if TYPE_CHECKING:
from collections.abc import Generator
@functools.cache
def _app() -> Tk:
"""Create the Tk application object lazily"""
return Tk()
def validate_colors(ctx: click.Context, param: click.Parameter, value: str) -> list[str]: # noqa: ARG001
"""Check that all colors in a string are valid, splitting it to a list"""
app = _app()
colors = value.split()
if len(colors) not in (2, 3, 4, 6):
raise click.BadParameter(f"Give 2, 3, 4 or 6 colors (not {len(colors)})")
for c in colors:
try:
app.winfo_rgb(c)
except TclError as e:
raise click.BadParameter(f"Invalid color {c}") from e
if len(colors) == 2:
off, on = colors
return [off, off, off, on, on, on]
if len(colors) == 3:
return colors + colors
if len(colors) == 4:
off, c1, c2, c3 = colors
return [off, off, off, c1, c2, c3]
return colors
DEFAULT_COLORS = "#3c3c3c #3c3c3c #3c3c3c #cc3c3c #88883c #3ccc3c"
@click.command
@click.option(
"--colors",
callback=validate_colors,
default=DEFAULT_COLORS,
metavar="COLORS",
help="2, 3, 4, or 6 Tk color values",
)
@click.option("--size", default=48, help="initial size in pixels")
@click.option("--min-size", default=None, type=int, help="minimum size in pixels (default: same as initial size)")
def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: PLR0915
"""Visualize the WWVB signal in realtime"""
if min_size is None:
min_size = size
def deadline_ms(deadline: datetime.datetime) -> int:
"""Compute the number of ms until a deadline"""
now = datetime.datetime.now(datetime.timezone.utc)
return int(max(0.0, (deadline - now).total_seconds()) * 1000)
def wwvbtick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation]]:
"""Yield consecutive values of the WWVB amplitude signal, going from minute to minute"""
timestamp = datetime.datetime.now(datetime.timezone.utc).replace(second=0, microsecond=0)
while True:
timecode = wwvb.WWVBMinuteIERS.from_datetime(timestamp).as_timecode()
for i, code in enumerate(timecode.am):
yield timestamp + datetime.timedelta(seconds=i), code
timestamp = timestamp + datetime.timedelta(seconds=60)
def wwvbsmarttick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation]]:
"""Yield consecutive values of the WWVB amplitude signal
.. but deal with time progressing unexpectedly, such as when the
computer is suspended or NTP steps the clock backwards
When time goes backwards or advances by more than a minute, get a fresh
wwvbtick object; otherwise, discard time signals more than 1s in the past.
"""
while True:
for stamp, code in wwvbtick():
now = datetime.datetime.now(datetime.timezone.utc)
if stamp < now - datetime.timedelta(seconds=60):
break
if stamp < now - datetime.timedelta(seconds=1):
continue
yield stamp, code
app = _app()
app.wm_minsize(min_size, min_size)
canvas = Canvas(app, width=size, height=size, highlightthickness=0)
circle = canvas.create_oval(4, 4, 44, 44, outline="black", fill=colors[0])
canvas.pack(fill="both", expand=True)
app.wm_deiconify()
def resize_canvas(event: Event) -> None:
"""Keep the circle filling the window when it is resized"""
sz = min(event.width, event.height) - 8
if sz < 0:
return
canvas.coords(
circle,
event.width // 2 - sz // 2,
event.height // 2 - sz // 2,
event.width // 2 + sz // 2,
event.height // 2 + sz // 2,
)
canvas.bind("<Configure>", resize_canvas)
def led_on(i: int) -> None:
"""Turn the canvas's virtual LED on"""
canvas.itemconfigure(circle, fill=colors[i + 3])
def led_off(i: int) -> None:
"""Turn the canvas's virtual LED off"""
canvas.itemconfigure(circle, fill=colors[i])
def controller_func() -> Generator[int]:
"""Update the canvas virtual LED, yielding the number of ms until the next change"""
for stamp, code in wwvbsmarttick():
yield deadline_ms(stamp)
led_on(code)
app.update()
yield deadline_ms(stamp + datetime.timedelta(seconds=0.2 + 0.3 * int(code)))
led_off(code)
app.update()
controller = controller_func().__next__
# pyrefly: ignore # bad-assignment
def after_func() -> None:
"""Repeatedly run the controller after the desired interval"""
app.after(controller(), after_func)
# pyrefly: ignore # bad-argument-type
app.after_idle(after_func)
app.mainloop()
if __name__ == "__main__":
main()