Add more TAI-wrangling
* leapseconddata.tai is a timezone object. BEWARE, because of reasons, it compares equal to datetime.datetime.utc! * tai_offset can take either kind of timestamp * to_tai, tai_to_utc: convert to/from the TAI timescale. * is_leap_second: return True if the given second is a leap second. In the UTC timescale, the ":59" second returns True even though "only" the second repetition of :59 is a leap second
This commit is contained in:
parent
2dddcdf282
commit
8007b02dcc
4 changed files with 128 additions and 9 deletions
4
.github/workflows/test.yml
vendored
4
.github/workflows/test.yml
vendored
|
|
@ -45,10 +45,10 @@ jobs:
|
|||
|
||||
- name: Check stubs
|
||||
if: (! startsWith(matrix.python-version, 'pypy-'))
|
||||
run: mypy --strict leapseconddata.py
|
||||
run: mypy --strict *.py
|
||||
|
||||
- name: Test
|
||||
run: python -mcoverage run --branch -m unittest leapseconddata_test.py && python -mcoverage report --fail-under=100 && python -mcoverage xml
|
||||
run: python -mcoverage run --branch -m unittest testleapseconddata.py && python -mcoverage report --fail-under=100 && python -mcoverage xml
|
||||
|
||||
- name: Upload Coverage to Codecov
|
||||
uses: codecov/codecov-action@v2
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -6,7 +6,7 @@ all: coverage mypy
|
|||
COVERAGE_INCLUDE=--omit '/usr/**/*.py'
|
||||
.PHONY: coverage
|
||||
coverage:
|
||||
$(PYTHON) -mcoverage run --branch -m unittest leapseconddata_test.py
|
||||
$(PYTHON) -mcoverage run --branch -m unittest testleapseconddata.py
|
||||
$(PYTHON) -mcoverage html $(COVERAGE_INCLUDE)
|
||||
$(PYTHON) -mcoverage report $(COVERAGE_INCLUDE) --fail-under=100
|
||||
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@ import re
|
|||
import urllib.request
|
||||
from typing import Union, List, Optional, NamedTuple, BinaryIO
|
||||
|
||||
tai = datetime.timezone(datetime.timedelta(0), "TAI")
|
||||
|
||||
NTP_EPOCH = datetime.datetime(1900, 1, 1, tzinfo=datetime.timezone.utc)
|
||||
|
||||
LeapSecondInfo = NamedTuple(
|
||||
|
|
@ -83,27 +85,80 @@ class LeapSecondData(_LeapSecondData):
|
|||
"""Return True if the data is valid at given datetime (or the current moment, if None is passed)"""
|
||||
return self._check_validity(when) is None
|
||||
|
||||
@staticmethod
|
||||
def _utc_datetime(when: datetime.datetime) -> datetime.datetime:
|
||||
if when.tzinfo is not None and when.tzinfo is not datetime.timezone.utc:
|
||||
when = when.astimezone(datetime.timezone.utc)
|
||||
return when
|
||||
|
||||
def tai_offset(
|
||||
self, when: datetime.datetime, check_validity: bool = True
|
||||
) -> datetime.timedelta:
|
||||
"""For a given datetime in UTC, return the TAI-UTC offset
|
||||
"""For a given datetime, return the TAI-UTC offset
|
||||
|
||||
For times before the first leap second, a zero offset is returned.
|
||||
For times after the end of the file's validity, an exception is raised
|
||||
unless `check_validity=False` is passed. In this case, it will return
|
||||
the offset of last list entry."""
|
||||
|
||||
is_tai = when.tzinfo is tai
|
||||
if not is_tai:
|
||||
when = self._utc_datetime(when)
|
||||
if check_validity:
|
||||
message = self._check_validity(when)
|
||||
if message is not None:
|
||||
raise ValidityError(message)
|
||||
|
||||
if not self.leap_seconds:
|
||||
return datetime.timedelta(0)
|
||||
|
||||
old_tai = datetime.timedelta()
|
||||
for start, tai in self.leap_seconds:
|
||||
for start, tai_offset in self.leap_seconds:
|
||||
if is_tai:
|
||||
start += tai_offset - datetime.timedelta(seconds=1)
|
||||
if when < start:
|
||||
return old_tai
|
||||
old_tai = tai
|
||||
old_tai = tai_offset
|
||||
return self.leap_seconds[-1].tai
|
||||
|
||||
def to_tai(
|
||||
self, when: datetime.datetime, check_validity: bool = True
|
||||
) -> datetime.datetime:
|
||||
"""Convert the given datetime object to TAI"""
|
||||
if when.tzinfo is tai:
|
||||
return when
|
||||
when = self._utc_datetime(when)
|
||||
return (when + self.tai_offset(when, check_validity)).replace(tzinfo=tai)
|
||||
|
||||
def tai_to_utc(
|
||||
self, when: datetime.datetime, check_validity: bool = True
|
||||
) -> datetime.datetime:
|
||||
"""Convert the given datetime object (which is assumed to be in TAI) to UTC"""
|
||||
if when.tzinfo is not None and when.tzinfo is not tai:
|
||||
raise ValueError("Input timestamp is not TAI or naive")
|
||||
if when.tzinfo is None:
|
||||
when = when.replace(tzinfo=tai)
|
||||
result = (when - self.tai_offset(when, check_validity)).replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
return result
|
||||
|
||||
def is_leap_second(
|
||||
self, when: datetime.datetime, check_validity: bool = True
|
||||
) -> bool:
|
||||
"""Return True if the given timestamp is the leap second.
|
||||
|
||||
For a TAI timestamp, it returns True for the leap second (the one that
|
||||
would be shown as :60 in UTC). For a UTC timestamp, it returns True
|
||||
for the :59 second, since the :60 second cannot be represented."""
|
||||
if when.tzinfo is not tai:
|
||||
when = self.to_tai(when, check_validity) + datetime.timedelta(seconds=1)
|
||||
tai_offset1 = self.tai_offset(when, check_validity)
|
||||
tai_offset2 = self.tai_offset(
|
||||
when - datetime.timedelta(seconds=1), check_validity
|
||||
)
|
||||
return tai_offset1 != tai_offset2
|
||||
|
||||
@classmethod
|
||||
def from_standard_source(
|
||||
cls, when: Optional[datetime.datetime] = None
|
||||
|
|
@ -218,8 +273,8 @@ class LeapSecondData(_LeapSecondData):
|
|||
hasher.update(parts[1])
|
||||
|
||||
when = _from_ntp_epoch(int(parts[0]))
|
||||
tai = datetime.timedelta(seconds=int(parts[1]))
|
||||
leap_seconds.append(LeapSecondInfo(when, tai))
|
||||
tai_offset = datetime.timedelta(seconds=int(parts[1]))
|
||||
leap_seconds.append(LeapSecondInfo(when, tai_offset))
|
||||
|
||||
if check_hash:
|
||||
if content_hash is None:
|
||||
|
|
@ -244,6 +299,25 @@ def main() -> None:
|
|||
when = datetime.datetime(2011, 1, 1, tzinfo=datetime.timezone.utc)
|
||||
print(f"TAI-UTC on {when:%Y-%m-%d} was {lsd.tai_offset(when).total_seconds()}")
|
||||
|
||||
when_tai = lsd.to_tai(when)
|
||||
when_rt = lsd.tai_to_utc(when_tai)
|
||||
print(f"{when:%Y-%m-%d %H:%M:%S} UTC -> {when_tai:%Y-%m-%d %H:%M:%S} TAI")
|
||||
print(f"{when_tai:%Y-%m-%d %H:%M:%S} TAI -> {when_rt:%Y-%m-%d %H:%M:%S} UTC")
|
||||
print(f"is leap second? {lsd.is_leap_second(when)}")
|
||||
|
||||
u = datetime.datetime(
|
||||
1999, 1, 1, tzinfo=datetime.timezone.utc
|
||||
) - datetime.timedelta(seconds=2)
|
||||
t = lsd.to_tai(u)
|
||||
|
||||
print("replaying leapsecond at end of 1998")
|
||||
for _ in range(5):
|
||||
print(
|
||||
f"{u:%Y-%m-%d %H:%M:%S} UTC {'LS' if lsd.is_leap_second(u) else ' '} = {t:%Y-%m-%d %H:%M:%S} TAI {'LS' if lsd.is_leap_second(t) else ' '}"
|
||||
)
|
||||
t += datetime.timedelta(seconds=1)
|
||||
u = lsd.tai_to_utc(t)
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma no cover
|
||||
main()
|
||||
|
|
|
|||
|
|
@ -5,13 +5,15 @@
|
|||
# SPDX-License-Identifier: GPL-3.0-only
|
||||
|
||||
"""Test most leapseconddata functionality"""
|
||||
# pylint: disable=missing-class-docstring,missing-function-docstring
|
||||
# pylint: disable=missing-class-docstring,missing-function-docstring,no-self-use
|
||||
import datetime
|
||||
import unittest
|
||||
import leapseconddata
|
||||
|
||||
db = leapseconddata.LeapSecondData.from_standard_source()
|
||||
|
||||
GMT1 = datetime.timezone(datetime.timedelta(seconds=3600), "GMT1")
|
||||
|
||||
|
||||
class LeapSecondDataTest(unittest.TestCase):
|
||||
def test_main(self) -> None: # pylint: disable=no-self-use
|
||||
|
|
@ -58,6 +60,49 @@ class LeapSecondDataTest(unittest.TestCase):
|
|||
db1.tai_offset(valid_until, False), datetime.timedelta(seconds=1)
|
||||
)
|
||||
|
||||
when = datetime.datetime(
|
||||
1999, 1, 1, tzinfo=datetime.timezone.utc
|
||||
) - datetime.timedelta(seconds=1)
|
||||
assert when.tzinfo is not None
|
||||
print(f"assertRaises {when=}")
|
||||
self.assertRaises(ValueError, db.tai_to_utc, when)
|
||||
|
||||
def test_empty(self) -> None:
|
||||
db1 = leapseconddata.LeapSecondData([])
|
||||
self.assertEqual(
|
||||
db1.tai_offset(datetime.datetime(2020, 1, 1), False),
|
||||
datetime.timedelta(seconds=0),
|
||||
)
|
||||
|
||||
def test_tz(self) -> None:
|
||||
when = datetime.datetime(
|
||||
1999, 1, 1, tzinfo=datetime.timezone.utc
|
||||
) - datetime.timedelta(seconds=1)
|
||||
self.assertTrue(db.is_leap_second(when))
|
||||
self.assertFalse(db.is_leap_second(when - datetime.timedelta(seconds=1)))
|
||||
self.assertFalse(db.is_leap_second(when + datetime.timedelta(seconds=1)))
|
||||
|
||||
when = when.astimezone(GMT1)
|
||||
self.assertTrue(db.is_leap_second(when))
|
||||
self.assertFalse(db.is_leap_second(when - datetime.timedelta(seconds=1)))
|
||||
self.assertFalse(db.is_leap_second(when + datetime.timedelta(seconds=1)))
|
||||
|
||||
when_tai = datetime.datetime(1999, 1, 1, 0, 0, 32)
|
||||
when_utc = db.tai_to_utc(when_tai)
|
||||
self.assertIs(when_utc.tzinfo, datetime.timezone.utc)
|
||||
print(when_utc)
|
||||
|
||||
def test_to_tai(self) -> None:
|
||||
when = datetime.datetime(
|
||||
1999, 1, 1, tzinfo=datetime.timezone.utc
|
||||
) - datetime.timedelta(seconds=1)
|
||||
when_tai = db.to_tai(when)
|
||||
when_tai2 = db.to_tai(when_tai)
|
||||
assert when != when_tai
|
||||
assert when_tai == when_tai2
|
||||
assert when_tai.tzinfo is leapseconddata.tai
|
||||
assert when_tai2.tzinfo is leapseconddata.tai
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
unittest.main()
|
||||
Loading…
Reference in a new issue