From b674319b561cd6b30a54500709ed49a1a2456bc3 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Mon, 30 Oct 2023 19:55:31 +0100 Subject: [PATCH] move openid testlib helper to cse modules Change-Id: I4f48b349c9b17038a0a7412657c2d93dc584fa57 --- scripts/create_test_idp_cse.sh | 2 +- tests/testlib/openid_oauth_provider.py | 127 ------------------ .../testlib/test_openid_oauth_provider.py | 96 ------------- 3 files changed, 1 insertion(+), 224 deletions(-) delete mode 100644 tests/testlib/openid_oauth_provider.py delete mode 100644 tests/unit/testlib/test_openid_oauth_provider.py diff --git a/scripts/create_test_idp_cse.sh b/scripts/create_test_idp_cse.sh index b803ed8d92e..800f2ab5e3a 100755 --- a/scripts/create_test_idp_cse.sh +++ b/scripts/create_test_idp_cse.sh @@ -36,4 +36,4 @@ export URL="http://localhost:${PORT}" configure_cognito $URL 5000 export PYTHONPATH="${REPO_PATH}/tests/testlib" -"$REPO_PATH"/scripts/run-pipenv run uvicorn openid_oauth_provider:application --port "$PORT" +"$REPO_PATH"/scripts/run-pipenv run uvicorn cse.openid_oauth_provider:application --port "$PORT" diff --git a/tests/testlib/openid_oauth_provider.py b/tests/testlib/openid_oauth_provider.py deleted file mode 100644 index 5662a167cc7..00000000000 --- a/tests/testlib/openid_oauth_provider.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2 -# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and -# conditions defined in the file COPYING, which is part of this source code package. -import base64 -from binascii import unhexlify -from dataclasses import dataclass -from typing import Annotated, Sequence -from urllib.parse import urlencode - -import fastapi as fapi -import jwt -from cryptography.hazmat.primitives.asymmetric import rsa -from pydantic import BaseModel - -from cmk.gui.cse.userdb.cognito.oauth2 import load_config, TenantInfo, UserRoleAnswer - -application = fapi.FastAPI() - - -@application.get("/healthz", status_code=200, responses={200: {}}) -def liveliness() -> str: - return "I'm alive" - - -@dataclass(frozen=True) -class Config: - base_url: str - - -def read_config() -> Config: - import os - - return Config(base_url=os.environ["URL"]) - - -class WellKnownReponseModel(BaseModel): - authorization_endpoint: str - token_endpoint: str - jwks_uri: str - issuer: str = "checkmk" - scopes_supported: Sequence[str] = ["openid", "email"] - response_types_supported: Sequence[str] = ["code", "token"] - id_token_signing_alg_values_supported: Sequence[str] = ["RS256"] - subject_types_supported: Sequence[str] = ["public"] - token_endpoint_auth_methods_supported: Sequence[str] = ["client_secret_post"] - grant_types_supported: Sequence[str] = ["authorization_code"] - - -@application.get("/.well-known/openid-configuration", status_code=200) -def well_known(config: Config = fapi.Depends(read_config)) -> WellKnownReponseModel: - return WellKnownReponseModel( - authorization_endpoint=f"{config.base_url}/authorize", - jwks_uri=f"{config.base_url}/.well-known/jwks.json", - token_endpoint=f"{config.base_url}/token", - ) - - -class JWKS: - def __init__(self) -> None: - self.private = rsa.generate_private_key(public_exponent=65537, key_size=1024) - self.public = self.private.public_key() - self.kid = "usethis" - - @property - def n(self) -> str: - n = self.public.public_numbers().n - hexi = hex(n).lstrip("0x") - encoded = base64.urlsafe_b64encode(unhexlify(hexi)) - return encoded.decode("utf-8").rstrip("=") - - -KEY = JWKS() - - -class KeyModel(BaseModel): - n: str - alg: str = "RS256" - e: str = "AQAB" - kid: str - use: str = "sig" - kty: str = "RSA" - - -class JWKSModel(BaseModel): - keys: Sequence[KeyModel] - - -@application.get("/.well-known/jwks.json", response_model=JWKSModel) -def jwks() -> JWKSModel: - key = KeyModel(n=KEY.n, kid=KEY.kid) - return JWKSModel(keys=[key]) - - -class TokenResponse(BaseModel): - id_token: str - - -class TokenPayload(BaseModel): - email: str - aud: str - sub: str = "1234567" - - -@application.post("/token", response_model=TokenResponse) -def token(client_id: Annotated[str, fapi.Form()]) -> TokenResponse: - payload = TokenPayload(email="test@test.com", aud=client_id) - id_token = jwt.encode( - payload.model_dump(), KEY.private, algorithm="RS256", headers={"kid": KEY.kid} - ) - return TokenResponse(id_token=id_token) - - -@application.get("/authorize") -def authorize(state: str, redirect_uri: str) -> fapi.responses.RedirectResponse: - params = {"state": state, "code": "fake"} - url = f"{redirect_uri}?{urlencode(params)}" - return fapi.responses.RedirectResponse(url) - - -# this endpoint is used by checkmk to authorize the user on a site -# given he belongs to the right tenant -@application.get("/api/users/{user_id}/tenants") -def tenant_role_mapping(user_id: str) -> UserRoleAnswer: - config = load_config() - tenant_info = TenantInfo(user_role="admin") - return UserRoleAnswer(tenants={config.tenant_id: tenant_info}) diff --git a/tests/unit/testlib/test_openid_oauth_provider.py b/tests/unit/testlib/test_openid_oauth_provider.py deleted file mode 100644 index 5a1f4bb66f1..00000000000 --- a/tests/unit/testlib/test_openid_oauth_provider.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (C) 2019 Checkmk GmbH - License: GNU General Public License v2 -# This file is part of Checkmk (https://checkmk.com). It is subject to the terms and -# conditions defined in the file COPYING, which is part of this source code package. -import http -from typing import Any - -import jwt -import pytest -from fastapi.testclient import TestClient - -# check if we can import this or abort. Do a separate import later for mypy -pytest.importorskip("tests.testlib.openid_oauth_provider") -import tests.testlib.openid_oauth_provider as app - - -def config_override() -> app.Config: - return app.Config(base_url="http://localhost:6666") - - -@pytest.fixture(name="client") -def _client() -> TestClient: - app.application.dependency_overrides[app.read_config] = config_override - return TestClient(app.application) - - -def test_healthz(client: TestClient) -> None: - # when - response = client.get("/healthz") - - # then - assert response.status_code == http.HTTPStatus.OK - assert response.text == '"I\'m alive"' - - -def test_well_known(client: TestClient) -> None: - response = client.get("/.well-known/openid-configuration") - assert response.status_code == 200 - expected = { - "authorization_endpoint": "http://localhost:6666/authorize", - "token_endpoint": "http://localhost:6666/token", - "jwks_uri": "http://localhost:6666/.well-known/jwks.json", - "grant_types_supported": ["authorization_code"], - "id_token_signing_alg_values_supported": ["RS256"], - "issuer": "checkmk", - "response_types_supported": ["code", "token"], - "scopes_supported": ["openid", "email"], - "subject_types_supported": ["public"], - "token_endpoint_auth_methods_supported": ["client_secret_post"], - } - - assert response.json() == expected - - -@pytest.fixture(name="jwks_client") -def _jwks_client(client: TestClient) -> jwt.PyJWKClient: - jwks_client = jwt.PyJWKClient("I'll mock the fetch_data") - - def fetch_data() -> Any: - resp = client.get("/.well-known/jwks.json") - return resp.json() - - jwks_client.fetch_data = fetch_data # type: ignore[method-assign] - return jwks_client - - -def test_jwks_json(client: TestClient, jwks_client: jwt.PyJWKClient) -> None: - response = client.get("/.well-known/jwks.json") - assert response.status_code == 200 - jwk = jwks_client.get_signing_key(app.KEY.kid) - assert jwk is not None - algo_obj = jwt.api_jws.get_algorithm_by_name("RS256") - read_key = algo_obj.prepare_key(jwk.key) - assert read_key.public_numbers().n == app.KEY.public.public_numbers().n - - -def test_token(client: TestClient, jwks_client: jwt.PyJWKClient) -> None: - audience = "this-test-app" - data = {"client_id": audience} - response = client.post("/token", data=data) - response.raise_for_status() - id_token = response.json()["id_token"] - signing_key = jwks_client.get_signing_key_from_jwt(id_token) - payload = jwt.decode(id_token, signing_key.key, algorithms=["RS256"], audience=audience) - assert "email" in payload - - -def test_authorize(client: TestClient) -> None: - state = "sometext" - redirect_uri = "http://localhost/checkmk" - resp = client.get("/authorize", params={"state": state, "redirect_uri": redirect_uri}) - assert len(resp.history) > 0 - assert str(resp.url).startswith(redirect_uri) - assert f"state={state}" in str(resp.url) - assert "code=" in str(resp.url) - assert "&code" in str(resp.url) or "&state" in str(resp.url)