From ca6a076137850bba65e672df14e44d655bc5dd4e Mon Sep 17 00:00:00 2001 From: Jeff Epler Date: Thu, 18 Jul 2024 07:43:21 -0500 Subject: [PATCH] Improve coverage & create InvalidContentError --- .coveragerc | 18 ++++++--- leapseconddata/__init__.py | 81 ++++++++++++++++++++++---------------- testleapseconddata.py | 12 ++++++ 3 files changed, 73 insertions(+), 38 deletions(-) diff --git a/.coveragerc b/.coveragerc index e4547b7..123de6c 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,7 +1,15 @@ -# SPDX-FileCopyrightText: 2021 Jeff Epler +# SPDX-FileCopyrightText: 2021-2024 Jeff Epler # # SPDX-License-Identifier: GPL-3.0-only -[run] -omit = - */site-packages/* - test*.py +[report] +exclude_also = + def __repr__ + if self.debug: + if settings.DEBUG + raise AssertionError + raise NotImplementedError + if 0: + if __name__ == .__main__.: + if TYPE_CHECKING: + class .*\bProtocol\): + @(abc\.)?abstractmethod diff --git a/leapseconddata/__init__.py b/leapseconddata/__init__.py index 4ebb7a5..2e448d4 100755 --- a/leapseconddata/__init__.py +++ b/leapseconddata/__init__.py @@ -25,12 +25,16 @@ from __future__ import annotations import datetime import hashlib import io +import itertools import logging import pathlib import re import urllib.request from dataclasses import dataclass, field -from typing import BinaryIO, ClassVar +from typing import TYPE_CHECKING, BinaryIO, ClassVar + +if TYPE_CHECKING: # pragma no cover + from collections.abc import Sequence tai = datetime.timezone(datetime.timedelta(0), "TAI") @@ -58,6 +62,10 @@ class InvalidHashError(ValueError): """The file hash could not be verified""" +class InvalidContentError(ValueError): + """A line in the file was not valid""" + + def _from_ntp_epoch(value: int) -> datetime.datetime: return NTP_EPOCH + datetime.timedelta(seconds=value) @@ -237,6 +245,7 @@ class LeapSecondData: when: datetime.datetime | None = None, *, check_hash: bool = True, + custom_sources: Sequence[str] = (), ) -> LeapSecondData: """Get the list of leap seconds from a standard source. @@ -247,21 +256,24 @@ class LeapSecondData: leap-second.list data valid for the given timestamp, or the current time (if unspecified) """ - for location in cls.standard_file_sources + cls.standard_network_sources: + for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources): logging.debug("Trying leap second data from %s", location) try: candidate = cls.from_url(location, check_hash=check_hash) - except InvalidHashError: # pragma no cover + except InvalidHashError: logging.warning("Invalid hash while reading %s", location) continue - if candidate is None: # pragma no cover + except InvalidContentError as e: + logging.warning("Invalid content while reading %s: %s", location, e) continue - if candidate.valid(when): # pragma no branch + if candidate is None: + continue + if candidate.valid(when): logging.info("Using leap second data from %s", location) return candidate - logging.warning("Validity expired for %s", location) # pragma no cover + logging.warning("Validity expired for %s", location) - raise ValidityError("No valid leap-second.list file could be found") # pragma no cover + raise ValidityError("No valid leap-second.list file could be found") @classmethod def from_file( @@ -338,36 +350,39 @@ class LeapSecondData: hasher = hashlib.sha1() - for row in open_file: - row = row.strip() # noqa: PLW2901 - if row.startswith(b"#h"): - content_hash = cls._parse_content_hash(row) - continue + for row_ws in open_file: + row = row_ws.strip() + try: + if row.startswith(b"#h"): + content_hash = cls._parse_content_hash(row) + continue + + if row.startswith(b"#@"): + parts = row.split() + hasher.update(parts[1]) + valid_until = _from_ntp_epoch(int(parts[1])) + continue + + if row.startswith(b"#$"): + parts = row.split() + hasher.update(parts[1]) + last_updated = _from_ntp_epoch(int(parts[1])) + continue + + row = row.split(b"#")[0].strip() + content_to_hash.extend(re.findall(rb"\d+", row)) - if row.startswith(b"#@"): parts = row.split() + if len(parts) != 2: # noqa: PLR2004 + continue + hasher.update(parts[0]) hasher.update(parts[1]) - valid_until = _from_ntp_epoch(int(parts[1])) - continue - if row.startswith(b"#$"): - parts = row.split() - hasher.update(parts[1]) - last_updated = _from_ntp_epoch(int(parts[1])) - continue - - row = row.split(b"#")[0].strip() # noqa: PLW2901 - content_to_hash.extend(re.findall(rb"\d+", row)) - - parts = row.split() - if len(parts) != 2: # noqa: PLR2004 - continue - hasher.update(parts[0]) - hasher.update(parts[1]) - - when = _from_ntp_epoch(int(parts[0])) - tai_offset = datetime.timedelta(seconds=int(parts[1])) - leap_seconds.append(LeapSecondInfo(when, tai_offset)) + when = _from_ntp_epoch(int(parts[0])) + tai_offset = datetime.timedelta(seconds=int(parts[1])) + leap_seconds.append(LeapSecondInfo(when, tai_offset)) + except Exception as e: + raise InvalidContentError(f"Failed to parse: {row!r}: {e}") from e if check_hash: if content_hash is None: diff --git a/testleapseconddata.py b/testleapseconddata.py index 4556ff6..e646404 100644 --- a/testleapseconddata.py +++ b/testleapseconddata.py @@ -91,6 +91,18 @@ class LeapSecondDataTest(unittest.TestCase): datetime.timedelta(seconds=0), ) + def test_invalid2(self) -> None: + when = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1) + with self.assertRaises(leapseconddata.ValidityError): + leapseconddata.LeapSecondData.from_standard_source( + when, + custom_sources=[ + "data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==", + "data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A", + "file:///doesnotexist", + ], + ) + def test_tz(self) -> None: when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1) when = when.replace(fold=True)