diff --git a/requests_oauth2client/__init__.py b/requests_oauth2client/__init__.py index 030fc19..7231bc5 100644 --- a/requests_oauth2client/__init__.py +++ b/requests_oauth2client/__init__.py @@ -79,6 +79,7 @@ from .dpop import ( DPoPKey, DPoPToken, + DPoPTokenSerializer, InvalidDPoPAccessToken, InvalidDPoPAlg, InvalidDPoPKey, diff --git a/requests_oauth2client/authorization_request.py b/requests_oauth2client/authorization_request.py index 695e0ea..5f4750c 100644 --- a/requests_oauth2client/authorization_request.py +++ b/requests_oauth2client/authorization_request.py @@ -933,6 +933,8 @@ def default_dumper(azr: AuthorizationRequest) -> str: Serialize an AuthorizationRequest as JSON, then compress with deflate, then encodes as base64url. + WARNING: If the `AuthorizationRequest` has `DPoPKey`, this does not serialize custom `jti_generator`, `iat_generator` or `dpop_token_class`! + Args: azr: the `AuthorizationRequest` to serialize diff --git a/requests_oauth2client/dpop.py b/requests_oauth2client/dpop.py index 4e05d48..c99af47 100644 --- a/requests_oauth2client/dpop.py +++ b/requests_oauth2client/dpop.py @@ -9,13 +9,13 @@ from uuid import uuid4 import jwskate -from attrs import define, field, frozen, setters +from attrs import asdict, define, field, frozen, setters from binapy import BinaPy from furl import furl # type: ignore[import-untyped] from requests import codes from typing_extensions import Self -from .tokens import AccessTokenTypes, BearerToken, IdToken, id_token_converter +from .tokens import AccessTokenTypes, BearerToken, BearerTokenSerializer, IdToken, id_token_converter from .utils import accepts_expires_in if TYPE_CHECKING: @@ -349,6 +349,71 @@ def handle_rs_provided_dpop_nonce(self, response: requests.Response) -> None: self.rs_nonce = nonce +class DPoPTokenSerializer(BearerTokenSerializer): + """A helper class to serialize `DPoPToken`s. + + This may be used to store DPoPTokens in session or cookies. + + It needs a `dumper` and a `loader` functions that will respectively serialize and deserialize + DPoPTokens. Default implementations are provided with use gzip and base64url on the serialized + JSON representation. + + Args: + dumper: a function to serialize a token into a `str`. + loader: a function to deserialize a serialized token representation. + + """ + + @staticmethod + def default_dumper(token: DPoPToken) -> str: + """Serialize a token as JSON, then compress with deflate, then encodes as base64url. + + WARNING: This does not serialize custom `jti_generator`, `iat_generator` or `dpop_token_class` in `DPoPKey`! + + Args: + token: the `DPoPToken` to serialize + + Returns: + the serialized value + + """ + d = asdict(token) + d.update(**d.pop("kwargs", {})) + d["dpop_key"]["private_key"] = token.dpop_key.private_key.to_pem() + d["dpop_key"].pop("jti_generator", None) + d["dpop_key"].pop("iat_generator", None) + d["dpop_key"].pop("dpop_token_class", None) + return ( + BinaPy.serialize_to("json", {k: w for k, w in d.items() if w is not None}).to("deflate").to("b64u").ascii() + ) + + @staticmethod + def default_loader(serialized: str, token_class: type[DPoPToken] = DPoPToken) -> DPoPToken: + """Deserialize a `DPoPToken`. + + This does the opposite operations than `default_dumper`. + + Args: + serialized: the serialized token + token_class: class to use to deserialize the Token + + Returns: + a DPoPToken + + """ + attrs = BinaPy(serialized).decode_from("b64u").decode_from("deflate").parse_from("json") + + expires_at = attrs.get("expires_at") + if expires_at: + attrs["expires_at"] = datetime.fromtimestamp(expires_at, tz=timezone.utc) + + if dpop_key := attrs.pop("dpop_key", None): + dpop_key["private_key"] = jwskate.Jwk.from_pem(dpop_key["private_key"]) + attrs["_dpop_key"] = DPoPKey(**dpop_key) + + return token_class(**attrs) + + def validate_dpop_proof( # noqa: C901 proof: str | bytes, *, diff --git a/requests_oauth2client/tokens.py b/requests_oauth2client/tokens.py index 91ec4a2..40185a1 100644 --- a/requests_oauth2client/tokens.py +++ b/requests_oauth2client/tokens.py @@ -645,7 +645,8 @@ def default_dumper(token: BearerToken) -> str: BinaPy.serialize_to("json", {k: w for k, w in d.items() if w is not None}).to("deflate").to("b64u").ascii() ) - def default_loader(self, serialized: str, token_class: type[BearerToken] = BearerToken) -> BearerToken: + @staticmethod + def default_loader(serialized: str, token_class: type[BearerToken] = BearerToken) -> BearerToken: """Deserialize a BearerToken. This does the opposite operations than `default_dumper`. diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index a685b28..81d65fc 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -17,6 +17,8 @@ ClientSecretBasic, ClientSecretJwt, ClientSecretPost, + DPoPKey, + DPoPToken, OAuth2Client, PrivateKeyJwt, PublicApp, @@ -50,6 +52,16 @@ def bearer_auth(access_token: str) -> BearerToken: return BearerToken(access_token) +@pytest.fixture(scope="session") +def dpop_key() -> DPoPKey: + return DPoPKey.generate() + + +@pytest.fixture(scope="session") +def dpop_token(access_token: str, dpop_key: DPoPKey) -> DPoPToken: + return DPoPToken(access_token=access_token, _dpop_key=dpop_key) + + @pytest.fixture(scope="session") def target_api() -> str: return "https://myapi.local/root/" diff --git a/tests/unit_tests/test_dpop.py b/tests/unit_tests/test_dpop.py index be99543..c1b7549 100644 --- a/tests/unit_tests/test_dpop.py +++ b/tests/unit_tests/test_dpop.py @@ -1,14 +1,17 @@ +from datetime import datetime, timedelta, timezone import secrets import pytest import requests from binapy import BinaPy from freezegun import freeze_time +from freezegun.api import FrozenDateTimeFactory from jwskate import Jwk, Jwt, KeyManagementAlgs, SignatureAlgs, SignedJwt from requests_oauth2client import ( DPoPKey, DPoPToken, + DPoPTokenSerializer, InvalidDPoPAccessToken, InvalidDPoPAlg, InvalidDPoPKey, @@ -703,3 +706,24 @@ def test_rs_dpop_nonce_loop( resp = requests.get(target_api, auth=dpop_token) assert resp.status_code == 401 assert resp.headers["DPoP-Nonce"] == "nonce2" + + +def test_token_serializer(dpop_token: DPoPToken, freezer: FrozenDateTimeFactory) -> None: + freezer.move_to("2024-08-01") + serializer = DPoPTokenSerializer() + candidate = serializer.dumps(dpop_token) + freezer.move_to(datetime.now(tz=timezone.utc) + timedelta(days=365)) + deserialized = serializer.loads(candidate) + + # Can't just check deserialized == dpop_token because DPoPKey iat_generator, + # jti_generator, and dpop_token_class defaults are different per instance + assert deserialized.access_token == dpop_token.access_token + assert deserialized.refresh_token == dpop_token.refresh_token + assert deserialized.expires_at == dpop_token.expires_at + assert deserialized.token_type == dpop_token.token_type + + assert deserialized.dpop_key.private_key == dpop_token.dpop_key.private_key + assert deserialized.dpop_key.alg == dpop_token.dpop_key.alg + assert deserialized.dpop_key.jwt_typ == dpop_token.dpop_key.jwt_typ + assert deserialized.dpop_key.as_nonce == dpop_token.dpop_key.as_nonce + assert deserialized.dpop_key.rs_nonce == dpop_token.dpop_key.rs_nonce