diff --git a/.pylintrc b/.pylintrc index fe4d471a..4835dbf7 100644 --- a/.pylintrc +++ b/.pylintrc @@ -38,7 +38,8 @@ load-plugins=linter_plugin # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=fixme,locally-disabled +disable=fixme,locally-disabled,abstract-class-not-used +# abstract-class-not-used cannot be disabled locally (at least in pylint 1.4.1) [REPORTS] @@ -148,10 +149,10 @@ module-rgx=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ module-name-hint=(([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$ # Regular expression matching correct method names -method-rgx=[a-z_][a-z0-9_]{2,40}$ +method-rgx=[a-z_][a-z0-9_]{2,50}$ # Naming hint for method names -method-name-hint=[a-z_][a-z0-9_]{2,40}$ +method-name-hint=[a-z_][a-z0-9_]{2,50}$ # Regular expression which should only match function or class names that do # not require a docstring. @@ -311,7 +312,7 @@ max-branches=12 max-statements=50 # Maximum number of parents for a class (see R0901). -max-parents=7 +max-parents=12 # Maximum number of attributes for a class (see R0902). max-attributes=7 diff --git a/docs/api/acme.rst b/docs/api/acme/index.rst similarity index 79% rename from docs/api/acme.rst rename to docs/api/acme/index.rst index 04c33917..89801611 100644 --- a/docs/api/acme.rst +++ b/docs/api/acme/index.rst @@ -5,12 +5,6 @@ :members: -Interfaces ----------- - -.. automodule:: letsencrypt.acme.interfaces - :members: - Messages -------- @@ -46,6 +40,3 @@ Utilities .. automodule:: letsencrypt.acme.util :members: - -.. automodule:: letsencrypt.acme.jose - :members: diff --git a/docs/api/acme/jose.rst b/docs/api/acme/jose.rst new file mode 100644 index 00000000..50e86ada --- /dev/null +++ b/docs/api/acme/jose.rst @@ -0,0 +1,60 @@ +:mod:`letsencrypt.acme.jose` +============================ + +.. contents:: + +.. automodule:: letsencrypt.acme.jose + :members: + + +JSON Web Algorithms +------------------- + +.. automodule:: letsencrypt.acme.jose.jwa + :members: + + +JSON Web Key +------------ + +.. automodule:: letsencrypt.acme.jose.jwk + :members: + + +Implementation details +---------------------- + + +Interfaces +~~~~~~~~~~ + +.. automodule:: letsencrypt.acme.jose.interfaces + :members: + + +Errors +~~~~~~ + +.. automodule:: letsencrypt.acme.jose.errors + :members: + + +JSON utilities +~~~~~~~~~~~~~~ + +.. automodule:: letsencrypt.acme.jose.json_util + :members: + + +JOSE Base64 +~~~~~~~~~~~ + +.. automodule:: letsencrypt.acme.jose.b64 + :members: + + +Utilities +~~~~~~~~~ + +.. automodule:: letsencrypt.acme.jose.util + :members: diff --git a/docs/conf.py b/docs/conf.py index 2a29b9dd..6e2c484c 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -55,7 +55,7 @@ extensions = [ ] autodoc_member_order = 'bysource' -autodoc_default_flags = ['show-inheritance'] +autodoc_default_flags = ['show-inheritance', 'private-members'] # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/letsencrypt/acme/challenges.py b/letsencrypt/acme/challenges.py index 4bbeb4cd..9227fa1a 100644 --- a/letsencrypt/acme/challenges.py +++ b/letsencrypt/acme/challenges.py @@ -7,13 +7,12 @@ import Crypto.Random from letsencrypt.acme import jose from letsencrypt.acme import other -from letsencrypt.acme import util # pylint: disable=too-few-public-methods -class Challenge(util.TypedACMEObject): +class Challenge(jose.TypedJSONObjectWithFields): # _fields_to_json | pylint: disable=abstract-method """ACME challenge.""" TYPES = {} @@ -27,40 +26,33 @@ class DVChallenge(Challenge): # pylint: disable=abstract-method """Domain validation challenges.""" -class ChallengeResponse(util.TypedACMEObject): +class ChallengeResponse(jose.TypedJSONObjectWithFields): # _fields_to_json | pylint: disable=abstract-method """ACME challenge response.""" TYPES = {} @classmethod - def from_valid_json(cls, jobj): + def from_json(cls, jobj): if jobj is None: # if the client chooses not to respond to a given # challenge, then the corresponding entry in the response # array is set to None (null) return None - return super(ChallengeResponse, cls).from_valid_json(jobj) + return super(ChallengeResponse, cls).from_json(jobj) @Challenge.register class SimpleHTTPS(DVChallenge): """ACME "simpleHttps" challenge.""" - acme_type = "simpleHttps" - __slots__ = ("token",) - - def _fields_to_json(self): - return {"token": self.token} - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj["token"]) + typ = "simpleHttps" + token = jose.Field("token") @ChallengeResponse.register class SimpleHTTPSResponse(ChallengeResponse): """ACME "simpleHttps" challenge response.""" - acme_type = "simpleHttps" - __slots__ = ("path",) + typ = "simpleHttps" + path = jose.Field("path") URI_TEMPLATE = "https://{domain}/.well-known/acme-challenge/{path}" """URI template for HTTPS server provisioned resource.""" @@ -76,13 +68,6 @@ class SimpleHTTPSResponse(ChallengeResponse): """ return self.URI_TEMPLATE.format(domain=domain, path=self.path) - def _fields_to_json(self): - return {"path": self.path} - - @classmethod - def from_valid_json(cls, jobj): - return cls(path=jobj["path"]) - @Challenge.register class DVSNI(DVChallenge): @@ -92,8 +77,7 @@ class DVSNI(DVChallenge): :ivar str nonce: Random data, **not** hex-encoded. """ - acme_type = "dvsni" - __slots__ = ("r", "nonce") + typ = "dvsni" DOMAIN_SUFFIX = ".acme.invalid" """Domain name suffix.""" @@ -104,22 +88,17 @@ class DVSNI(DVChallenge): NONCE_SIZE = 16 """Required size of the :attr:`nonce` in bytes.""" + r = jose.Field("r", encoder=jose.b64encode, # pylint: disable=invalid-name + decoder=functools.partial(jose.decode_b64jose, size=R_SIZE)) + nonce = jose.Field("nonce", encoder=binascii.hexlify, + decoder=functools.partial(functools.partial( + jose.decode_hex16, size=NONCE_SIZE))) + @property def nonce_domain(self): """Domain name used in SNI.""" return binascii.hexlify(self.nonce) + self.DOMAIN_SUFFIX - def _fields_to_json(self): - return { - "r": jose.b64encode(self.r), - "nonce": binascii.hexlify(self.nonce), - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(r=util.decode_b64jose(jobj["r"], cls.R_SIZE), - nonce=util.decode_hex16(jobj["nonce"], cls.NONCE_SIZE)) - @ChallengeResponse.register class DVSNIResponse(ChallengeResponse): @@ -128,8 +107,7 @@ class DVSNIResponse(ChallengeResponse): :param str s: Random data, **not** base64-encoded. """ - acme_type = "dvsni" - __slots__ = ("s",) + typ = "dvsni" DOMAIN_SUFFIX = DVSNI.DOMAIN_SUFFIX """Domain name suffix.""" @@ -137,6 +115,9 @@ class DVSNIResponse(ChallengeResponse): S_SIZE = 32 """Required size of the :attr:`s` in bytes.""" + s = jose.Field("s", encoder=jose.b64encode, # pylint: disable=invalid-name + decoder=functools.partial(jose.decode_b64jose, size=S_SIZE)) + def __init__(self, s=None, *args, **kwargs): s = Crypto.Random.get_random_bytes(self.S_SIZE) if s is None else s super(DVSNIResponse, self).__init__(s=s, *args, **kwargs) @@ -157,90 +138,34 @@ class DVSNIResponse(ChallengeResponse): """Domain name for certificate subjectAltName.""" return self.z(chall) + self.DOMAIN_SUFFIX - def _fields_to_json(self): - return {"s": jose.b64encode(self.s)} - - @classmethod - def from_valid_json(cls, jobj): - return cls(s=util.decode_b64jose(jobj["s"], cls.S_SIZE)) - - @Challenge.register class RecoveryContact(ClientChallenge): """ACME "recoveryContact" challenge.""" - acme_type = "recoveryContact" - __slots__ = ("activation_url", "success_url", "contact") + typ = "recoveryContact" - def _fields_to_json(self): - fields = {} - add = functools.partial(_extend_if_not_none, fields) - add(self.activation_url, "activationURL") - add(self.success_url, "successURL") - add(self.contact, "contact") - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls(activation_url=jobj.get("activationURL"), - success_url=jobj.get("successURL"), - contact=jobj.get("contact")) + activation_url = jose.Field("activationURL", omitempty=True) + success_url = jose.Field("successURL", omitempty=True) + contact = jose.Field("contact", omitempty=True) @ChallengeResponse.register class RecoveryContactResponse(ChallengeResponse): """ACME "recoveryContact" challenge response.""" - acme_type = "recoveryContact" - __slots__ = ("token",) - - def _fields_to_json(self): - fields = {} - if self.token is not None: - fields["token"] = self.token - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj.get("token")) + typ = "recoveryContact" + token = jose.Field("token", omitempty=True) @Challenge.register class RecoveryToken(ClientChallenge): """ACME "recoveryToken" challenge.""" - acme_type = "recoveryToken" - __slots__ = () - - def _fields_to_json(self): - return {} - - @classmethod - def from_valid_json(cls, jobj): - return cls() + typ = "recoveryToken" @ChallengeResponse.register class RecoveryTokenResponse(ChallengeResponse): """ACME "recoveryToken" challenge response.""" - acme_type = "recoveryToken" - __slots__ = ("token",) - - def _fields_to_json(self): - fields = {} - if self.token is not None: - fields["token"] = self.token - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj.get("token")) - - -def _extend_if_not_empty(dikt, param, name): - if param: - dikt[name] = param - -def _extend_if_not_none(dikt, param, name): - if param is not None: - dikt[name] = param + typ = "recoveryToken" + token = jose.Field("token", omitempty=True) @Challenge.register @@ -251,57 +176,40 @@ class ProofOfPossession(ClientChallenge): :ivar hints: Various clues for the client (:class:`Hints`). """ - acme_type = "proofOfPossession" - __slots__ = ("alg", "nonce", "hints") + typ = "proofOfPossession" NONCE_SIZE = 16 - class Hints(util.ACMEObject): + class Hints(jose.JSONObjectWithFields): """Hints for "proofOfPossession" challenge. - :ivar jwk: JSON Web Key (:class:`letsencrypt.acme.other.JWK`) + :ivar jwk: JSON Web Key (:class:`letsencrypt.acme.jose.JWK`) :ivar list certs: List of :class:`M2Crypto.X509.X509` cetificates. """ - __slots__ = ( - "jwk", "cert_fingerprints", "certs", "subject_key_identifiers", - "serial_numbers", "issuers", "authorized_for") + jwk = jose.Field("jwk", decoder=jose.JWK.from_json) + cert_fingerprints = jose.Field( + "certFingerprints", omitempty=True, default=()) + certs = jose.Field("certs", omitempty=True, default=()) + subject_key_identifiers = jose.Field( + "subjectKeyIdentifiers", omitempty=True, default=()) + serial_numbers = jose.Field("serialNumbers", omitempty=True, default=()) + issuers = jose.Field("issuers", omitempty=True, default=()) + authorized_for = jose.Field("authorizedFor", omitempty=True, default=()) - def to_json(self): - fields = {"jwk": self.jwk} - add = functools.partial(_extend_if_not_empty, fields) - add(self.cert_fingerprints, "certFingerprints") - add([util.encode_cert(cert) for cert in self.certs], "certs") - add(self.subject_key_identifiers, "subjectKeyIdentifiers") - add(self.serial_numbers, "serialNumbers") - add(self.issuers, "issuers") - add(self.authorized_for, "authorizedFor") - return fields + @certs.encoder + def certs(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(jose.encode_cert(cert) for cert in value) - @classmethod - def from_valid_json(cls, jobj): - return cls( - jwk=other.JWK.from_valid_json(jobj["jwk"]), - cert_fingerprints=jobj.get("certFingerprints", []), - certs=[util.decode_cert(cert) - for cert in jobj.get("certs", [])], - subject_key_identifiers=jobj.get("subjectKeyIdentifiers", []), - serial_numbers=jobj.get("serialNumbers", []), - issuers=jobj.get("issuers", []), - authorized_for=jobj.get("authorizedFor", [])) + @certs.decoder + def certs(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(jose.decode_cert(cert) for cert in value) - def _fields_to_json(self): - return { - "alg": self.alg, - "nonce": jose.b64encode(self.nonce), - "hints": self.hints, - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(alg=jobj["alg"], - nonce=util.decode_b64jose(jobj["nonce"], cls.NONCE_SIZE), - hints=cls.Hints.from_valid_json(jobj["hints"])) + alg = jose.Field("alg", decoder=jose.JWASignature.from_json) + nonce = jose.Field( + "nonce", encoder=jose.b64encode, decoder=functools.partial( + jose.decode_b64jose, size=NONCE_SIZE)) + hints = jose.Field("hints", decoder=Hints.from_json) @ChallengeResponse.register @@ -312,50 +220,29 @@ class ProofOfPossessionResponse(ChallengeResponse): :ivar signature: :class:`~letsencrypt.acme.other.Signature` of this message. """ - acme_type = "proofOfPossession" - __slots__ = ("nonce", "signature") + typ = "proofOfPossession" NONCE_SIZE = ProofOfPossession.NONCE_SIZE + nonce = jose.Field( + "nonce", encoder=jose.b64encode, decoder=functools.partial( + jose.decode_b64jose, size=NONCE_SIZE)) + signature = jose.Field("signature", decoder=other.Signature.from_json) + def verify(self): """Verify the challenge.""" + # self.signature is not Field | pylint: disable=no-member return self.signature.verify(self.nonce) - def _fields_to_json(self): - return { - "nonce": jose.b64encode(self.nonce), - "signature": self.signature, - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(nonce=util.decode_b64jose(jobj["nonce"], cls.NONCE_SIZE), - signature=other.Signature.from_valid_json(jobj["signature"])) - @Challenge.register class DNS(DVChallenge): """ACME "dns" challenge.""" - acme_type = "dns" - __slots__ = ("token",) - - def _fields_to_json(self): - return {"token": self.token} - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj["token"]) + typ = "dns" + token = jose.Field("token") @ChallengeResponse.register class DNSResponse(ChallengeResponse): """ACME "dns" challenge response.""" - acme_type = "dns" - __slots__ = () - - def _fields_to_json(self): - return {} - - @classmethod - def from_valid_json(cls, jobj): - return cls() + typ = "dns" diff --git a/letsencrypt/acme/challenges_test.py b/letsencrypt/acme/challenges_test.py index 53b3ff3f..081560fe 100644 --- a/letsencrypt/acme/challenges_test.py +++ b/letsencrypt/acme/challenges_test.py @@ -6,13 +6,11 @@ import unittest import Crypto.PublicKey.RSA import M2Crypto -from letsencrypt.acme import errors from letsencrypt.acme import jose from letsencrypt.acme import other -from letsencrypt.acme import util -CERT = util.ComparableX509(M2Crypto.X509.load_cert( +CERT = jose.ComparableX509(M2Crypto.X509.load_cert( pkg_resources.resource_filename( 'letsencrypt.client.tests', 'testdata/cert.pem'))) KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( @@ -35,7 +33,7 @@ class SimpleHTTPSTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import SimpleHTTPS - self.assertEqual(self.msg, SimpleHTTPS.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, SimpleHTTPS.from_json(self.jmsg)) class SimpleHTTPSResponseTest(unittest.TestCase): @@ -58,7 +56,7 @@ class SimpleHTTPSResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import SimpleHTTPSResponse self.assertEqual( - self.msg, SimpleHTTPSResponse.from_valid_json(self.jmsg)) + self.msg, SimpleHTTPSResponse.from_json(self.jmsg)) class DVSNITest(unittest.TestCase): @@ -84,19 +82,19 @@ class DVSNITest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import DVSNI - self.assertEqual(self.msg, DVSNI.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, DVSNI.from_json(self.jmsg)) def test_from_json_invalid_r_length(self): from letsencrypt.acme.challenges import DVSNI self.jmsg['r'] = 'abcd' self.assertRaises( - errors.ValidationError, DVSNI.from_valid_json, self.jmsg) + jose.DeserializationError, DVSNI.from_json, self.jmsg) def test_from_json_invalid_nonce_length(self): from letsencrypt.acme.challenges import DVSNI self.jmsg['nonce'] = 'abcd' self.assertRaises( - errors.ValidationError, DVSNI.from_valid_json, self.jmsg) + jose.DeserializationError, DVSNI.from_json, self.jmsg) class DVSNIResponseTest(unittest.TestCase): @@ -129,7 +127,7 @@ class DVSNIResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import DVSNIResponse - self.assertEqual(self.msg, DVSNIResponse.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, DVSNIResponse.from_json(self.jmsg)) class RecoveryContactTest(unittest.TestCase): @@ -152,7 +150,7 @@ class RecoveryContactTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import RecoveryContact - self.assertEqual(self.msg, RecoveryContact.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, RecoveryContact.from_json(self.jmsg)) def test_json_without_optionals(self): del self.jmsg['activationURL'] @@ -160,7 +158,7 @@ class RecoveryContactTest(unittest.TestCase): del self.jmsg['contact'] from letsencrypt.acme.challenges import RecoveryContact - msg = RecoveryContact.from_valid_json(self.jmsg) + msg = RecoveryContact.from_json(self.jmsg) self.assertTrue(msg.activation_url is None) self.assertTrue(msg.success_url is None) @@ -181,13 +179,13 @@ class RecoveryContactResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import RecoveryContactResponse self.assertEqual( - self.msg, RecoveryContactResponse.from_valid_json(self.jmsg)) + self.msg, RecoveryContactResponse.from_json(self.jmsg)) def test_json_without_optionals(self): del self.jmsg['token'] from letsencrypt.acme.challenges import RecoveryContactResponse - msg = RecoveryContactResponse.from_valid_json(self.jmsg) + msg = RecoveryContactResponse.from_json(self.jmsg) self.assertTrue(msg.token is None) self.assertEqual(self.jmsg, msg.to_json()) @@ -205,7 +203,7 @@ class RecoveryTokenTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import RecoveryToken - self.assertEqual(self.msg, RecoveryToken.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, RecoveryToken.from_json(self.jmsg)) class RecoveryTokenResponseTest(unittest.TestCase): @@ -221,13 +219,13 @@ class RecoveryTokenResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import RecoveryTokenResponse self.assertEqual( - self.msg, RecoveryTokenResponse.from_valid_json(self.jmsg)) + self.msg, RecoveryTokenResponse.from_json(self.jmsg)) def test_json_without_optionals(self): del self.jmsg['token'] from letsencrypt.acme.challenges import RecoveryTokenResponse - msg = RecoveryTokenResponse.from_valid_json(self.jmsg) + msg = RecoveryTokenResponse.from_json(self.jmsg) self.assertTrue(msg.token is None) self.assertEqual(self.jmsg, msg.to_json()) @@ -236,37 +234,37 @@ class RecoveryTokenResponseTest(unittest.TestCase): class ProofOfPossessionHintsTest(unittest.TestCase): def setUp(self): - jwk = other.JWK(key=KEY.publickey()) - issuers = [ + jwk = jose.JWKRSA(key=KEY.publickey()) + issuers = ( 'C=US, O=SuperT LLC, CN=SuperTrustworthy Public CA', 'O=LessTrustworthy CA Inc, CN=LessTrustworthy But StillSecure', - ] - cert_fingerprints = [ + ) + cert_fingerprints = ( '93416768eb85e33adc4277f4c9acd63e7418fcfe', '16d95b7b63f1972b980b14c20291f3c0d1855d95', '48b46570d9fc6358108af43ad1649484def0debf', - ] - subject_key_identifiers = ['d0083162dcc4c8a23ecb8aecbd86120e56fd24e5'] - authorized_for = ['www.example.com', 'example.net'] - serial_numbers = [34234239832, 23993939911, 17] + ) + subject_key_identifiers = ('d0083162dcc4c8a23ecb8aecbd86120e56fd24e5') + authorized_for = ('www.example.com', 'example.net') + serial_numbers = (34234239832, 23993939911, 17) from letsencrypt.acme.challenges import ProofOfPossession self.msg = ProofOfPossession.Hints( jwk=jwk, issuers=issuers, cert_fingerprints=cert_fingerprints, - certs=[CERT], subject_key_identifiers=subject_key_identifiers, + certs=(CERT,), subject_key_identifiers=subject_key_identifiers, authorized_for=authorized_for, serial_numbers=serial_numbers) self.jmsg_to = { 'jwk': jwk, 'certFingerprints': cert_fingerprints, - 'certs': [jose.b64encode(CERT.as_der())], + 'certs': (jose.b64encode(CERT.as_der()),), 'subjectKeyIdentifiers': subject_key_identifiers, 'serialNumbers': serial_numbers, 'issuers': issuers, 'authorizedFor': authorized_for, } self.jmsg_from = self.jmsg_to.copy() - self.jmsg_from.update({'jwk': jwk.to_json()}) + self.jmsg_from.update({'jwk': jwk.fully_serialize()}) def test_to_json(self): self.assertEqual(self.jmsg_to, self.msg.to_json()) @@ -274,7 +272,7 @@ class ProofOfPossessionHintsTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import ProofOfPossession self.assertEqual( - self.msg, ProofOfPossession.Hints.from_valid_json(self.jmsg_from)) + self.msg, ProofOfPossession.Hints.from_json(self.jmsg_from)) def test_json_without_optionals(self): for optional in ['certFingerprints', 'certs', 'subjectKeyIdentifiers', @@ -283,14 +281,14 @@ class ProofOfPossessionHintsTest(unittest.TestCase): del self.jmsg_to[optional] from letsencrypt.acme.challenges import ProofOfPossession - msg = ProofOfPossession.Hints.from_valid_json(self.jmsg_from) + msg = ProofOfPossession.Hints.from_json(self.jmsg_from) - self.assertEqual(msg.cert_fingerprints, []) - self.assertEqual(msg.certs, []) - self.assertEqual(msg.subject_key_identifiers, []) - self.assertEqual(msg.serial_numbers, []) - self.assertEqual(msg.issuers, []) - self.assertEqual(msg.authorized_for, []) + self.assertEqual(msg.cert_fingerprints, ()) + self.assertEqual(msg.certs, ()) + self.assertEqual(msg.subject_key_identifiers, ()) + self.assertEqual(msg.serial_numbers, ()) + self.assertEqual(msg.issuers, ()) + self.assertEqual(msg.authorized_for, ()) self.assertEqual(self.jmsg_to, msg.to_json()) @@ -300,27 +298,25 @@ class ProofOfPossessionTest(unittest.TestCase): def setUp(self): from letsencrypt.acme.challenges import ProofOfPossession hints = ProofOfPossession.Hints( - jwk=other.JWK(key=KEY.publickey()), cert_fingerprints=[], certs=[], - serial_numbers=[], subject_key_identifiers=[], issuers=[], - authorized_for=[]) + jwk=jose.JWKRSA(key=KEY.publickey()), cert_fingerprints=(), + certs=(), serial_numbers=(), subject_key_identifiers=(), + issuers=(), authorized_for=()) self.msg = ProofOfPossession( - alg='RS256', nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ', - hints=hints) + alg=jose.RS256, hints=hints, + nonce='xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ') self.jmsg_to = { 'type': 'proofOfPossession', - 'alg': 'RS256', + 'alg': jose.RS256, 'nonce': 'eET5udtV7aoX8Xl8gYiZIA', 'hints': hints, } self.jmsg_from = { 'type': 'proofOfPossession', - 'alg': 'RS256', + 'alg': jose.RS256.fully_serialize(), 'nonce': 'eET5udtV7aoX8Xl8gYiZIA', - 'hints': hints.to_json(), + 'hints': hints.fully_serialize(), } - self.jmsg_from['hints']['jwk'] = self.jmsg_from[ - 'hints']['jwk'].to_json() def test_to_json(self): self.assertEqual(self.jmsg_to, self.msg.to_json()) @@ -328,7 +324,7 @@ class ProofOfPossessionTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import ProofOfPossession self.assertEqual( - self.msg, ProofOfPossession.from_valid_json(self.jmsg_from)) + self.msg, ProofOfPossession.from_json(self.jmsg_from)) class ProofOfPossessionResponseTest(unittest.TestCase): @@ -338,7 +334,7 @@ class ProofOfPossessionResponseTest(unittest.TestCase): # nonce and challenge nonce are the same, don't make the same # mistake here... signature = other.Signature( - alg='RS256', jwk=other.JWK(key=KEY.publickey()), + alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()), sig='\xa7\xc1\xe7\xe82o\xbc\xcd\xd0\x1e\x010#Z|\xaf\x15\x83' '\x94\x8f#\x9b\nQo(\x80\x15,\x08\xfcz\x1d\xfd\xfd.\xaap' '\xfa\x06\xd1\xa2f\x8d8X2>%d\xbd%\xe1T\xdd\xaa0\x18\xde' @@ -359,11 +355,8 @@ class ProofOfPossessionResponseTest(unittest.TestCase): self.jmsg_from = { 'type': 'proofOfPossession', 'nonce': 'eET5udtV7aoX8Xl8gYiZIA', - 'signature': signature.to_json(), + 'signature': signature.fully_serialize(), } - self.jmsg_from['signature']['jwk'] = self.jmsg_from[ - 'signature']['jwk'].to_json() - def test_verify(self): self.assertTrue(self.msg.verify()) @@ -374,7 +367,7 @@ class ProofOfPossessionResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import ProofOfPossessionResponse self.assertEqual( - self.msg, ProofOfPossessionResponse.from_valid_json(self.jmsg_from)) + self.msg, ProofOfPossessionResponse.from_json(self.jmsg_from)) class DNSTest(unittest.TestCase): @@ -389,7 +382,7 @@ class DNSTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import DNS - self.assertEqual(self.msg, DNS.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, DNS.from_json(self.jmsg)) class DNSResponseTest(unittest.TestCase): @@ -404,7 +397,7 @@ class DNSResponseTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.challenges import DNSResponse - self.assertEqual(self.msg, DNSResponse.from_valid_json(self.jmsg)) + self.assertEqual(self.msg, DNSResponse.from_json(self.jmsg)) if __name__ == '__main__': diff --git a/letsencrypt/acme/errors.py b/letsencrypt/acme/errors.py index c8888141..d69efda1 100644 --- a/letsencrypt/acme/errors.py +++ b/letsencrypt/acme/errors.py @@ -1,13 +1,8 @@ """ACME errors.""" +from letsencrypt.acme.jose import errors as jose_errors class Error(Exception): """Generic ACME error.""" -class ValidationError(Error): - """ACME object validation error.""" - -class UnrecognizedTypeError(ValidationError): - """Unrecognized ACME object type error.""" - -class SchemaValidationError(ValidationError): +class SchemaValidationError(jose_errors.DeserializationError): """JSON schema ACME object validation error.""" diff --git a/letsencrypt/acme/interfaces.py b/letsencrypt/acme/interfaces.py deleted file mode 100644 index e49956b4..00000000 --- a/letsencrypt/acme/interfaces.py +++ /dev/null @@ -1,69 +0,0 @@ -"""ACME interfaces. - -Separation between :class:`IJSONSerializable` and :class:`IJSONDeserializable` -is necessary because we want to use ``cls.from_valid_json`` -classmethod on class and ``cls().to_json()`` on object, i.e. class -instance. ``cls.to_json()`` doesn't make much sense. Therefore a class -definition that requires both must call -``zope.interface.implements(IJSONSerializable)`` and -``zope.interface.classImplements(IJSONDeSerializable)`` (note the -difference btween `implements` and `classImplements`) and -:class:`letsencrypt.acme.util.ACMEObject` definition is an example. - -""" -import zope.interface - -# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class -# pylint: disable=too-few-public-methods - - -class IJSONSerializable(zope.interface.Interface): - # pylint: disable=too-few-public-methods - """JSON serializable object.""" - - def to_json(): - """Prepare JSON serializable object. - - Note, however, that this method might return other - :class:`letsencrypt.acme.interfaces.IJSONSerializable` - objects that haven't been serialized yet, which is fine as - long as :func:`letsencrypt.acme.util.dump_ijsonserializable` - is used. For example:: - - class Foo(object): - zope.interface.implements(IJSONSerializable) - - def to_json(self): - return 'foo' - - class Bar(object): - zope.interface.implements(IJSONSerializable) - - def to_json(self): - return [Foo(), Foo()] - - bar = Bar() - assert isinstance(bar.to_json()[0], Foo) - assert isinstance(bar.to_json()[1], Foo) - assert json.dumps( - bar, default=dump_ijsonserializable) == ['foo', 'foo'] - - :returns: JSON object ready to be serialized. - - """ - -class IJSONDeserializable(zope.interface.Interface): - """JSON deserializable class.""" - - def from_valid_json(jobj): - """Deserialize valid JSON object. - - :param jobj: JSON object validated against JSON schema (found in - schemata/ directory). - - :raises letsencrypt.acme.errors.ValidationError: It might be the - case that ``jobj`` validates against schema, but still is not - valid (e.g. unparseable X509 certificate, or wrong padding in - JOSE base64 encoded string). - - """ diff --git a/letsencrypt/acme/jose/__init__.py b/letsencrypt/acme/jose/__init__.py new file mode 100644 index 00000000..48877581 --- /dev/null +++ b/letsencrypt/acme/jose/__init__.py @@ -0,0 +1,68 @@ +"""Javascript Object Signing and Encryption (jose). + +This package is a Python implementation of the stadards developed by +IETF `Javascript Object Signing and Encryption (Active WG)`_, in +particular the following RFCs: + + - `JSON Web Algorithms (JWA)`_ + - `JSON Web Key (JWK)`_ + + +.. _`Javascript Object Signing and Encryption (Active WG)`: + https://tools.ietf.org/wg/jose/ + +.. _`JSON Web Algorithms (JWA)`: + https://datatracker.ietf.org/doc/draft-ietf-jose-json-web-algorithms/ + +.. _`JSON Web Key (JWK)`: + https://datatracker.ietf.org/doc/draft-ietf-jose-json-web-key/ + +""" +from letsencrypt.acme.jose.b64 import ( + b64decode, + b64encode, +) + +from letsencrypt.acme.jose.errors import ( + DeserializationError, + SerializationError, + Error, + UnrecognizedTypeError, +) + +from letsencrypt.acme.jose.interfaces import JSONDeSerializable + +from letsencrypt.acme.jose.json_util import ( + Field, + JSONObjectWithFields, + TypedJSONObjectWithFields, + decode_b64jose, + decode_cert, + decode_csr, + decode_hex16, + encode_cert, + encode_csr, +) + +from letsencrypt.acme.jose.jwa import ( + HS256, + HS384, + HS512, + JWASignature, + PS256, + PS384, + PS512, + RS256, + RS384, + RS512, +) + +from letsencrypt.acme.jose.jwk import ( + JWK, + JWKRSA, +) + +from letsencrypt.acme.jose.util import ( + ComparableX509, + ImmutableMap, +) diff --git a/letsencrypt/acme/jose.py b/letsencrypt/acme/jose/b64.py similarity index 79% rename from letsencrypt/acme/jose.py rename to letsencrypt/acme/jose/b64.py index 81c1abbf..8f2d284c 100644 --- a/letsencrypt/acme/jose.py +++ b/letsencrypt/acme/jose/b64.py @@ -1,13 +1,19 @@ -"""JOSE.""" -import base64 +"""JOSE Base64. -# https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C -# -# Jose Base64: -# -# - URL-safe Base64 -# -# - padding stripped +`JOSE Base64`_ is defined as: + + - URL-safe Base64 + - padding stripped + + +.. _`JOSE Base64`: + https://tools.ietf.org/html/draft-ietf-jose-json-web-signature-37#appendix-C + +.. warning:: Do NOT try to call this module "base64", + as it will "shadow" the standard library. + +""" +import base64 def b64encode(data): diff --git a/letsencrypt/acme/jose_test.py b/letsencrypt/acme/jose/b64_test.py similarity index 87% rename from letsencrypt/acme/jose_test.py rename to letsencrypt/acme/jose/b64_test.py index 42cf8051..89ff27f5 100644 --- a/letsencrypt/acme/jose_test.py +++ b/letsencrypt/acme/jose/b64_test.py @@ -1,4 +1,4 @@ -"""Tests for letsencrypt.acme.jose.""" +"""Tests for letsencrypt.acme.jose.b64.""" import unittest @@ -19,11 +19,11 @@ B64_URL_UNSAFE_EXAMPLES = { class B64EncodeTest(unittest.TestCase): - """Tests for letsencrypt.acme.jose.b64encode.""" + """Tests for letsencrypt.acme.jose.b64.b64encode.""" @classmethod def _call(cls, data): - from letsencrypt.acme.jose import b64encode + from letsencrypt.acme.jose.b64 import b64encode return b64encode(data) def test_unsafe_url(self): @@ -39,11 +39,11 @@ class B64EncodeTest(unittest.TestCase): class B64DecodeTest(unittest.TestCase): - """Tests for letsencrypt.acme.jose.b64decode.""" + """Tests for letsencrypt.acme.jose.b64.b64decode.""" @classmethod def _call(cls, data): - from letsencrypt.acme.jose import b64decode + from letsencrypt.acme.jose.b64 import b64decode return b64decode(data) def test_unsafe_url(self): diff --git a/letsencrypt/acme/jose/errors.py b/letsencrypt/acme/jose/errors.py new file mode 100644 index 00000000..74708c4a --- /dev/null +++ b/letsencrypt/acme/jose/errors.py @@ -0,0 +1,31 @@ +"""JOSE errors.""" + + +class Error(Exception): + """Generic JOSE Error.""" + + +class DeserializationError(Error): + """JSON deserialization error.""" + + +class SerializationError(Error): + """JSON serialization error.""" + + +class UnrecognizedTypeError(DeserializationError): + """Unrecognized type error. + + :ivar str typ: The unrecognized type of the JSON object. + :ivar jobj: Full JSON object. + + """ + + def __init__(self, typ, jobj): + self.typ = typ + self.jobj = jobj + super(UnrecognizedTypeError, self).__init__(str(self)) + + def __str__(self): + return '{0} was not recognized, full message: {1}'.format( + self.typ, self.jobj) diff --git a/letsencrypt/acme/jose/errors_test.py b/letsencrypt/acme/jose/errors_test.py new file mode 100644 index 00000000..dd6af6c1 --- /dev/null +++ b/letsencrypt/acme/jose/errors_test.py @@ -0,0 +1,17 @@ +"""Tests for letsencrypt.acme.jose.errors.""" +import unittest + + +class UnrecognizedTypeErrorTest(unittest.TestCase): + def setUp(self): + from letsencrypt.acme.jose.errors import UnrecognizedTypeError + self.error = UnrecognizedTypeError('foo', {'type': 'foo'}) + + def test_str(self): + self.assertEqual( + "foo was not recognized, full message: {'type': 'foo'}", + str(self.error)) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/jose/interfaces.py b/letsencrypt/acme/jose/interfaces.py new file mode 100644 index 00000000..446a5d2b --- /dev/null +++ b/letsencrypt/acme/jose/interfaces.py @@ -0,0 +1,198 @@ +"""JOSE interfaces.""" +import abc +import collections +import json + +from letsencrypt.acme.jose import util + +# pylint: disable=no-self-argument,no-method-argument,no-init,inherit-non-class +# pylint: disable=too-few-public-methods + + +class JSONDeSerializable(object): + # pylint: disable=too-few-public-methods + """Interface for (de)serializable JSON objects. + + Please recall, that standard Python library implements + :class:`json.JSONEncoder` and :class:`json.JSONDecoder` that perform + translations based on respective :ref:`conversion tables + ` that look pretty much like the one below (for + complete tables see relevant Python documentation): + + .. _conversion-table: + + ====== ====== + JSON Python + ====== ====== + object dict + ... ... + ====== ====== + + While the above **conversion table** is about translation of JSON + documents to/from the basic Python types only, + :class:`JSONDeSerializable` introduces the following two concepts: + + serialization + Turning an arbitrary Python object into Python object that can + be encoded into a JSON document. **Full serialization** produces + a Python object composed of only basic types as required by the + :ref:`conversion table `. + **Partial serialization** (acomplished by :meth:`to_json`) + produces a Python object that might also be built from other + :class:`JSONDeSerializable` objects. + + deserialization + Turning a decoded Python object (necessarily one of the basic + types as required by the :ref:`conversion table + `) into an arbitrary Python object. + + Serialization produces **serialized object** ("partially serialized + object" or "fully serialized object" for partial and full + serialization respectively) and deserialization produces + **deserialized object**, both usually denoted in the source code as + ``jobj``. + + Wording in the official Python documentation might be confusing + after reading the above, but in the light of those definitions, one + can view :meth:`json.JSONDecoder.decode` as decoder and + deserializer of basic types, :meth:`json.JSONEncoder.default` as + serializer of basic types, :meth:`json.JSONEncoder.encode` as + serializer and encoder of basic types. + + One could extend :mod:`json` to support arbitrary object + (de)serialization either by: + + - overriding :meth:`json.JSONDecoder.decode` and + :meth:`json.JSONEncoder.default` in subclasses + + - or passing ``object_hook`` argument (or ``object_hook_pairs``) + to :func:`json.load`/:func:`json.loads` or ``default`` argument + for :func:`json.dump`/:func:`json.dumps`. + + Interestingly, ``default`` is required to perform only partial + serialization, as :func:`json.dumps` applies ``default`` + recursively. This is the idea behind making :meth:`to_json` produce + only partial serialization, while providing custom :meth:`json_dumps` + that dumps with ``default`` set to :meth:`json_dump_default`. + + To make further documentation a bit more concrete, please, consider + the following imaginatory implementation example:: + + class Foo(JSONDeSerializable): + def to_json(self): + return 'foo' + + @classmethod + def from_json(cls, jobj): + return Foo() + + class Bar(JSONDeSerializable): + def to_json(self): + return [Foo(), Foo()] + + @classmethod + def from_json(cls, jobj): + return Bar() + + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def to_json(self): # pragma: no cover + """Partially serialize. + + Following the example, **partial serialization** means the following:: + + assert isinstance(Bar().to_json()[0], Foo) + assert isinstance(Bar().to_json()[1], Foo) + + # in particular... + assert Bar().to_json() != ['foo', 'foo'] + + :raises letsencrypt.acme.jose.errors.SerializationError: + in case of any serialization error. + :returns: Partially serializable object. + + """ + raise NotImplementedError() + + def fully_serialize(self): + """Fully serialize. + + Again, following the example from before, **full serialization** + means the following:: + + assert Bar().fully_serialize() == ['foo', 'foo'] + + :raises letsencrypt.acme.jose.errors.SerializationError: + in case of any serialization error. + :returns: Fully serialized object. + + """ + partial = self.to_json() + try_serialize = (lambda x: x.fully_serialize() + if isinstance(x, JSONDeSerializable) else x) + if isinstance(partial, basestring): # strings are sequences + return partial + if isinstance(partial, collections.Sequence): + return [try_serialize(elem) for elem in partial] + elif isinstance(partial, collections.Mapping): + return dict([(try_serialize(key), try_serialize(value)) + for key, value in partial.iteritems()]) + else: + return partial + + @util.abstractclassmethod + def from_json(cls, unused_jobj): + """Deserialize a decoded JSON document. + + :param jobj: Python object, composed of only other basic data + types, as decoded from JSON document. Not necessarily + :class:`dict` (as decoded from "JSON object" document). + + :raises letsencrypt.acme.jose.errors.DeserializationError: + if decoding was unsuccessful, e.g. in case of unparseable + X509 certificate, or wrong padding in JOSE base64 encoded + string, etc. + + """ + # TypeError: Can't instantiate abstract class with + # abstract methods from_json, to_json + return cls() # pylint: disable=abstract-class-instantiated + + @classmethod + def json_loads(cls, json_string): + """Deserialize from JSON document string.""" + return cls.from_json(json.loads(json_string)) + + def json_dumps(self, **kwargs): + """Dump to JSON string using proper serializer. + + :returns: JSON document string. + :rtype: str + + """ + return json.dumps(self, default=self.json_dump_default, **kwargs) + + def json_dumps_pretty(self): + """Dump the object to pretty JSON document string.""" + return self.json_dumps(sort_keys=True, indent=4, separators=(',', ': ')) + + @classmethod + def json_dump_default(cls, python_object): + """Serialize Python object. + + This function is meant to be passed as ``default`` to + :func:`json.load` or :func:`json.loads`. They call + ``default(python_object)`` only for non-basic Python types, so + this function necessarily raises :class:`TypeError` if + ``python_object`` is not an instance of + :class:`IJSONSerializable`. + + Please read the class docstring for more information. + + """ + if isinstance(python_object, JSONDeSerializable): + return python_object.to_json() + else: # this branch is necessary, cannot just "return" + raise TypeError(repr(python_object) + ' is not JSON serializable') diff --git a/letsencrypt/acme/jose/interfaces_test.py b/letsencrypt/acme/jose/interfaces_test.py new file mode 100644 index 00000000..2e5606bc --- /dev/null +++ b/letsencrypt/acme/jose/interfaces_test.py @@ -0,0 +1,106 @@ +"""Tests for letsencrypt.acme.jose.interfaces.""" +import unittest + + +class JSONDeSerializableTest(unittest.TestCase): + + def setUp(self): + from letsencrypt.acme.jose.interfaces import JSONDeSerializable + + # pylint: disable=missing-docstring,invalid-name + + class Basic(JSONDeSerializable): + def __init__(self, v): + self.v = v + + def to_json(self): + return self.v + + @classmethod + def from_json(cls, jobj): + return cls(jobj) + + class Sequence(JSONDeSerializable): + def __init__(self, x, y): + self.x = x + self.y = y + + def to_json(self): + return [self.x, self.y] + + @classmethod + def from_json(cls, jobj): + return cls( + Basic.from_json(jobj[0]), Basic.from_json(jobj[1])) + + class Mapping(JSONDeSerializable): + def __init__(self, x, y): + self.x = x + self.y = y + + def to_json(self): + return {self.x: self.y} + + @classmethod + def from_json(cls, jobj): + return cls(Basic.from_json(jobj.keys()[0]), + Basic.from_json(jobj.values()[0])) + + self.basic1 = Basic('foo1') + self.basic2 = Basic('foo2') + self.seq = Sequence(self.basic1, self.basic2) + self.mapping = Mapping(self.basic1, self.basic2) + + # pylint: disable=invalid-name + self.Basic = Basic + self.Sequence = Sequence + self.Mapping = Mapping + + def test_fully_serialize_sequence(self): + self.assertEqual(self.seq.fully_serialize(), ['foo1', 'foo2']) + + def test_fully_serialize_mapping(self): + self.assertEqual(self.mapping.fully_serialize(), {'foo1': 'foo2'}) + + def test_fully_serialize_other(self): + mock_value = object() + self.assertTrue(self.Basic(mock_value).fully_serialize() is mock_value) + + def test_from_json_not_implemented(self): + from letsencrypt.acme.jose.interfaces import JSONDeSerializable + self.assertRaises(TypeError, JSONDeSerializable.from_json, 'xxx') + + def test_json_loads(self): + seq = self.Sequence.json_loads('["foo1", "foo2"]') + self.assertTrue(isinstance(seq, self.Sequence)) + self.assertTrue(isinstance(seq.x, self.Basic)) + self.assertTrue(isinstance(seq.y, self.Basic)) + self.assertEqual(seq.x.v, 'foo1') + self.assertEqual(seq.y.v, 'foo2') + + def test_json_dumps(self): + self.assertEqual('["foo1", "foo2"]', self.seq.json_dumps()) + + def test_json_dumps_pretty(self): + self.assertEqual( + self.seq.json_dumps_pretty(), '[\n "foo1",\n "foo2"\n]') + + def test_json_dump_default(self): + from letsencrypt.acme.jose.interfaces import JSONDeSerializable + + self.assertEqual( + 'foo1', JSONDeSerializable.json_dump_default(self.basic1)) + + jobj = JSONDeSerializable.json_dump_default(self.seq) + self.assertEqual(len(jobj), 2) + self.assertTrue(jobj[0] is self.basic1) + self.assertTrue(jobj[1] is self.basic2) + + def test_json_dump_default_type_error(self): + from letsencrypt.acme.jose.interfaces import JSONDeSerializable + self.assertRaises( + TypeError, JSONDeSerializable.json_dump_default, object()) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/jose/json_util.py b/letsencrypt/acme/jose/json_util.py new file mode 100644 index 00000000..8abcf5e3 --- /dev/null +++ b/letsencrypt/acme/jose/json_util.py @@ -0,0 +1,426 @@ +"""JSON (de)serialization framework. + +The framework presented here is somewhat based on `Go's "json" package`_ +(especially the ``omitempty`` functionality). + +.. _`Go's "json" package`: http://golang.org/pkg/encoding/json/ + +""" +import abc +import binascii +import logging + +import M2Crypto + +from letsencrypt.acme.jose import b64 +from letsencrypt.acme.jose import errors +from letsencrypt.acme.jose import interfaces +from letsencrypt.acme.jose import util + + +class Field(object): + """JSON object field. + + :class:`Field` is meant to be used together with + :class:`JSONObjectWithFields`. + + ``encoder`` (``decoder``) is a callable that accepts a single + parameter, i.e. a value to be encoded (decoded), and returns the + serialized (deserialized) value. In case of errors it should raise + :class:`~letsencrypt.acme.jose.errors.SerializationError` + (:class:`~letsencrypt.acme.jose.errors.DeserializationError`). + + For greater flexibility, ``encoder2`` and ``decoder2`` accept two + parameters: the whole object ("``self``" in case of encoding, and + JSON serialized object ``jobj`` in case of decoding) and the value + to be encoded/decoded. + + Note, that ``decoder`` and ``decoder2`` should perform partial + serialization only. + + :ivar str json_name: Name of the field when encoded to JSON. + :ivar default: Default value (used when not present in JSON object). + :ivar bool omitempty: If ``True`` and the field value is empty, then + it will not be included in the serialized JSON object, and + ``default`` will be used for deserialization. Otherwise, if ``False``, + field is considered as required, value will always be included in the + serialized JSON objected, and it must also be present when + deserializing. + + """ + __slots__ = ('json_name', 'default', 'omitempty', + 'fdec', 'fenc', 'fdec2', 'fenc2') + + def __init__(self, json_name, default=None, omitempty=False, + decoder=None, encoder=None, decoder2=None, encoder2=None): + # pylint: disable=too-many-arguments + self.json_name = json_name + self.default = default + self.omitempty = omitempty + + self.fdec2 = decoder2 + self.fenc2 = encoder2 + self.fdec = self.default_decoder if decoder is None else decoder + self.fenc = self.default_encoder if encoder is None else encoder + + @classmethod + def _empty(cls, value): + """Is the provided value cosidered "empty" for this field? + + This is useful for subclasses that might want to override the + definition of being empty, e.g. for some more exotic data types. + + """ + return not value + + def omit(self, value): + """Omit the value in output?""" + return self._empty(value) and self.omitempty + + def _update_params(self, **kwargs): + current = dict(json_name=self.json_name, default=self.default, + omitempty=self.omitempty, + decoder=self.fdec, encoder=self.fenc, + decoder2=self.fdec2, encoder2=self.fenc2) + current.update(kwargs) + return type(self)(**current) # pylint: disable=star-args + + def decoder(self, fdec): + """Descriptor to change the decoder on JSON object field.""" + return self._update_params(decoder=fdec, decoder2=None) + + def encoder(self, fenc): + """Descriptor to change the encoder on JSON object field.""" + return self._update_params(encoder=fenc, encoder2=None) + + def decoder2(self, fdec2): + """Descriptor to change the decoder2 on JSON object field.""" + return self._update_params(decoder2=fdec2, decoder=None) + + def encoder2(self, fenc2): + """Descriptor to change the encoder2 on JSON object field.""" + return self._update_params(encoder2=fenc2, encoder=None) + + def decode(self, value, jobj=None): + """Decode a value, optionally with context JSON object.""" + if self.fdec2 is not None: + return self.fdec2(jobj, value) + return self.fdec(value) + + def encode(self, value, obj=None): + """Encode a value, optionally with context JSON object.""" + if self.fenc2 is not None: + return self.fenc2(obj, value) + return self.fenc(value) + + @classmethod + def default_decoder(cls, value): + """Default decoder. + + Recursively deserialize into immutable types ( + :class:`letsencrypt.acme.jose.util.frozendict` instead of + :func:`dict`, :func:`tuple` instead of :func:`list`). + + """ + # bases cases for different types returned by json.loads + if isinstance(value, list): + return tuple(cls.default_decoder(subvalue) for subvalue in value) + elif isinstance(value, dict): + return util.frozendict( + dict((cls.default_decoder(key), cls.default_decoder(value)) + for key, value in value.iteritems())) + else: # integer or string + return value + + @classmethod + def default_encoder(cls, value): + """Default (passthrough) encoder.""" + # field.to_json() is no good as encoder has to do partial + # serialization only + return value + + +class JSONObjectWithFieldsMeta(abc.ABCMeta): + """Metaclass for :class:`JSONObjectWithFields` and its subclasses. + + It makes sure that, for any class ``cls`` with ``__metaclass__`` + set to ``JSONObjectWithFieldsMeta``: + + 1. All fields (attributes of type :class:`Field`) in the class + definition are moved to the ``cls._fields`` dictionary, where + keys are field attribute names and values are fields themselves. + + 2. ``cls.__slots__`` is extended by all field attribute names + (i.e. not :attr:`Field.json_name`). + + In a consequence, for a field attribute name ``some_field``, + ``cls.some_field`` will be a slot descriptor and not an instance + of :class:`Field`. For example:: + + some_field = Field('someField', default=()) + + class Foo(object): + __metaclass__ = JSONObjectWithFieldsMeta + __slots__ = ('baz',) + some_field = some_field + + assert Foo.__slots__ == ('some_field', 'baz') + assert Foo.some_field is not Field + + assert Foo._fields.keys() == ['some_field'] + assert Foo._fields['some_field'] is some_field + + As an implementation note, this metaclass inherits from + :class:`abc.ABCMeta` (and not the usual :class:`type`) to mitigate + the metaclass conflict (:class:`ImmutableMap` and + :class:`JSONDeSerializable`, parents of :class:`JSONObjectWithFields`, + use :class:`abc.ABCMeta` as its metaclass). + + """ + + def __new__(mcs, name, bases, dikt): + fields = {} + for key, value in dikt.items(): # not iterkeys() (in-place edit!) + if isinstance(value, Field): + fields[key] = dikt.pop(key) + + dikt['__slots__'] = tuple( + list(dikt.get('__slots__', ())) + fields.keys()) + dikt['_fields'] = fields + + return abc.ABCMeta.__new__(mcs, name, bases, dikt) + + +class JSONObjectWithFields(util.ImmutableMap, interfaces.JSONDeSerializable): + # pylint: disable=too-few-public-methods + """JSON object with fields. + + Example:: + + class Foo(JSONObjectWithFields): + bar = Field('Bar') + empty = Field('Empty', omitempty=True) + + @bar.encoder + def bar(value): + return value + 'bar' + + @bar.decoder + def bar(value): + if not value.endswith('bar'): + raise errors.DeserializationError('No bar suffix!') + return value[:-3] + + assert Foo(bar='baz').to_json() == {'Bar': 'bazbar'} + assert Foo.from_json({'Bar': 'bazbar'}) == Foo(bar='baz') + assert (Foo.from_json({'Bar': 'bazbar', 'Empty': '!'}) + == Foo(bar='baz', empty='!')) + assert Foo(bar='baz').bar == 'baz' + + """ + __metaclass__ = JSONObjectWithFieldsMeta + + @classmethod + def _defaults(cls): + """Get default fields values.""" + return dict([(slot, field.default) for slot, field + in cls._fields.iteritems() if field.omitempty]) + + def __init__(self, **kwargs): + # pylint: disable=star-args + super(JSONObjectWithFields, self).__init__( + **(dict(self._defaults(), **kwargs))) + + def fields_to_json(self): + """Serialize fields to JSON.""" + jobj = {} + for slot, field in self._fields.iteritems(): + value = getattr(self, slot) + + if field.omit(value): + logging.debug('Ommiting empty field "%s" (%s)', slot, value) + else: + try: + jobj[field.json_name] = field.encode(value, self) + except errors.SerializationError as error: + raise errors.SerializationError( + 'Could not encode {0} ({1}): {2}'.format( + slot, value, error)) + return jobj + + def to_json(self): + return self.fields_to_json() + + @classmethod + def _check_required(cls, jobj): + missing = set() + for _, field in cls._fields.iteritems(): + if not field.omitempty and field.json_name not in jobj: + missing.add(field.json_name) + + if missing: + raise errors.DeserializationError( + 'The following field are required: {0}'.format( + ','.join(missing))) + + @classmethod + def fields_from_json(cls, jobj): + """Deserialize fields from JSON.""" + cls._check_required(jobj) + fields = {} + for slot, field in cls._fields.iteritems(): + if field.json_name not in jobj and field.omitempty: + fields[slot] = field.default + else: + value = jobj[field.json_name] + try: + fields[slot] = field.decode(value, jobj) + except errors.DeserializationError as error: + raise errors.DeserializationError( + 'Could not decode {0!r} ({1!r}): {2}'.format( + slot, value, error)) + return fields + + @classmethod + def from_json(cls, jobj): + return cls(**cls.fields_from_json(jobj)) + + +def decode_b64jose(data, size=None, minimum=False): + """Decode JOSE Base-64 field. + + :param int size: Required length (after decoding). + :param bool minimum: If ``True``, then `size` will be treated as + minimum required length, as opposed to exact equality. + + """ + try: + decoded = b64.b64decode(data) + except TypeError as error: + raise errors.DeserializationError(error) + + if size is not None and ((not minimum and len(decoded) != size) + or (minimum and len(decoded) < size)): + raise errors.DeserializationError() + + return decoded + + +def decode_hex16(value, size=None, minimum=False): + """Decode hexlified field. + + :param int size: Required length (after decoding). + :param bool minimum: If ``True``, then `size` will be treated as + minimum required length, as opposed to exact equality. + + """ + if size is not None and ((not minimum and len(value) != size * 2) + or (minimum and len(value) < size * 2)): + raise errors.DeserializationError() + try: + return binascii.unhexlify(value) + except TypeError as error: + raise errors.DeserializationError(error) + +def encode_cert(cert): + """Encode certificate as JOSE Base-64 DER. + + :param cert: Certificate. + :type cert: :class:`letsencrypt.acme.jose.util.ComparableX509` + + """ + return b64.b64encode(cert.as_der()) + +def decode_cert(b64der): + """Decode JOSE Base-64 DER-encoded certificate.""" + try: + return util.ComparableX509(M2Crypto.X509.load_cert_der_string( + decode_b64jose(b64der))) + except M2Crypto.X509.X509Error as error: + raise errors.DeserializationError(error) + +def encode_csr(csr): + """Encode CSR as JOSE Base-64 DER.""" + return encode_cert(csr) + +def decode_csr(b64der): + """Decode JOSE Base-64 DER-encoded CSR.""" + try: + return util.ComparableX509(M2Crypto.X509.load_request_der_string( + decode_b64jose(b64der))) + except M2Crypto.X509.X509Error as error: + raise errors.DeserializationError(error) + + +class TypedJSONObjectWithFields(JSONObjectWithFields): + """JSON object with type.""" + + typ = NotImplemented + """Type of the object. Subclasses must override.""" + + type_field_name = "type" + """Field name used to distinguish different object types. + + Subclasses will probably have to override this. + + """ + + TYPES = NotImplemented + """Types registered for JSON deserialization""" + + @classmethod + def register(cls, type_cls, typ=None): + """Register class for JSON deserialization.""" + typ = type_cls.typ if typ is None else typ + cls.TYPES[typ] = type_cls + return type_cls + + @classmethod + def get_type_cls(cls, jobj): + """Get the registered class for ``jobj``.""" + if cls in cls.TYPES.itervalues(): + assert jobj[cls.type_field_name] + # cls is already registered type_cls, force to use it + # so that, e.g Revocation.from_json(jobj) fails if + # jobj["type"] != "revocation". + return cls + + if not isinstance(jobj, dict): + raise errors.DeserializationError( + "{0} is not a dictionary object".format(jobj)) + try: + typ = jobj[cls.type_field_name] + except KeyError: + raise errors.DeserializationError("missing type field") + + try: + type_cls = cls.TYPES[typ] + except KeyError: + raise errors.UnrecognizedTypeError(typ, jobj) + + return type_cls + + def to_json(self): + """Get JSON serializable object. + + :returns: Serializable JSON object representing ACME typed object. + :meth:`validate` will almost certianly not work, due to reasons + explained in :class:`letsencrypt.acme.interfaces.IJSONSerializable`. + :rtype: dict + + """ + jobj = self.fields_to_json() + jobj[self.type_field_name] = self.typ + return jobj + + @classmethod + def from_json(cls, jobj): + """Deserialize ACME object from valid JSON object. + + :raises letsencrypt.acme.errors.UnrecognizedTypeError: if type + of the ACME object has not been registered. + + """ + # make sure subclasses don't cause infinite recursive from_json calls + type_cls = cls.get_type_cls(jobj) + return type_cls(**type_cls.fields_from_json(jobj)) diff --git a/letsencrypt/acme/jose/json_util_test.py b/letsencrypt/acme/jose/json_util_test.py new file mode 100644 index 00000000..da548aae --- /dev/null +++ b/letsencrypt/acme/jose/json_util_test.py @@ -0,0 +1,319 @@ +"""Tests for letsencrypt.acme.jose.json_util.""" +import os +import pkg_resources +import unittest + +import M2Crypto +import mock + +from letsencrypt.acme.jose import errors +from letsencrypt.acme.jose import interfaces +from letsencrypt.acme.jose import util + + +CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename( + 'letsencrypt.client.tests', os.path.join('testdata', 'cert.pem'))) +CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename( + 'letsencrypt.client.tests', os.path.join('testdata', 'csr.pem'))) + + +class FieldTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.json_util.Field.""" + + def test_descriptors(self): + mock_jobj = mock.MagicMock() + mock_obj = mock.MagicMock() + mock_value = mock.MagicMock() + + # pylint: disable=missing-docstring + + def decoder(unused_value): + return 'd' + + def encoder(unused_value): + return 'e' + + def decoder2(jobj, unused_value): + self.assertTrue(jobj is mock_jobj) + return 'd2' + + def encoder2(obj, unused_value): + self.assertTrue(obj is mock_obj) + return 'e2' + + from letsencrypt.acme.jose.json_util import Field + field = Field('foo', decoder=decoder, encoder=encoder, + decoder2=decoder2, encoder2=encoder2) + + self.assertEqual('e2', field.encode(mock_value, mock_obj)) + self.assertEqual('d2', field.decode(mock_value, mock_jobj)) + + field = field.encoder(encoder) + self.assertEqual('e', field.encode(mock_value, mock_obj)) + self.assertEqual('d2', field.decode(mock_value, mock_jobj)) + + field = field.decoder(decoder) + self.assertEqual('e', field.encode(mock_value, mock_obj)) + self.assertEqual('d', field.decode(mock_value, mock_jobj)) + + field = field.encoder2(encoder2) + self.assertEqual('e2', field.encode(mock_value, mock_obj)) + self.assertEqual('d', field.decode(mock_value, mock_jobj)) + + field = field.decoder2(decoder2) + self.assertEqual('e2', field.encode(mock_value, mock_obj)) + self.assertEqual('d2', field.decode(mock_value, mock_jobj)) + + def test_default_encoder_is_partial(self): + class MockField(interfaces.JSONDeSerializable): + # pylint: disable=missing-docstring + def to_json(self): + return 'foo' + @classmethod + def from_json(cls, jobj): + pass + mock_field = MockField() + + from letsencrypt.acme.jose.json_util import Field + self.assertTrue(Field.default_encoder(mock_field) is mock_field) + # in particular... + self.assertNotEqual('foo', Field.default_encoder(mock_field)) + + def test_default_encoder_passthrough(self): + mock_value = mock.MagicMock() + from letsencrypt.acme.jose.json_util import Field + self.assertTrue(Field.default_encoder(mock_value) is mock_value) + + def test_default_decoder_list_to_tuple(self): + from letsencrypt.acme.jose.json_util import Field + self.assertEqual((1, 2, 3), Field.default_decoder([1, 2, 3])) + + def test_default_decoder_dict_to_frozendict(self): + from letsencrypt.acme.jose.json_util import Field + obj = Field.default_decoder({'x': 2}) + self.assertTrue(isinstance(obj, util.frozendict)) + self.assertEqual(obj, util.frozendict(x=2)) + + def test_default_decoder_passthrough(self): + mock_value = mock.MagicMock() + from letsencrypt.acme.jose.json_util import Field + self.assertTrue(Field.default_decoder(mock_value) is mock_value) + + +class JSONObjectWithFieldsTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.json_util.JSONObjectWithFields.""" + # pylint: disable=protected-access + + def setUp(self): + from letsencrypt.acme.jose.json_util import JSONObjectWithFields + from letsencrypt.acme.jose.json_util import Field + + class MockJSONObjectWithFields(JSONObjectWithFields): + # pylint: disable=invalid-name,missing-docstring,no-self-argument + # pylint: disable=too-few-public-methods + x = Field('x', omitempty=True, + encoder=(lambda x: x * 2), + decoder=(lambda x: x / 2)) + y = Field('y') + z = Field('Z') # on purpose uppercase + + @y.encoder + def y(value): + if value == 500: + raise errors.SerializationError() + return value + + @y.decoder + def y(value): + if value == 500: + raise errors.DeserializationError() + return value + + # pylint: disable=invalid-name + self.MockJSONObjectWithFields = MockJSONObjectWithFields + self.mock = MockJSONObjectWithFields(x=None, y=2, z=3) + + def test_init_defaults(self): + self.assertEqual(self.mock, self.MockJSONObjectWithFields(y=2, z=3)) + + def test_fields_to_json_omits_empty(self): + self.assertEqual(self.mock.fields_to_json(), {'y': 2, 'Z': 3}) + + def test_fields_from_json_fills_default_for_empty(self): + self.assertEqual( + {'x': None, 'y': 2, 'z': 3}, + self.MockJSONObjectWithFields.fields_from_json({'y': 2, 'Z': 3})) + + def test_fields_from_json_fails_on_missing(self): + self.assertRaises( + errors.DeserializationError, + self.MockJSONObjectWithFields.fields_from_json, {'y': 0}) + self.assertRaises( + errors.DeserializationError, + self.MockJSONObjectWithFields.fields_from_json, {'Z': 0}) + self.assertRaises( + errors.DeserializationError, + self.MockJSONObjectWithFields.fields_from_json, {'x': 0, 'y': 0}) + self.assertRaises( + errors.DeserializationError, + self.MockJSONObjectWithFields.fields_from_json, {'x': 0, 'Z': 0}) + + def test_fields_to_json_encoder(self): + self.assertEqual(self.MockJSONObjectWithFields(x=1, y=2, z=3).to_json(), + {'x': 2, 'y': 2, 'Z': 3}) + + def test_fields_from_json_decoder(self): + self.assertEqual( + {'x': 2, 'y': 2, 'z': 3}, + self.MockJSONObjectWithFields.fields_from_json( + {'x': 4, 'y': 2, 'Z': 3})) + + def test_fields_to_json_error_passthrough(self): + self.assertRaises( + errors.SerializationError, self.MockJSONObjectWithFields( + x=1, y=500, z=3).to_json) + + def test_fields_from_json_error_passthrough(self): + self.assertRaises( + errors.DeserializationError, + self.MockJSONObjectWithFields.from_json, + {'x': 4, 'y': 500, 'Z': 3}) + + +class DeEncodersTest(unittest.TestCase): + def setUp(self): + self.b64_cert = ( + 'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM' + 'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz' + 'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF' + 'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx' + 'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI' + 'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW' + 'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD' + 'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1' + 'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE' + 'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd' + 'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o' + ) + self.b64_csr = ( + 'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F' + 'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw' + 'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb' + '20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As' + 'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3' + 'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG' + 'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW' + 'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg' + ) + + def test_decode_b64_jose_padding_error(self): + from letsencrypt.acme.jose.json_util import decode_b64jose + self.assertRaises(errors.DeserializationError, decode_b64jose, 'x') + + def test_decode_b64_jose_size(self): + from letsencrypt.acme.jose.json_util import decode_b64jose + self.assertEqual('foo', decode_b64jose('Zm9v', size=3)) + self.assertRaises( + errors.DeserializationError, decode_b64jose, 'Zm9v', size=2) + self.assertRaises( + errors.DeserializationError, decode_b64jose, 'Zm9v', size=4) + + def test_decode_b64_jose_minimum_size(self): + from letsencrypt.acme.jose.json_util import decode_b64jose + self.assertEqual('foo', decode_b64jose('Zm9v', size=3, minimum=True)) + self.assertEqual('foo', decode_b64jose('Zm9v', size=2, minimum=True)) + self.assertRaises(errors.DeserializationError, decode_b64jose, + 'Zm9v', size=4, minimum=True) + + def test_decode_hex16(self): + from letsencrypt.acme.jose.json_util import decode_hex16 + self.assertEqual('foo', decode_hex16('666f6f')) + + def test_decode_hex16_minimum_size(self): + from letsencrypt.acme.jose.json_util import decode_hex16 + self.assertEqual('foo', decode_hex16('666f6f', size=3, minimum=True)) + self.assertEqual('foo', decode_hex16('666f6f', size=2, minimum=True)) + self.assertRaises(errors.DeserializationError, decode_hex16, + '666f6f', size=4, minimum=True) + + def test_decode_hex16_odd_length(self): + from letsencrypt.acme.jose.json_util import decode_hex16 + self.assertRaises(errors.DeserializationError, decode_hex16, 'x') + + def test_encode_cert(self): + from letsencrypt.acme.jose.json_util import encode_cert + self.assertEqual(self.b64_cert, encode_cert(CERT)) + + def test_decode_cert(self): + from letsencrypt.acme.jose.json_util import decode_cert + cert = decode_cert(self.b64_cert) + self.assertTrue(isinstance(cert, util.ComparableX509)) + self.assertEqual(cert, CERT) + self.assertRaises(errors.DeserializationError, decode_cert, '') + + def test_encode_csr(self): + from letsencrypt.acme.jose.json_util import encode_csr + self.assertEqual(self.b64_cert, encode_csr(CERT)) + + def test_decode_csr(self): + from letsencrypt.acme.jose.json_util import decode_csr + csr = decode_csr(self.b64_csr) + self.assertTrue(isinstance(csr, util.ComparableX509)) + self.assertEqual(csr, CSR) + self.assertRaises(errors.DeserializationError, decode_csr, '') + + +class TypedJSONObjectWithFieldsTest(unittest.TestCase): + + def setUp(self): + from letsencrypt.acme.jose.json_util import TypedJSONObjectWithFields + + # pylint: disable=missing-docstring,abstract-method + # pylint: disable=too-few-public-methods + + class MockParentTypedJSONObjectWithFields(TypedJSONObjectWithFields): + TYPES = {} + type_field_name = 'type' + + @MockParentTypedJSONObjectWithFields.register + class MockTypedJSONObjectWithFields( + MockParentTypedJSONObjectWithFields): + typ = 'test' + __slots__ = ('foo',) + + @classmethod + def fields_from_json(cls, jobj): + return {'foo': jobj['foo']} + + def fields_to_json(self): + return {'foo': self.foo} + + self.parent_cls = MockParentTypedJSONObjectWithFields + self.msg = MockTypedJSONObjectWithFields(foo='bar') + + def test_to_json(self): + self.assertEqual(self.msg.to_json(), { + 'type': 'test', + 'foo': 'bar', + }) + + def test_from_json_non_dict_fails(self): + for value in [[], (), 5, "asd"]: # all possible input types + self.assertRaises( + errors.DeserializationError, self.parent_cls.from_json, value) + + def test_from_json_dict_no_type_fails(self): + self.assertRaises( + errors.DeserializationError, self.parent_cls.from_json, {}) + + def test_from_json_unknown_type_fails(self): + self.assertRaises(errors.UnrecognizedTypeError, + self.parent_cls.from_json, {'type': 'bar'}) + + def test_from_json_returns_obj(self): + self.assertEqual({'foo': 'bar'}, self.parent_cls.from_json( + {'type': 'test', 'foo': 'bar'})) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/jose/jwa.py b/letsencrypt/acme/jose/jwa.py new file mode 100644 index 00000000..99c9a863 --- /dev/null +++ b/letsencrypt/acme/jose/jwa.py @@ -0,0 +1,125 @@ +"""JSON Web Algorithm. + +https://tools.ietf.org/html/draft-ietf-jose-json-web-algorithms-40 + +""" +import abc + +from Crypto.Hash import HMAC +from Crypto.Hash import SHA256 +from Crypto.Hash import SHA384 +from Crypto.Hash import SHA512 + +from Crypto.Signature import PKCS1_PSS +from Crypto.Signature import PKCS1_v1_5 + +from letsencrypt.acme.jose import errors +from letsencrypt.acme.jose import interfaces +from letsencrypt.acme.jose import jwk + + +class JWA(interfaces.JSONDeSerializable): # pylint: disable=abstract-method,too-few-public-methods + """JSON Web Algorithm.""" + + +class JWASignature(JWA): + """JSON Web Signature Algorithm.""" + SIGNATURES = {} + + def __init__(self, name): + self.name = name + + def __eq__(self, other): + return isinstance(other, JWASignature) and self.name == other.name + + @classmethod + def register(cls, signature_cls): + """Register class for JSON deserialization.""" + cls.SIGNATURES[signature_cls.name] = signature_cls + return signature_cls + + def to_json(self): + return self.name + + @classmethod + def from_json(cls, jobj): + return cls.SIGNATURES[jobj] + + @abc.abstractmethod + def sign(self, key, msg): # pragma: no cover + """Sign the ``msg`` using ``key``.""" + raise NotImplementedError() + + @abc.abstractmethod + def verify(self, key, msg, sig): # pragma: no cover + """Verify the ``msg` and ``sig`` using ``key``.""" + raise NotImplementedError() + + def __repr__(self): + return self.name + + +class _JWAHS(JWASignature): + + kty = jwk.JWKOct + + def __init__(self, name, digestmod): + super(_JWAHS, self).__init__(name) + self.digestmod = digestmod + + def sign(self, key, msg): + return HMAC.new(key, msg, self.digestmod).digest() + + def verify(self, key, msg, sig): + # TODO: use constant compare to mitigate timing attack? + return self.sign(key, msg) == sig + + +class _JWARS(JWASignature): + + kty = jwk.JWKRSA + + def __init__(self, name, padding, digestmod): + super(_JWARS, self).__init__(name) + self.padding = padding + self.digestmod = digestmod + + def sign(self, key, msg): + try: + return self.padding.new(key).sign(self.digestmod.new(msg)) + except TypeError as error: # key has no private part + raise errors.Error(error) + except (AttributeError, ValueError) as error: + # key is too small: ValueError for PS, AttributeError for RS + raise errors.Error(error) + + def verify(self, key, msg, sig): + return self.padding.new(key).verify(self.digestmod.new(msg), sig) + + +class _JWAES(JWASignature): # pylint: disable=abstract-class-not-used + + # TODO: implement ES signatures + + def sign(self, key, msg): # pragma: no cover + raise NotImplementedError() + + def verify(self, key, msg, sig): # pragma: no cover + raise NotImplementedError() + + +HS256 = JWASignature.register(_JWAHS('HS256', SHA256)) +HS384 = JWASignature.register(_JWAHS('HS384', SHA384)) +HS512 = JWASignature.register(_JWAHS('HS512', SHA512)) + +RS256 = JWASignature.register(_JWARS('RS256', PKCS1_v1_5, SHA256)) +RS384 = JWASignature.register(_JWARS('RS384', PKCS1_v1_5, SHA384)) +RS512 = JWASignature.register(_JWARS('RS512', PKCS1_v1_5, SHA512)) + +PS256 = JWASignature.register(_JWARS('PS256', PKCS1_PSS, SHA256)) +PS384 = JWASignature.register(_JWARS('PS384', PKCS1_PSS, SHA384)) +PS512 = JWASignature.register(_JWARS('PS512', PKCS1_PSS, SHA512)) + +ES256 = JWASignature.register(_JWAES('ES256')) +ES256 = JWASignature.register(_JWAES('ES384')) +ES256 = JWASignature.register(_JWAES('ES512')) diff --git a/letsencrypt/acme/jose/jwa_test.py b/letsencrypt/acme/jose/jwa_test.py new file mode 100644 index 00000000..712b5051 --- /dev/null +++ b/letsencrypt/acme/jose/jwa_test.py @@ -0,0 +1,105 @@ +"""Tests for letsencrypt.acme.jose.jwa.""" +import os +import pkg_resources +import unittest + +from Crypto.PublicKey import RSA + +from letsencrypt.acme.jose import errors + + +RSA256_KEY = RSA.importKey(pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa256_key.pem'))) +RSA512_KEY = RSA.importKey(pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa512_key.pem'))) +RSA1024_KEY = RSA.importKey(pkg_resources.resource_string( + __name__, os.path.join('testdata', 'rsa1024_key.pem'))) + + +class JWASignatureTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.jwa.JWASignature.""" + + def setUp(self): + from letsencrypt.acme.jose.jwa import JWASignature + + class MockSig(JWASignature): + # pylint: disable=missing-docstring,too-few-public-methods + # pylint: disable=abstract-class-not-used + def sign(self, key, msg): + raise NotImplementedError() + + def verify(self, key, msg, sig): + raise NotImplementedError() + + # pylint: disable=invalid-name + self.Sig1 = MockSig('Sig1') + self.Sig2 = MockSig('Sig2') + + def test_eq(self): + self.assertEqual(self.Sig1, self.Sig1) + self.assertNotEqual(self.Sig1, self.Sig2) + + def test_repr(self): + self.assertEqual('Sig1', repr(self.Sig1)) + self.assertEqual('Sig2', repr(self.Sig2)) + + def test_to_json(self): + self.assertEqual(self.Sig1.to_json(), 'Sig1') + self.assertEqual(self.Sig2.to_json(), 'Sig2') + + def test_from_json(self): + from letsencrypt.acme.jose.jwa import JWASignature + from letsencrypt.acme.jose.jwa import RS256 + self.assertTrue(JWASignature.from_json('RS256') is RS256) + + +class JWAHSTest(unittest.TestCase): # pylint: disable=too-few-public-methods + + def test_it(self): + from letsencrypt.acme.jose.jwa import HS256 + sig = ( + "\xceR\xea\xcd\x94\xab\xcf\xfb\xe0\xacA.:\x1a'\x08i\xe2\xc4" + "\r\x85+\x0e\x85\xaeUZ\xd4\xb3\x97zO" + ) + self.assertEqual(HS256.sign('some key', 'foo'), sig) + self.assertTrue(HS256.verify('some key', 'foo', sig) is True) + self.assertTrue(HS256.verify('some key', 'foo', sig + '!') is False) + + +class JWARSTest(unittest.TestCase): + + def test_sign_no_private_part(self): + from letsencrypt.acme.jose.jwa import RS256 + self.assertRaises( + errors.Error, RS256.sign, RSA512_KEY.publickey(), 'foo') + + def test_sign_key_too_small(self): + from letsencrypt.acme.jose.jwa import RS256 + from letsencrypt.acme.jose.jwa import PS256 + self.assertRaises(errors.Error, RS256.sign, RSA256_KEY, 'foo') + self.assertRaises(errors.Error, PS256.sign, RSA256_KEY, 'foo') + self.assertRaises(errors.Error, PS256.sign, RSA512_KEY, 'foo') + + def test_rs(self): + from letsencrypt.acme.jose.jwa import RS256 + sig = ( + '\x13\xf0\xe5\x83\x91\xd8~\x02q\xdf\xbdwX\x97\xecn\xe4UH\xb0' + '\xe1oq\x94\x9f\xf4\x0f\xcb0\x05\xa9\x0fs\xea\xf3\xe3\xe7' + '\x1cAh\xb3@\xb8\xe4UnG\xa0\xb2K\xac-\x1c1\x1c\xe9dw}2@\xa7' + '\xf0\xe8' + ) + self.assertEqual(RS256.sign(RSA512_KEY, 'foo'), sig) + # next tests guard that only True/False are return as oppossed + # to e.g. 1/0 + self.assertTrue(RS256.verify(RSA512_KEY, 'foo', sig) is True) + self.assertFalse(RS256.verify(RSA512_KEY, 'foo', sig + '!') is False) + + def test_ps(self): + from letsencrypt.acme.jose.jwa import PS256 + sig = PS256.sign(RSA1024_KEY, 'foo') + self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig) is True) + self.assertTrue(PS256.verify(RSA1024_KEY, 'foo', sig + '!') is False) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/jose/jwk.py b/letsencrypt/acme/jose/jwk.py new file mode 100644 index 00000000..ccdef790 --- /dev/null +++ b/letsencrypt/acme/jose/jwk.py @@ -0,0 +1,100 @@ +"""JSON Web Key.""" +import binascii + +import Crypto.PublicKey.RSA + +from letsencrypt.acme.jose import b64 +from letsencrypt.acme.jose import errors +from letsencrypt.acme.jose import json_util + + +class JWK(json_util.TypedJSONObjectWithFields): + # pylint: disable=too-few-public-methods + """JSON Web Key.""" + type_field_name = 'kty' + TYPES = {} + + +@JWK.register +class JWKES(JWK): # pragma: no cover + # pylint: disable=abstract-class-not-used + """ES JWK. + + .. warning:: This is not yet implemented! + + """ + typ = 'ES' + + def fields_to_json(self): + raise NotImplementedError() + + @classmethod + def fields_from_json(cls, jobj): + raise NotImplementedError() + + +@JWK.register +class JWKOct(JWK): + """Symmetric JWK.""" + typ = 'oct' + __slots__ = ('key',) + + def fields_to_json(self): + # TODO: An "alg" member SHOULD also be present to identify the + # algorithm intended to be used with the key, unless the + # application uses another means or convention to determine + # the algorithm used. + return {'k': self.key} + + @classmethod + def fields_from_json(cls, jobj): + return cls(key=jobj['k']) + + +@JWK.register +class JWKRSA(JWK): + """RSA JWK.""" + typ = 'RSA' + __slots__ = ('key',) + + @classmethod + def _encode_param(cls, data): + def _leading_zeros(arg): + if len(arg) % 2: + return '0' + arg + return arg + + return b64.b64encode(binascii.unhexlify( + _leading_zeros(hex(data)[2:].rstrip('L')))) + + @classmethod + def _decode_param(cls, data): + try: + return long(binascii.hexlify(json_util.decode_b64jose(data)), 16) + except ValueError: # invalid literal for long() with base 16 + raise errors.DeserializationError() + + @classmethod + def load(cls, key): + """Load RSA key from string. + + :param str key: RSA key in string form. + + :returns: + :rtype: :class:`JWKRSA` + + """ + return cls(key=Crypto.PublicKey.RSA.importKey(key)) + + @classmethod + def fields_from_json(cls, jobj): + return cls(key=Crypto.PublicKey.RSA.construct( + (cls._decode_param(jobj['n']), + cls._decode_param(jobj['e'])))) + + def fields_to_json(self): + return { + 'n': self._encode_param(self.key.n), + 'e': self._encode_param(self.key.e), + } + diff --git a/letsencrypt/acme/jose/jwk_test.py b/letsencrypt/acme/jose/jwk_test.py new file mode 100644 index 00000000..7f851e65 --- /dev/null +++ b/letsencrypt/acme/jose/jwk_test.py @@ -0,0 +1,88 @@ +"""Tests for letsencrypt.acme.jose.jwk.""" +import os +import pkg_resources +import unittest + +from Crypto.PublicKey import RSA + +from letsencrypt.acme.jose import errors + + +RSA256_KEY = RSA.importKey(pkg_resources.resource_string( + 'letsencrypt.client.tests', os.path.join('testdata', 'rsa256_key.pem'))) +RSA512_KEY = RSA.importKey(pkg_resources.resource_string( + 'letsencrypt.client.tests', os.path.join('testdata', 'rsa512_key.pem'))) + + +class JWKOctTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.jwk.JWKOct.""" + + def setUp(self): + from letsencrypt.acme.jose.jwk import JWKOct + self.jwk = JWKOct(key='foo') + self.jobj = {'kty': 'oct', 'k': 'foo'} + + def test_to_json(self): + self.assertEqual(self.jwk.to_json(), self.jobj) + + def test_from_json(self): + from letsencrypt.acme.jose.jwk import JWKOct + self.assertEqual(self.jwk, JWKOct.from_json(self.jobj)) + + +class JWKRSATest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.jwk.JWKRSA.""" + + def setUp(self): + from letsencrypt.acme.jose.jwk import JWKRSA + self.jwk256 = JWKRSA(key=RSA256_KEY.publickey()) + self.jwk256json = { + 'kty': 'RSA', + 'e': 'AQAB', + 'n': 'rHVztFHtH92ucFJD_N_HW9AsdRsUuHUBBBDlHwNlRd3fp5' + '80rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3C5Q', + } + self.jwk512 = JWKRSA(key=RSA512_KEY.publickey()) + self.jwk512json = { + 'kty': 'RSA', + 'e': 'AQAB', + 'n': '9LYRcVE3Nr-qleecEcX8JwVDnjeG1X7ucsCasuuZM0e09c' + 'mYuUzxIkMjO_9x4AVcvXXRXPEV-LzWWkfkTlzRMw', + } + + def test_equals(self): + self.assertEqual(self.jwk256, self.jwk256) + self.assertEqual(self.jwk512, self.jwk512) + + def test_not_equals(self): + self.assertNotEqual(self.jwk256, self.jwk512) + self.assertNotEqual(self.jwk512, self.jwk256) + + def test_load(self): + from letsencrypt.acme.jose.jwk import JWKRSA + self.assertEqual(JWKRSA(key=RSA256_KEY), JWKRSA.load( + pkg_resources.resource_string( + 'letsencrypt.client.tests', + os.path.join('testdata', 'rsa256_key.pem')))) + + def test_to_json(self): + self.assertEqual(self.jwk256.to_json(), self.jwk256json) + self.assertEqual(self.jwk512.to_json(), self.jwk512json) + + def test_from_json(self): + from letsencrypt.acme.jose.jwk import JWK + self.assertEqual(self.jwk256, JWK.from_json(self.jwk256json)) + # TODO: fix schemata to allow RSA512 + #self.assertEqual(self.jwk512, JWK.from_json(self.jwk512json)) + + def test_from_json_non_schema_errors(self): + # valid against schema, but still failing + from letsencrypt.acme.jose.jwk import JWK + self.assertRaises(errors.DeserializationError, JWK.from_json, + {'kty': 'RSA', 'e': 'AQAB', 'n': ''}) + self.assertRaises(errors.DeserializationError, JWK.from_json, + {'kty': 'RSA', 'e': 'AQAB', 'n': '1'}) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/jose/testdata/README b/letsencrypt/acme/jose/testdata/README new file mode 100644 index 00000000..9e0f2059 --- /dev/null +++ b/letsencrypt/acme/jose/testdata/README @@ -0,0 +1,10 @@ +The following commands has been used to generate test keys: + + for x in 256 512 1024; do openssl genrsa -out rsa${k}_key.pem $k; done + +and for the CSR: + + python -c from letsencrypt.client.crypto_util import make_csr; + import pkg_resources; open("csr2.pem", + "w").write(make_csr(pkg_resources.resource_string("letsencrypt.client.tests", + "testdata/rsa512_key.pem"), ["example2.com"])[0]) diff --git a/letsencrypt/acme/jose/testdata/csr2.pem b/letsencrypt/acme/jose/testdata/csr2.pem new file mode 100644 index 00000000..bd059a44 --- /dev/null +++ b/letsencrypt/acme/jose/testdata/csr2.pem @@ -0,0 +1,10 @@ +-----BEGIN CERTIFICATE REQUEST----- +MIIBXzCCAQkCAQAwejELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIw +EAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECwwWVW5pdmVy +c2l0eSBvZiBNaWNoaWdhbjEVMBMGA1UEAwwMZXhhbXBsZTIuY29tMFwwDQYJKoZI +hvcNAQEBBQADSwAwSAJBAPS2EXFRNza/qpXnnBHF/CcFQ543htV+7nLAmrLrmTNH +tPXJmLlM8SJDIzv/ceAFXL110VzxFfi81lpH5E5c0TMCAwEAAaAqMCgGCSqGSIb3 +DQEJDjEbMBkwFwYDVR0RBBAwDoIMZXhhbXBsZTIuY29tMA0GCSqGSIb3DQEBCwUA +A0EAwsdL4FLIgISKV4vXFmc6QTV7CjBiP4XmPFbeN+gMFdR7QcnRyyxSpXxB0v8Z +oqYboP5LGFt9zC6/9GyjcI9/IQ== +-----END CERTIFICATE REQUEST----- diff --git a/letsencrypt/acme/jose/testdata/rsa1024_key.pem b/letsencrypt/acme/jose/testdata/rsa1024_key.pem new file mode 100644 index 00000000..de5339d0 --- /dev/null +++ b/letsencrypt/acme/jose/testdata/rsa1024_key.pem @@ -0,0 +1,15 @@ +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCaifO0fGlcAcjjcYEAPYcIL0Hf0KiNa9VCJ14RBdlZxLWRrVFi +4tdNCKSKqzKuKrrA8DWd4PHFD7UpLyRrPPXY6GozAyCT+5UFBClGJ2KyNKu+eU6/ +w4C1kpO4lpeXs8ptFc1lA9P8V1M/MkWzTE402nPNK0uUmZNo2tsFpGJUSQIDAQAB +AoGAFjLWxQhSAhtnhfRZ+XTdHrnbFpFchOQGgDgzdPKIJDLzefeRh0jacIBbUmgB +Ia+Vn/1hVkpnsEzvUvkonBbnoYWlYVQdpNTmrrew7SOztf8/1fYCsSkyDAvqGTXc +TmHM0PaLS+junoWcKOvQRVb0N3k+43OnBkr2b393Sx30qGECQQDNO2IBWOsYs8cB +CZQAZs8zBlbwBFZibqovqpLwXt9adBIsT9XzgagGbJMpzSuoHTUn3QqqJd9uHD8X +UTmmoh4NAkEAwMRauo+PlNj8W1lusflko52KL17+E5cmeOERM2xvhZNpO7d3/1ak +Co9dxVMicrYSh7jXbcXFNt3xNDTv6Dg8LQJAPuJwMDt/pc0IMCAwMkNOP7M0lkyt +73E7QmnAplhblcq0+tDnnLpgsr84BHnyY4u3iuRm7SW3pXSQPGPOB2nrTQJANBXa +HgakWSe4KEal7ljgpITwzZPxOwHgV1EZALgP+hu2l3gfaFLUyDWstKCd8jjYEOwU +6YhCnWyiu+SB3lEzkQJBAJapJpfypFyY8kQNYlYILLBcPu5fmy3QUZKHJ4L3rIVJ +c2UTLMeBBgGFHT04CtWntmjwzSv+V6lwiCxKXsIUySc= +-----END RSA PRIVATE KEY----- diff --git a/letsencrypt/acme/jose/testdata/rsa256_key.pem b/letsencrypt/acme/jose/testdata/rsa256_key.pem new file mode 100644 index 00000000..659274d1 --- /dev/null +++ b/letsencrypt/acme/jose/testdata/rsa256_key.pem @@ -0,0 +1,6 @@ +-----BEGIN RSA PRIVATE KEY----- +MIGrAgEAAiEAm2Fylv+Uz7trgTW8EBHP3FQSMeZs2GNQ6VRo1sIVJEkCAwEAAQIh +AJT0BA/xD01dFCAXzSNyj9nfSZa3NpqzJZZn/eOm7vghAhEAzUVNZn4lLLBD1R6N +E8TKNQIRAMHHyn3O5JeY36lwKwkUlEUCEAliRauN0L0+QZuYjfJ9aJECEGx4dru3 +rTPCyighdqWNlHUCEQCiLjlwSRtWgmMBudCkVjzt +-----END RSA PRIVATE KEY----- diff --git a/letsencrypt/acme/jose/testdata/rsa512_key.pem b/letsencrypt/acme/jose/testdata/rsa512_key.pem new file mode 100644 index 00000000..77627dcd --- /dev/null +++ b/letsencrypt/acme/jose/testdata/rsa512_key.pem @@ -0,0 +1,9 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIBPAIBAAJBAJ+afYCLq33YTZumktV+Lg9LpDGKCv/DxuXkXc40mFc+82KbsyR8 +5/S2pmNQrKzL/jLmenQT67PnRaVNqEsvj2UCAwEAAQJAJWqOaYhU19fRud+/JJXE +LonJIGQAWB2Jj3OOGj1ySWF13ahdsQxXKQoVSUTnrvLJkrQwXwNFck9BnZ1otL6u +MQIhAMw84RdsMJufn7bCMe6ppVukoGKRbjxE8ar/tBGUOOFrAiEAyA2ysBdOXF8z +FweoKED11siyJbHuuavMaoL1ZI779m8CIQCWuf8seA3PbBhEmkCbb9u3LGGpHMcL +952aoydTKd5ojQIhAKuSA+O9uTjDdL+Vk4QiYjS4nwBxH3ohewkGE4sQjcsFAiEA +uToAFyz5vUHnk8vME9y+ZIHSePBqckGwXVOfgIbATF0= +-----END RSA PRIVATE KEY----- diff --git a/letsencrypt/acme/jose/util.py b/letsencrypt/acme/jose/util.py new file mode 100644 index 00000000..5f516884 --- /dev/null +++ b/letsencrypt/acme/jose/util.py @@ -0,0 +1,123 @@ +"""JOSE utilities.""" +import collections + + +class abstractclassmethod(classmethod): + # pylint: disable=invalid-name,too-few-public-methods + """Descriptor for an abstract classmethod. + + It augments the :mod:`abc` framework with an abstract + classmethod. This is implemented as :class:`abc.abstractclassmethod` + in the standard Python library starting with version 3.2. + + This particular implementation, allegedly based on Python 3.3 source + code, is stolen from + http://stackoverflow.com/questions/11217878/python-2-7-combine-abc-abstractmethod-and-classmethod. + + """ + __isabstractmethod__ = True + + def __init__(self, target): + target.__isabstractmethod__ = True + super(abstractclassmethod, self).__init__(target) + + +class ComparableX509(object): # pylint: disable=too-few-public-methods + """Wrapper for M2Crypto.X509.* objects that supports __eq__. + + Wraps around: + + - :class:`M2Crypto.X509.X509` + - :class:`M2Crypto.X509.Request` + + """ + def __init__(self, wrapped): + self._wrapped = wrapped + + def __getattr__(self, name): + return getattr(self._wrapped, name) + + def __eq__(self, other): + return self.as_der() == other.as_der() + + +class ImmutableMap(collections.Mapping, collections.Hashable): + # pylint: disable=too-few-public-methods + """Immutable key to value mapping with attribute access.""" + + __slots__ = () + """Must be overriden in subclasses.""" + + def __init__(self, **kwargs): + if set(kwargs) != set(self.__slots__): + raise TypeError( + '__init__() takes exactly the following arguments: {0} ' + '({1} given)'.format(', '.join(self.__slots__), + ', '.join(kwargs) if kwargs else 'none')) + for slot in self.__slots__: + object.__setattr__(self, slot, kwargs.pop(slot)) + + def __getitem__(self, key): + try: + return getattr(self, key) + except AttributeError: + raise KeyError(key) + + def __iter__(self): + return iter(self.__slots__) + + def __len__(self): + return len(self.__slots__) + + def __hash__(self): + return hash(tuple(getattr(self, slot) for slot in self.__slots__)) + + def __setattr__(self, name, value): + raise AttributeError("can't set attribute") + + def __repr__(self): + return '{0}({1})'.format(self.__class__.__name__, ', '.join( + '{0}={1!r}'.format(key, value) for key, value in self.iteritems())) + + +class frozendict(collections.Mapping, collections.Hashable): + # pylint: disable=invalid-name,too-few-public-methods + """Frozen dictionary.""" + __slots__ = ('_items', '_keys') + + def __init__(self, *args, **kwargs): + if kwargs and not args: + items = dict(kwargs) + elif len(args) == 1 and isinstance(args[0], collections.Mapping): + items = args[0] + else: + raise TypeError() + # TODO: support generators/iterators + + object.__setattr__(self, '_items', items) + object.__setattr__(self, '_keys', tuple(sorted(items.iterkeys()))) + + def __getitem__(self, key): + return self._items[key] + + def __iter__(self): + return iter(self._keys) + + def __len__(self): + return len(self._items) + + def __hash__(self): + return hash(tuple((key, value) for key, value in self.items())) + + def __getattr__(self, name): + try: + return self._items[name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value): + raise AttributeError("can't set attribute") + + def __repr__(self): + return 'frozendict({0})'.format(', '.join( + '{0}={1!r}'.format(key, value) for key, value in self.iteritems())) diff --git a/letsencrypt/acme/jose/util_test.py b/letsencrypt/acme/jose/util_test.py new file mode 100644 index 00000000..671b4547 --- /dev/null +++ b/letsencrypt/acme/jose/util_test.py @@ -0,0 +1,107 @@ +"""Tests for letsencrypt.acme.jose.util.""" +import functools +import unittest + + +class ImmutableMapTest(unittest.TestCase): + """Tests for letsencrypt.acme.jose.util.ImmutableMap.""" + + def setUp(self): + # pylint: disable=invalid-name,too-few-public-methods + # pylint: disable=missing-docstring + from letsencrypt.acme.jose.util import ImmutableMap + + class A(ImmutableMap): + __slots__ = ('x', 'y') + + class B(ImmutableMap): + __slots__ = ('x', 'y') + + self.A = A + self.B = B + + self.a1 = self.A(x=1, y=2) + self.a1_swap = self.A(y=2, x=1) + self.a2 = self.A(x=3, y=4) + self.b = self.B(x=1, y=2) + + def test_get_missing_item_raises_key_error(self): + self.assertRaises(KeyError, self.a1.__getitem__, 'z') + + def test_order_of_args_does_not_matter(self): + self.assertEqual(self.a1, self.a1_swap) + + def test_type_error_on_missing(self): + self.assertRaises(TypeError, self.A, x=1) + self.assertRaises(TypeError, self.A, y=2) + + def test_type_error_on_unrecognized(self): + self.assertRaises(TypeError, self.A, x=1, z=2) + self.assertRaises(TypeError, self.A, x=1, y=2, z=3) + + def test_get_attr(self): + self.assertEqual(1, self.a1.x) + self.assertEqual(2, self.a1.y) + self.assertEqual(1, self.a1_swap.x) + self.assertEqual(2, self.a1_swap.y) + + def test_set_attr_raises_attribute_error(self): + self.assertRaises( + AttributeError, functools.partial(self.a1.__setattr__, 'x'), 10) + + def test_equal(self): + self.assertEqual(self.a1, self.a1) + self.assertEqual(self.a2, self.a2) + self.assertNotEqual(self.a1, self.a2) + + def test_hash(self): + self.assertEqual(hash((1, 2)), hash(self.a1)) + + def test_unhashable(self): + self.assertRaises(TypeError, self.A(x=1, y={}).__hash__) + + def test_repr(self): + self.assertEqual('A(x=1, y=2)', repr(self.a1)) + self.assertEqual('A(x=1, y=2)', repr(self.a1_swap)) + self.assertEqual('B(x=1, y=2)', repr(self.b)) + self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar'))) + + +class frozendictTest(unittest.TestCase): # pylint: disable=invalid-name + """Tests for letsencrypt.acme.jose.util.frozendict.""" + + def setUp(self): + from letsencrypt.acme.jose.util import frozendict + self.fdict = frozendict(x=1, y='2') + + def test_init_dict(self): + from letsencrypt.acme.jose.util import frozendict + self.assertEqual(self.fdict, frozendict({'x': 1, 'y': '2'})) + + def test_init_other_raises_type_error(self): + from letsencrypt.acme.jose.util import frozendict + # specifically fail for generators... + self.assertRaises(TypeError, frozendict, {'a': 'b'}.iteritems()) + + def test_len(self): + self.assertEqual(2, len(self.fdict)) + + def test_hash(self): + self.assertEqual(1278944519403861804, hash(self.fdict)) + + def test_getattr_proxy(self): + self.assertEqual(1, self.fdict.x) + self.assertEqual('2', self.fdict.y) + + def test_getattr_raises_attribute_error(self): + self.assertRaises(AttributeError, self.fdict.__getattr__, 'z') + + def test_setattr_immutable(self): + self.assertRaises(AttributeError, self.fdict.__setattr__, 'z', 3) + + def test_repr(self): + self.assertEqual("frozendict(x=1, y='2')", repr(self.fdict)) + + +if __name__ == '__main__': + unittest.main() diff --git a/letsencrypt/acme/messages.py b/letsencrypt/acme/messages.py index 64f7a035..b3a376c8 100644 --- a/letsencrypt/acme/messages.py +++ b/letsencrypt/acme/messages.py @@ -1,6 +1,4 @@ """ACME protocol messages.""" -import json - import jsonschema from letsencrypt.acme import challenges @@ -9,11 +7,16 @@ from letsencrypt.acme import jose from letsencrypt.acme import other from letsencrypt.acme import util +from letsencrypt.acme.jose import errors as jose_errors +from letsencrypt.acme.jose import json_util -class Message(util.TypedACMEObject): + +class Message(jose.TypedJSONObjectWithFields): # _fields_to_json | pylint: disable=abstract-method + # pylint: disable=too-few-public-methods """ACME message.""" TYPES = {} + type_field_name = "type" schema = NotImplemented """JSON schema the object is tested against in :meth:`from_json`. @@ -24,28 +27,6 @@ class Message(util.TypedACMEObject): """ - @classmethod - def get_msg_cls(cls, jobj): - """Get the registered class for ``jobj``.""" - if cls in cls.TYPES.itervalues(): - # cls is already registered Message type, force to use it - # so that, e.g Revocation.from_json(jobj) fails if - # jobj["type"] != "revocation". - return cls - - if not isinstance(jobj, dict): - raise errors.ValidationError( - "{0} is not a dictionary object".format(jobj)) - try: - msg_type = jobj["type"] - except KeyError: - raise errors.ValidationError("missing type field") - - try: - return cls.TYPES[msg_type] - except KeyError: - raise errors.UnrecognizedTypeError(msg_type) - @classmethod def from_json(cls, jobj): """Deserialize from (possibly invalid) JSON object. @@ -57,35 +38,21 @@ class Message(util.TypedACMEObject): :raises letsencrypt.acme.errors.SchemaValidationError: if the input JSON object could not be validated against JSON schema specified in :attr:`schema`. - :raises letsencrypt.acme.errors.ValidationError: for any other generic - error in decoding. + :raises letsencrypt.acme.jose.errors.DeserializationError: for any + other generic error in decoding. :returns: instance of the class """ - msg_cls = cls.get_msg_cls(jobj) + msg_cls = cls.get_type_cls(jobj) + # TODO: is that schema testing still relevant? try: jsonschema.validate(jobj, msg_cls.schema) except jsonschema.ValidationError as error: raise errors.SchemaValidationError(error) - return cls.from_valid_json(jobj) - - @classmethod - def json_loads(cls, json_string): - """Load JSON string.""" - return cls.from_json(json.loads(json_string)) - - def json_dumps(self, *args, **kwargs): - """Dump to JSON string using proper serializer. - - :returns: JSON serialized string. - :rtype: str - - """ - return json.dumps( - self, *args, default=util.dump_ijsonserializable, **kwargs) + return super(Message, cls).from_json(jobj) @Message.register # pylint: disable=too-few-public-methods @@ -96,86 +63,55 @@ class Challenge(Message): :ivar list challenges: List of :class:`~letsencrypt.acme.challenges.Challenge` objects. - """ - acme_type = "challenge" - schema = util.load_schema(acme_type) - __slots__ = ("session_id", "nonce", "challenges", "combinations") + .. todo:: + 1. can challenges contain two challenges of the same type? + 2. can challenges contain duplicates? + 3. check "combinations" indices are in valid range + 4. turn "combinations" elements into sets? + 5. turn "combinations" into set? - def _fields_to_json(self): - fields = { - "sessionID": self.session_id, - "nonce": jose.b64encode(self.nonce), - "challenges": self.challenges, - } - if self.combinations: - fields["combinations"] = self.combinations - return fields + """ + typ = "challenge" + schema = util.load_schema(typ) + + session_id = jose.Field("sessionID") + nonce = jose.Field("nonce", encoder=jose.b64encode, + decoder=jose.decode_b64jose) + challenges = jose.Field("challenges") + combinations = jose.Field("combinations", omitempty=True, default=()) + + @challenges.decoder + def challenges(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(challenges.Challenge.from_json(chall) for chall in value) @property def resolved_combinations(self): """Combinations with challenges instead of indices.""" - return [[self.challenges[idx] for idx in combo] - for combo in self.combinations] - - @classmethod - def from_valid_json(cls, jobj): - # TODO: can challenges contain two challenges of the same type? - # TODO: can challenges contain duplicates? - # TODO: check "combinations" indices are in valid range - # TODO: turn "combinations" elements into sets? - # TODO: turn "combinations" into set? - return cls(session_id=jobj["sessionID"], - nonce=util.decode_b64jose(jobj["nonce"]), - challenges=[challenges.Challenge.from_valid_json(chall) - for chall in jobj["challenges"]], - combinations=jobj.get("combinations", [])) + return tuple(tuple(self.challenges[idx] for idx in combo) + for combo in self.combinations) @Message.register # pylint: disable=too-few-public-methods class ChallengeRequest(Message): """ACME "challengeRequest" message.""" - acme_type = "challengeRequest" - schema = util.load_schema(acme_type) - __slots__ = ("identifier",) - - def _fields_to_json(self): - return { - "identifier": self.identifier, - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(identifier=jobj["identifier"]) + typ = "challengeRequest" + schema = util.load_schema(typ) + identifier = jose.Field("identifier") @Message.register # pylint: disable=too-few-public-methods class Authorization(Message): """ACME "authorization" message. - :ivar jwk: :class:`letsencrypt.acme.other.JWK` + :ivar jwk: :class:`letsencrypt.acme.jose.JWK` """ - acme_type = "authorization" - schema = util.load_schema(acme_type) - __slots__ = ("recovery_token", "identifier", "jwk") + typ = "authorization" + schema = util.load_schema(typ) - def _fields_to_json(self): - fields = {} - if self.recovery_token is not None: - fields["recoveryToken"] = self.recovery_token - if self.identifier is not None: - fields["identifier"] = self.identifier - if self.jwk is not None: - fields["jwk"] = self.jwk - return fields - - @classmethod - def from_valid_json(cls, jobj): - jwk = jobj.get("jwk") - if jwk is not None: - jwk = other.JWK.from_valid_json(jwk) - return cls(recovery_token=jobj.get("recoveryToken"), - identifier=jobj.get("identifier"), jwk=jwk) + recovery_token = jose.Field("recoveryToken", omitempty=True) + identifier = jose.Field("identifier", omitempty=True) + jwk = jose.Field("jwk", decoder=jose.JWK.from_json, omitempty=True) @Message.register @@ -189,9 +125,20 @@ class AuthorizationRequest(Message): :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). """ - acme_type = "authorizationRequest" - schema = util.load_schema(acme_type) - __slots__ = ("session_id", "nonce", "responses", "signature", "contact") + typ = "authorizationRequest" + schema = util.load_schema(typ) + + session_id = jose.Field("sessionID") + nonce = jose.Field("nonce", encoder=jose.b64encode, + decoder=jose.decode_b64jose) + responses = jose.Field("responses") + signature = jose.Field("signature", decoder=other.Signature.from_json) + contact = jose.Field("contact", omitempty=True, default=()) + + @responses.decoder + def responses(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(challenges.ChallengeResponse.from_json(chall) + for chall in value) @classmethod def create(cls, name, key, sig_nonce=None, **kwargs): @@ -213,7 +160,7 @@ class AuthorizationRequest(Message): signature = other.Signature.from_msg( name + kwargs["nonce"], key, sig_nonce) return cls( - signature=signature, contact=kwargs.pop("contact", []), **kwargs) + signature=signature, contact=kwargs.pop("contact", ()), **kwargs) def verify(self, name): """Verify signature. @@ -228,29 +175,9 @@ class AuthorizationRequest(Message): :rtype: bool """ + # self.signature is not Field | pylint: disable=no-member return self.signature.verify(name + self.nonce) - def _fields_to_json(self): - fields = { - "sessionID": self.session_id, - "nonce": jose.b64encode(self.nonce), - "responses": self.responses, - "signature": self.signature, - } - if self.contact: - fields["contact"] = self.contact - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls( - session_id=jobj["sessionID"], - nonce=util.decode_b64jose(jobj["nonce"]), - responses=[challenges.ChallengeResponse.from_valid_json(chall) - for chall in jobj["responses"]], - signature=other.Signature.from_valid_json(jobj["signature"]), - contact=jobj.get("contact", [])) - @Message.register # pylint: disable=too-few-public-methods class Certificate(Message): @@ -263,24 +190,21 @@ class Certificate(Message): wrapped in :class:`letsencrypt.acme.util.ComparableX509` ). """ - acme_type = "certificate" - schema = util.load_schema(acme_type) - __slots__ = ("certificate", "chain", "refresh") + typ = "certificate" + schema = util.load_schema(typ) - def _fields_to_json(self): - fields = {"certificate": util.encode_cert(self.certificate)} - if self.chain: - fields["chain"] = [util.encode_cert(cert) for cert in self.chain] - if self.refresh is not None: - fields["refresh"] = self.refresh - return fields + certificate = jose.Field("certificate", encoder=jose.encode_cert, + decoder=jose.decode_cert) + chain = jose.Field("chain", omitempty=True, default=()) + refresh = jose.Field("refresh", omitempty=True) - @classmethod - def from_valid_json(cls, jobj): - return cls(certificate=util.decode_cert(jobj["certificate"]), - chain=[util.decode_cert(cert) for cert in - jobj.get("chain", [])], - refresh=jobj.get("refresh")) + @chain.decoder + def chain(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(jose.decode_cert(cert) for cert in value) + + @chain.encoder + def chain(value): # pylint: disable=missing-docstring,no-self-argument + return tuple(jose.encode_cert(cert) for cert in value) @Message.register @@ -292,9 +216,26 @@ class CertificateRequest(Message): :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). """ - acme_type = "certificateRequest" - schema = util.load_schema(acme_type) - __slots__ = ("csr", "signature") + typ = "certificateRequest" + schema = util.load_schema(typ) + + csr = jose.Field("csr", encoder=jose.encode_csr, + decoder=jose.decode_csr) + signature = jose.Field("signature") + + @classmethod + def fields_from_json(cls, jobj): + cls._check_required(jobj) + + sig = other.Signature.from_json( + jobj[cls._fields['signature'].json_name]) + if not sig.verify(json_util.decode_b64jose(jobj["csr"])): + raise jose_errors.DeserializationError( + 'Signature could not be verified') + # verify signature before decoding principle! + csr = jose.decode_csr(jobj[cls._fields['csr'].json_name]) + + return {'signature': sig, 'csr': csr} @classmethod def create(cls, key, sig_nonce=None, **kwargs): @@ -324,49 +265,32 @@ class CertificateRequest(Message): :rtype: bool """ + # self.signature is not Field | pylint: disable=no-member return self.signature.verify(self.csr.as_der()) - def _fields_to_json(self): - return { - "csr": util.encode_csr(self.csr), - "signature": self.signature, - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(csr=util.decode_csr(jobj["csr"]), - signature=other.Signature.from_valid_json(jobj["signature"])) - @Message.register # pylint: disable=too-few-public-methods class Defer(Message): """ACME "defer" message.""" - acme_type = "defer" - schema = util.load_schema(acme_type) - __slots__ = ("token", "interval", "message") + typ = "defer" + schema = util.load_schema(typ) - def _fields_to_json(self): - fields = {"token": self.token} - if self.interval is not None: - fields["interval"] = self.interval - if self.message is not None: - fields["message"] = self.message - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj["token"], interval=jobj.get("interval"), - message=jobj.get("message")) + token = jose.Field("token") + interval = jose.Field("interval", omitempty=True) + message = jose.Field("message", omitempty=True) @Message.register # pylint: disable=too-few-public-methods class Error(Message): """ACME "error" message.""" - acme_type = "error" - schema = util.load_schema(acme_type) - __slots__ = ("error", "message", "more_info") + typ = "error" + schema = util.load_schema(typ) - CODES = { + error = jose.Field("error") + message = jose.Field("message", omitempty=True) + more_info = jose.Field("moreInfo", omitempty=True) + + MESSAGE_CODES = { "malformed": "The request message was malformed", "unauthorized": "The client lacks sufficient authorization", "serverInternal": "The server experienced an internal error", @@ -375,33 +299,12 @@ class Error(Message): "badCSR": "The CSR is unacceptable (e.g., due to a short key)", } - def _fields_to_json(self): - fields = {"error": self.error} - if self.message is not None: - fields["message"] = self.message - if self.more_info is not None: - fields["moreInfo"] = self.more_info - return fields - - @classmethod - def from_valid_json(cls, jobj): - return cls(error=jobj["error"], message=jobj.get("message"), - more_info=jobj.get("moreInfo")) - @Message.register # pylint: disable=too-few-public-methods class Revocation(Message): """ACME "revocation" message.""" - acme_type = "revocation" - schema = util.load_schema(acme_type) - __slots__ = () - - def _fields_to_json(self): - return {} - - @classmethod - def from_valid_json(cls, jobj): - return cls() + typ = "revocation" + schema = util.load_schema(typ) @Message.register @@ -413,9 +316,12 @@ class RevocationRequest(Message): :ivar signature: Signature (:class:`letsencrypt.acme.other.Signature`). """ - acme_type = "revocationRequest" - schema = util.load_schema(acme_type) - __slots__ = ("certificate", "signature") + typ = "revocationRequest" + schema = util.load_schema(typ) + + certificate = jose.Field("certificate", decoder=jose.decode_cert, + encoder=jose.encode_cert) + signature = jose.Field("signature", decoder=other.Signature.from_json) @classmethod def create(cls, key, sig_nonce=None, **kwargs): @@ -445,34 +351,13 @@ class RevocationRequest(Message): :rtype: bool """ + # self.signature is not Field | pylint: disable=no-member return self.signature.verify(self.certificate.as_der()) - def _fields_to_json(self): - return { - "certificate": util.encode_cert(self.certificate), - "signature": self.signature, - } - - @classmethod - def from_valid_json(cls, jobj): - return cls(certificate=util.decode_cert(jobj["certificate"]), - signature=other.Signature.from_valid_json(jobj["signature"])) - @Message.register # pylint: disable=too-few-public-methods class StatusRequest(Message): - """ACME "statusRequest" message. - - :ivar unicode token: Token provided in ACME "defer" message. - - """ - acme_type = "statusRequest" - schema = util.load_schema(acme_type) - __slots__ = ("token",) - - def _fields_to_json(self): - return {"token": self.token} - - @classmethod - def from_valid_json(cls, jobj): - return cls(token=jobj["token"]) + """ACME "statusRequest" message.""" + typ = "statusRequest" + schema = util.load_schema(typ) + token = jose.Field("token") diff --git a/letsencrypt/acme/messages_test.py b/letsencrypt/acme/messages_test.py index ab9f4f64..ba1962b4 100644 --- a/letsencrypt/acme/messages_test.py +++ b/letsencrypt/acme/messages_test.py @@ -9,17 +9,21 @@ from letsencrypt.acme import challenges from letsencrypt.acme import errors from letsencrypt.acme import jose from letsencrypt.acme import other -from letsencrypt.acme import util + +from letsencrypt.acme.jose import errors as jose_errors KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( 'letsencrypt.client.tests', 'testdata/rsa256_key.pem')) -CERT = util.ComparableX509(M2Crypto.X509.load_cert( +CERT = jose.ComparableX509(M2Crypto.X509.load_cert( pkg_resources.resource_filename( 'letsencrypt.client.tests', 'testdata/cert.pem'))) -CSR = util.ComparableX509(M2Crypto.X509.load_request( +CSR = jose.ComparableX509(M2Crypto.X509.load_request( pkg_resources.resource_filename( 'letsencrypt.client.tests', 'testdata/csr.pem'))) +CSR2 = jose.ComparableX509(M2Crypto.X509.load_request( + pkg_resources.resource_filename( + 'letsencrypt.acme.jose', 'testdata/csr2.pem'))) class MessageTest(unittest.TestCase): @@ -35,7 +39,7 @@ class MessageTest(unittest.TestCase): @MockParentMessage.register class MockMessage(MockParentMessage): - acme_type = 'test' + typ = 'test' schema = { 'type': 'object', 'properties': { @@ -56,43 +60,21 @@ class MessageTest(unittest.TestCase): self.parent_cls = MockParentMessage self.msg = MockMessage(price=123, name='foo') - def test_from_json_non_dict_fails(self): - self.assertRaises(errors.ValidationError, self.parent_cls.from_json, []) - - def test_from_json_dict_no_type_fails(self): - self.assertRaises(errors.ValidationError, self.parent_cls.from_json, {}) - - def test_from_json_unrecognized_type(self): - self.assertRaises(errors.UnrecognizedTypeError, - self.parent_cls.from_json, {'type': 'foo'}) - def test_from_json_validates(self): self.assertRaises(errors.SchemaValidationError, self.parent_cls.from_json, {'type': 'test', 'price': 'asd'}) - def test_from_json(self): - self.assertEqual(self.msg, self.parent_cls.from_json( - {'type': 'test', 'name': 'foo', 'price': 123})) - - def test_json_loads(self): - self.assertEqual(self.msg, self.parent_cls.json_loads( - '{"type": "test", "name": "foo", "price": 123}')) - - def test_json_dumps(self): - self.assertEqual(self.msg.json_dumps(sort_keys=True), - '{"name": "foo", "price": 123, "type": "test"}') - class ChallengeTest(unittest.TestCase): def setUp(self): - challs = [ + challs = ( challenges.SimpleHTTPS(token='IlirfxKKXAsHtmzK29Pj8A'), challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'), challenges.RecoveryToken(), - ] - combinations = [[0, 2], [1, 2]] + ) + combinations = ((0, 2), (1, 2)) from letsencrypt.acme.messages import Challenge self.msg = Challenge( @@ -112,21 +94,21 @@ class ChallengeTest(unittest.TestCase): 'type': 'challenge', 'sessionID': 'aefoGaavieG9Wihuk2aufai3aeZ5EeW4', 'nonce': '7Nbyb1lI6xPVI3Hg3aKSqQ', - 'challenges': [chall.to_json() for chall in challs], - 'combinations': combinations, + 'challenges': [chall.fully_serialize() for chall in challs], + 'combinations': [[0, 2], [1, 2]], # TODO array tuples } def test_resolved_combinations(self): - self.assertEqual(self.msg.resolved_combinations, [ - [ + self.assertEqual(self.msg.resolved_combinations, ( + ( challenges.SimpleHTTPS(token='IlirfxKKXAsHtmzK29Pj8A'), challenges.RecoveryToken() - ], - [ + ), + ( challenges.DNS(token='DGyRejmCefe7v4NfDGDKfA'), challenges.RecoveryToken(), - ] - ]) + ) + )) def test_to_json(self): self.assertEqual(self.msg.to_json(), self.jmsg_to) @@ -142,7 +124,7 @@ class ChallengeTest(unittest.TestCase): from letsencrypt.acme.messages import Challenge msg = Challenge.from_json(self.jmsg_from) - self.assertEqual(msg.combinations, []) + self.assertEqual(msg.combinations, ()) self.assertEqual(msg.to_json(), self.jmsg_to) @@ -168,7 +150,7 @@ class ChallengeRequestTest(unittest.TestCase): class AuthorizationTest(unittest.TestCase): def setUp(self): - jwk = other.JWK(key=KEY.publickey()) + jwk = jose.JWKRSA(key=KEY.publickey()) from letsencrypt.acme.messages import Authorization self.msg = Authorization(recovery_token='tok', jwk=jwk, @@ -207,14 +189,14 @@ class AuthorizationTest(unittest.TestCase): class AuthorizationRequestTest(unittest.TestCase): def setUp(self): - self.responses = [ + self.responses = ( challenges.SimpleHTTPSResponse(path='Hf5GrX4Q7EBax9hc2jJnfw'), None, # null challenges.RecoveryTokenResponse(token='23029d88d9e123e'), - ] - self.contact = ["mailto:cert-admin@example.com", "tel:+12025551212"] + ) + self.contact = ("mailto:cert-admin@example.com", "tel:+12025551212") signature = other.Signature( - alg='RS256', jwk=other.JWK(key=KEY.publickey()), + alg=jose.RS256, jwk=jose.JWKRSA(key=KEY.publickey()), sig='-v\xd8\xc2\xa3\xba0\xd6\x92\x16\xb5.\xbe\xa1[\x04\xbe' '\x1b\xa1X\xd2)\x18\x94\x8f\xd7\xd0\xc0\xbbcI`W\xdf v' '\xe4\xed\xe8\x03J\xe8\xc8l#\x10<\x96\xd2\xcdr\xa3' '\x1b\xa1\xf5!f\xef\xc64\xb6\x13') self.nonce = '\xec\xd6\xf2oYH\xeb\x13\xd5#q\xe0\xdd\xa2\x92\xa9' - from letsencrypt.acme.other import JWK - self.jwk = JWK(key=RSA256_KEY.publickey()) + self.alg = jose.RS256 + self.jwk = jose.JWKRSA(key=RSA256_KEY.publickey()) b64sig = ('SUPYKucUnhlTt8_sMxLiigOYdf_wlOLXPI-o7aRLTsOquVjDd6r' 'AX9AFJHk-bCMQPJbSzXKjG6H1IWbvxjS2Ew') @@ -88,7 +40,7 @@ class SignatureTest(unittest.TestCase): self.jsig_from = { 'nonce': b64nonce, - 'alg': self.alg, + 'alg': self.alg.to_json(), 'jwk': self.jwk.to_json(), 'sig': b64sig, } @@ -130,15 +82,17 @@ class SignatureTest(unittest.TestCase): def test_from_json(self): from letsencrypt.acme.other import Signature self.assertEqual( - self.signature, Signature.from_valid_json(self.jsig_from)) + self.signature, Signature.from_json(self.jsig_from)) def test_from_json_non_schema_errors(self): from letsencrypt.acme.other import Signature jwk = self.jwk.to_json() - self.assertRaises(errors.ValidationError, Signature.from_valid_json, { - 'alg': 'RS256', 'sig': 'x', 'nonce': '', 'jwk': jwk}) - self.assertRaises(errors.ValidationError, Signature.from_valid_json, { - 'alg': 'RS256', 'sig': '', 'nonce': 'x', 'jwk': jwk}) + self.assertRaises( + jose.DeserializationError, Signature.from_json, { + 'alg': 'RS256', 'sig': 'x', 'nonce': '', 'jwk': jwk}) + self.assertRaises( + jose.DeserializationError, Signature.from_json, { + 'alg': 'RS256', 'sig': '', 'nonce': 'x', 'jwk': jwk}) if __name__ == '__main__': diff --git a/letsencrypt/acme/util.py b/letsencrypt/acme/util.py index cc00dc2b..8bea9091 100644 --- a/letsencrypt/acme/util.py +++ b/letsencrypt/acme/util.py @@ -1,247 +1,9 @@ """ACME utilities.""" -import binascii import json import pkg_resources -import M2Crypto -import zope.interface - -from letsencrypt.acme import errors -from letsencrypt.acme import interfaces -from letsencrypt.acme import jose - - -class ComparableX509(object): # pylint: disable=too-few-public-methods - """Wrapper for M2Crypto.X509.* objects that supports __eq__. - - Wraps around: - - - :class:`M2Crypto.X509.X509` - - :class:`M2Crypto.X509.Request` - - """ - def __init__(self, wrapped): - self._wrapped = wrapped - - def __getattr__(self, name): - return getattr(self._wrapped, name) - - def __eq__(self, other): - return self.as_der() == other.as_der() - def load_schema(name): """Load JSON schema from distribution.""" return json.load(open(pkg_resources.resource_filename( __name__, "schemata/%s.json" % name))) - - -def dump_ijsonserializable(python_object): - """Serialize IJSONSerializable to JSON. - - This is meant to be passed to :func:`json.dumps` as ``default`` - argument in order to facilitate recursive calls to - :meth:`~letsencrypt.acme.interfaces.IJSONSerializable.to_json`. - Please see :meth:`letsencrypt.acme.interfaces.IJSONSerializable.to_json` - for an example. - - """ - # providedBy | pylint: disable=no-member - if interfaces.IJSONSerializable.providedBy(python_object): - return python_object.to_json() - else: - raise TypeError(repr(python_object) + ' is not JSON serializable') - - -class ImmutableMap(object): # pylint: disable=too-few-public-methods - """Immutable key to value mapping with attribute access.""" - - __slots__ = () - """Must be overriden in subclasses.""" - - def __init__(self, **kwargs): - if set(kwargs) != set(self.__slots__): - raise TypeError( - '__init__() takes exactly the following arguments: {0} ' - '({1} given)'.format(', '.join(self.__slots__), - ', '.join(kwargs) if kwargs else 'none')) - for slot in self.__slots__: - object.__setattr__(self, slot, kwargs.pop(slot)) - - def __setattr__(self, name, value): - raise AttributeError("can't set attribute") - - def __eq__(self, other): - return isinstance(other, self.__class__) and all( - getattr(self, slot) == getattr(other, slot) - for slot in self.__slots__) - - def __hash__(self): - return hash(tuple(getattr(self, slot) for slot in self.__slots__)) - - def __repr__(self): - return '{0}({1})'.format(self.__class__.__name__, ', '.join( - '{0}={1!r}'.format(slot, getattr(self, slot)) - for slot in self.__slots__)) - - -class ACMEObject(ImmutableMap): # pylint: disable=too-few-public-methods - """ACME object.""" - zope.interface.implements(interfaces.IJSONSerializable) - zope.interface.classImplements(interfaces.IJSONDeserializable) - - def to_json(self): # pragma: no cover - """Serialize to JSON.""" - raise NotImplementedError() - - @classmethod - def from_valid_json(cls, jobj): # pragma: no cover - """Deserialize from valid JSON object.""" - raise NotImplementedError() - - -def decode_b64jose(value, size=None, minimum=False): - """Decode ACME object JOSE Base64 encoded field. - - :param str value: Encoded field value. - :param int size: If specified, this function will check if data size - (after decoding) matches. - :param bool minimum: If ``True``, then ``size`` is the minimum required - size, otherwise ``size`` must be exact. - - :raises letsencrypt.acme.errors.ValidationError: if anything goes wrong - :returns: Decoded value. - - """ - try: - decoded = jose.b64decode(value) - except TypeError: - raise errors.ValidationError() - - if size is not None and ((not minimum and len(decoded) != size) - or (minimum and len(decoded) < size)): - raise errors.ValidationError() - - return decoded - - -def decode_hex16(value, size=None, minimum=False): - """Decode ACME object hex16-encoded field. - - :param str value: Encoded field value. - :param int size: If specified, this function will check if data size - (after decoding) matches. - :param bool minimum: If ``True``, then ``size`` is the minimum required - size, otherwise ``size`` must be exact. - - """ - # binascii.hexlify.__doc__: "The resulting string is therefore twice - # as long as the length of data." - if size is not None and ((not minimum and len(value) != size * 2) - or (minimum and len(value) < size * 2)): - raise errors.ValidationError() - try: - return binascii.unhexlify(value) - except TypeError as error: # odd-length string (binascci.unhexlify.__doc__) - raise errors.ValidationError(error) - - -def encode_cert(cert): - """Encode ACME object X509 certificate field.""" - return jose.b64encode(cert.as_der()) - - -def decode_cert(b64der): - """Decode ACME object X509 certificate field. - - :param str b64der: Input data that's meant to be valid base64 - DER-encoded certificate. - - :raises letsencrypt.acme.errors.ValidationError: if anything goes wrong - - :returns: Decoded certificate. - :rtype: :class:`M2Crypto.X509.X509` wrapped in :class:`ComparableX509`. - - """ - try: - return ComparableX509(M2Crypto.X509.load_cert_der_string( - decode_b64jose(b64der))) - except M2Crypto.X509.X509Error: - raise errors.ValidationError() - - -def encode_csr(csr): - """Encode ACME object CSR field.""" - return encode_cert(csr) - - -def decode_csr(b64der): - """Decode ACME object CSR field. - - :param str b64der: Input data that's meant to be valid base64 - DER-encoded CSR. - - :raises letsencrypt.acme.errors.ValidationError: if anything goes wrong - - :returns: Decoded certificate. - :rtype: :class:`M2Crypto.X509.X509` wrapped in :class:`ComparableX509`. - - """ - try: - return ComparableX509(M2Crypto.X509.load_request_der_string( - decode_b64jose(b64der))) - except M2Crypto.X509.X509Error: - raise errors.ValidationError() - - -class TypedACMEObject(ACMEObject): - """ACME object with type (immutable).""" - - acme_type = NotImplemented - """ACME "type" field. Subclasses must override.""" - - TYPES = NotImplemented - """Types registered for JSON deserialization""" - - @classmethod - def register(cls, msg_cls): - """Register class for JSON deserialization.""" - cls.TYPES[msg_cls.acme_type] = msg_cls - return msg_cls - - def to_json(self): - """Get JSON serializable object. - - :returns: Serializable JSON object representing ACME typed object. - :rtype: dict - - """ - jobj = self._fields_to_json() - jobj["type"] = self.acme_type - return jobj - - def _fields_to_json(self): # pragma: no cover - """Prepare ACME object fields for JSON serialiazation. - - Subclasses must override this method. - - :returns: Serializable JSON object containg all ACME object fields - apart from "type". - :rtype: dict - - """ - raise NotImplementedError() - - @classmethod - def from_valid_json(cls, jobj): - """Deserialize ACME object from valid JSON object. - - :raises letsencrypt.acme.errors.UnrecognizedTypeError: if type - of the ACME object has not been registered. - - """ - try: - msg_cls = cls.TYPES[jobj["type"]] - except KeyError: - raise errors.UnrecognizedTypeError(jobj["type"]) - return msg_cls.from_valid_json(jobj) diff --git a/letsencrypt/acme/util_test.py b/letsencrypt/acme/util_test.py deleted file mode 100644 index 0b500a2c..00000000 --- a/letsencrypt/acme/util_test.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Tests for letsencrypt.acme.util.""" -import functools -import json -import os -import pkg_resources -import unittest - -import M2Crypto -import zope.interface - -from letsencrypt.acme import errors -from letsencrypt.acme import interfaces - - -CERT = M2Crypto.X509.load_cert(pkg_resources.resource_filename( - 'letsencrypt.client.tests', os.path.join('testdata', 'cert.pem'))) -CSR = M2Crypto.X509.load_request(pkg_resources.resource_filename( - 'letsencrypt.client.tests', os.path.join('testdata', 'csr.pem'))) - - -class DumpIJSONSerializableTest(unittest.TestCase): - """Tests for letsencrypt.acme.util.dump_ijsonserializable.""" - - class MockJSONSerialiazable(object): - # pylint: disable=missing-docstring,too-few-public-methods,no-self-use - zope.interface.implements(interfaces.IJSONSerializable) - - def to_json(self): - return [3, 2, 1] - - @classmethod - def _call(cls, obj): - from letsencrypt.acme.util import dump_ijsonserializable - return json.dumps(obj, default=dump_ijsonserializable) - - def test_json_type(self): - self.assertEqual('5', self._call(5)) - - def test_ijsonserializable(self): - self.assertEqual('[3, 2, 1]', self._call(self.MockJSONSerialiazable())) - - def test_raises_type_error(self): - self.assertRaises(TypeError, self._call, object()) - - -class ImmutableMapTest(unittest.TestCase): - """Tests for letsencrypt.acme.util.ImmutableMap.""" - - def setUp(self): - # pylint: disable=invalid-name,too-few-public-methods - # pylint: disable=missing-docstring - from letsencrypt.acme.util import ImmutableMap - - class A(ImmutableMap): - __slots__ = ('x', 'y') - - class B(ImmutableMap): - __slots__ = ('x', 'y') - - self.A = A - self.B = B - - self.a1 = self.A(x=1, y=2) - self.a1_swap = self.A(y=2, x=1) - self.a2 = self.A(x=3, y=4) - self.b = self.B(x=1, y=2) - - def test_order_of_args_does_not_matter(self): - self.assertEqual(self.a1, self.a1_swap) - - def test_type_error_on_missing(self): - self.assertRaises(TypeError, self.A, x=1) - self.assertRaises(TypeError, self.A, y=2) - - def test_type_error_on_unrecognized(self): - self.assertRaises(TypeError, self.A, x=1, z=2) - self.assertRaises(TypeError, self.A, x=1, y=2, z=3) - - def test_get_attr(self): - self.assertEqual(1, self.a1.x) - self.assertEqual(2, self.a1.y) - self.assertEqual(1, self.a1_swap.x) - self.assertEqual(2, self.a1_swap.y) - - def test_set_attr_raises_attribute_error(self): - self.assertRaises( - AttributeError, functools.partial(self.a1.__setattr__, 'x'), 10) - - def test_equal(self): - self.assertEqual(self.a1, self.a1) - self.assertEqual(self.a2, self.a2) - self.assertNotEqual(self.a1, self.a2) - - def test_same_slots_diff_cls_not_equal(self): - self.assertEqual(self.a1.x, self.b.x) - self.assertEqual(self.a1.y, self.b.y) - self.assertNotEqual(self.a1, self.b) - - def test_hash(self): - self.assertEqual(hash((1, 2)), hash(self.a1)) - - def test_unhashable(self): - self.assertRaises(TypeError, self.A(x=1, y={}).__hash__) - - def test_repr(self): - self.assertEqual('A(x=1, y=2)', repr(self.a1)) - self.assertEqual('A(x=1, y=2)', repr(self.a1_swap)) - self.assertEqual('B(x=1, y=2)', repr(self.b)) - self.assertEqual("B(x='foo', y='bar')", repr(self.B(x='foo', y='bar'))) - - -class EncodersAndDecodersTest(unittest.TestCase): - """Tests for encoders and decoders from letsencrypt.acme.util""" - # pylint: disable=protected-access - - def setUp(self): - self.b64_cert = ( - 'MIIB3jCCAYigAwIBAgICBTkwDQYJKoZIhvcNAQELBQAwdzELMAkGA1UEBhM' - 'CVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRIwEAYDVQQHDAlBbm4gQXJib3IxKz' - 'ApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTWljaGlnYW4gYW5kIHRoZSBFRkYxF' - 'DASBgNVBAMMC2V4YW1wbGUuY29tMB4XDTE0MTIxMTIyMzQ0NVoXDTE0MTIx' - 'ODIyMzQ0NVowdzELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2FuMRI' - 'wEAYDVQQHDAlBbm4gQXJib3IxKzApBgNVBAoMIlVuaXZlcnNpdHkgb2YgTW' - 'ljaGlnYW4gYW5kIHRoZSBFRkYxFDASBgNVBAMMC2V4YW1wbGUuY29tMFwwD' - 'QYJKoZIhvcNAQEBBQADSwAwSAJBAKx1c7RR7R_drnBSQ_zfx1vQLHUbFLh1' - 'AQQQ5R8DZUXd36efNK79vukFhN9HFoHZiUvOjm0c-pVE6K-EdE_twuUCAwE' - 'AATANBgkqhkiG9w0BAQsFAANBAC24z0IdwIVKSlntksllvr6zJepBH5fMnd' - 'fk3XJp10jT6VE-14KNtjh02a56GoraAvJAT5_H67E8GvJ_ocNnB_o' - ) - self.b64_csr = ( - 'MIIBXTCCAQcCAQAweTELMAkGA1UEBhMCVVMxETAPBgNVBAgMCE1pY2hpZ2F' - 'uMRIwEAYDVQQHDAlBbm4gQXJib3IxDDAKBgNVBAoMA0VGRjEfMB0GA1UECw' - 'wWVW5pdmVyc2l0eSBvZiBNaWNoaWdhbjEUMBIGA1UEAwwLZXhhbXBsZS5jb' - '20wXDANBgkqhkiG9w0BAQEFAANLADBIAkEArHVztFHtH92ucFJD_N_HW9As' - 'dRsUuHUBBBDlHwNlRd3fp580rv2-6QWE30cWgdmJS86ObRz6lUTor4R0T-3' - 'C5QIDAQABoCkwJwYJKoZIhvcNAQkOMRowGDAWBgNVHREEDzANggtleGFtcG' - 'xlLmNvbTANBgkqhkiG9w0BAQsFAANBAHJH_O6BtC9aGzEVCMGOZ7z9iIRHW' - 'Szr9x_bOzn7hLwsbXPAgO1QxEwL-X-4g20Gn9XBE1N9W6HCIEut2d8wACg' - ) - - def test_decode_b64_jose_padding_error(self): - from letsencrypt.acme.util import decode_b64jose - self.assertRaises(errors.ValidationError, decode_b64jose, 'x') - - def test_decode_b64_jose_size(self): - from letsencrypt.acme.util import decode_b64jose - self.assertEqual('foo', decode_b64jose('Zm9v', size=3)) - self.assertRaises( - errors.ValidationError, decode_b64jose, 'Zm9v', size=2) - self.assertRaises( - errors.ValidationError, decode_b64jose, 'Zm9v', size=4) - - def test_decode_b64_jose_minimum_size(self): - from letsencrypt.acme.util import decode_b64jose - self.assertEqual('foo', decode_b64jose('Zm9v', size=3, minimum=True)) - self.assertEqual('foo', decode_b64jose('Zm9v', size=2, minimum=True)) - self.assertRaises(errors.ValidationError, decode_b64jose, - 'Zm9v', size=4, minimum=True) - - def test_decode_hex16(self): - from letsencrypt.acme.util import decode_hex16 - self.assertEqual('foo', decode_hex16('666f6f')) - - def test_decode_hex16_minimum_size(self): - from letsencrypt.acme.util import decode_hex16 - self.assertEqual('foo', decode_hex16('666f6f', size=3, minimum=True)) - self.assertEqual('foo', decode_hex16('666f6f', size=2, minimum=True)) - self.assertRaises(errors.ValidationError, decode_hex16, - '666f6f', size=4, minimum=True) - - def test_decode_hex16_odd_length(self): - from letsencrypt.acme.util import decode_hex16 - self.assertRaises(errors.ValidationError, decode_hex16, 'x') - - def test_encode_cert(self): - from letsencrypt.acme.util import encode_cert - self.assertEqual(self.b64_cert, encode_cert(CERT)) - - def test_decode_cert(self): - from letsencrypt.acme.util import ComparableX509 - from letsencrypt.acme.util import decode_cert - cert = decode_cert(self.b64_cert) - self.assertTrue(isinstance(cert, ComparableX509)) - self.assertEqual(cert, CERT) - self.assertRaises(errors.ValidationError, decode_cert, '') - - def test_encode_csr(self): - from letsencrypt.acme.util import encode_csr - self.assertEqual(self.b64_csr, encode_csr(CSR)) - - def test_decode_csr(self): - from letsencrypt.acme.util import ComparableX509 - from letsencrypt.acme.util import decode_csr - csr = decode_csr(self.b64_csr) - self.assertTrue(isinstance(csr, ComparableX509)) - self.assertEqual(csr, CSR) - self.assertRaises(errors.ValidationError, decode_csr, '') - - -class TypedACMEObjectTest(unittest.TestCase): - - def setUp(self): - from letsencrypt.acme.util import TypedACMEObject - - # pylint: disable=missing-docstring,abstract-method - # pylint: disable=too-few-public-methods - - class MockParentTypedACMEObject(TypedACMEObject): - TYPES = {} - - @MockParentTypedACMEObject.register - class MockTypedACMEObject(MockParentTypedACMEObject): - acme_type = 'test' - - @classmethod - def from_valid_json(cls, unused_obj): - return '!' - - def _fields_to_json(self): - return {'foo': 'bar'} - - self.parent_cls = MockParentTypedACMEObject - self.msg = MockTypedACMEObject() - - def test_to_json(self): - self.assertEqual(self.msg.to_json(), { - 'type': 'test', - 'foo': 'bar', - }) - - def test_from_json_unknown_type_fails(self): - self.assertRaises(errors.UnrecognizedTypeError, - self.parent_cls.from_valid_json, {'type': 'bar'}) - - def test_from_json_returns_obj(self): - self.assertEqual(self.parent_cls.from_valid_json({'type': 'test'}), '!') - - -if __name__ == '__main__': - unittest.main() diff --git a/letsencrypt/client/achallenges.py b/letsencrypt/client/achallenges.py index 835bd1e8..cc7c322f 100644 --- a/letsencrypt/client/achallenges.py +++ b/letsencrypt/client/achallenges.py @@ -17,7 +17,7 @@ Note, that all annotated challenges act as a proxy objects:: """ from letsencrypt.acme import challenges -from letsencrypt.acme import util as acme_util +from letsencrypt.acme.jose import util as jose_util from letsencrypt.client import crypto_util @@ -25,7 +25,7 @@ from letsencrypt.client import crypto_util # pylint: disable=too-few-public-methods -class AnnotatedChallenge(acme_util.ImmutableMap): +class AnnotatedChallenge(jose_util.ImmutableMap): """Client annotated challenge. Wraps around :class:`~letsencrypt.acme.challenges.Challenge` and @@ -88,7 +88,7 @@ class ProofOfPossession(AnnotatedChallenge): acme_type = challenges.ProofOfPossession -class Indexed(acme_util.ImmutableMap): +class Indexed(jose_util.ImmutableMap): """Indexed and annotated ACME challenge. Wraps around :class:`AnnotatedChallenge` and annotates with an diff --git a/letsencrypt/client/auth_handler.py b/letsencrypt/client/auth_handler.py index 4e3b5f68..7ded4322 100644 --- a/letsencrypt/client/auth_handler.py +++ b/letsencrypt/client/auth_handler.py @@ -299,8 +299,7 @@ class AuthHandler(object): # pylint: disable=too-many-instance-attributes else: raise errors.LetsEncryptClientError( - "Received unsupported challenge of type: " - "%s" % chall.acme_type) + "Received unsupported challenge of type: %s", chall.typ) ichall = achallenges.Indexed(achall=achall, index=index) diff --git a/letsencrypt/client/client.py b/letsencrypt/client/client.py index d415403f..2f3f9a76 100644 --- a/letsencrypt/client/client.py +++ b/letsencrypt/client/client.py @@ -7,7 +7,7 @@ import Crypto.PublicKey.RSA import M2Crypto from letsencrypt.acme import messages -from letsencrypt.acme import util as acme_util +from letsencrypt.acme.jose import util as jose_util from letsencrypt.client import auth_handler from letsencrypt.client import client_authenticator @@ -130,7 +130,7 @@ class Client(object): logging.info("Preparing and sending CSR...") return self.network.send_and_receive_expected( messages.CertificateRequest.create( - csr=acme_util.ComparableX509( + csr=jose_util.ComparableX509( M2Crypto.X509.load_request_der_string(csr_der)), key=Crypto.PublicKey.RSA.importKey(self.authkey.pem)), messages.Certificate) diff --git a/letsencrypt/client/network.py b/letsencrypt/client/network.py index b61a8a2f..de6db575 100644 --- a/letsencrypt/client/network.py +++ b/letsencrypt/client/network.py @@ -5,7 +5,7 @@ import time import requests -from letsencrypt.acme import errors as acme_errors +from letsencrypt.acme import jose from letsencrypt.acme import messages from letsencrypt.client import errors @@ -57,7 +57,7 @@ class Network(object): json_string = response.json() try: return messages.Message.from_json(json_string) - except acme_errors.ValidationError as error: + except jose.DeserializationError as error: logging.error(json_string) raise # TODO diff --git a/letsencrypt/client/revoker.py b/letsencrypt/client/revoker.py index 98cf1704..c18b5ffa 100644 --- a/letsencrypt/client/revoker.py +++ b/letsencrypt/client/revoker.py @@ -17,7 +17,7 @@ import Crypto.PublicKey.RSA import M2Crypto from letsencrypt.acme import messages -from letsencrypt.acme import util as acme_util +from letsencrypt.acme.jose import util as jose_util from letsencrypt.client import errors from letsencrypt.client import le_util @@ -240,7 +240,7 @@ class Revoker(object): """ # These will both have to change in the future away from M2Crypto # pylint: disable=protected-access - certificate = acme_util.ComparableX509(cert._cert) + certificate = jose_util.ComparableX509(cert._cert) try: with open(cert.backup_key_path, "rU") as backup_key_file: key = Crypto.PublicKey.RSA.importKey(backup_key_file.read()) diff --git a/letsencrypt/client/tests/acme_util.py b/letsencrypt/client/tests/acme_util.py index 23343636..aba839f8 100644 --- a/letsencrypt/client/tests/acme_util.py +++ b/letsencrypt/client/tests/acme_util.py @@ -5,7 +5,7 @@ import pkg_resources import Crypto.PublicKey.RSA from letsencrypt.acme import challenges -from letsencrypt.acme import other +from letsencrypt.acme import jose KEY = Crypto.PublicKey.RSA.importKey(pkg_resources.resource_string( @@ -26,7 +26,7 @@ RECOVERY_TOKEN = challenges.RecoveryToken() POP = challenges.ProofOfPossession( alg="RS256", nonce="xD\xf9\xb9\xdbU\xed\xaa\x17\xf1y|\x81\x88\x99 ", hints=challenges.ProofOfPossession.Hints( - jwk=other.JWK(key=KEY.publickey()), + jwk=jose.JWKRSA(key=KEY.publickey()), cert_fingerprints=[ "93416768eb85e33adc4277f4c9acd63e7418fcfe", "16d95b7b63f1972b980b14c20291f3c0d1855d95", diff --git a/letsencrypt/client/tests/auth_handler_test.py b/letsencrypt/client/tests/auth_handler_test.py index 91874dc0..abf7032b 100644 --- a/letsencrypt/client/tests/auth_handler_test.py +++ b/letsencrypt/client/tests/auth_handler_test.py @@ -335,7 +335,7 @@ class SatisfyChallengesTest(unittest.TestCase): # pylint: disable=no-self-use exp_resp = [None] * len(challs) for i in path: - exp_resp[i] = TRANSLATE[challs[i].acme_type] + str(domain) + exp_resp[i] = TRANSLATE[challs[i].typ] + str(domain) return exp_resp diff --git a/linter_plugin.py b/linter_plugin.py index ac2a01f6..9a165d81 100644 --- a/linter_plugin.py +++ b/linter_plugin.py @@ -21,5 +21,9 @@ def _transform(cls): for slot in cls.slots(): cls.locals[slot.value] = [nodes.EmptyNode()] + if cls.name == 'JSONObjectWithFields': + # _fields is magically introduced by JSONObjectWithFieldsMeta + cls.locals['_fields'] = [nodes.EmptyNode()] + MANAGER.register_transform(nodes.Class, _transform)