From a8cd537ca96ec16d3ae2a93061ad7c92e2f94d87 Mon Sep 17 00:00:00 2001 From: Sergei Maertens Date: Wed, 14 Feb 2024 17:48:28 +0100 Subject: [PATCH 1/2] :recycle: [#517] Extract JWS/JWK verification and decoding into generic utility This allows us to re-use the verification functionality for both the access token processing and userinfo response data processing without duplicating the low-level implementation. Notable changes: * the 'none' algorithm is blocked as this is a common exploit vector * added some more documentation about the parameters * added type hints to document expected parameter types * did some slight refactoring/restructuring to make the code type-safe --- mozilla_django_oidc/_jose.py | 79 ++++++++++++++++++++++++++++++++++++ mozilla_django_oidc/auth.py | 37 ++++------------- 2 files changed, 87 insertions(+), 29 deletions(-) create mode 100644 mozilla_django_oidc/_jose.py diff --git a/mozilla_django_oidc/_jose.py b/mozilla_django_oidc/_jose.py new file mode 100644 index 00000000..85ce21a4 --- /dev/null +++ b/mozilla_django_oidc/_jose.py @@ -0,0 +1,79 @@ +""" +Helpers for dealing with Javascript Object Signing and Encryption (JOSE) in an OpenID +Connect context. +""" +import json +from typing import Dict, Any, TypeAlias, Union + +from django.core.exceptions import SuspiciousOperation +from django.utils.encoding import smart_bytes + +from josepy.jwk import JWK +from josepy.jws import JWS + +# values could be narrowed down to relevant JSON types. +Payload: TypeAlias = Dict[str, Any] + + +def verify_jws_and_decode( + token: bytes, + key, + signing_algorithm: str = "", + decode_json: bool = False, # for backwards compatibility reasons +) -> Union[Payload, bytes]: + """ + Cryptographically verify the passed in token and return the decoded payload. + + Verification is done with utilities from the josepy library. + + :arg token: the raw binary content of the JWT/token. + :arg key: the key to verify the signature with. This may be a key obtained from + the OIDC_OP_JWKS_ENDPOINT or a shared key as a string (XXX CONFIRM THIS!) + :arg signing_algorithm: If provided, the token must employ this exact signing + algorithm. Values must be valid names from algorithms in :mod:`josepy.jwa`. + :arg decode_json: If true, the payload will be json-decoded and return a Python + object rather than a bytestring. + :return: the extracted payload object, deserialized from JSON. + :raises SuspiciousOperation: if the token verification fails. + """ + jws = JWS.from_compact(token) + + # validate the signing algorithm + if (alg := jws.signature.combined.alg) is None: + msg = "No alg value found in header" + raise SuspiciousOperation(msg) + + if signing_algorithm and signing_algorithm != alg.name: + msg = ( + "The provider algorithm {!r} does not match the client's " + "expected signing algorithm.".format(alg) + ) + raise SuspiciousOperation(msg) + + # one of the most common implementation weaknesses -> attacker can supply 'none' + # algorithm + # XXX: check if this is okay, technically users can now set + # settings.OIDC_RP_SIGN_ALGO = "none" and things should work? + if alg.name == "none": + raise SuspiciousOperation("'none' for alg value is not allowed") + + # process the key parameter which was/may have been loaded from keys endpoint + if isinstance(key, str): + # Use smart_bytes here since the key string comes from settings. + jwk = JWK.load(smart_bytes(key)) + else: + # The key is a json returned from the IDP JWKS endpoint. + jwk = JWK.from_json(key) + # address some missing upstream Self type declarations + assert isinstance(jwk, JWK) + + if not jws.verify(jwk): + msg = "JWS token verification failed." + raise SuspiciousOperation(msg) + + # return the decoded JSON or the raw bytestring payload + payload = jws.payload + if not decode_json: + return payload + else: + return json.loads(payload.decode("utf-8")) diff --git a/mozilla_django_oidc/auth.py b/mozilla_django_oidc/auth.py index f8243fe3..869a3346 100644 --- a/mozilla_django_oidc/auth.py +++ b/mozilla_django_oidc/auth.py @@ -9,15 +9,15 @@ from django.contrib.auth.backends import ModelBackend from django.core.exceptions import ImproperlyConfigured, SuspiciousOperation from django.urls import reverse -from django.utils.encoding import force_bytes, smart_bytes, smart_str +from django.utils.encoding import force_bytes, smart_str from django.utils.module_loading import import_string from josepy.b64 import b64decode -from josepy.jwk import JWK from josepy.jws import JWS, Header from requests.auth import HTTPBasicAuth from requests.exceptions import HTTPError from mozilla_django_oidc.utils import absolutify, import_from_settings +from ._jose import verify_jws_and_decode LOGGER = logging.getLogger(__name__) @@ -127,33 +127,12 @@ def update_user(self, user, claims): def _verify_jws(self, payload, key): """Verify the given JWS payload with the given key and return the payload""" - jws = JWS.from_compact(payload) - - try: - alg = jws.signature.combined.alg.name - except KeyError: - msg = "No alg value found in header" - raise SuspiciousOperation(msg) - - if alg != self.OIDC_RP_SIGN_ALGO: - msg = ( - "The provider algorithm {!r} does not match the client's " - "OIDC_RP_SIGN_ALGO.".format(alg) - ) - raise SuspiciousOperation(msg) - - if isinstance(key, str): - # Use smart_bytes here since the key string comes from settings. - jwk = JWK.load(smart_bytes(key)) - else: - # The key is a json returned from the IDP JWKS endpoint. - jwk = JWK.from_json(key) - - if not jws.verify(jwk): - msg = "JWS token verification failed." - raise SuspiciousOperation(msg) - - return jws.payload + return verify_jws_and_decode( + token=payload, + key=key, + signing_algorithm=self.OIDC_RP_SIGN_ALGO, + decode_json=False, + ) def retrieve_matching_jwk(self, token): """Get the signing key by exploring the JWKS endpoint of the OP.""" From 6bfe75212c18f00166fc890b24ea1c235209dd2b Mon Sep 17 00:00:00 2001 From: Sergei Maertens Date: Wed, 14 Feb 2024 17:59:54 +0100 Subject: [PATCH 2/2] :sparkles: [#517] Implement content-type negotation in userinfo endpoint TODO: address broken tests TODO: add new tests --- mozilla_django_oidc/auth.py | 43 +++++++++++++++++++++++++++++++++--- mozilla_django_oidc/utils.py | 14 ++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/mozilla_django_oidc/auth.py b/mozilla_django_oidc/auth.py index 869a3346..d1073393 100644 --- a/mozilla_django_oidc/auth.py +++ b/mozilla_django_oidc/auth.py @@ -16,8 +16,12 @@ from requests.auth import HTTPBasicAuth from requests.exceptions import HTTPError -from mozilla_django_oidc.utils import absolutify, import_from_settings -from ._jose import verify_jws_and_decode +from mozilla_django_oidc._jose import verify_jws_and_decode +from mozilla_django_oidc.utils import ( + absolutify, + extract_content_type, + import_from_settings, +) LOGGER = logging.getLogger(__name__) @@ -258,7 +262,40 @@ def get_userinfo(self, access_token, id_token, payload): proxies=self.get_settings("OIDC_PROXY", None), ) user_response.raise_for_status() - return user_response.json() + + # From https://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse + # + # > The UserInfo Endpoint MUST return a content-type header to indicate which + # > format is being returned. + # + # XXX: should we .lower() this value to be sure? + content_type = extract_content_type(user_response.headers["Content-Type"]) + if content_type == "application/json": + return user_response.json() + elif content_type == "application/jwt": + token = user_response.content + # get the key from the configured keys endpoint + # XXX: tested with asymmetric encryption. algorithms like HS256 rely on + # out-of-band key exchange and are currently untested. Re-using + # self.OIDC_RP_IDP_SIGN_KEY seems like a bad idea since the endpoint may + # use a separate key alltogether + key = self.retrieve_matching_jwk(token) + payload = verify_jws_and_decode( + token, + key, + # Providers typically control the algorithm which may differ from + # self.OIDC_RP_SIGN_ALGO, e.g. Keycloak allows setting the algorithm + # specfically for the userinfo endpoint. + signing_algorithm="", + decode_json=True, + ) + return payload + else: + raise ValueError( + f"Got an invalid Content-Type header value ({content_type}) " + "according to OpenID Connect Core 1.0 standard. Contact your " + "vendor." + ) def authenticate(self, request, **kwargs): """Authenticates a user based on the OIDC code flow.""" diff --git a/mozilla_django_oidc/utils.py b/mozilla_django_oidc/utils.py index 6e70ee95..de808f0f 100644 --- a/mozilla_django_oidc/utils.py +++ b/mozilla_django_oidc/utils.py @@ -8,6 +8,7 @@ import josepy.b64 from django.conf import settings from django.core.exceptions import ImproperlyConfigured +from requests.utils import _parse_content_type_header # type: ignore LOGGER = logging.getLogger(__name__) @@ -21,6 +22,19 @@ def parse_www_authenticate_header(header): return parse_keqv_list(items) +def extract_content_type(ct_header: str) -> str: + """ + Get the content type + parameters from content type header. + + This is internal API since we use a requests internal utility, which may be + removed/modified at any time. However, this is a deliberate choices since I trust + requests to have a correct implementation more than coming up with one myself. + """ + content_type, _ = _parse_content_type_header(ct_header) + # discard the params, we only want the content type itself + return content_type + + def import_from_settings(attr, *args): """ Load an attribute from the django settings.