Improve coverage & create InvalidContentError

This commit is contained in:
Jeff Epler 2024-07-18 07:43:21 -05:00
parent 5d1c0799be
commit ca6a076137
3 changed files with 73 additions and 38 deletions

View file

@ -1,7 +1,15 @@
# SPDX-FileCopyrightText: 2021 Jeff Epler
# SPDX-FileCopyrightText: 2021-2024 Jeff Epler
#
# SPDX-License-Identifier: GPL-3.0-only
[run]
omit =
*/site-packages/*
test*.py
[report]
exclude_also =
def __repr__
if self.debug:
if settings.DEBUG
raise AssertionError
raise NotImplementedError
if 0:
if __name__ == .__main__.:
if TYPE_CHECKING:
class .*\bProtocol\):
@(abc\.)?abstractmethod

View file

@ -25,12 +25,16 @@ from __future__ import annotations
import datetime
import hashlib
import io
import itertools
import logging
import pathlib
import re
import urllib.request
from dataclasses import dataclass, field
from typing import BinaryIO, ClassVar
from typing import TYPE_CHECKING, BinaryIO, ClassVar
if TYPE_CHECKING: # pragma no cover
from collections.abc import Sequence
tai = datetime.timezone(datetime.timedelta(0), "TAI")
@ -58,6 +62,10 @@ class InvalidHashError(ValueError):
"""The file hash could not be verified"""
class InvalidContentError(ValueError):
"""A line in the file was not valid"""
def _from_ntp_epoch(value: int) -> datetime.datetime:
return NTP_EPOCH + datetime.timedelta(seconds=value)
@ -237,6 +245,7 @@ class LeapSecondData:
when: datetime.datetime | None = None,
*,
check_hash: bool = True,
custom_sources: Sequence[str] = (),
) -> LeapSecondData:
"""Get the list of leap seconds from a standard source.
@ -247,21 +256,24 @@ class LeapSecondData:
leap-second.list data valid for the given timestamp, or the current
time (if unspecified)
"""
for location in cls.standard_file_sources + cls.standard_network_sources:
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
logging.debug("Trying leap second data from %s", location)
try:
candidate = cls.from_url(location, check_hash=check_hash)
except InvalidHashError: # pragma no cover
except InvalidHashError:
logging.warning("Invalid hash while reading %s", location)
continue
if candidate is None: # pragma no cover
except InvalidContentError as e:
logging.warning("Invalid content while reading %s: %s", location, e)
continue
if candidate.valid(when): # pragma no branch
if candidate is None:
continue
if candidate.valid(when):
logging.info("Using leap second data from %s", location)
return candidate
logging.warning("Validity expired for %s", location) # pragma no cover
logging.warning("Validity expired for %s", location)
raise ValidityError("No valid leap-second.list file could be found") # pragma no cover
raise ValidityError("No valid leap-second.list file could be found")
@classmethod
def from_file(
@ -338,36 +350,39 @@ class LeapSecondData:
hasher = hashlib.sha1()
for row in open_file:
row = row.strip() # noqa: PLW2901
if row.startswith(b"#h"):
content_hash = cls._parse_content_hash(row)
continue
for row_ws in open_file:
row = row_ws.strip()
try:
if row.startswith(b"#h"):
content_hash = cls._parse_content_hash(row)
continue
if row.startswith(b"#@"):
parts = row.split()
hasher.update(parts[1])
valid_until = _from_ntp_epoch(int(parts[1]))
continue
if row.startswith(b"#$"):
parts = row.split()
hasher.update(parts[1])
last_updated = _from_ntp_epoch(int(parts[1]))
continue
row = row.split(b"#")[0].strip()
content_to_hash.extend(re.findall(rb"\d+", row))
if row.startswith(b"#@"):
parts = row.split()
if len(parts) != 2: # noqa: PLR2004
continue
hasher.update(parts[0])
hasher.update(parts[1])
valid_until = _from_ntp_epoch(int(parts[1]))
continue
if row.startswith(b"#$"):
parts = row.split()
hasher.update(parts[1])
last_updated = _from_ntp_epoch(int(parts[1]))
continue
row = row.split(b"#")[0].strip() # noqa: PLW2901
content_to_hash.extend(re.findall(rb"\d+", row))
parts = row.split()
if len(parts) != 2: # noqa: PLR2004
continue
hasher.update(parts[0])
hasher.update(parts[1])
when = _from_ntp_epoch(int(parts[0]))
tai_offset = datetime.timedelta(seconds=int(parts[1]))
leap_seconds.append(LeapSecondInfo(when, tai_offset))
when = _from_ntp_epoch(int(parts[0]))
tai_offset = datetime.timedelta(seconds=int(parts[1]))
leap_seconds.append(LeapSecondInfo(when, tai_offset))
except Exception as e:
raise InvalidContentError(f"Failed to parse: {row!r}: {e}") from e
if check_hash:
if content_hash is None:

View file

@ -91,6 +91,18 @@ class LeapSecondDataTest(unittest.TestCase):
datetime.timedelta(seconds=0),
)
def test_invalid2(self) -> None:
when = datetime.datetime(9999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
with self.assertRaises(leapseconddata.ValidityError):
leapseconddata.LeapSecondData.from_standard_source(
when,
custom_sources=[
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
"file:///doesnotexist",
],
)
def test_tz(self) -> None:
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
when = when.replace(fold=True)