|
| 1 | +"""Authentication Backends for the People core app.""" |
| 2 | + |
| 3 | +import logging |
| 4 | + |
| 5 | +import requests |
| 6 | +from django.conf import settings |
| 7 | +from django.core.exceptions import SuspiciousOperation |
| 8 | +from django.utils.translation import gettext_lazy as _ |
| 9 | +from mozilla_django_oidc.auth import ( |
| 10 | + OIDCAuthenticationBackend as MozillaOIDCAuthenticationBackend, |
| 11 | +) |
| 12 | + |
| 13 | +logger = logging.getLogger(__name__) |
| 14 | + |
| 15 | + |
| 16 | +OIDC_USER_SUB_FIELD = getattr( |
| 17 | + settings, |
| 18 | + "OIDC_USER_SUB_FIELD", |
| 19 | + "sub", |
| 20 | +) # Default to 'sub' if not set in settings |
| 21 | + |
| 22 | + |
| 23 | +class OIDCAuthenticationBackend(MozillaOIDCAuthenticationBackend): |
| 24 | + """ |
| 25 | + Custom OpenID Connect (OIDC) Authentication Backend. |
| 26 | +
|
| 27 | + This class overrides the default OIDC Authentication Backend to accommodate differences |
| 28 | + in the User model, and handles signed and/or encrypted UserInfo response. |
| 29 | + """ |
| 30 | + |
| 31 | + def get_extra_claims(self, user_info): |
| 32 | + """ |
| 33 | + Return extra claims from user_info. |
| 34 | +
|
| 35 | + Args: |
| 36 | + user_info (dict): The user information dictionary. |
| 37 | +
|
| 38 | + Returns: |
| 39 | + dict: A dictionary of extra claims. |
| 40 | +
|
| 41 | + """ |
| 42 | + return { |
| 43 | + "name": self.compute_full_name(user_info), |
| 44 | + } |
| 45 | + |
| 46 | + def post_get_or_create_user(self, user, claims): |
| 47 | + """ |
| 48 | + Post-processing after user creation or retrieval. |
| 49 | +
|
| 50 | + Args: |
| 51 | + user (User): The user instance. |
| 52 | + claims (dict): The claims dictionary. |
| 53 | +
|
| 54 | + Returns: |
| 55 | + - None |
| 56 | +
|
| 57 | + """ |
| 58 | + |
| 59 | + def get_userinfo(self, access_token, id_token, payload): |
| 60 | + """ |
| 61 | + Return user details dictionary. |
| 62 | +
|
| 63 | + Args: |
| 64 | + access_token (str): The access token. |
| 65 | + id_token (str): The id token (unused). |
| 66 | + payload (dict): The token payload (unused). |
| 67 | +
|
| 68 | + Note: The id_token and payload parameters are unused in this implementation, |
| 69 | + but were kept to preserve base method signature. |
| 70 | +
|
| 71 | + Note: It handles signed and/or encrypted UserInfo Response. It is required by |
| 72 | + Agent Connect, which follows the OIDC standard. It forces us to override the |
| 73 | + base method, which deal with 'application/json' response. |
| 74 | +
|
| 75 | + Returns: |
| 76 | + dict: User details dictionary obtained from the OpenID Connect user endpoint. |
| 77 | +
|
| 78 | + """ |
| 79 | + user_response = requests.get( |
| 80 | + self.OIDC_OP_USER_ENDPOINT, |
| 81 | + headers={"Authorization": f"Bearer {access_token}"}, |
| 82 | + verify=self.get_settings("OIDC_VERIFY_SSL", True), |
| 83 | + timeout=self.get_settings("OIDC_TIMEOUT", None), |
| 84 | + proxies=self.get_settings("OIDC_PROXY", None), |
| 85 | + ) |
| 86 | + user_response.raise_for_status() |
| 87 | + return self.verify_token(user_response.text) |
| 88 | + |
| 89 | + def get_or_create_user(self, access_token, id_token, payload): |
| 90 | + """ |
| 91 | + Return a User based on userinfo. Create a new user if no match is found. |
| 92 | +
|
| 93 | + Args: |
| 94 | + access_token (str): The access token. |
| 95 | + id_token (str): The ID token. |
| 96 | + payload (dict): The user payload. |
| 97 | +
|
| 98 | + Returns: |
| 99 | + User: An existing or newly created User instance. |
| 100 | +
|
| 101 | + Raises: |
| 102 | + Exception: Raised when user creation is not allowed and no existing user is found. |
| 103 | +
|
| 104 | + """ |
| 105 | + user_info = self.get_userinfo(access_token, id_token, payload) |
| 106 | + |
| 107 | + sub = user_info.get("sub") |
| 108 | + if not sub: |
| 109 | + raise SuspiciousOperation(_("User info contained no recognizable user identification")) |
| 110 | + |
| 111 | + # Get user's full name from OIDC fields defined in settings |
| 112 | + email = user_info.get("email") |
| 113 | + |
| 114 | + claims = { |
| 115 | + OIDC_USER_SUB_FIELD: sub, |
| 116 | + "email": email, |
| 117 | + } |
| 118 | + claims.update(**self.get_extra_claims(user_info)) |
| 119 | + |
| 120 | + # if sub is absent, try matching on email |
| 121 | + user = self.get_existing_user(sub, email) |
| 122 | + |
| 123 | + if user: |
| 124 | + if not user.is_active: |
| 125 | + raise SuspiciousOperation(_("User account is disabled")) |
| 126 | + self.update_user_if_needed(user, claims) |
| 127 | + |
| 128 | + elif self.get_settings("OIDC_CREATE_USER", True): |
| 129 | + user = self.create_user(claims) |
| 130 | + |
| 131 | + self.post_get_or_create_user(user, claims) |
| 132 | + return user |
| 133 | + |
| 134 | + def create_user(self, claims): |
| 135 | + """Return a newly created User instance.""" |
| 136 | + sub = claims.get("sub") |
| 137 | + if sub is None: |
| 138 | + raise SuspiciousOperation(_("Claims contained no recognizable user identification")) |
| 139 | + |
| 140 | + logger.info("Creating user %s", sub) |
| 141 | + |
| 142 | + user = self.UserModel(**claims) |
| 143 | + user.set_unusable_password() |
| 144 | + user.save() |
| 145 | + |
| 146 | + return user |
| 147 | + |
| 148 | + def compute_full_name(self, user_info): |
| 149 | + """Compute user's full name based on OIDC fields in settings.""" |
| 150 | + name_fields = settings.USER_OIDC_FIELDS_TO_NAME |
| 151 | + full_name = " ".join(user_info[field] for field in name_fields if user_info.get(field)) |
| 152 | + return full_name or None |
| 153 | + |
| 154 | + def get_existing_user(self, sub, email): |
| 155 | + """Fetch existing user by sub or email.""" |
| 156 | + try: |
| 157 | + return self.UserModel.objects.get(sub=sub) |
| 158 | + except self.UserModel.DoesNotExist: |
| 159 | + if email and settings.OIDC_FALLBACK_TO_EMAIL_FOR_IDENTIFICATION: |
| 160 | + try: |
| 161 | + return self.UserModel.objects.get(email=email) |
| 162 | + except self.UserModel.DoesNotExist: |
| 163 | + pass |
| 164 | + return None |
| 165 | + |
| 166 | + def update_user_if_needed(self, user, claims): |
| 167 | + """Update user claims if they have changed.""" |
| 168 | + updated_claims = {} |
| 169 | + for key in claims: |
| 170 | + if not hasattr(user, key): |
| 171 | + continue |
| 172 | + |
| 173 | + claim_value = claims.get(key) |
| 174 | + if claim_value and claim_value != getattr(user, key): |
| 175 | + updated_claims[key] = claim_value |
| 176 | + |
| 177 | + if updated_claims: |
| 178 | + self.UserModel.objects.filter(sub=user.sub).update(**updated_claims) |
0 commit comments