disentangle typing problems from using dataclasses
.. and drop pre-3.9 compatibility (depend on str.removeprefix)
This commit is contained in:
parent
bf9a98e8e8
commit
14361a925a
5 changed files with 87 additions and 89 deletions
|
|
@ -12,7 +12,7 @@ import datetime
|
|||
import enum
|
||||
import json
|
||||
import warnings
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, TextIO, TypeVar
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Self, TextIO
|
||||
|
||||
from . import iersdata
|
||||
from .tz import Mountain
|
||||
|
|
@ -22,19 +22,6 @@ if TYPE_CHECKING:
|
|||
|
||||
HOUR = datetime.timedelta(seconds=3600)
|
||||
SECOND = datetime.timedelta(seconds=1)
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
def _require(x: T | None) -> T:
|
||||
"""Check an Optional item is not None."""
|
||||
assert x is not None
|
||||
return x
|
||||
|
||||
|
||||
def _removeprefix(s: str, p: str) -> str:
|
||||
if s.startswith(p):
|
||||
return s[len(p) :]
|
||||
return s
|
||||
|
||||
|
||||
def _date(dt: datetime.date) -> datetime.date:
|
||||
|
|
@ -324,7 +311,7 @@ _dst_ls_lut = [
|
|||
]
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
@dataclasses.dataclass(frozen=True)
|
||||
class WWVBMinute:
|
||||
"""Uniquely identifies a minute of time in the WWVB system.
|
||||
|
||||
|
|
@ -343,33 +330,52 @@ class WWVBMinute:
|
|||
min: int
|
||||
"""Minute of hour"""
|
||||
|
||||
dst: int | None = None
|
||||
dst: int
|
||||
"""2-bit DST code """
|
||||
|
||||
ut1: int | None = None
|
||||
ut1: int
|
||||
"""UT1 offset in units of 100ms, range -900 to +900ms"""
|
||||
|
||||
ls: bool | None = None
|
||||
ls: bool
|
||||
"""Leap second warning flag"""
|
||||
|
||||
ly: bool | None = None
|
||||
ly: bool
|
||||
"""Leap year flag"""
|
||||
|
||||
epoch: ClassVar[int] = 1970
|
||||
|
||||
def __post_init__(self):
|
||||
"""Fill the optional members if not otherwise set"""
|
||||
if self.dst is None:
|
||||
self.dst = self.get_dst(self.year, self.days)
|
||||
if self.dst not in (0, 1, 2, 3):
|
||||
@classmethod
|
||||
def from_parts(
|
||||
cls,
|
||||
year: int,
|
||||
days: int,
|
||||
hour: int,
|
||||
minute: int,
|
||||
dst: int | None = None,
|
||||
ut1: int | None = None,
|
||||
ls: bool | None = None,
|
||||
ly: bool | None = None,
|
||||
) -> Self:
|
||||
"""Create a WWVBMinute from parts
|
||||
|
||||
The constructor requires all parts are supplied. This classmethod can
|
||||
determine the any or all of the `dst`, `ut1`, `ls`, and `ly` properties
|
||||
based on class heuristics (except that either `ut1` and `ls` must both
|
||||
be specified, or neither one may be specified)
|
||||
"""
|
||||
if dst is None:
|
||||
dst = cls.get_dst(year, days)
|
||||
if dst not in (0, 1, 2, 3):
|
||||
raise ValueError("dst value should be 0..3")
|
||||
if self.ut1 is None and self.ls is None:
|
||||
self.ut1, self.ls = self._get_dut1_info(self.year, self.days)
|
||||
elif self.ut1 is None or self.ls is None:
|
||||
if ut1 is None and ls is None:
|
||||
ut1, ls = cls._get_dut1_info(year, days)
|
||||
elif ut1 is None or ls is None:
|
||||
raise ValueError("sepecify both ut1 and ls or neither one")
|
||||
self.year = self.full_year(self.year)
|
||||
if self.ly is None:
|
||||
self.ly = isly(self.year)
|
||||
year = cls.full_year(year)
|
||||
if ly is None:
|
||||
ly = isly(year)
|
||||
|
||||
return cls(year, days, hour, minute, dst, ut1, ls, ly)
|
||||
|
||||
@classmethod
|
||||
def full_year(cls, year: int) -> int:
|
||||
|
|
@ -403,8 +409,8 @@ class WWVBMinute:
|
|||
"""Implement str()"""
|
||||
return (
|
||||
f"year={self.year:4d} days={self.days:03d} hour={self.hour:02d} "
|
||||
f"min={self.min:02d} dst={self.dst} ut1={self.ut1} ly={int(self.ly)} "
|
||||
f"ls={int(self.ls)}"
|
||||
f"min={self.min:02d} dst={+self.dst} ut1={self.ut1} ly={+self.ly} "
|
||||
f"ls={+self.ls}"
|
||||
)
|
||||
|
||||
def as_datetime_utc(self) -> datetime.datetime:
|
||||
|
|
@ -461,7 +467,7 @@ class WWVBMinute:
|
|||
|
||||
def as_timecode(self) -> WWVBTimecode:
|
||||
"""Fill a WWVBTimecode structure representing this minute. Fills both the amplitude and phase codes."""
|
||||
t = WWVBTimecode(self.minute_length())
|
||||
t = WWVBTimecode.make_empty(self.minute_length())
|
||||
|
||||
self._fill_am_timecode(t)
|
||||
self._fill_pm_timecode(t)
|
||||
|
|
@ -632,7 +638,7 @@ class WWVBMinute:
|
|||
@classmethod
|
||||
def fromstring(cls, s: str) -> WWVBMinute:
|
||||
"""Construct a WWVBMinute from a string representation created by print_timecodes"""
|
||||
s = _removeprefix(s, "WWVB timecode: ")
|
||||
s = s.removeprefix("WWVB timecode: ")
|
||||
d: dict[str, int] = {}
|
||||
for part in s.split():
|
||||
k, v = part.split("=")
|
||||
|
|
@ -645,11 +651,11 @@ class WWVBMinute:
|
|||
minute = d.pop("minute")
|
||||
dst: int | None = d.pop("dst", None)
|
||||
ut1: int | None = d.pop("ut1", None)
|
||||
ls = d.pop("ls", None)
|
||||
ls: int | None = d.pop("ls", None)
|
||||
d.pop("ly", None)
|
||||
if d:
|
||||
raise ValueError(f"Invalid options: {d}")
|
||||
return cls(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls))
|
||||
return cls.from_parts(year, days, hour, minute, dst, ut1, None if ls is None else bool(ls), isly(year))
|
||||
|
||||
@classmethod
|
||||
def from_datetime(
|
||||
|
|
@ -663,7 +669,7 @@ class WWVBMinute:
|
|||
u = d.utctimetuple()
|
||||
if newls is None and newut1 is None:
|
||||
newut1, newls = cls._get_dut1_info(u.tm_year, u.tm_yday, old_time)
|
||||
return cls(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls)
|
||||
return cls.from_parts(u.tm_year, u.tm_yday, u.tm_hour, u.tm_min, ut1=newut1, ls=newls)
|
||||
|
||||
@classmethod
|
||||
def from_timecode_am(cls, t: WWVBTimecode) -> WWVBMinute | None:
|
||||
|
|
@ -700,8 +706,8 @@ class WWVBMinute:
|
|||
if days > 366 or (not ly and days > 365):
|
||||
return None
|
||||
ls = bool(t.am[56])
|
||||
dst = _require(t._get_am_bcd(57, 58))
|
||||
return cls(year, days, hour, minute, dst, ut1, ls, ly)
|
||||
dst = t._get_am_bcd(57, 58) or 0
|
||||
return cls.from_parts(year, days, hour, minute, dst, ut1, ls, ly)
|
||||
|
||||
|
||||
class WWVBMinuteIERS(WWVBMinute):
|
||||
|
|
@ -745,21 +751,16 @@ class PhaseModulation(enum.IntEnum):
|
|||
class WWVBTimecode:
|
||||
"""Represent the amplitude and/or phase signal, usually over 1 minute"""
|
||||
|
||||
sz: dataclasses.InitVar[int] = 60
|
||||
am: list[AmplitudeModulation] | None = None
|
||||
phase: list[PhaseModulation] | None = None
|
||||
|
||||
am: list[AmplitudeModulation]
|
||||
"""The amplitude modulation data"""
|
||||
|
||||
phase: list[PhaseModulation]
|
||||
"""The phase modulation data"""
|
||||
|
||||
def __post_init__(self, sz):
|
||||
"""Fill the am & phase members if not otherwise set"""
|
||||
if self.am is None:
|
||||
self.am = [AmplitudeModulation.UNSET] * sz
|
||||
if self.phase is None:
|
||||
self.phase = [PhaseModulation.UNSET] * sz
|
||||
@classmethod
|
||||
def make_empty(cls, sz: int) -> Self:
|
||||
"""Provide an empty timecode of the given length"""
|
||||
return cls([AmplitudeModulation.UNSET] * sz, [PhaseModulation.UNSET] * sz)
|
||||
|
||||
def _get_am_bcd(self, *poslist: int) -> int | None:
|
||||
"""Convert AM data to BCD
|
||||
|
|
|
|||
|
|
@ -65,7 +65,7 @@ def wwvbreceive() -> Generator[wwvb.WWVBTimecode | None, wwvb.AmplitudeModulatio
|
|||
state = 1
|
||||
elif len(minute) == 60:
|
||||
# print("FULL MINUTE")
|
||||
tc = wwvb.WWVBTimecode(60)
|
||||
tc = wwvb.WWVBTimecode.make_empty(60)
|
||||
tc.am[:] = minute
|
||||
minute = []
|
||||
state = 2
|
||||
|
|
|
|||
|
|
@ -66,12 +66,12 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
|
|||
if min_size is None:
|
||||
min_size = size
|
||||
|
||||
def deadline_ms(deadline: float) -> int:
|
||||
def deadline_ms(deadline: datetime.datetime) -> int:
|
||||
"""Compute the number of ms until a deadline"""
|
||||
now = datetime.datetime.now(datetime.timezone.utc)
|
||||
return int(max(0, (deadline - now).total_seconds()) * 1000)
|
||||
|
||||
def wwvbtick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]:
|
||||
def wwvbtick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
|
||||
"""Yield consecutive values of the WWVB amplitude signal, going from minute to minute"""
|
||||
timestamp = datetime.datetime.now(datetime.timezone.utc).replace(second=0, microsecond=0)
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ def main(colors: list[str], size: int, min_size: int | None) -> None: # noqa: P
|
|||
yield timestamp + datetime.timedelta(seconds=i), code
|
||||
timestamp = timestamp + datetime.timedelta(seconds=60)
|
||||
|
||||
def wwvbsmarttick() -> Generator[tuple[float, wwvb.AmplitudeModulation], None, None]:
|
||||
def wwvbsmarttick() -> Generator[tuple[datetime.datetime, wwvb.AmplitudeModulation], None, None]:
|
||||
"""Yield consecutive values of the WWVB amplitude signal
|
||||
|
||||
.. but deal with time progressing unexpectedly, such as when the
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class TestPhaseModulation(unittest.TestCase):
|
|||
|
||||
ref_pm = "001110110100010010000011001000011000110100110100010110110110"
|
||||
|
||||
ref_minute = wwvb.WWVBMinuteIERS(2012, 186, 17, 30, dst=3)
|
||||
ref_minute = wwvb.WWVBMinuteIERS.from_parts(2012, 186, 17, 30, dst=3)
|
||||
ref_time = ref_minute.as_timecode()
|
||||
|
||||
test_am = ref_time.to_am_string(["0", "1", "2"])
|
||||
|
|
|
|||
|
|
@ -127,11 +127,13 @@ class WWVBRoundtrip(unittest.TestCase):
|
|||
while dt.year < 1993:
|
||||
minute = wwvb.WWVBMinuteIERS.from_datetime(dt)
|
||||
assert minute is not None
|
||||
assert minute.year == dt.year
|
||||
timecode = minute.as_timecode().am
|
||||
assert timecode
|
||||
decoded_minute: wwvb.WWVBMinute | None = wwvb.WWVBMinuteIERS.from_timecode_am(minute.as_timecode())
|
||||
assert decoded_minute
|
||||
decoded = decoded_minute.as_timecode().am
|
||||
self.assertEqual(minute, decoded_minute)
|
||||
self.assertEqual(
|
||||
timecode,
|
||||
decoded,
|
||||
|
|
@ -261,12 +263,12 @@ class WWVBRoundtrip(unittest.TestCase):
|
|||
|
||||
def test_epoch(self) -> None:
|
||||
"""Test the 1970-to-2069 epoch"""
|
||||
m = wwvb.WWVBMinute(69, 1, 1, 0, 0)
|
||||
n = wwvb.WWVBMinute(2069, 1, 1, 0, 0)
|
||||
m = wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0)
|
||||
n = wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0)
|
||||
self.assertEqual(m, n)
|
||||
|
||||
m = wwvb.WWVBMinute(70, 1, 1, 0, 0)
|
||||
n = wwvb.WWVBMinute(1970, 1, 1, 0, 0)
|
||||
m = wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0)
|
||||
n = wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0)
|
||||
self.assertEqual(m, n)
|
||||
|
||||
def test_fromstring(self) -> None:
|
||||
|
|
@ -290,13 +292,13 @@ class WWVBRoundtrip(unittest.TestCase):
|
|||
def test_exceptions(self) -> None:
|
||||
"""Test some error detection"""
|
||||
with self.assertRaises(ValueError):
|
||||
wwvb.WWVBMinute(2021, 1, 1, 1, dst=4)
|
||||
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, dst=4)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
wwvb.WWVBMinute(2021, 1, 1, 1, ut1=1)
|
||||
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ut1=1)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
wwvb.WWVBMinute(2021, 1, 1, 1, ls=False)
|
||||
wwvb.WWVBMinute.from_parts(2021, 1, 1, 1, ls=False)
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
wwvb.WWVBMinute.fromstring("year=1998 days=365 hour=23 min=56 dst=0 ut1=-300 ly=0 ls=1 boo=1")
|
||||
|
|
@ -310,7 +312,7 @@ class WWVBRoundtrip(unittest.TestCase):
|
|||
def test_undefined(self) -> None:
|
||||
"""Ensure that the check for unset elements in am works"""
|
||||
with self.assertWarnsRegex(Warning, "is unset"):
|
||||
str(wwvb.WWVBTimecode(60))
|
||||
str(wwvb.WWVBTimecode.make_empty(60))
|
||||
|
||||
def test_tz(self) -> None:
|
||||
"""Get a little more coverage in the dst change functions"""
|
||||
|
|
@ -365,40 +367,35 @@ class WWVBRoundtrip(unittest.TestCase):
|
|||
|
||||
def test_epoch2(self) -> None:
|
||||
"""Test that the settable epoch feature works"""
|
||||
self.assertEqual(wwvb.WWVBMinute(0, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(wwvb.WWVBMinute(69, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(wwvb.WWVBMinute(70, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(wwvb.WWVBMinute(99, 1, 1, 0, 0).year, 1999)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(0, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(69, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(70, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(99, 1, 1, 0, 0).year, 1999)
|
||||
|
||||
# 4-digit years can always be used
|
||||
self.assertEqual(wwvb.WWVBMinute(2000, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(wwvb.WWVBMinute(2069, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(wwvb.WWVBMinute(1970, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(wwvb.WWVBMinute(1999, 1, 1, 0, 0).year, 1999)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(2000, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(2069, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(1970, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(1999, 1, 1, 0, 0).year, 1999)
|
||||
|
||||
self.assertEqual(wwvb.WWVBMinute(1900, 1, 1, 0, 0).year, 1900)
|
||||
self.assertEqual(wwvb.WWVBMinute(1969, 1, 1, 0, 0).year, 1969)
|
||||
self.assertEqual(wwvb.WWVBMinute(2070, 1, 1, 0, 0).year, 2070)
|
||||
self.assertEqual(wwvb.WWVBMinute(2099, 1, 1, 0, 0).year, 2099)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(1900, 1, 1, 0, 0).year, 1900)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(1969, 1, 1, 0, 0).year, 1969)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(2070, 1, 1, 0, 0).year, 2070)
|
||||
self.assertEqual(wwvb.WWVBMinute.from_parts(2099, 1, 1, 0, 0).year, 2099)
|
||||
|
||||
self.assertEqual(WWVBMinute2k(0, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(WWVBMinute2k(99, 1, 1, 0, 0).year, 2099)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(0, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(99, 1, 1, 0, 0).year, 2099)
|
||||
|
||||
# 4-digit years can always be used
|
||||
self.assertEqual(WWVBMinute2k(2000, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(WWVBMinute2k(2069, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(WWVBMinute2k(1970, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(WWVBMinute2k(1999, 1, 1, 0, 0).year, 1999)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(2000, 1, 1, 0, 0).year, 2000)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(2069, 1, 1, 0, 0).year, 2069)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(1970, 1, 1, 0, 0).year, 1970)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(1999, 1, 1, 0, 0).year, 1999)
|
||||
|
||||
self.assertEqual(WWVBMinute2k(1900, 1, 1, 0, 0).year, 1900)
|
||||
self.assertEqual(WWVBMinute2k(1969, 1, 1, 0, 0).year, 1969)
|
||||
self.assertEqual(WWVBMinute2k(2070, 1, 1, 0, 0).year, 2070)
|
||||
self.assertEqual(WWVBMinute2k(2099, 1, 1, 0, 0).year, 2099)
|
||||
|
||||
def test_cover_construct(self) -> None:
|
||||
"""Ensure coverage of some unusual code paths in WWVBTimecode"""
|
||||
assert wwvb.WWVBTimecode(am=[]).am == []
|
||||
assert wwvb.WWVBTimecode(phase=[]).phase == []
|
||||
self.assertEqual(WWVBMinute2k.from_parts(1900, 1, 1, 0, 0).year, 1900)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(1969, 1, 1, 0, 0).year, 1969)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(2070, 1, 1, 0, 0).year, 2070)
|
||||
self.assertEqual(WWVBMinute2k.from_parts(2099, 1, 1, 0, 0).year, 2099)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
Loading…
Reference in a new issue