Compare commits

...

5 commits

Author SHA1 Message Date
5f4dfa5ef3 Add an enumerated type for DST status 2025-04-20 19:54:03 +02:00
14361a925a disentangle typing problems from using dataclasses
.. and drop pre-3.9 compatibility (depend on str.removeprefix)
2025-04-20 19:47:59 +02:00
bf9a98e8e8 Convert WWVBMinute to dataclass
This has the effect of making WWVBMinute instances mutable now.
2025-04-20 17:24:13 +02:00
6f03ba274e Convert WWVBTimecode to dataclass 2025-04-20 17:15:12 +02:00
93ccb9e028 wwvbtk: Use datetime for timekeeping
I don't know if it's *better* but `WWVBMinuteIERS(*key)` rubbed me
the wrong way.
2025-04-20 13:26:40 +02:00
5 changed files with 95 additions and 97 deletions

View file

@ -7,11 +7,12 @@
from __future__ import annotations
import dataclasses
import datetime
import enum
import json
import warnings
from typing import TYPE_CHECKING, Any, NamedTuple, TextIO, TypeVar
from typing import TYPE_CHECKING, Any, ClassVar, Self, TextIO
from . import iersdata
from .tz import Mountain
@ -21,19 +22,6 @@ if TYPE_CHECKING:
HOUR = datetime.timedelta(seconds=3600)
SECOND = datetime.timedelta(seconds=1)
T = TypeVar("T")
def _require(x: T | None) -> T:
"""Check an Optional item is not None."""
assert x is not None
return x
def _removeprefix(s: str, p: str) -> str:
if s.startswith(p):
return s[len(p) :]
return s
def _date(dt: datetime.date) -> datetime.date:
@ -323,7 +311,8 @@ _dst_ls_lut = [
]
class _WWVBMinute(NamedTuple):
@dataclasses.dataclass(frozen=True)
class WWVBMinute:
"""Uniquely identifies a minute of time in the WWVB system.
To use ut1 and ls information from IERS, create a WWVBMinuteIERS value instead.
@ -341,8 +330,8 @@ class _WWVBMinute(NamedTuple):
min: int
"""Minute of hour"""
dst: int
"""2-bit DST code """
dst: DstStatus
"""DST status code"""
ut1: int
"""UT1 offset in units of 100ms, range -900 to +900ms"""
@ -353,31 +342,28 @@ class _WWVBMinute(NamedTuple):
ly: bool
"""Leap year flag"""
epoch: ClassVar[int] = 1970
class WWVBMinute(_WWVBMinute):
"""Uniquely identifies a minute of time in the WWVB system.
To use ut1 and ls information from IERS, create a WWVBMinuteIERS value instead.
"""
epoch: int = 1970
def __new__( # noqa: PYI034
@classmethod
def from_parts(
cls,
year: int,
days: int,
hour: int,
minute: int,
dst: int | None = None,
dst: DstStatus | int | None = None,
ut1: int | None = None,
ls: bool | None = None,
ly: bool | None = None,
) -> WWVBMinute:
"""Construct a WWVBMinute"""
if dst is None:
dst = cls.get_dst(year, days)
if dst not in (0, 1, 2, 3):
raise ValueError("dst value should be 0..3")
) -> Self:
"""Create a WWVBMinute from parts
The constructor requires all parts are supplied. This classmethod can
determine the any or all of the `dst`, `ut1`, `ls`, and `ly` properties
based on class heuristics (except that either `ut1` and `ls` must both
be specified, or neither one may be specified)
"""
dst = cls.get_dst(year, days) if dst is None else DstStatus(dst)
if ut1 is None and ls is None:
ut1, ls = cls._get_dut1_info(year, days)
elif ut1 is None or ls is None:
@ -385,7 +371,8 @@ class WWVBMinute(_WWVBMinute):
year = cls.full_year(year)
if ly is None:
ly = isly(year)
return _WWVBMinute.__new__(cls, year, days, hour, minute, dst, ut1, ls, ly)
return cls(year, days, hour, minute, dst, ut1, ls, ly)
@classmethod
def full_year(cls, year: int) -> int:
@ -407,20 +394,20 @@ class WWVBMinute(_WWVBMinute):
return year
@staticmethod
def get_dst(year: int, days: int) -> int:
def get_dst(year: int, days: int) -> DstStatus:
"""Get the 2-bit WWVB DST value for the given day"""
d0 = datetime.datetime(year, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(days - 1)
d1 = d0 + datetime.timedelta(1)
dst0 = isdst(d0)
dst1 = isdst(d1)
return dst1 * 2 + dst0
return DstStatus(dst1 * 2 + dst0)
def __str__(self) -> str:
"""Implement str()"""
return (
f"year={self.year:4d} days={self.days:03d} hour={self.hour:02d} "
f"min={self.min:02d} dst={self.dst} ut1={self.ut1} ly={int(self.ly)} "
f"ls={int(self.ls)}"
f"min={self.min:02d} dst={+self.dst} ut1={self.ut1} ly={+self.ly} "
f"ls={+self.ls}"
)
def as_datetime_utc(self) -> datetime.datetime:
@ -477,7 +464,7 @@ class WWVBMinute(_WWVBMinute):
def as_timecode(self) -> WWVBTimecode:
"""Fill a WWVBTimecode structure representing this minute. Fills both the amplitude and phase codes."""
t = WWVBTimecode(self.minute_length())
t = WWVBTimecode.make_empty(self.minute_length())
self._fill_am_timecode(t)
self._fill_pm_timecode(t)
@ -648,7 +635,7 @@ class WWVBMinute(_WWVBMinute):
@classmethod
def fromstring(cls, s: str) -> WWVBMinute:
"""Construct a WWVBMinute from a string representation created by print_timecodes"""
s = _removeprefix(s, "WWVB timecode: ")
s = s.removeprefix("WWVB timecode: ")
d: dict[str, int] = {}
for part in s.split():
k, v = part.split("=")
@ -661,11 +648,11 @@ class WWVBMinute(_WWVBMinute):
minute = d.pop("minute")
dst: int | None = d.pop("dst", None)
ut1: int | None = d.pop("ut1", None)
ls = d.pop("ls", None)
ls: int | None = d.pop("ls", None)
d.pop("ly", None)
if d:
raise ValueError(f"Invalid options: {d}")
return cls(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls))
return cls.from_parts(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls), isly(year))
@classmethod
def from_datetime(
@ -679,7 +666,7 @@ class WWVBMinute(_WWVBMinute):
u = d.utctimetuple()
if newls is None and newut1 is None:
newut1, newls = cls._get_dut1_info(u.tm_year, u.tm_yday, old_time)
return cls(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls)
return cls.from_parts(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls)
@classmethod
def from_timecode_am(cls, t: WWVBTimecode) -> WWVBMinute | None:
@ -716,8 +703,8 @@ class WWVBMinute(_WWVBMinute):
if days > 366 or (not ly and days > 365):
return None
ls = bool(t.am[56])
dst = _require(t._get_am_bcd(57, 58))
return cls(year, days, hour, minute, dst, ut1, ls, ly)
dst = t._get_am_bcd(57, 58) or 0
return cls.from_parts(year, days, hour, minute, dst, ut1, ls, ly)
class WWVBMinuteIERS(WWVBMinute):
@ -757,6 +744,17 @@ class PhaseModulation(enum.IntEnum):
UNSET = -1
@enum.unique
class DstStatus(enum.IntEnum):
"""Constants that describe the DST status of a minute"""
DST_NOT_IN_EFFECT = 0b00
DST_STARTS_TODAY = 0b01
DST_ENDS_TODAY = 0b10
DST_IN_EFFECT = 0b11
@dataclasses.dataclass
class WWVBTimecode:
"""Represent the amplitude and/or phase signal, usually over 1 minute"""
@ -766,10 +764,10 @@ class WWVBTimecode:
phase: list[PhaseModulation]
"""The phase modulation data"""
def __init__(self, sz: int) -> None:
"""Construct a WWVB timecode ``sz`` seconds long"""
self.am = [AmplitudeModulation.UNSET] * sz
self.phase = [PhaseModulation.UNSET] * sz
@classmethod
def make_empty(cls, sz: int) -> Self:
"""Provide an empty timecode of the given length"""
return cls([AmplitudeModulation.UNSET] * sz, [PhaseModulation.UNSET] * sz)
def _get_am_bcd(self, *poslist: int) -> int | None:
"""Convert AM data to BCD

View file

@ -65,7 +65,7 @@ def wwvbreceive() -> Generator[wwvb.WWVBTimecode | None, wwvb.AmplitudeModulatio
state = 1
elif len(minute) == 60:
# print("FULL MINUTE")
tc = wwvb.WWVBTimecode(60)
tc = wwvb.WWVBTimecode.make_empty(60)
tc.am[:] = minute
minute = []
state = 2

View file

@ -6,8 +6,8 @@
# SPDX-License-Identifier: GPL-3.0-only
from __future__ import annotations
import datetime
import functools
import time
from tkinter import Canvas, TclError, Tk
from typing import TYPE_CHECKING, Any
@ -66,24 +66,22 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
if min_size is None:
min_size = size
def deadline_ms(deadline: float) -> int:
def deadline_ms(deadline: datetime.datetime) -> int:
"""Compute the number of ms until a deadline"""
now = time.time()
return int(max(0, deadline - now) * 1000)
now = datetime.datetime.now(datetime.timezone.utc)
return int(max(0, (deadline - now).total_seconds()) * 1000)
def wwvbtick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]:
def wwvbtick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
"""Yield consecutive values of the WWVB amplitude signal, going from minute to minute"""
timestamp = time.time() // 60 * 60
timestamp = datetime.datetime.now(datetime.timezone.utc).replace(second=0, microsecond=0)
while True:
tt = time.gmtime(timestamp)
key = tt.tm_year, tt.tm_yday, tt.tm_hour, tt.tm_min
timecode = wwvb.WWVBMinuteIERS(*key).as_timecode()
timecode = wwvb.WWVBMinuteIERS.from_datetime(timestamp).as_timecode()
for i, code in enumerate(timecode.am):
yield timestamp + i, code
timestamp = timestamp + 60
yield timestamp + datetime.timedelta(seconds=i), code
timestamp = timestamp + datetime.timedelta(seconds=60)
def wwvbsmarttick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]:
def wwvbsmarttick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
"""Yield consecutive values of the WWVB amplitude signal
.. but deal with time progressing unexpectedly, such as when the
@ -94,10 +92,10 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
"""
while True:
for stamp, code in wwvbtick():
now = time.time()
if stamp < now - 60:
now = datetime.datetime.now(datetime.timezone.utc)
if stamp < now - datetime.timedelta(seconds=60):
break
if stamp < now - 1:
if stamp < now - datetime.timedelta(seconds=1):
continue
yield stamp, code
@ -137,7 +135,7 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
yield deadline_ms(stamp)
led_on(code)
app.update()
yield deadline_ms(stamp + 0.2 + 0.3 * int(code))
yield deadline_ms(stamp + datetime.timedelta(seconds=0.2 + 0.3 * int(code)))
led_off(code)
app.update()

View file

@ -19,7 +19,7 @@ class TestPhaseModulation(unittest.TestCase):
ref_pm = "001110110100010010000011001000011000110100110100010110110110"
ref_minute = wwvb.WWVBMinuteIERS(2012, 186, 17, 30, dst=3)
ref_minute = wwvb.WWVBMinuteIERS.from_parts(2012, 186, 17, 30, dst=3)
ref_time = ref_minute.as_timecode()
test_am = ref_time.to_am_string(["0", "1", "2"])

View file

@ -127,11 +127,13 @@ class WWVBRoundtrip(unittest.TestCase):
while dt.year < 1993:
minute = wwvb.WWVBMinuteIERS.from_datetime(dt)
assert minute is not None
assert minute.year == dt.year
timecode = minute.as_timecode().am
assert timecode
decoded_minute: wwvb.WWVBMinute | None = wwvb.WWVBMinuteIERS.from_timecode_am(minute.as_timecode())
assert decoded_minute
decoded = decoded_minute.as_timecode().am
self.assertEqual(minute, decoded_minute)
self.assertEqual(
timecode,
decoded,
@ -261,12 +263,12 @@ class WWVBRoundtrip(unittest.TestCase):
def test_epoch(self) -> None:
"""Test the 1970-to-2069 epoch"""
m = wwvb.WWVBMinute(69, 1, 1, 0, 0)
n = wwvb.WWVBMinute(2069, 1, 1, 0, 0)
m = wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0)
n = wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0)
self.assertEqual(m, n)
m = wwvb.WWVBMinute(70, 1, 1, 0, 0)
n = wwvb.WWVBMinute(1970, 1, 1, 0, 0)
m = wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0)
n = wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0)
self.assertEqual(m, n)
def test_fromstring(self) -> None:
@ -290,13 +292,13 @@ class WWVBRoundtrip(unittest.TestCase):
def test_exceptions(self) -> None:
"""Test some error detection"""
with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, dst=4)
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, dst=4)
with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, ut1=1)
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ut1=1)
with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, ls=False)
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ls=False)
with self.assertRaises(ValueError):
wwvb.WWVBMinute.fromstring("year=1998 days=365 hour=23 min=56 dst=0 ut1=-300 ly=0 ls=1 boo=1")
@ -310,7 +312,7 @@ class WWVBRoundtrip(unittest.TestCase):
def test_undefined(self) -> None:
"""Ensure that the check for unset elements in am works"""
with self.assertWarnsRegex(Warning, "is unset"):
str(wwvb.WWVBTimecode(60))
str(wwvb.WWVBTimecode.make_empty(60))
def test_tz(self) -> None:
"""Get a little more coverage in the dst change functions"""
@ -365,35 +367,35 @@ class WWVBRoundtrip(unittest.TestCase):
def test_epoch2(self) -> None:
"""Test that the settable epoch feature works"""
self.assertEqual(wwvb.WWVBMinute(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute(69, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute(70, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute(99, 1, 1, 0, 0).year, 1999)
self.assertEqual(wwvb.WWVBMinute.from_parts(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute.from_parts(99, 1, 1, 0, 0).year, 1999)
# 4-digit years can always be used
self.assertEqual(wwvb.WWVBMinute(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(wwvb.WWVBMinute.from_parts(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute.from_parts(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(wwvb.WWVBMinute(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(wwvb.WWVBMinute(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(wwvb.WWVBMinute(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(wwvb.WWVBMinute(2099, 1, 1, 0, 0).year, 2099)
self.assertEqual(wwvb.WWVBMinute.from_parts(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(wwvb.WWVBMinute.from_parts(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(wwvb.WWVBMinute.from_parts(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(wwvb.WWVBMinute.from_parts(2099, 1, 1, 0, 0).year, 2099)
self.assertEqual(WWVBMinute2k(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k(99, 1, 1, 0, 0).year, 2099)
self.assertEqual(WWVBMinute2k.from_parts(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k.from_parts(99, 1, 1, 0, 0).year, 2099)
# 4-digit years can always be used
self.assertEqual(WWVBMinute2k(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(WWVBMinute2k(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(WWVBMinute2k(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(WWVBMinute2k.from_parts(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k.from_parts(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(WWVBMinute2k.from_parts(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(WWVBMinute2k.from_parts(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(WWVBMinute2k(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(WWVBMinute2k(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(WWVBMinute2k(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(WWVBMinute2k(2099, 1, 1, 0, 0).year, 2099)
self.assertEqual(WWVBMinute2k.from_parts(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(WWVBMinute2k.from_parts(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(WWVBMinute2k.from_parts(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(WWVBMinute2k.from_parts(2099, 1, 1, 0, 0).year, 2099)
if __name__ == "__main__":