Compare commits

...

5 commits

Author SHA1 Message Date
5f4dfa5ef3 Add an enumerated type for DST status 2025-04-20 19:54:03 +02:00
14361a925a disentangle typing problems from using dataclasses
.. and drop pre-3.9 compatibility (depend on str.removeprefix)
2025-04-20 19:47:59 +02:00
bf9a98e8e8 Convert WWVBMinute to dataclass
This has the effect of making WWVBMinute instances mutable now.
2025-04-20 17:24:13 +02:00
6f03ba274e Convert WWVBTimecode to dataclass 2025-04-20 17:15:12 +02:00
93ccb9e028 wwvbtk: Use datetime for timekeeping
I don't know if it's *better* but `WWVBMinuteIERS(*key)` rubbed me
the wrong way.
2025-04-20 13:26:40 +02:00
5 changed files with 95 additions and 97 deletions

View file

@ -7,11 +7,12 @@
from __future__ import annotations from __future__ import annotations
import dataclasses
import datetime import datetime
import enum import enum
import json import json
import warnings import warnings
from typing import TYPE_CHECKING, Any, NamedTuple, TextIO, TypeVar from typing import TYPE_CHECKING, Any, ClassVar, Self, TextIO
from . import iersdata from . import iersdata
from .tz import Mountain from .tz import Mountain
@ -21,19 +22,6 @@ if TYPE_CHECKING:
HOUR = datetime.timedelta(seconds=3600) HOUR = datetime.timedelta(seconds=3600)
SECOND = datetime.timedelta(seconds=1) SECOND = datetime.timedelta(seconds=1)
T = TypeVar("T")
def _require(x: T | None) -> T:
"""Check an Optional item is not None."""
assert x is not None
return x
def _removeprefix(s: str, p: str) -> str:
if s.startswith(p):
return s[len(p) :]
return s
def _date(dt: datetime.date) -> datetime.date: def _date(dt: datetime.date) -> datetime.date:
@ -323,7 +311,8 @@ _dst_ls_lut = [
] ]
class _WWVBMinute(NamedTuple): @dataclasses.dataclass(frozen=True)
class WWVBMinute:
"""Uniquely identifies a minute of time in the WWVB system. """Uniquely identifies a minute of time in the WWVB system.
To use ut1 and ls information from IERS, create a WWVBMinuteIERS value instead. To use ut1 and ls information from IERS, create a WWVBMinuteIERS value instead.
@ -341,8 +330,8 @@ class _WWVBMinute(NamedTuple):
min: int min: int
"""Minute of hour""" """Minute of hour"""
dst: int dst: DstStatus
"""2-bit DST code """ """DST status code"""
ut1: int ut1: int
"""UT1 offset in units of 100ms, range -900 to +900ms""" """UT1 offset in units of 100ms, range -900 to +900ms"""
@ -353,31 +342,28 @@ class _WWVBMinute(NamedTuple):
ly: bool ly: bool
"""Leap year flag""" """Leap year flag"""
epoch: ClassVar[int] = 1970
class WWVBMinute(_WWVBMinute): @classmethod
"""Uniquely identifies a minute of time in the WWVB system. def from_parts(
To use ut1 and ls information from IERS, create a WWVBMinuteIERS value instead.
"""
epoch: int = 1970
def __new__( # noqa: PYI034
cls, cls,
year: int, year: int,
days: int, days: int,
hour: int, hour: int,
minute: int, minute: int,
dst: int | None = None, dst: DstStatus | int | None = None,
ut1: int | None = None, ut1: int | None = None,
ls: bool | None = None, ls: bool | None = None,
ly: bool | None = None, ly: bool | None = None,
) -> WWVBMinute: ) -> Self:
"""Construct a WWVBMinute""" """Create a WWVBMinute from parts
if dst is None:
dst = cls.get_dst(year, days) The constructor requires all parts are supplied. This classmethod can
if dst not in (0, 1, 2, 3): determine the any or all of the `dst`, `ut1`, `ls`, and `ly` properties
raise ValueError("dst value should be 0..3") based on class heuristics (except that either `ut1` and `ls` must both
be specified, or neither one may be specified)
"""
dst = cls.get_dst(year, days) if dst is None else DstStatus(dst)
if ut1 is None and ls is None: if ut1 is None and ls is None:
ut1, ls = cls._get_dut1_info(year, days) ut1, ls = cls._get_dut1_info(year, days)
elif ut1 is None or ls is None: elif ut1 is None or ls is None:
@ -385,7 +371,8 @@ class WWVBMinute(_WWVBMinute):
year = cls.full_year(year) year = cls.full_year(year)
if ly is None: if ly is None:
ly = isly(year) ly = isly(year)
return _WWVBMinute.__new__(cls, year, days, hour, minute, dst, ut1, ls, ly)
return cls(year, days, hour, minute, dst, ut1, ls, ly)
@classmethod @classmethod
def full_year(cls, year: int) -> int: def full_year(cls, year: int) -> int:
@ -407,20 +394,20 @@ class WWVBMinute(_WWVBMinute):
return year return year
@staticmethod @staticmethod
def get_dst(year: int, days: int) -> int: def get_dst(year: int, days: int) -> DstStatus:
"""Get the 2-bit WWVB DST value for the given day""" """Get the 2-bit WWVB DST value for the given day"""
d0 = datetime.datetime(year, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(days - 1) d0 = datetime.datetime(year, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(days - 1)
d1 = d0 + datetime.timedelta(1) d1 = d0 + datetime.timedelta(1)
dst0 = isdst(d0) dst0 = isdst(d0)
dst1 = isdst(d1) dst1 = isdst(d1)
return dst1 * 2 + dst0 return DstStatus(dst1 * 2 + dst0)
def __str__(self) -> str: def __str__(self) -> str:
"""Implement str()""" """Implement str()"""
return ( return (
f"year={self.year:4d} days={self.days:03d} hour={self.hour:02d} " f"year={self.year:4d} days={self.days:03d} hour={self.hour:02d} "
f"min={self.min:02d} dst={self.dst} ut1={self.ut1} ly={int(self.ly)} " f"min={self.min:02d} dst={+self.dst} ut1={self.ut1} ly={+self.ly} "
f"ls={int(self.ls)}" f"ls={+self.ls}"
) )
def as_datetime_utc(self) -> datetime.datetime: def as_datetime_utc(self) -> datetime.datetime:
@ -477,7 +464,7 @@ class WWVBMinute(_WWVBMinute):
def as_timecode(self) -> WWVBTimecode: def as_timecode(self) -> WWVBTimecode:
"""Fill a WWVBTimecode structure representing this minute. Fills both the amplitude and phase codes.""" """Fill a WWVBTimecode structure representing this minute. Fills both the amplitude and phase codes."""
t = WWVBTimecode(self.minute_length()) t = WWVBTimecode.make_empty(self.minute_length())
self._fill_am_timecode(t) self._fill_am_timecode(t)
self._fill_pm_timecode(t) self._fill_pm_timecode(t)
@ -648,7 +635,7 @@ class WWVBMinute(_WWVBMinute):
@classmethod @classmethod
def fromstring(cls, s: str) -> WWVBMinute: def fromstring(cls, s: str) -> WWVBMinute:
"""Construct a WWVBMinute from a string representation created by print_timecodes""" """Construct a WWVBMinute from a string representation created by print_timecodes"""
s = _removeprefix(s, "WWVB timecode: ") s = s.removeprefix("WWVB timecode: ")
d: dict[str, int] = {} d: dict[str, int] = {}
for part in s.split(): for part in s.split():
k, v = part.split("=") k, v = part.split("=")
@ -661,11 +648,11 @@ class WWVBMinute(_WWVBMinute):
minute = d.pop("minute") minute = d.pop("minute")
dst: int | None = d.pop("dst", None) dst: int | None = d.pop("dst", None)
ut1: int | None = d.pop("ut1", None) ut1: int | None = d.pop("ut1", None)
ls = d.pop("ls", None) ls: int | None = d.pop("ls", None)
d.pop("ly", None) d.pop("ly", None)
if d: if d:
raise ValueError(f"Invalid options: {d}") raise ValueError(f"Invalid options: {d}")
return cls(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls)) return cls.from_parts(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls), isly(year))
@classmethod @classmethod
def from_datetime( def from_datetime(
@ -679,7 +666,7 @@ class WWVBMinute(_WWVBMinute):
u = d.utctimetuple() u = d.utctimetuple()
if newls is None and newut1 is None: if newls is None and newut1 is None:
newut1, newls = cls._get_dut1_info(u.tm_year, u.tm_yday, old_time) newut1, newls = cls._get_dut1_info(u.tm_year, u.tm_yday, old_time)
return cls(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls) return cls.from_parts(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls)
@classmethod @classmethod
def from_timecode_am(cls, t: WWVBTimecode) -> WWVBMinute | None: def from_timecode_am(cls, t: WWVBTimecode) -> WWVBMinute | None:
@ -716,8 +703,8 @@ class WWVBMinute(_WWVBMinute):
if days > 366 or (not ly and days > 365): if days > 366 or (not ly and days > 365):
return None return None
ls = bool(t.am[56]) ls = bool(t.am[56])
dst = _require(t._get_am_bcd(57, 58)) dst = t._get_am_bcd(57, 58) or 0
return cls(year, days, hour, minute, dst, ut1, ls, ly) return cls.from_parts(year, days, hour, minute, dst, ut1, ls, ly)
class WWVBMinuteIERS(WWVBMinute): class WWVBMinuteIERS(WWVBMinute):
@ -757,6 +744,17 @@ class PhaseModulation(enum.IntEnum):
UNSET = -1 UNSET = -1
@enum.unique
class DstStatus(enum.IntEnum):
"""Constants that describe the DST status of a minute"""
DST_NOT_IN_EFFECT = 0b00
DST_STARTS_TODAY = 0b01
DST_ENDS_TODAY = 0b10
DST_IN_EFFECT = 0b11
@dataclasses.dataclass
class WWVBTimecode: class WWVBTimecode:
"""Represent the amplitude and/or phase signal, usually over 1 minute""" """Represent the amplitude and/or phase signal, usually over 1 minute"""
@ -766,10 +764,10 @@ class WWVBTimecode:
phase: list[PhaseModulation] phase: list[PhaseModulation]
"""The phase modulation data""" """The phase modulation data"""
def __init__(self, sz: int) -> None: @classmethod
"""Construct a WWVB timecode ``sz`` seconds long""" def make_empty(cls, sz: int) -> Self:
self.am = [AmplitudeModulation.UNSET] * sz """Provide an empty timecode of the given length"""
self.phase = [PhaseModulation.UNSET] * sz return cls([AmplitudeModulation.UNSET] * sz, [PhaseModulation.UNSET] * sz)
def _get_am_bcd(self, *poslist: int) -> int | None: def _get_am_bcd(self, *poslist: int) -> int | None:
"""Convert AM data to BCD """Convert AM data to BCD

View file

@ -65,7 +65,7 @@ def wwvbreceive() -> Generator[wwvb.WWVBTimecode | None, wwvb.AmplitudeModulatio
state = 1 state = 1
elif len(minute) == 60: elif len(minute) == 60:
# print("FULL MINUTE") # print("FULL MINUTE")
tc = wwvb.WWVBTimecode(60) tc = wwvb.WWVBTimecode.make_empty(60)
tc.am[:] = minute tc.am[:] = minute
minute = [] minute = []
state = 2 state = 2

View file

@ -6,8 +6,8 @@
# SPDX-License-Identifier: GPL-3.0-only # SPDX-License-Identifier: GPL-3.0-only
from __future__ import annotations from __future__ import annotations
import datetime
import functools import functools
import time
from tkinter import Canvas, TclError, Tk from tkinter import Canvas, TclError, Tk
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
@ -66,24 +66,22 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
if min_size is None: if min_size is None:
min_size = size min_size = size
def deadline_ms(deadline: float) -> int: def deadline_ms(deadline: datetime.datetime) -> int:
"""Compute the number of ms until a deadline""" """Compute the number of ms until a deadline"""
now = time.time() now = datetime.datetime.now(datetime.timezone.utc)
return int(max(0, deadline - now) * 1000) return int(max(0, (deadline - now).total_seconds()) * 1000)
def wwvbtick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]: def wwvbtick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
"""Yield consecutive values of the WWVB amplitude signal, going from minute to minute""" """Yield consecutive values of the WWVB amplitude signal, going from minute to minute"""
timestamp = time.time() // 60 * 60 timestamp = datetime.datetime.now(datetime.timezone.utc).replace(second=0, microsecond=0)
while True: while True:
tt = time.gmtime(timestamp) timecode = wwvb.WWVBMinuteIERS.from_datetime(timestamp).as_timecode()
key = tt.tm_year, tt.tm_yday, tt.tm_hour, tt.tm_min
timecode = wwvb.WWVBMinuteIERS(*key).as_timecode()
for i, code in enumerate(timecode.am): for i, code in enumerate(timecode.am):
yield timestamp + i, code yield timestamp + datetime.timedelta(seconds=i), code
timestamp = timestamp + 60 timestamp = timestamp + datetime.timedelta(seconds=60)
def wwvbsmarttick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]: def wwvbsmarttick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
"""Yield consecutive values of the WWVB amplitude signal """Yield consecutive values of the WWVB amplitude signal
.. but deal with time progressing unexpectedly, such as when the .. but deal with time progressing unexpectedly, such as when the
@ -94,10 +92,10 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
""" """
while True: while True:
for stamp, code in wwvbtick(): for stamp, code in wwvbtick():
now = time.time() now = datetime.datetime.now(datetime.timezone.utc)
if stamp < now - 60: if stamp < now - datetime.timedelta(seconds=60):
break break
if stamp < now - 1: if stamp < now - datetime.timedelta(seconds=1):
continue continue
yield stamp, code yield stamp, code
@ -137,7 +135,7 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
yield deadline_ms(stamp) yield deadline_ms(stamp)
led_on(code) led_on(code)
app.update() app.update()
yield deadline_ms(stamp + 0.2 + 0.3 * int(code)) yield deadline_ms(stamp + datetime.timedelta(seconds=0.2 + 0.3 * int(code)))
led_off(code) led_off(code)
app.update() app.update()

View file

@ -19,7 +19,7 @@ class TestPhaseModulation(unittest.TestCase):
ref_pm = "001110110100010010000011001000011000110100110100010110110110" ref_pm = "001110110100010010000011001000011000110100110100010110110110"
ref_minute = wwvb.WWVBMinuteIERS(2012, 186, 17, 30, dst=3) ref_minute = wwvb.WWVBMinuteIERS.from_parts(2012, 186, 17, 30, dst=3)
ref_time = ref_minute.as_timecode() ref_time = ref_minute.as_timecode()
test_am = ref_time.to_am_string(["0", "1", "2"]) test_am = ref_time.to_am_string(["0", "1", "2"])

View file

@ -127,11 +127,13 @@ class WWVBRoundtrip(unittest.TestCase):
while dt.year < 1993: while dt.year < 1993:
minute = wwvb.WWVBMinuteIERS.from_datetime(dt) minute = wwvb.WWVBMinuteIERS.from_datetime(dt)
assert minute is not None assert minute is not None
assert minute.year == dt.year
timecode = minute.as_timecode().am timecode = minute.as_timecode().am
assert timecode assert timecode
decoded_minute: wwvb.WWVBMinute | None = wwvb.WWVBMinuteIERS.from_timecode_am(minute.as_timecode()) decoded_minute: wwvb.WWVBMinute | None = wwvb.WWVBMinuteIERS.from_timecode_am(minute.as_timecode())
assert decoded_minute assert decoded_minute
decoded = decoded_minute.as_timecode().am decoded = decoded_minute.as_timecode().am
self.assertEqual(minute, decoded_minute)
self.assertEqual( self.assertEqual(
timecode, timecode,
decoded, decoded,
@ -261,12 +263,12 @@ class WWVBRoundtrip(unittest.TestCase):
def test_epoch(self) -> None: def test_epoch(self) -> None:
"""Test the 1970-to-2069 epoch""" """Test the 1970-to-2069 epoch"""
m = wwvb.WWVBMinute(69, 1, 1, 0, 0) m = wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0)
n = wwvb.WWVBMinute(2069, 1, 1, 0, 0) n = wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0)
self.assertEqual(m, n) self.assertEqual(m, n)
m = wwvb.WWVBMinute(70, 1, 1, 0, 0) m = wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0)
n = wwvb.WWVBMinute(1970, 1, 1, 0, 0) n = wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0)
self.assertEqual(m, n) self.assertEqual(m, n)
def test_fromstring(self) -> None: def test_fromstring(self) -> None:
@ -290,13 +292,13 @@ class WWVBRoundtrip(unittest.TestCase):
def test_exceptions(self) -> None: def test_exceptions(self) -> None:
"""Test some error detection""" """Test some error detection"""
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, dst=4) wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, dst=4)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, ut1=1) wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ut1=1)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
wwvb.WWVBMinute(2021, 1, 1, 1, ls=False) wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ls=False)
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
wwvb.WWVBMinute.fromstring("year=1998 days=365 hour=23 min=56 dst=0 ut1=-300 ly=0 ls=1 boo=1") wwvb.WWVBMinute.fromstring("year=1998 days=365 hour=23 min=56 dst=0 ut1=-300 ly=0 ls=1 boo=1")
@ -310,7 +312,7 @@ class WWVBRoundtrip(unittest.TestCase):
def test_undefined(self) -> None: def test_undefined(self) -> None:
"""Ensure that the check for unset elements in am works""" """Ensure that the check for unset elements in am works"""
with self.assertWarnsRegex(Warning, "is unset"): with self.assertWarnsRegex(Warning, "is unset"):
str(wwvb.WWVBTimecode(60)) str(wwvb.WWVBTimecode.make_empty(60))
def test_tz(self) -> None: def test_tz(self) -> None:
"""Get a little more coverage in the dst change functions""" """Get a little more coverage in the dst change functions"""
@ -365,35 +367,35 @@ class WWVBRoundtrip(unittest.TestCase):
def test_epoch2(self) -> None: def test_epoch2(self) -> None:
"""Test that the settable epoch feature works""" """Test that the settable epoch feature works"""
self.assertEqual(wwvb.WWVBMinute(0, 1, 1, 0, 0).year, 2000) self.assertEqual(wwvb.WWVBMinute.from_parts(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute(69, 1, 1, 0, 0).year, 2069) self.assertEqual(wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute(70, 1, 1, 0, 0).year, 1970) self.assertEqual(wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute(99, 1, 1, 0, 0).year, 1999) self.assertEqual(wwvb.WWVBMinute.from_parts(99, 1, 1, 0, 0).year, 1999)
# 4-digit years can always be used # 4-digit years can always be used
self.assertEqual(wwvb.WWVBMinute(2000, 1, 1, 0, 0).year, 2000) self.assertEqual(wwvb.WWVBMinute.from_parts(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(wwvb.WWVBMinute(2069, 1, 1, 0, 0).year, 2069) self.assertEqual(wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(wwvb.WWVBMinute(1970, 1, 1, 0, 0).year, 1970) self.assertEqual(wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(wwvb.WWVBMinute(1999, 1, 1, 0, 0).year, 1999) self.assertEqual(wwvb.WWVBMinute.from_parts(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(wwvb.WWVBMinute(1900, 1, 1, 0, 0).year, 1900) self.assertEqual(wwvb.WWVBMinute.from_parts(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(wwvb.WWVBMinute(1969, 1, 1, 0, 0).year, 1969) self.assertEqual(wwvb.WWVBMinute.from_parts(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(wwvb.WWVBMinute(2070, 1, 1, 0, 0).year, 2070) self.assertEqual(wwvb.WWVBMinute.from_parts(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(wwvb.WWVBMinute(2099, 1, 1, 0, 0).year, 2099) self.assertEqual(wwvb.WWVBMinute.from_parts(2099, 1, 1, 0, 0).year, 2099)
self.assertEqual(WWVBMinute2k(0, 1, 1, 0, 0).year, 2000) self.assertEqual(WWVBMinute2k.from_parts(0, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k(99, 1, 1, 0, 0).year, 2099) self.assertEqual(WWVBMinute2k.from_parts(99, 1, 1, 0, 0).year, 2099)
# 4-digit years can always be used # 4-digit years can always be used
self.assertEqual(WWVBMinute2k(2000, 1, 1, 0, 0).year, 2000) self.assertEqual(WWVBMinute2k.from_parts(2000, 1, 1, 0, 0).year, 2000)
self.assertEqual(WWVBMinute2k(2069, 1, 1, 0, 0).year, 2069) self.assertEqual(WWVBMinute2k.from_parts(2069, 1, 1, 0, 0).year, 2069)
self.assertEqual(WWVBMinute2k(1970, 1, 1, 0, 0).year, 1970) self.assertEqual(WWVBMinute2k.from_parts(1970, 1, 1, 0, 0).year, 1970)
self.assertEqual(WWVBMinute2k(1999, 1, 1, 0, 0).year, 1999) self.assertEqual(WWVBMinute2k.from_parts(1999, 1, 1, 0, 0).year, 1999)
self.assertEqual(WWVBMinute2k(1900, 1, 1, 0, 0).year, 1900) self.assertEqual(WWVBMinute2k.from_parts(1900, 1, 1, 0, 0).year, 1900)
self.assertEqual(WWVBMinute2k(1969, 1, 1, 0, 0).year, 1969) self.assertEqual(WWVBMinute2k.from_parts(1969, 1, 1, 0, 0).year, 1969)
self.assertEqual(WWVBMinute2k(2070, 1, 1, 0, 0).year, 2070) self.assertEqual(WWVBMinute2k.from_parts(2070, 1, 1, 0, 0).year, 2070)
self.assertEqual(WWVBMinute2k(2099, 1, 1, 0, 0).year, 2099) self.assertEqual(WWVBMinute2k.from_parts(2099, 1, 1, 0, 0).year, 2099)
if __name__ == "__main__": if __name__ == "__main__":