Merge pull request #23 from jepler/no-tracemalloc
Improve coverage & create InvalidContentError
This commit is contained in:
commit
2fad09fac6
4 changed files with 77 additions and 39 deletions
18
.coveragerc
18
.coveragerc
|
|
@ -1,7 +1,15 @@
|
|||
# SPDX-FileCopyrightText: 2021 Jeff Epler
|
||||
# SPDX-FileCopyrightText: 2021-2024 Jeff Epler
|
||||
#
|
||||
# SPDX-License-Identifier: GPL-3.0-only
|
||||
[run]
|
||||
omit =
|
||||
*/site-packages/*
|
||||
test*.py
|
||||
[report]
|
||||
exclude_also =
|
||||
def __repr__
|
||||
if self.debug:
|
||||
if settings.DEBUG
|
||||
raise AssertionError
|
||||
raise NotImplementedError
|
||||
if 0:
|
||||
if __name__ == .__main__.:
|
||||
if TYPE_CHECKING:
|
||||
class .*\bProtocol\):
|
||||
@(abc\.)?abstractmethod
|
||||
|
|
|
|||
2
.github/workflows/test.yml
vendored
2
.github/workflows/test.yml
vendored
|
|
@ -67,7 +67,7 @@ jobs:
|
|||
run: make mypy
|
||||
|
||||
- name: Test
|
||||
run: python -X tracemalloc=3 -mcoverage run --branch -m unittest testleapseconddata.py && python -mcoverage report --fail-under=100 && python -mcoverage xml
|
||||
run: python -mcoverage run --branch -m unittest testleapseconddata.py && python -mcoverage report --fail-under=100 && python -mcoverage xml
|
||||
|
||||
pre-commit:
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
|||
|
|
@ -25,12 +25,16 @@ from __future__ import annotations
|
|||
import datetime
|
||||
import hashlib
|
||||
import io
|
||||
import itertools
|
||||
import logging
|
||||
import pathlib
|
||||
import re
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from typing import BinaryIO, ClassVar
|
||||
from typing import TYPE_CHECKING, BinaryIO, ClassVar
|
||||
|
||||
if TYPE_CHECKING: # pragma no cover
|
||||
from collections.abc import Sequence
|
||||
|
||||
tai = datetime.timezone(datetime.timedelta(0), "TAI")
|
||||
|
||||
|
|
@ -58,6 +62,10 @@ class InvalidHashError(ValueError):
|
|||
"""The file hash could not be verified"""
|
||||
|
||||
|
||||
class InvalidContentError(ValueError):
|
||||
"""A line in the file was not valid"""
|
||||
|
||||
|
||||
def _from_ntp_epoch(value: int) -> datetime.datetime:
|
||||
return NTP_EPOCH + datetime.timedelta(seconds=value)
|
||||
|
||||
|
|
@ -237,6 +245,7 @@ class LeapSecondData:
|
|||
when: datetime.datetime | None = None,
|
||||
*,
|
||||
check_hash: bool = True,
|
||||
custom_sources: Sequence[str] = (),
|
||||
) -> LeapSecondData:
|
||||
"""Get the list of leap seconds from a standard source.
|
||||
|
||||
|
|
@ -246,22 +255,28 @@ class LeapSecondData:
|
|||
Using a list of standard sources, including network sources, find a
|
||||
leap-second.list data valid for the given timestamp, or the current
|
||||
time (if unspecified)
|
||||
|
||||
If ``custom_sources`` is specified, this list of URLs is checked before
|
||||
the hard-coded sources.
|
||||
"""
|
||||
for location in cls.standard_file_sources + cls.standard_network_sources:
|
||||
for location in itertools.chain(custom_sources, cls.standard_file_sources, cls.standard_network_sources):
|
||||
logging.debug("Trying leap second data from %s", location)
|
||||
try:
|
||||
candidate = cls.from_url(location, check_hash=check_hash)
|
||||
except InvalidHashError: # pragma no cover
|
||||
except InvalidHashError:
|
||||
logging.warning("Invalid hash while reading %s", location)
|
||||
continue
|
||||
if candidate is None: # pragma no cover
|
||||
except InvalidContentError as e:
|
||||
logging.warning("Invalid content while reading %s: %s", location, e)
|
||||
continue
|
||||
if candidate.valid(when): # pragma no branch
|
||||
if candidate is None:
|
||||
continue
|
||||
if candidate.valid(when):
|
||||
logging.info("Using leap second data from %s", location)
|
||||
return candidate
|
||||
logging.warning("Validity expired for %s", location) # pragma no cover
|
||||
logging.warning("Validity expired for %s", location)
|
||||
|
||||
raise ValidityError("No valid leap-second.list file could be found") # pragma no cover
|
||||
raise ValidityError("No valid leap-second.list file could be found")
|
||||
|
||||
@classmethod
|
||||
def from_file(
|
||||
|
|
@ -338,36 +353,39 @@ class LeapSecondData:
|
|||
|
||||
hasher = hashlib.sha1()
|
||||
|
||||
for row in open_file:
|
||||
row = row.strip() # noqa: PLW2901
|
||||
if row.startswith(b"#h"):
|
||||
content_hash = cls._parse_content_hash(row)
|
||||
continue
|
||||
for row_ws in open_file:
|
||||
row = row_ws.strip()
|
||||
try:
|
||||
if row.startswith(b"#h"):
|
||||
content_hash = cls._parse_content_hash(row)
|
||||
continue
|
||||
|
||||
if row.startswith(b"#@"):
|
||||
parts = row.split()
|
||||
hasher.update(parts[1])
|
||||
valid_until = _from_ntp_epoch(int(parts[1]))
|
||||
continue
|
||||
|
||||
if row.startswith(b"#$"):
|
||||
parts = row.split()
|
||||
hasher.update(parts[1])
|
||||
last_updated = _from_ntp_epoch(int(parts[1]))
|
||||
continue
|
||||
|
||||
row = row.split(b"#")[0].strip()
|
||||
content_to_hash.extend(re.findall(rb"\d+", row))
|
||||
|
||||
if row.startswith(b"#@"):
|
||||
parts = row.split()
|
||||
if len(parts) != 2: # noqa: PLR2004
|
||||
continue
|
||||
hasher.update(parts[0])
|
||||
hasher.update(parts[1])
|
||||
valid_until = _from_ntp_epoch(int(parts[1]))
|
||||
continue
|
||||
|
||||
if row.startswith(b"#$"):
|
||||
parts = row.split()
|
||||
hasher.update(parts[1])
|
||||
last_updated = _from_ntp_epoch(int(parts[1]))
|
||||
continue
|
||||
|
||||
row = row.split(b"#")[0].strip() # noqa: PLW2901
|
||||
content_to_hash.extend(re.findall(rb"\d+", row))
|
||||
|
||||
parts = row.split()
|
||||
if len(parts) != 2: # noqa: PLR2004
|
||||
continue
|
||||
hasher.update(parts[0])
|
||||
hasher.update(parts[1])
|
||||
|
||||
when = _from_ntp_epoch(int(parts[0]))
|
||||
tai_offset = datetime.timedelta(seconds=int(parts[1]))
|
||||
leap_seconds.append(LeapSecondInfo(when, tai_offset))
|
||||
when = _from_ntp_epoch(int(parts[0]))
|
||||
tai_offset = datetime.timedelta(seconds=int(parts[1]))
|
||||
leap_seconds.append(LeapSecondInfo(when, tai_offset))
|
||||
except Exception as e:
|
||||
raise InvalidContentError(f"Failed to parse: {row!r}: {e}") from e
|
||||
|
||||
if check_hash:
|
||||
if content_hash is None:
|
||||
|
|
|
|||
|
|
@ -91,6 +91,18 @@ class LeapSecondDataTest(unittest.TestCase):
|
|||
datetime.timedelta(seconds=0),
|
||||
)
|
||||
|
||||
def test_invalid2(self) -> None:
|
||||
when = datetime.datetime(datetime.MAXYEAR, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
|
||||
with self.assertRaises(leapseconddata.ValidityError):
|
||||
leapseconddata.LeapSecondData.from_standard_source(
|
||||
when,
|
||||
custom_sources=[
|
||||
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ==",
|
||||
"data:text/plain,%23h%099dac5845%208acd32c0%202947d462%20daf4a943%20f58d9391%0A",
|
||||
"file:///doesnotexist",
|
||||
],
|
||||
)
|
||||
|
||||
def test_tz(self) -> None:
|
||||
when = datetime.datetime(1999, 1, 1, tzinfo=datetime.timezone.utc) - datetime.timedelta(seconds=1)
|
||||
when = when.replace(fold=True)
|
||||
|
|
|
|||
Loading…
Reference in a new issue