Skip to content

Commit 690a99f

Browse files
committed
test(protocol): Introduce account management test
Refs: #141
1 parent e08593e commit 690a99f

File tree

3 files changed

+134
-37
lines changed

3 files changed

+134
-37
lines changed

automatoes/model.py

+21-5
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,18 @@
2424

2525
class Account:
2626

27-
def __init__(self, key, uri=None):
27+
def __init__(self, key, **kwargs):
28+
self.contact = kwargs.get("contact")
29+
self.orders = kwargs.get("orders")
30+
# TODO: Maybe store the key format
2831
self.key = key
29-
self.uri = uri
32+
self.uri = kwargs.get("uri")
3033

3134
def serialize(self):
3235
return json.dumps({
36+
'contact': self.contact,
3337
'key': export_private_key(self.key).decode("utf-8"),
38+
'orders': self.orders,
3439
'uri': self.uri,
3540
}).encode("utf-8")
3641

@@ -46,8 +51,17 @@ def deserialize(data):
4651
data = json.loads(data)
4752
if "key" not in data or "uri" not in data:
4853
raise ValueError("Missing 'key' or 'uri' fields.")
49-
return Account(key=load_private_key(data['key'].encode("utf8")),
50-
uri=data['uri'])
54+
if "contact" in data:
55+
raise ValueError("Missing 'key' or 'uri' fields.")
56+
account = Account(key=load_private_key(data['key'].encode("utf8")),
57+
uri=data['uri'])
58+
# TODO: Check how to deal with this after migration from 0.9.x to
59+
# 1.x finishes
60+
if "contact" in data:
61+
account.contact = data['contact']
62+
if "orders" in data:
63+
account.orders = data['orders']
64+
return account
5165
except (TypeError, ValueError, AttributeError) as e:
5266
raise IOError("Invalid account structure: {}".format(e))
5367

@@ -142,7 +156,9 @@ def deserialize(data):
142156
raise IOError("Invalid account structure: {}".format(e))
143157

144158

145-
RegistrationResult = namedtuple("RegistrationResult", "contents uri terms")
159+
AccountResponse = namedtuple("AccountResponse", "orders status terms")
160+
RegistrationResponse = namedtuple("RegistrationResponse",
161+
"contact orders status")
146162
NewAuthorizationResult = namedtuple("NewAuthorizationResult", "contents uri")
147163
IssuanceResult = namedtuple("IssuanceResult",
148164
"certificate location intermediate")

automatoes/protocol.py

+52-32
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515
from automatoes import get_version
1616
from automatoes.crypto import generate_protected_header, sign_request_v2
1717
from automatoes.errors import AccountAlreadyExistsError, AcmeError
18-
from automatoes.model import Account, RegistrationResult
18+
from automatoes.model import (Account, AccountResponse, RegistrationResponse)
1919

2020
import copy
2121

2222
from peasant.client.protocol import Peasant
2323
from peasant.client.transport import METHOD_POST
2424
from peasant.client.transport_requests import RequestsTransport
2525

26-
import requests
26+
import warnings
2727

2828

2929
class AcmeV2Pesant(Peasant):
@@ -47,11 +47,11 @@ def directory_path(self, path):
4747
def verify(self):
4848
return self._verify
4949

50-
def get_registration(self, account: Account):
50+
def get_registration(self, account: Account) -> RegistrationResponse:
5151
return self.transport.get_registration(account)
5252

5353
def new_account(self, account: Account, contacts: list,
54-
terms_agreed: bool = False) -> RegistrationResult:
54+
terms_agreed: bool = False) -> AccountResponse:
5555
return self.transport.new_account(account, contacts, terms_agreed)
5656

5757

@@ -84,8 +84,7 @@ def __kwargs_updater(self, method, **kwargs):
8484
protected['kid'] = kid
8585
protected.pop("jwk")
8686
data = kwargs.get("data")
87-
if data:
88-
kwargs['data'] = sign_request_v2(key, protected, data)
87+
kwargs['data'] = sign_request_v2(key, protected, data)
8988
if self.peasant.verify:
9089
kwargs['verify'] = self.peasant.verify
9190
return kwargs
@@ -102,21 +101,34 @@ def get_protected_headers(self, key, uri=None):
102101
return protected_header
103102

104103
def post_as_get(self, path, **kwargs):
105-
"""Send a POST request.
104+
"""Send a POST as GET request.
105+
106+
This method enforces no data/body being passed into kwargs.
107+
108+
If either data or body is present into kwargs a warning will be
109+
displayed and the value will be discarded.
110+
111+
This post will be sent without body into.
106112
107113
:param path: absolute or relative URL for the new
108114
:class:`requests.Request` object.
109115
:param **kwargs: Optional arguments that ``request`` takes.
110116
:return: :class:`requests.Response <Response>` object
111117
:rtype: requests.Response
112118
"""
113-
url = self.get_url(path, **kwargs)
114-
headers = self.get_headers(**kwargs)
115-
kwargs['headers'] = headers
116-
kwargs = self.update_kwargs(METHOD_POST, **kwargs)
117-
with requests.post(url, **kwargs) as result:
118-
result.raise_for_status()
119-
return result
119+
if "data" in kwargs:
120+
kwargs.pop("data")
121+
warnings.warn("Do not pass the key 'data' in kwargs. The "
122+
"post_as_get method won't add body to the request "
123+
"and the value will be ignored!",
124+
category=RuntimeWarning)
125+
if "body" in kwargs:
126+
kwargs.pop("body")
127+
warnings.warn("Do not pass the key 'body' in kwargs. The "
128+
"post_as_get method won't add body to the request "
129+
"and the value will be ignored!",
130+
category=RuntimeWarning)
131+
return self.post(path, **kwargs)
120132

121133
def set_directory(self):
122134
response = self.get("/%s" % self.peasant.directory_path)
@@ -125,15 +137,21 @@ def set_directory(self):
125137
else:
126138
raise Exception
127139

128-
def get_registration(self, account: Account):
140+
def get_registration(self, account: Account) -> RegistrationResponse:
129141
"""
130142
Get available account information from the server.
131143
"""
132144
headers = {'Content-Type': "application/jose+json"}
133145
response = self.post_as_get(account.uri, headers=headers,
134-
kid=self.account.uri,
135-
key=account.key, uri=account.uri)
146+
kid=account.uri, key=account.key,
147+
uri=account.uri)
136148
if str(response.status_code).startswith("2"):
149+
response_json = _json(response)
150+
return RegistrationResponse(
151+
contact=response_json['contact'],
152+
orders=response_json['orders'],
153+
status=response_json['status'],
154+
)
137155
return _json(response)
138156
raise AcmeError(response)
139157

@@ -146,37 +164,39 @@ def new_nonce(self):
146164

147165
# TODO: Document that this method used to be called register
148166
def new_account(self, account: Account, contacts: list,
149-
terms_agreed: bool = False) -> RegistrationResult:
167+
terms_agreed: bool = False) -> AccountResponse:
150168
""" Create a new account in the Acme Server """
151169
payload = {
152170
"termsOfServiceAgreed": terms_agreed,
153171
"contact": [f"mailto:{contact}" for contact in contacts],
154172
}
155173
headers = {'Content-Type': "application/jose+json"}
156174
path = self.peasant.directory()['newAccount']
157-
response = self.post_as_get(path, data=payload, headers=headers,
158-
key=account.key, uri=path)
159-
160-
uri = response.headers.get("Location")
161-
175+
response = self.post(path, data=payload, headers=headers,
176+
key=account.key, uri=path)
162177
if response.status_code == 201:
178+
uri = response.headers.get("Location")
163179
# Find terms of service from link headers
164180
terms = self.terms_from_directory()
165-
return RegistrationResult(
166-
contents=_json(response),
167-
uri=uri,
168-
terms=terms
181+
response_json = _json(response)
182+
account.contact = response_json['contact']
183+
account.orders = response_json['orders']
184+
account.uri = uri
185+
return AccountResponse(
186+
orders=response_json['orders'],
187+
status=response_json['status'],
188+
terms=terms,
169189
)
170190
elif response.status_code == 409:
171191
raise AccountAlreadyExistsError(response, uri)
172192
raise AcmeError(response)
173193

174194
def terms_from_directory(self):
175-
response = self.get_directory()
176-
if response.status_code == 200:
177-
if "meta" in response.json():
178-
if "termsOfService" in response.json()['meta']:
179-
return response.json()['meta']['termsOfService']
195+
# TODO: Check how to trigger an error here
196+
directory = self.peasant.directory()
197+
if "meta" in directory:
198+
if "termsOfService" in directory['meta']:
199+
return directory['meta']['termsOfService']
180200
return None
181201

182202

tests/account_management_test.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2019-2025 Flavio Garcia
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from automatoes.crypto import generate_rsa_key
16+
from automatoes.model import Account
17+
from cartola import security
18+
from tests import get_protocol, get_transport, PEEBLE_URL
19+
import unittest
20+
21+
22+
class AccountTestCase(unittest.TestCase):
23+
""" Test letsencrypt nonce
24+
"""
25+
26+
def setUp(self) -> None:
27+
self.protocol = get_protocol(get_transport())
28+
29+
def test_new_acount(self):
30+
contact = "candango_{}_{}@candango.org".format(
31+
security.random_string(5, False, False),
32+
security.random_string(5, False, False)
33+
)
34+
# To check against the get_registration method after
35+
# TODO: check against more than one emails in the contacts
36+
peeble_term = ("data:text/plain,Do%20what%20thou%20wilt")
37+
account = Account(key=generate_rsa_key(4096))
38+
response = self.protocol.new_account(account, [contact], True)
39+
self.assertEqual(peeble_term, response.terms)
40+
self.assertEqual("valid", response.status)
41+
urix = account.uri.split("/")
42+
self.assertEqual(PEEBLE_URL, "/".join(urix[0:3]))
43+
self.assertEqual("my-account", "/".join(urix[3:4]))
44+
self.assertIsInstance(urix[4:5][0], str)
45+
ordersx = account.orders.split("/")
46+
self.assertEqual(PEEBLE_URL, "/".join(ordersx[0:3]))
47+
self.assertEqual("list-orderz", "/".join(ordersx[3:4]))
48+
self.assertIsInstance(ordersx[4:5][0], str)
49+
self.assertEqual([f"mailto:{contact}"], account.contact)
50+
51+
def test_get_acount(self):
52+
contact = "candango_{}_{}@candango.org".format(
53+
security.random_string(5, False, False),
54+
security.random_string(5, False, False)
55+
)
56+
account = Account(key=generate_rsa_key(4096))
57+
self.protocol.new_account(account, [contact], True)
58+
response = self.protocol.get_registration(account)
59+
self.assertEqual("valid", response.status)
60+
self.assertEqual(account.contact, response.contact)
61+
self.assertEqual(account.orders, response.orders)

0 commit comments

Comments
 (0)