Compare commits

...

6 commits
4.1.0 ... main

Author SHA1 Message Date
b7ae9ccca6
Merge pull request #30 from jepler/sources-timeout 2025-04-25 08:52:34 +02:00
d8480e67d5 Add timeout & improve coverage
Looks like the NIST FTP server is not happy with urllib and it fails
(ugh!) by hanging for a long time. Maybe I'll have to delete it.
(it works with curl, which uses EPSV and not PASV. python uses PASV
for all ipv4 ftp and EPSV for ipv6, instead of trying EPSV first)

Set a reasonable timeout.

The format of `leapsecond sources` has been modified for better readability.
2025-04-25 08:27:31 +02:00
f0dfb35101
Merge pull request #29 from jepler/issue24
Fix handling of naive datetime objects, but deprecate them
2024-07-20 12:49:37 -05:00
c659bd5b17 Deprecate use of naive timestamps 2024-07-20 12:13:34 -05:00
940945970f Fix passing naive timestamps & test 2024-07-20 12:13:05 -05:00
49b8e31c5f Check for tai timestamp by identity, not string equality 2024-07-20 12:01:56 -05:00
3 changed files with 65 additions and 32 deletions

View file

@ -29,6 +29,7 @@ import logging
import pathlib
import re
import urllib.request
import warnings
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, BinaryIO, ClassVar
@ -71,7 +72,7 @@ def _from_ntp_epoch(value: int) -> datetime.datetime:
def datetime_is_tai(when: datetime.datetime) -> bool:
"""Return true if the datetime is in the TAI timescale"""
return when.tzname() == "TAI"
return when.tzinfo is tai
@dataclass(frozen=True)
@ -155,7 +156,10 @@ class LeapSecondData:
@staticmethod
def _utc_datetime(when: datetime.datetime) -> datetime.datetime:
if when.tzinfo is not None and when.tzinfo is not datetime.timezone.utc:
if when.tzinfo is None:
warnings.warn("Use of naive datetime objects is deprecated", DeprecationWarning, stacklevel=2)
when = when.replace(tzinfo=datetime.timezone.utc)
elif when.tzinfo is not datetime.timezone.utc:
when = when.astimezone(datetime.timezone.utc)
return when
@ -194,14 +198,18 @@ class LeapSecondData:
def to_tai(self, when: datetime.datetime, *, check_validity: bool = True) -> datetime.datetime:
"""Convert the given datetime object to TAI.
:param when: Moment in time to convert. If naive, it is assumed to be in UTC.
:param check_validity: Check whether the database is valid for the given moment
A TAI timestamp is returned unchanged.
Naive timestamps are assumed to be UTC. A TAI timestamp is returned unchanged.
A naive timestamp is assumed to be UTC. This behavior is deprecated, and a future
release will raise an exception when ``when`` is naive.
:param when: Moment in time to convert.
:param check_validity: Check whether the database is valid for the given moment
"""
if datetime_is_tai(when):
return when
when = self._utc_datetime(when)
assert when.tzinfo is not None
return (when + self.tai_offset(when, check_validity=check_validity)).replace(tzinfo=tai)
def tai_to_utc(self, when: datetime.datetime, *, check_validity: bool = True) -> datetime.datetime:
@ -209,12 +217,16 @@ class LeapSecondData:
For a leap second, the ``fold`` property of the returned time is True.
:param when: Moment in time to convert. If not naive, its ``tzinfo`` must be `tai`.
A naive timestamp is assumed to be TAI. This behavior is deprecated, and a future
release will raise an exception when ``when`` is naive.
:param when: Moment in time to convert. Its ``tzinfo`` must be `tai`.
:param check_validity: Check whether the database is valid for the given moment
"""
if when.tzinfo is not None and when.tzinfo is not tai:
raise ValueError("Input timestamp is not TAI or naive")
if when.tzinfo is None:
warnings.warn("Use of naive datetime objects is deprecated", DeprecationWarning, stacklevel=1)
when = when.replace(tzinfo=tai)
result = (when - self.tai_offset(when, check_validity=check_validity)).replace(tzinfo=datetime.timezone.utc)
if self.is_leap_second(when, check_validity=check_validity):
@ -224,7 +236,10 @@ class LeapSecondData:
def is_leap_second(self, when: datetime.datetime, *, check_validity: bool = True) -> bool:
"""Return True if the given timestamp is the leap second.
:param when: Moment in time to check. If naive, it is assumed to be in UTC.
A naive timestamp is assumed to be UTC. This behavior is deprecated, and a future
release will raise an exception when ``when`` is naive.
:param when: Moment in time to check.
:param check_validity: Check whether the database is valid for the given moment
For a TAI timestamp, it returns True for the leap second (the one that
@ -245,6 +260,7 @@ class LeapSecondData:
*,
check_hash: bool = True,
custom_sources: Sequence[str] = (),
timeout: float | None = 60,
) -> LeapSecondData:
"""Get the list of leap seconds from a standard source.
@ -261,7 +277,7 @@ class LeapSecondData:
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
logging.debug("Trying leap second data from %s", location)
try:
candidate = cls.from_url(location, check_hash=check_hash)
candidate = cls.from_url(location, check_hash=check_hash, timeout=timeout)
except InvalidHashError:
logging.warning("Invalid hash while reading %s", location)
continue
@ -273,7 +289,7 @@ class LeapSecondData:
if candidate.valid(when):
logging.info("Using leap second data from %s", location)
return candidate
logging.warning("Validity expired for %s", location)
logging.warning(f"Validity expired for {location} at {candidate.valid_until} (checking validity at {when})")
raise ValidityError("No valid leap-second.list file could be found")
@ -299,6 +315,7 @@ class LeapSecondData:
url: str,
*,
check_hash: bool = True,
timeout: float | None = 60,
) -> LeapSecondData | None:
"""Retrieve the leap second list from a local file
@ -306,7 +323,7 @@ class LeapSecondData:
:param check_hash: Whether to check the embedded hash
"""
try:
with urllib.request.urlopen(url) as open_file:
with urllib.request.urlopen(url, timeout=timeout) as open_file:
return cls.from_open_file(open_file, check_hash=check_hash)
except urllib.error.URLError: # pragma no cover
return None

View file

@ -166,27 +166,36 @@ def table(ctx: click.Context, *, start: datetime.datetime, end: datetime.datetim
@cli.command
def sources() -> None:
"""Print information about leap-second.list data sources"""
@click.option("--timeout", type=float, default=12, metavar="[SECS]")
@click.argument("urls", type=str, nargs=-1)
def sources(*, timeout: float, urls: list[str]) -> None:
"""Print information about leap-second.list data sources
If no URLs are specified, print information about all standard sources.
If one or more URLs are specified, check them instead.
"""
first = True
for location in LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources:
locations = urls if urls else LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources
for location in locations:
if not first:
print()
first = False
print(f"{location}:")
try:
leap_second_data = LeapSecondData.from_url(location, check_hash=True)
except InvalidHashError: # pragma no coverage
print(f"{location}: Invalid hash")
leap_second_data = LeapSecondData.from_url(location, check_hash=False)
except Exception as e: # pragma no coverage # noqa: BLE001
print(f"{location}: {e}")
leap_second_data = LeapSecondData.from_url(location, check_hash=True, timeout=timeout)
except InvalidHashError as e:
print(f" {e}")
leap_second_data = LeapSecondData.from_url(location, check_hash=False, timeout=timeout)
except Exception as e: # noqa: BLE001
print(f" {e}")
leap_second_data = None
if leap_second_data is not None:
print(f"{location}: Last updated {leap_second_data.last_updated}")
print(f"{location}: Valid until {leap_second_data.valid_until}")
print(f" Last updated {leap_second_data.last_updated}")
print(f" Valid until {leap_second_data.valid_until}")
print(f" {len(leap_second_data.leap_seconds)} leap seconds")
else:
print(f"{location}: Could not be read")
print(" Could not be read")
if __name__ == "__main__": # pragma no cover

View file

@ -18,10 +18,16 @@ import unittest
import leapseconddata
import leapseconddata.__main__
db = leapseconddata.LeapSecondData.from_standard_source()
db = leapseconddata.LeapSecondData.from_standard_source(timeout=8)
GMT1 = datetime.timezone(datetime.timedelta(seconds=3600), "GMT1")
bad_sources = [
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
]
class LeapSecondDataTest(unittest.TestCase):
def run_main(self, *args: str) -> None:
@ -45,7 +51,8 @@ class LeapSecondDataTest(unittest.TestCase):
self.run_main("next-leapsecond", "2100-2-2")
self.run_main("previous-leapsecond", "2009-2-2")
self.run_main("previous-leapsecond", "1960-2-2")
self.run_main("sources")
self.run_main("sources", "--timeout", "8")
self.run_main("sources", *bad_sources)
def test_corrupt(self) -> None:
self.assertRaises(
@ -98,14 +105,7 @@ class LeapSecondDataTest(unittest.TestCase):
def test_invalid2(self) -> None:
when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
with self.assertRaises(leapseconddata.ValidityError):
leapseconddata.LeapSecondData.from_standard_source(
when,
custom_sources=[
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
],
)
leapseconddata.LeapSecondData.from_standard_source(when, custom_sources=bad_sources, timeout=8)
def test_tz(self) -> None:
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
@ -136,6 +136,13 @@ class LeapSecondDataTest(unittest.TestCase):
assert when_tai.tzinfo is leapseconddata.tai
assert when_tai2.tzinfo is leapseconddata.tai
def test_to_tai_naive(self) -> None:
when = datetime.datetime(1999, 1, 1, tzinfo=None) - datetime.timedelta(seconds=1) # noqa: DTZ001
when_tai = db.to_tai(when)
when2 = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
when_tai2 = db.to_tai(when2)
self.assertEqual(when_tai, when_tai2)
def assertPrints(self, code: str, expected: str) -> None: # noqa: N802
buf = io.StringIO()
with contextlib.redirect_stdout(buf):