|
1 | 1 | # Copyright (c) Microsoft Corporation.
|
2 | 2 | # Licensed under the MIT License.
|
3 | 3 | import os
|
| 4 | +import json |
4 | 5 | import threading
|
5 | 6 | import pytest
|
| 7 | +import jwt |
| 8 | +import jwcrypto |
| 9 | +from flask import Flask, jsonify |
6 | 10 | from werkzeug.serving import make_server
|
7 | 11 | from scitt_emulator import cli, server
|
| 12 | +from scitt_emulator.oidc import OIDCAuthMiddleware |
8 | 13 |
|
9 | 14 | issuer = "did:web:example.com"
|
10 | 15 | content_type = "application/json"
|
@@ -149,3 +154,127 @@ def test_client_cli(use_lro: bool, tmp_path):
|
149 | 154 | with open(receipt_path_2, "rb") as f:
|
150 | 155 | receipt_2 = f.read()
|
151 | 156 | assert receipt == receipt_2
|
| 157 | + |
| 158 | + |
| 159 | +def create_flask_app_oidc_server(config): |
| 160 | + app = Flask("oidc_server") |
| 161 | + |
| 162 | + app.config.update(dict(DEBUG=True)) |
| 163 | + app.config.update(config) |
| 164 | + |
| 165 | + @app.route("/.well-known/openid-configuration", methods=["GET"]) |
| 166 | + def openid_configuration(): |
| 167 | + return jsonify( |
| 168 | + { |
| 169 | + "issuer": app.url, |
| 170 | + "jwks_uri": f"{app.url}/.well-known/jwks", |
| 171 | + "response_types_supported": ["id_token"], |
| 172 | + "claims_supported": ["sub", "aud", "exp", "iat", "iss"], |
| 173 | + "id_token_signing_alg_values_supported": app.config["algorithms"], |
| 174 | + "scopes_supported": ["openid"], |
| 175 | + } |
| 176 | + ) |
| 177 | + |
| 178 | + @app.route("/.well-known/jwks", methods=["GET"]) |
| 179 | + def jwks(): |
| 180 | + return jsonify( |
| 181 | + { |
| 182 | + "keys": [ |
| 183 | + { |
| 184 | + **app.config["key"].export_public(as_dict=True), |
| 185 | + "use": "sig", |
| 186 | + "kid": app.config["key"].thumbprint(), |
| 187 | + } |
| 188 | + ] |
| 189 | + } |
| 190 | + ) |
| 191 | + |
| 192 | + return app |
| 193 | + |
| 194 | + |
| 195 | +def test_client_cli_token(tmp_path): |
| 196 | + workspace_path = tmp_path / "workspace" |
| 197 | + |
| 198 | + claim_path = tmp_path / "claim.cose" |
| 199 | + receipt_path = tmp_path / "claim.receipt.cbor" |
| 200 | + entry_id_path = tmp_path / "claim.entry_id.txt" |
| 201 | + retrieved_claim_path = tmp_path / "claim.retrieved.cose" |
| 202 | + |
| 203 | + key = jwcrypto.jwk.JWK.generate(kty="RSA", size=2048) |
| 204 | + algorithm = "RS256" |
| 205 | + audience = "scitt.example.org" |
| 206 | + |
| 207 | + with Service( |
| 208 | + {"key": key, "algorithms": [algorithm]}, |
| 209 | + create_flask_app=create_flask_app_oidc_server, |
| 210 | + ) as oidc_service: |
| 211 | + os.environ["no_proxy"] = ",".join( |
| 212 | + os.environ.get("no_proxy", "").split(",") + [oidc_service.host] |
| 213 | + ) |
| 214 | + middleware_config_path = tmp_path / "oidc-middleware-config.json" |
| 215 | + middleware_config_path.write_text( |
| 216 | + json.dumps({"issuer": oidc_service.url, "audience": audience}) |
| 217 | + ) |
| 218 | + with Service( |
| 219 | + { |
| 220 | + "middleware": OIDCAuthMiddleware, |
| 221 | + "middleware_config_path": middleware_config_path, |
| 222 | + "tree_alg": "CCF", |
| 223 | + "workspace": workspace_path, |
| 224 | + "error_rate": 0.1, |
| 225 | + "use_lro": False, |
| 226 | + } |
| 227 | + ) as service: |
| 228 | + # create claim |
| 229 | + command = [ |
| 230 | + "client", |
| 231 | + "create-claim", |
| 232 | + "--out", |
| 233 | + claim_path, |
| 234 | + "--issuer", |
| 235 | + issuer, |
| 236 | + "--content-type", |
| 237 | + content_type, |
| 238 | + "--payload", |
| 239 | + payload, |
| 240 | + ] |
| 241 | + execute_cli(command) |
| 242 | + assert os.path.exists(claim_path) |
| 243 | + |
| 244 | + # submit claim without token |
| 245 | + command = [ |
| 246 | + "client", |
| 247 | + "submit-claim", |
| 248 | + "--claim", |
| 249 | + claim_path, |
| 250 | + "--out", |
| 251 | + receipt_path, |
| 252 | + "--out-entry-id", |
| 253 | + entry_id_path, |
| 254 | + "--url", |
| 255 | + service.url, |
| 256 | + ] |
| 257 | + check_error = None |
| 258 | + try: |
| 259 | + execute_cli(command) |
| 260 | + except Exception as error: |
| 261 | + check_error = error |
| 262 | + assert check_error |
| 263 | + assert not os.path.exists(receipt_path) |
| 264 | + assert not os.path.exists(entry_id_path) |
| 265 | + |
| 266 | + # create token |
| 267 | + token = jwt.encode( |
| 268 | + {"iss": oidc_service.url, "aud": audience}, |
| 269 | + key.export_to_pem(private_key=True, password=None), |
| 270 | + algorithm=algorithm, |
| 271 | + headers={"kid": key.thumbprint()}, |
| 272 | + ) |
| 273 | + # submit claim with token |
| 274 | + command += [ |
| 275 | + "--token", |
| 276 | + token, |
| 277 | + ] |
| 278 | + execute_cli(command) |
| 279 | + assert os.path.exists(receipt_path) |
| 280 | + assert os.path.exists(entry_id_path) |
0 commit comments