Skip to content

Commit f022fce

Browse files
committed
chore(crypto): add ari tests
Refs: #121
1 parent d80ae4c commit f022fce

File tree

6 files changed

+106
-54
lines changed

6 files changed

+106
-54
lines changed

automatoes/errors.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
#!/usr/bin/env python
2-
#
3-
# Copyright 2019 Flavio Garcia
1+
# Copyright 2019-2025 Flavio Garcia
42
# Copyright 2016-2017 Veeti Paananen under MIT License
53
#
64
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -25,8 +23,8 @@ def __init__(self, response):
2523
try:
2624
details = response.json()
2725
self.type = details.get('type', 'unknown')
28-
message = "{} (type {}, HTTP {})".format(details.get('detail'),
29-
self.type, response.status_code)
26+
message = (f"{details.get('detail')} (type {self.type}, HTTP "
27+
f"{response.status_code})")
3028
except (ValueError, TypeError, AttributeError):
3129
pass
3230
super().__init__(message)

automatoes/protocol.py

+43-18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2024 Flavio Garcia
1+
# Copyright 2019-2025 Flavio Garcia
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -12,8 +12,12 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from . import get_version
15+
from automatoes import get_version
16+
from automatoes.errors import AccountAlreadyExistsError, AcmeError
17+
from automatoes.model import Account, RegistrationResult
18+
1619
from peasant.client.protocol import Peasant
20+
from peasant.client.transport import METHOD_POST
1721
from peasant.client.transport_requests import RequestsTransport
1822

1923

@@ -23,24 +27,9 @@ def __init__(self, transport, **kwargs):
2327
"""
2428
"""
2529
super().__init__(transport)
26-
self._url = kwargs.get("url")
27-
self._account = kwargs.get("account")
2830
self._directory_path = kwargs.get("directory", "directory")
2931
self._verify = kwargs.get("verify")
3032

31-
@property
32-
def url(self):
33-
return self._url
34-
35-
@property
36-
def account(self):
37-
return self.account
38-
39-
@account.setter
40-
def account(self, account):
41-
# TODO: Throw an error right here if account is None
42-
self._account = account
43-
4433
@property
4534
def directory_path(self):
4635
return self._directory_path
@@ -80,8 +69,44 @@ def set_directory(self):
8069
raise Exception
8170

8271
def new_nonce(self):
83-
""" Returns a new nonce """
72+
""" Return a new nonce """
8473
return self.head(self.peasant.directory()['newNonce'], headers={
8574
'resource': "new-reg",
8675
'payload': None,
8776
}).headers.get('Replay-Nonce')
77+
78+
def new_account(self, account: Account, contacts: list,
79+
terms_agreed: bool = False):
80+
""" Create a new account in the Acme Server """
81+
payload = {
82+
"termsOfServiceAgreed": terms_agreed,
83+
"contact": [f"mailto:{contact}" for contact in contacts],
84+
}
85+
response = self.post(
86+
self.peasant.directory()['newAccount'],
87+
payload
88+
)
89+
90+
uri = response.headers.get("Location")
91+
92+
if response.status_code == 201:
93+
self.account.uri = uri
94+
95+
# Find terms of service from link headers
96+
terms = self.terms_from_directory()
97+
98+
return RegistrationResult(
99+
contents=_json(response),
100+
uri=uri,
101+
terms=terms
102+
)
103+
elif response.status_code == 409:
104+
raise AccountAlreadyExistsError(response, uri)
105+
raise AcmeError(response)
106+
107+
108+
def _json(response):
109+
try:
110+
return response.json()
111+
except ValueError as e:
112+
raise AcmeError("Invalid JSON response. {}".format(e))

tests/__init__.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2024 Flavio Garcia
1+
# Copyright 2019-2025 Flavio Garcia
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -12,13 +12,30 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from automatoes.protocol import AcmeV2Pesant, AcmeRequestsTransport
1516
import os
1617

1718

1819
TEST_ROOT = os.path.dirname(os.path.abspath(__file__))
1920
FIXTURES_ROOT = os.path.abspath(os.path.join(TEST_ROOT, "fixtures"))
2021
PROJECT_ROOT = os.path.abspath(os.path.join(TEST_ROOT, ".."))
2122

23+
PEEBLE_URL = "https://localhost:14000"
24+
25+
26+
def get_transport() -> AcmeRequestsTransport:
27+
return AcmeRequestsTransport(PEEBLE_URL)
28+
29+
30+
def get_protocol(transport: AcmeRequestsTransport = None) -> AcmeV2Pesant:
31+
if transport is None:
32+
transport = get_transport()
33+
return AcmeV2Pesant(
34+
transport,
35+
directory="dir",
36+
verify=get_absolute_path("certs/candango.minica.pem")
37+
)
38+
2239

2340
def get_absolute_path(directory):
2441
return os.path.realpath(

tests/ari_test.py

+34-18
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from . import FIXTURES_ROOT
15+
from tests import FIXTURES_ROOT, get_protocol
1616
from automatoes.crypto import (generate_ari_data,
1717
get_certificate_aki,
1818
get_certificate_serial,
@@ -21,33 +21,49 @@
2121
from cartola import fs
2222
import unittest
2323
import os
24+
from tornado import testing
2425

2526

26-
class ARITestCase(unittest.TestCase):
27-
""" Tests the crypto module from automatoes
27+
class ARITestCase(testing.AsyncTestCase):
28+
""" Test letsencrypt nonce
2829
"""
2930

30-
def test_aki(self):
31-
""" Test the strip_certificate function """
31+
@testing.gen_test
32+
async def test_nonce(self):
33+
protocol = get_protocol()
34+
# print(protocol.directory()['renewalInfo'])
35+
self.assertIsNotNone(protocol.new_nonce())
36+
37+
38+
class AKISerialTestCase(unittest.TestCase):
39+
""" Test aki and serial fucntion to create the data to be sent for aki
40+
request
41+
"""
42+
43+
def setUp(self) -> None:
3244
key_directory = os.path.join(FIXTURES_ROOT, "keys", "candango.org",
3345
"another")
34-
3546
key_crt = fs.read(
3647
os.path.join(key_directory, "another.candango.org.crt"),
3748
True
3849
)
50+
self.cert = load_pem_certificate(key_crt)
3951

40-
pem_crt = load_pem_certificate(key_crt)
52+
self.expected_aki = ("c0:cc:03:46:b9:58:20:cc:5c:72:70:f3:e1:2e:cb:"
53+
"20:a6:f5:68:3a")
54+
self.expected_serial = ("fa:f3:97:73:26:ea:e8:44:e7:14:00:20:ae:90:"
55+
"60:af:ba:44")
4156

42-
aki = get_certificate_aki(pem_crt)
43-
serial = get_certificate_serial(pem_crt)
44-
expected_aki = ("c0:cc:03:46:b9:58:20:cc:5c:72:70:f3:e1:2e:cb:20:a6:"
45-
"f5:68:3a")
46-
expected_serial = ("fa:f3:97:73:26:ea:e8:44:e7:14:00:20:ae:90:60:af:"
47-
"ba:44")
48-
aki_b64 = base64.urlsafe_b64encode(expected_aki.encode())
49-
serial_b64 = base64.urlsafe_b64encode(expected_serial.encode())
57+
def test_aki_serial(self):
58+
""" Test cert aki and serial """
59+
aki = get_certificate_aki(self.cert)
60+
serial = get_certificate_serial(self.cert)
61+
self.assertEqual(self.expected_aki, aki)
62+
self.assertEqual(self.expected_serial, serial)
5063

51-
self.assertEqual(expected_aki, aki)
52-
self.assertEqual(expected_serial, serial)
53-
self.assertEqual(f"{aki_b64}.{serial_b64}", generate_ari_data(pem_crt))
64+
def test_ari_data(self):
65+
""" Test ari_data from aki and serial """
66+
aki_b64 = base64.urlsafe_b64encode(self.expected_aki.encode())
67+
serial_b64 = base64.urlsafe_b64encode(self.expected_serial.encode())
68+
self.assertEqual(f"{aki_b64}.{serial_b64}",
69+
generate_ari_data(self.cert))

tests/nonce_test.py

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2019-2024 Flavio Garcia
1+
# Copyright 2019-2025 Flavio Garcia
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.
@@ -12,8 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from . import get_absolute_path
16-
from automatoes.protocol import AcmeV2Pesant, AcmeRequestsTransport
15+
from tests import get_protocol, get_transport
1716
from tornado import testing
1817

1918

@@ -22,11 +21,7 @@ class NonceTestCase(testing.AsyncTestCase):
2221
"""
2322

2423
@testing.gen_test
25-
async def test_auth(self):
26-
transport = AcmeRequestsTransport("https://localhost:14000")
27-
protocol = AcmeV2Pesant(
28-
transport,
29-
directory="dir",
30-
verify=get_absolute_path("certs/candango.minica.pem")
31-
)
24+
async def test_nonce(self):
25+
protocol = get_protocol(get_transport())
26+
print(protocol.new_nonce())
3227
self.assertIsNotNone(protocol.new_nonce())

tests/runtests.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
#
3-
# Copyright 2019-2024 Flavio Garcia
3+
# Copyright 2019-2025 Flavio Garcia
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");
66
# you may not use this file except in compliance with the License.
@@ -15,12 +15,13 @@
1515
# limitations under the License.
1616

1717
import unittest
18-
from tests import crypto_test
18+
from tests import ari_test, crypto_test
1919

2020

2121
def suite():
2222
testLoader = unittest.TestLoader()
2323
alltests = unittest.TestSuite()
24+
alltests.addTests(testLoader.loadTestsFromModule(ari_test))
2425
alltests.addTests(testLoader.loadTestsFromModule(crypto_test))
2526
return alltests
2627

0 commit comments

Comments
 (0)