|
| 1 | +"""2FA Enforcement Middleware. |
| 2 | +
|
| 3 | +SECURITY CRITICAL: This middleware enforces 2FA for authenticated users. |
| 4 | +Previous versions used string prefix matching which allowed bypass via |
| 5 | +any /accounts/* URL. This version uses proper Django URL resolution. |
| 6 | +
|
| 7 | +See GitHub Issue #173 for vulnerability details. |
| 8 | +""" |
| 9 | + |
| 10 | +import logging |
| 11 | +from typing import Any |
| 12 | + |
1 | 13 | from allauth.mfa.adapter import get_adapter as get_mfa_adapter
|
2 | 14 | from asgiref.sync import sync_to_async
|
| 15 | +from django.conf import settings |
3 | 16 | from django.contrib import messages
|
4 | 17 | from django.http import HttpRequest, HttpResponse
|
5 | 18 | from django.shortcuts import redirect
|
6 |
| -from django.urls import reverse |
| 19 | +from django.urls import Resolver404, resolve |
7 | 20 | from django.utils.decorators import sync_and_async_middleware
|
8 | 21 |
|
9 | 22 | from myapp.models import SiteConfiguration
|
10 | 23 |
|
| 24 | +# Set up security logging |
| 25 | +security_logger = logging.getLogger("security.2fa") |
| 26 | + |
11 | 27 |
|
12 | 28 | @sync_and_async_middleware
|
13 | 29 | class Require2FAMiddleware:
|
14 |
| - """Middleware to enforce 2FA for all users based on site configuration.""" |
| 30 | + """Middleware to enforce 2FA for all users based on site configuration. |
| 31 | +
|
| 32 | + Uses secure URL resolution instead of vulnerable string prefix matching. |
| 33 | + """ |
15 | 34 |
|
16 | 35 | def __init__(self, get_response) -> None: # noqa: ANN001, D107
|
17 | 36 | self.get_response = get_response
|
18 | 37 |
|
19 |
| - # URLs that are always accessible without 2FA |
20 |
| - self.exempt_urls = [ |
21 |
| - reverse("account_login"), |
22 |
| - reverse("account_logout"), |
23 |
| - reverse("account_email"), # Email verification page |
24 |
| - "/admin/login/", # Admin login |
25 |
| - "/static/", # Static files |
26 |
| - "/media/", # Media files |
27 |
| - "/accounts/", # Alternative path for email verification |
28 |
| - ] |
29 |
| - |
30 |
| - def is_path_exempt(self, path: str) -> bool: |
31 |
| - """Check if the current path is exempt from 2FA enforcement. |
32 |
| -
|
33 |
| - Args: |
34 |
| - path (str): The current path. |
| 38 | + # URL names that are exempt from 2FA - Django's actual routing |
| 39 | + self.exempt_url_names = { |
| 40 | + "account_login", |
| 41 | + "account_logout", |
| 42 | + "account_email", # Email management page - required for email verification |
| 43 | + "account_confirm_email", |
| 44 | + "account_email_verification_sent", |
| 45 | + "account_reset_password", |
| 46 | + "account_reset_password_done", |
| 47 | + "account_reset_password_from_key", |
| 48 | + "account_reset_password_from_key_done", |
| 49 | + "account_reauthenticate", # Required for 2FA setup flow |
| 50 | + "mfa_activate_totp", # TOTP activation - required to set up 2FA |
| 51 | + "mfa_deactivate_totp", # TOTP deactivation |
| 52 | + "mfa_reauthenticate", # MFA-specific reauthentication |
| 53 | + "mfa_generate_recovery_codes", # Generate recovery codes |
| 54 | + "mfa_view_recovery_codes", # View recovery codes |
| 55 | + "mfa_download_recovery_codes", # Download recovery codes |
| 56 | + "admin:login", |
| 57 | + } |
| 58 | + |
| 59 | + def _is_static_request(self, request: HttpRequest) -> bool: |
| 60 | + """Check if this is a static file request using Django's settings.""" |
| 61 | + static_url = getattr(settings, "STATIC_URL", "/static/") |
| 62 | + media_url = getattr(settings, "MEDIA_URL", "/media/") |
| 63 | + |
| 64 | + # SECURITY: Protect against dangerous root path configurations |
| 65 | + # that would bypass ALL 2FA security checks |
| 66 | + if static_url == "/" or media_url == "/": |
| 67 | + security_logger.error( |
| 68 | + "SECURITY MISCONFIGURATION: STATIC_URL or MEDIA_URL is set to root path '/'. " |
| 69 | + "This bypasses 2FA enforcement. Fix your Django settings." |
| 70 | + ) |
| 71 | + # Don't treat root paths as static - force proper security checking |
| 72 | + return False |
35 | 73 |
|
36 |
| - Returns: |
37 |
| - bool: True if the path is exempt, False otherwise |
| 74 | + return request.path.startswith((static_url, media_url)) |
38 | 75 |
|
39 |
| - """ |
40 |
| - return any(path.startswith(url) for url in self.exempt_urls) |
| 76 | + def _user_has_2fa(self, user: Any) -> bool: |
| 77 | + """Check if user has 2FA enabled.""" |
| 78 | + mfa_adapter = get_mfa_adapter() |
| 79 | + return mfa_adapter.is_mfa_enabled(user) |
| 80 | + |
| 81 | + def _is_exempt_url(self, request: HttpRequest) -> bool: |
| 82 | + """Check if current URL is exempt from 2FA by name.""" |
| 83 | + # First try to get existing resolver_match |
| 84 | + resolver_match = getattr(request, "resolver_match", None) |
| 85 | + |
| 86 | + if not resolver_match: |
| 87 | + try: |
| 88 | + resolver_match = resolve(request.path_info) |
| 89 | + except Resolver404: |
| 90 | + # Can't resolve = probably 404 = let it through |
| 91 | + security_logger.debug("2FA: path %s doesn't resolve, allowing", request.path) |
| 92 | + return True |
| 93 | + except Exception as e: |
| 94 | + # Unexpected error during resolution - log and don't exempt |
| 95 | + security_logger.warning("2FA: error resolving path %s: %s", request.path, str(e)) |
| 96 | + return False |
| 97 | + |
| 98 | + # Check by URL name |
| 99 | + if resolver_match.url_name in self.exempt_url_names: |
| 100 | + security_logger.debug("2FA exemption: URL name '%s' for %s", resolver_match.url_name, request.path) |
| 101 | + return True |
| 102 | + |
| 103 | + # Check namespaced URLs properly |
| 104 | + if resolver_match.namespace and resolver_match.url_name: |
| 105 | + namespaced_name = f"{resolver_match.namespace}:{resolver_match.url_name}" |
| 106 | + if namespaced_name in self.exempt_url_names: |
| 107 | + security_logger.debug("2FA exemption: namespaced URL '%s' for %s", namespaced_name, request.path) |
| 108 | + return True |
| 109 | + |
| 110 | + return False |
41 | 111 |
|
42 | 112 | # Sync version
|
43 | 113 | def __call__(self, request: HttpRequest) -> HttpResponse:
|
44 | 114 | """Process the request and enforce 2FA if required."""
|
45 |
| - if not request.user.is_authenticated or self.is_path_exempt(request.path): |
| 115 | + # Skip static/media files |
| 116 | + if self._is_static_request(request): |
| 117 | + return self.get_response(request) |
| 118 | + |
| 119 | + # Skip if user not authenticated |
| 120 | + if not request.user.is_authenticated: |
| 121 | + return self.get_response(request) |
| 122 | + |
| 123 | + # Skip exempt URLs |
| 124 | + if self._is_exempt_url(request): |
46 | 125 | return self.get_response(request)
|
47 | 126 |
|
48 | 127 | # Check if 2FA is required by site configuration
|
49 | 128 | site_config = SiteConfiguration.objects.get()
|
50 | 129 | if not site_config.required_2fa:
|
51 | 130 | return self.get_response(request)
|
52 | 131 |
|
53 |
| - mfa_adapter = get_mfa_adapter() |
54 |
| - has_2fa = mfa_adapter.is_mfa_enabled(request.user) |
| 132 | + # Check if user has 2FA |
| 133 | + if self._user_has_2fa(request.user): |
| 134 | + return self.get_response(request) |
| 135 | + |
| 136 | + # User needs 2FA - log and redirect |
| 137 | + security_logger.warning( |
| 138 | + "2FA required but not configured for user: %s accessing: %s", |
| 139 | + getattr(request.user, "id", "unknown"), |
| 140 | + request.path, |
| 141 | + ) |
55 | 142 |
|
56 |
| - # If 2FA is not enabled for this user, redirect to 2FA setup |
57 |
| - if not has_2fa: |
58 |
| - # Add a message explaining the redirect |
| 143 | + # Don't redirect if we're already going to 2FA setup to avoid loops |
| 144 | + if not request.path.startswith("/accounts/2fa/"): |
59 | 145 | messages.warning(request, "Two-factor authentication is required. Please set it up now.")
|
60 |
| - # Redirect to 2FA setup page |
61 | 146 | return redirect("/accounts/2fa/")
|
62 | 147 |
|
63 | 148 | return self.get_response(request)
|
64 | 149 |
|
65 | 150 | # Async version
|
66 | 151 | async def __acall__(self, request: HttpRequest) -> HttpResponse:
|
67 | 152 | """Process the request and enforce 2FA if required."""
|
68 |
| - if not request.user.is_authenticated or self.is_path_exempt(request.path): |
| 153 | + # Skip static/media files |
| 154 | + if self._is_static_request(request): |
| 155 | + return await self.get_response(request) |
| 156 | + |
| 157 | + # Skip if user not authenticated |
| 158 | + if not request.user.is_authenticated: |
| 159 | + return await self.get_response(request) |
| 160 | + |
| 161 | + # Skip exempt URLs |
| 162 | + if self._is_exempt_url(request): |
69 | 163 | return await self.get_response(request)
|
70 | 164 |
|
71 | 165 | # Check if 2FA is required by site configuration
|
72 | 166 | site_config = await sync_to_async(SiteConfiguration.objects.get)()
|
73 | 167 | if not site_config.required_2fa:
|
74 | 168 | return await self.get_response(request)
|
75 | 169 |
|
76 |
| - mfa_adapter = get_mfa_adapter() |
77 |
| - # Properly await the async method |
78 |
| - has_2fa = await sync_to_async(mfa_adapter.is_mfa_enabled)(request.user) |
| 170 | + # Check if user has 2FA |
| 171 | + has_2fa = await sync_to_async(self._user_has_2fa)(request.user) |
| 172 | + if has_2fa: |
| 173 | + return await self.get_response(request) |
| 174 | + |
| 175 | + # User needs 2FA - log and redirect |
| 176 | + await sync_to_async(security_logger.warning)( |
| 177 | + "2FA required but not configured for user: %s accessing: %s", |
| 178 | + getattr(request.user, "id", "unknown"), |
| 179 | + request.path, |
| 180 | + ) |
79 | 181 |
|
80 |
| - # If 2FA is not enabled for this user, redirect to 2FA setup |
81 |
| - if not has_2fa: |
82 |
| - # Add a message explaining the redirect |
| 182 | + # Don't redirect if we're already going to 2FA setup to avoid loops |
| 183 | + if not request.path.startswith("/accounts/2fa/"): |
83 | 184 | await sync_to_async(messages.warning)(
|
84 | 185 | request, "Two-factor authentication is required. Please set it up now."
|
85 | 186 | )
|
86 |
| - # Redirect to 2FA setup page |
87 | 187 | return redirect("/accounts/2fa/")
|
88 | 188 |
|
89 | 189 | return await self.get_response(request)
|
0 commit comments