Compare commits
6 commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b7ae9ccca6 | |||
| d8480e67d5 | |||
| f0dfb35101 | |||
| c659bd5b17 | |||
| 940945970f | |||
| 49b8e31c5f |
3 changed files with 65 additions and 32 deletions
|
|
@ -29,6 +29,7 @@ import logging
|
|||
import pathlib
|
||||
import re
|
||||
import urllib.request
|
||||
import warnings
|
||||
from dataclasses import dataclass, field
|
||||
from typing import TYPE_CHECKING, BinaryIO, ClassVar
|
||||
|
||||
|
|
@ -71,7 +72,7 @@ def _from_ntp_epoch(value: int) -> datetime.datetime:
|
|||
|
||||
def datetime_is_tai(when: datetime.datetime) -> bool:
|
||||
"""Return true if the datetime is in the TAI timescale"""
|
||||
return when.tzname() == "TAI"
|
||||
return when.tzinfo is tai
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
|
|
@ -155,7 +156,10 @@ class LeapSecondData:
|
|||
|
||||
@staticmethod
|
||||
def _utc_datetime(when: datetime.datetime) -> datetime.datetime:
|
||||
if when.tzinfo is not None and when.tzinfo is not datetime.timezone.utc:
|
||||
if when.tzinfo is None:
|
||||
warnings.warn("Use of naive datetime objects is deprecated", DeprecationWarning, stacklevel=2)
|
||||
when = when.replace(tzinfo=datetime.timezone.utc)
|
||||
elif when.tzinfo is not datetime.timezone.utc:
|
||||
when = when.astimezone(datetime.timezone.utc)
|
||||
return when
|
||||
|
||||
|
|
@ -194,14 +198,18 @@ class LeapSecondData:
|
|||
def to_tai(self, when: datetime.datetime, *, check_validity: bool = True) -> datetime.datetime:
|
||||
"""Convert the given datetime object to TAI.
|
||||
|
||||
:param when: Moment in time to convert. If naive, it is assumed to be in UTC.
|
||||
:param check_validity: Check whether the database is valid for the given moment
|
||||
A TAI timestamp is returned unchanged.
|
||||
|
||||
Naive timestamps are assumed to be UTC. A TAI timestamp is returned unchanged.
|
||||
A naive timestamp is assumed to be UTC. This behavior is deprecated, and a future
|
||||
release will raise an exception when ``when`` is naive.
|
||||
|
||||
:param when: Moment in time to convert.
|
||||
:param check_validity: Check whether the database is valid for the given moment
|
||||
"""
|
||||
if datetime_is_tai(when):
|
||||
return when
|
||||
when = self._utc_datetime(when)
|
||||
assert when.tzinfo is not None
|
||||
return (when + self.tai_offset(when, check_validity=check_validity)).replace(tzinfo=tai)
|
||||
|
||||
def tai_to_utc(self, when: datetime.datetime, *, check_validity: bool = True) -> datetime.datetime:
|
||||
|
|
@ -209,12 +217,16 @@ class LeapSecondData:
|
|||
|
||||
For a leap second, the ``fold`` property of the returned time is True.
|
||||
|
||||
:param when: Moment in time to convert. If not naive, its ``tzinfo`` must be `tai`.
|
||||
A naive timestamp is assumed to be TAI. This behavior is deprecated, and a future
|
||||
release will raise an exception when ``when`` is naive.
|
||||
|
||||
:param when: Moment in time to convert. Its ``tzinfo`` must be `tai`.
|
||||
:param check_validity: Check whether the database is valid for the given moment
|
||||
"""
|
||||
if when.tzinfo is not None and when.tzinfo is not tai:
|
||||
raise ValueError("Input timestamp is not TAI or naive")
|
||||
if when.tzinfo is None:
|
||||
warnings.warn("Use of naive datetime objects is deprecated", DeprecationWarning, stacklevel=1)
|
||||
when = when.replace(tzinfo=tai)
|
||||
result = (when - self.tai_offset(when, check_validity=check_validity)).replace(tzinfo=datetime.timezone.utc)
|
||||
if self.is_leap_second(when, check_validity=check_validity):
|
||||
|
|
@ -224,7 +236,10 @@ class LeapSecondData:
|
|||
def is_leap_second(self, when: datetime.datetime, *, check_validity: bool = True) -> bool:
|
||||
"""Return True if the given timestamp is the leap second.
|
||||
|
||||
:param when: Moment in time to check. If naive, it is assumed to be in UTC.
|
||||
A naive timestamp is assumed to be UTC. This behavior is deprecated, and a future
|
||||
release will raise an exception when ``when`` is naive.
|
||||
|
||||
:param when: Moment in time to check.
|
||||
:param check_validity: Check whether the database is valid for the given moment
|
||||
|
||||
For a TAI timestamp, it returns True for the leap second (the one that
|
||||
|
|
@ -245,6 +260,7 @@ class LeapSecondData:
|
|||
*,
|
||||
check_hash: bool = True,
|
||||
custom_sources: Sequence[str] = (),
|
||||
timeout: float | None = 60,
|
||||
) -> LeapSecondData:
|
||||
"""Get the list of leap seconds from a standard source.
|
||||
|
||||
|
|
@ -261,7 +277,7 @@ class LeapSecondData:
|
|||
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
|
||||
logging.debug("Trying leap second data from %s", location)
|
||||
try:
|
||||
candidate = cls.from_url(location, check_hash=check_hash)
|
||||
candidate = cls.from_url(location, check_hash=check_hash, timeout=timeout)
|
||||
except InvalidHashError:
|
||||
logging.warning("Invalid hash while reading %s", location)
|
||||
continue
|
||||
|
|
@ -273,7 +289,7 @@ class LeapSecondData:
|
|||
if candidate.valid(when):
|
||||
logging.info("Using leap second data from %s", location)
|
||||
return candidate
|
||||
logging.warning("Validity expired for %s", location)
|
||||
logging.warning(f"Validity expired for {location} at {candidate.valid_until} (checking validity at {when})")
|
||||
|
||||
raise ValidityError("No valid leap-second.list file could be found")
|
||||
|
||||
|
|
@ -299,6 +315,7 @@ class LeapSecondData:
|
|||
url: str,
|
||||
*,
|
||||
check_hash: bool = True,
|
||||
timeout: float | None = 60,
|
||||
) -> LeapSecondData | None:
|
||||
"""Retrieve the leap second list from a local file
|
||||
|
||||
|
|
@ -306,7 +323,7 @@ class LeapSecondData:
|
|||
:param check_hash: Whether to check the embedded hash
|
||||
"""
|
||||
try:
|
||||
with urllib.request.urlopen(url) as open_file:
|
||||
with urllib.request.urlopen(url, timeout=timeout) as open_file:
|
||||
return cls.from_open_file(open_file, check_hash=check_hash)
|
||||
except urllib.error.URLError: # pragma no cover
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -166,27 +166,36 @@ def table(ctx: click.Context, *, start: datetime.datetime, end: datetime.datetim
|
|||
|
||||
|
||||
@cli.command
|
||||
def sources() -> None:
|
||||
"""Print information about leap-second.list data sources"""
|
||||
@click.option("--timeout", type=float, default=12, metavar="[SECS]")
|
||||
@click.argument("urls", type=str, nargs=-1)
|
||||
def sources(*, timeout: float, urls: list[str]) -> None:
|
||||
"""Print information about leap-second.list data sources
|
||||
|
||||
If no URLs are specified, print information about all standard sources.
|
||||
If one or more URLs are specified, check them instead.
|
||||
"""
|
||||
first = True
|
||||
for location in LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources:
|
||||
locations = urls if urls else LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources
|
||||
for location in locations:
|
||||
if not first:
|
||||
print()
|
||||
first = False
|
||||
print(f"{location}:")
|
||||
try:
|
||||
leap_second_data = LeapSecondData.from_url(location, check_hash=True)
|
||||
except InvalidHashError: # pragma no coverage
|
||||
print(f"{location}: Invalid hash")
|
||||
leap_second_data = LeapSecondData.from_url(location, check_hash=False)
|
||||
except Exception as e: # pragma no coverage # noqa: BLE001
|
||||
print(f"{location}: {e}")
|
||||
leap_second_data = LeapSecondData.from_url(location, check_hash=True, timeout=timeout)
|
||||
except InvalidHashError as e:
|
||||
print(f" {e}")
|
||||
leap_second_data = LeapSecondData.from_url(location, check_hash=False, timeout=timeout)
|
||||
except Exception as e: # noqa: BLE001
|
||||
print(f" {e}")
|
||||
leap_second_data = None
|
||||
|
||||
if leap_second_data is not None:
|
||||
print(f"{location}: Last updated {leap_second_data.last_updated}")
|
||||
print(f"{location}: Valid until {leap_second_data.valid_until}")
|
||||
print(f" Last updated {leap_second_data.last_updated}")
|
||||
print(f" Valid until {leap_second_data.valid_until}")
|
||||
print(f" {len(leap_second_data.leap_seconds)} leap seconds")
|
||||
else:
|
||||
print(f"{location}: Could not be read")
|
||||
print(" Could not be read")
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma no cover
|
||||
|
|
|
|||
|
|
@ -18,10 +18,16 @@ import unittest
|
|||
import leapseconddata
|
||||
import leapseconddata.__main__
|
||||
|
||||
db = leapseconddata.LeapSecondData.from_standard_source()
|
||||
db = leapseconddata.LeapSecondData.from_standard_source(timeout=8)
|
||||
|
||||
GMT1 = datetime.timezone(datetime.timedelta(seconds=3600), "GMT1")
|
||||
|
||||
bad_sources = [
|
||||
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
|
||||
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
|
||||
"file:///doesnotexist",
|
||||
]
|
||||
|
||||
|
||||
class LeapSecondDataTest(unittest.TestCase):
|
||||
def run_main(self, *args: str) -> None:
|
||||
|
|
@ -45,7 +51,8 @@ class LeapSecondDataTest(unittest.TestCase):
|
|||
self.run_main("next-leapsecond", "2100-2-2")
|
||||
self.run_main("previous-leapsecond", "2009-2-2")
|
||||
self.run_main("previous-leapsecond", "1960-2-2")
|
||||
self.run_main("sources")
|
||||
self.run_main("sources", "--timeout", "8")
|
||||
self.run_main("sources", *bad_sources)
|
||||
|
||||
def test_corrupt(self) -> None:
|
||||
self.assertRaises(
|
||||
|
|
@ -98,14 +105,7 @@ class LeapSecondDataTest(unittest.TestCase):
|
|||
def test_invalid2(self) -> None:
|
||||
when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
|
||||
with self.assertRaises(leapseconddata.ValidityError):
|
||||
leapseconddata.LeapSecondData.from_standard_source(
|
||||
when,
|
||||
custom_sources=[
|
||||
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
|
||||
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
|
||||
"file:///doesnotexist",
|
||||
],
|
||||
)
|
||||
leapseconddata.LeapSecondData.from_standard_source(when, custom_sources=bad_sources, timeout=8)
|
||||
|
||||
def test_tz(self) -> None:
|
||||
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
|
||||
|
|
@ -136,6 +136,13 @@ class LeapSecondDataTest(unittest.TestCase):
|
|||
assert when_tai.tzinfo is leapseconddata.tai
|
||||
assert when_tai2.tzinfo is leapseconddata.tai
|
||||
|
||||
def test_to_tai_naive(self) -> None:
|
||||
when = datetime.datetime(1999, 1, 1, tzinfo=None) - datetime.timedelta(seconds=1) # noqa: DTZ001
|
||||
when_tai = db.to_tai(when)
|
||||
when2 = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
|
||||
when_tai2 = db.to_tai(when2)
|
||||
self.assertEqual(when_tai, when_tai2)
|
||||
|
||||
def assertPrints(self, code: str, expected: str) -> None: # noqa: N802
|
||||
buf = io.StringIO()
|
||||
with contextlib.redirect_stdout(buf):
|
||||
|
|
|
|||
Loading…
Reference in a new issue