Add timeout & improve coverage

Looks like the NIST FTP server is not happy with urllib and it fails
(ugh!) by hanging for a long time. Maybe I'll have to delete it.
(it works with curl, which uses EPSV and not PASV. python uses PASV
for all ipv4 ftp and EPSV for ipv6, instead of trying EPSV first)

Set a reasonable timeout.

The format of `leapsecond sources` has been modified for better readability.
This commit is contained in:
Jeff Epler 2025-04-25 08:23:23 +02:00
parent c659bd5b17
commit d8480e67d5
3 changed files with 36 additions and 25 deletions

View file

@ -260,6 +260,7 @@ class LeapSecondData:
*, *,
check_hash: bool = True, check_hash: bool = True,
custom_sources: Sequence[str] = (), custom_sources: Sequence[str] = (),
timeout: float | None = 60,
) -> LeapSecondData: ) -> LeapSecondData:
"""Get the list of leap seconds from a standard source. """Get the list of leap seconds from a standard source.
@ -276,7 +277,7 @@ class LeapSecondData:
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources): for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
logging.debug("Trying leap second data from %s", location) logging.debug("Trying leap second data from %s", location)
try: try:
candidate = cls.from_url(location, check_hash=check_hash) candidate = cls.from_url(location, check_hash=check_hash, timeout=timeout)
except InvalidHashError: except InvalidHashError:
logging.warning("Invalid hash while reading %s", location) logging.warning("Invalid hash while reading %s", location)
continue continue
@ -288,7 +289,7 @@ class LeapSecondData:
if candidate.valid(when): if candidate.valid(when):
logging.info("Using leap second data from %s", location) logging.info("Using leap second data from %s", location)
return candidate return candidate
logging.warning("Validity expired for %s", location) logging.warning(f"Validity expired for {location} at {candidate.valid_until} (checking validity at {when})")
raise ValidityError("No valid leap-second.list file could be found") raise ValidityError("No valid leap-second.list file could be found")
@ -314,6 +315,7 @@ class LeapSecondData:
url: str, url: str,
*, *,
check_hash: bool = True, check_hash: bool = True,
timeout: float | None = 60,
) -> LeapSecondData | None: ) -> LeapSecondData | None:
"""Retrieve the leap second list from a local file """Retrieve the leap second list from a local file
@ -321,7 +323,7 @@ class LeapSecondData:
:param check_hash: Whether to check the embedded hash :param check_hash: Whether to check the embedded hash
""" """
try: try:
with urllib.request.urlopen(url) as open_file: with urllib.request.urlopen(url, timeout=timeout) as open_file:
return cls.from_open_file(open_file, check_hash=check_hash) return cls.from_open_file(open_file, check_hash=check_hash)
except urllib.error.URLError: # pragma no cover except urllib.error.URLError: # pragma no cover
return None return None

View file

@ -166,27 +166,36 @@ def table(ctx: click.Context, *, start: datetime.datetime, end: datetime.datetim
@cli.command @cli.command
def sources() -> None: @click.option("--timeout", type=float, default=12, metavar="[SECS]")
"""Print information about leap-second.list data sources""" @click.argument("urls", type=str, nargs=-1)
def sources(*, timeout: float, urls: list[str]) -> None:
"""Print information about leap-second.list data sources
If no URLs are specified, print information about all standard sources.
If one or more URLs are specified, check them instead.
"""
first = True first = True
for location in LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources: locations = urls if urls else LeapSecondData.standard_file_sources + LeapSecondData.standard_network_sources
for location in locations:
if not first: if not first:
print() print()
first = False first = False
print(f"{location}:")
try: try:
leap_second_data = LeapSecondData.from_url(location, check_hash=True) leap_second_data = LeapSecondData.from_url(location, check_hash=True, timeout=timeout)
except InvalidHashError: # pragma no coverage except InvalidHashError as e:
print(f"{location}: Invalid hash") print(f" {e}")
leap_second_data = LeapSecondData.from_url(location, check_hash=False) leap_second_data = LeapSecondData.from_url(location, check_hash=False, timeout=timeout)
except Exception as e: # pragma no coverage # noqa: BLE001 except Exception as e: # noqa: BLE001
print(f"{location}: {e}") print(f" {e}")
leap_second_data = None leap_second_data = None
if leap_second_data is not None: if leap_second_data is not None:
print(f"{location}: Last updated {leap_second_data.last_updated}") print(f" Last updated {leap_second_data.last_updated}")
print(f"{location}: Valid until {leap_second_data.valid_until}") print(f" Valid until {leap_second_data.valid_until}")
print(f" {len(leap_second_data.leap_seconds)} leap seconds")
else: else:
print(f"{location}: Could not be read") print(" Could not be read")
if __name__ == "__main__": # pragma no cover if __name__ == "__main__": # pragma no cover

View file

@ -18,10 +18,16 @@ import unittest
import leapseconddata import leapseconddata
import leapseconddata.__main__ import leapseconddata.__main__
db = leapseconddata.LeapSecondData.from_standard_source() db = leapseconddata.LeapSecondData.from_standard_source(timeout=8)
GMT1 = datetime.timezone(datetime.timedelta(seconds=3600), "GMT1") GMT1 = datetime.timezone(datetime.timedelta(seconds=3600), "GMT1")
bad_sources = [
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
]
class LeapSecondDataTest(unittest.TestCase): class LeapSecondDataTest(unittest.TestCase):
def run_main(self, *args: str) -> None: def run_main(self, *args: str) -> None:
@ -45,7 +51,8 @@ class LeapSecondDataTest(unittest.TestCase):
self.run_main("next-leapsecond", "2100-2-2") self.run_main("next-leapsecond", "2100-2-2")
self.run_main("previous-leapsecond", "2009-2-2") self.run_main("previous-leapsecond", "2009-2-2")
self.run_main("previous-leapsecond", "1960-2-2") self.run_main("previous-leapsecond", "1960-2-2")
self.run_main("sources") self.run_main("sources", "--timeout", "8")
self.run_main("sources", *bad_sources)
def test_corrupt(self) -> None: def test_corrupt(self) -> None:
self.assertRaises( self.assertRaises(
@ -98,14 +105,7 @@ class LeapSecondDataTest(unittest.TestCase):
def test_invalid2(self) -> None: def test_invalid2(self) -> None:
when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1) when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
with self.assertRaises(leapseconddata.ValidityError): with self.assertRaises(leapseconddata.ValidityError):
leapseconddata.LeapSecondData.from_standard_source( leapseconddata.LeapSecondData.from_standard_source(when, custom_sources=bad_sources, timeout=8)
when,
custom_sources=[
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
],
)
def test_tz(self) -> None: def test_tz(self) -> None:
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1) when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)