Skip to content

Commit 8d18f30

Browse files
feat(ingestion): Add secret masking framework (#15188)
1 parent ae1406a commit 8d18f30

21 files changed

+5884
-5
lines changed

metadata-ingestion/src/datahub/cli/ingest_cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from datahub.ingestion.graph.config import ClientMode
2222
from datahub.ingestion.run.connection import ConnectionManager
2323
from datahub.ingestion.run.pipeline import Pipeline
24+
from datahub.masking.bootstrap import initialize_secret_masking
2425
from datahub.telemetry import telemetry
2526
from datahub.upgrade import upgrade
2627
from datahub.utilities.ingest_utils import deploy_source_vars
@@ -128,6 +129,9 @@ def run(
128129
) -> None:
129130
"""Ingest metadata into DataHub."""
130131

132+
# Initialize secret masking (before any logging)
133+
initialize_secret_masking()
134+
131135
def run_pipeline_to_completion(pipeline: Pipeline) -> int:
132136
logger.info("Starting metadata ingestion")
133137
with click_spinner.spinner(disable=no_spinner or no_progress):

metadata-ingestion/src/datahub/configuration/common.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import contextvars
12
import dataclasses
23
import re
34
import unittest.mock
@@ -21,11 +22,12 @@
2122
import pydantic
2223
import pydantic_core
2324
from cached_property import cached_property
24-
from pydantic import BaseModel, ConfigDict, ValidationError
25+
from pydantic import BaseModel, ConfigDict, SecretStr, ValidationError, model_validator
2526
from pydantic.fields import Field
2627
from typing_extensions import Protocol, Self
2728

2829
from datahub.configuration._config_enum import ConfigEnum as ConfigEnum
30+
from datahub.masking.secret_registry import SecretRegistry, is_masking_enabled
2931
from datahub.utilities.dedup_list import deduplicate_list
3032

3133
REDACT_KEYS = {
@@ -95,6 +97,11 @@ def redact_raw_config(obj: Any) -> Any:
9597

9698
LaxStr = Annotated[str, pydantic.BeforeValidator(lambda v: str(v))]
9799

100+
# Context variable to track if we're inside a nested ConfigModel construction
101+
_inside_nested_config: contextvars.ContextVar[bool] = contextvars.ContextVar(
102+
"_inside_nested_config", default=False
103+
)
104+
98105

99106
@dataclasses.dataclass(frozen=True)
100107
class SupportedSources:
@@ -129,6 +136,96 @@ class ConfigModel(BaseModel):
129136
json_schema_extra=_config_model_schema_extra,
130137
)
131138

139+
@model_validator(mode="wrap")
140+
@classmethod
141+
def _track_nesting_context(
142+
cls,
143+
data: Any,
144+
handler: pydantic.ValidatorFunctionWrapHandler,
145+
info: pydantic.ValidationInfo,
146+
) -> Self:
147+
"""
148+
Wrap validator that tracks nesting context for nested ConfigModel detection.
149+
150+
Sets a context variable so nested ConfigModels know they're being constructed as fields.
151+
"""
152+
# Set context for any nested models that will be created during field processing
153+
token = _inside_nested_config.set(True)
154+
try:
155+
# Process the model normally (this calls __init__ and all validators)
156+
instance = handler(data)
157+
finally:
158+
# Reset context after processing
159+
_inside_nested_config.reset(token)
160+
161+
return instance
162+
163+
@model_validator(mode="after")
164+
def _register_secret_fields(self) -> Self:
165+
"""
166+
Register SecretStr fields with the secret masking registry.
167+
Recursively traverses nested ConfigModel instances to find all SecretStr fields.
168+
169+
Only models that are constructed outside of Pydantic field processing will register secrets.
170+
This ensures we capture the full qualified paths for nested secrets.
171+
172+
Performance: Uses batch registration for efficiency - single version
173+
increment instead of one per secret.
174+
"""
175+
if not is_masking_enabled():
176+
return self
177+
178+
# Only register if we're NOT inside another ConfigModel's field processing
179+
# This means we're a "root" model from the user's perspective
180+
if _inside_nested_config.get():
181+
return self
182+
183+
# Collect all secrets recursively (including from nested models)
184+
secrets: Dict[str, str] = {}
185+
self._collect_secrets(secrets, prefix="")
186+
187+
# Batch register all secrets in one operation
188+
if secrets:
189+
SecretRegistry.get_instance().register_secrets_batch(secrets)
190+
191+
return self
192+
193+
def _collect_secrets(self, secrets: Dict[str, str], prefix: str) -> None:
194+
"""
195+
Recursively collect SecretStr fields from this model and nested ConfigModel instances.
196+
197+
Args:
198+
secrets: Dictionary to populate with field_name -> secret_value mappings
199+
prefix: Prefix for nested field names (e.g., "azure_auth." for nested fields)
200+
"""
201+
for field_name, _field_info in self.__class__.model_fields.items():
202+
field_value = getattr(self, field_name, None)
203+
204+
if field_value is None:
205+
continue
206+
207+
# Build the full field path for better debugging
208+
full_name = f"{prefix}{field_name}" if prefix else field_name
209+
210+
if isinstance(field_value, SecretStr):
211+
# Direct SecretStr field
212+
secret_value = field_value.get_secret_value()
213+
if secret_value:
214+
secrets[full_name] = secret_value
215+
elif isinstance(field_value, ConfigModel):
216+
# Nested ConfigModel - recurse into it
217+
field_value._collect_secrets(secrets, prefix=f"{full_name}.")
218+
elif isinstance(field_value, list):
219+
# Handle lists of ConfigModels
220+
for idx, item in enumerate(field_value):
221+
if isinstance(item, ConfigModel):
222+
item._collect_secrets(secrets, prefix=f"{full_name}[{idx}].")
223+
elif isinstance(field_value, dict):
224+
# Handle dicts with ConfigModel values
225+
for key, item in field_value.items():
226+
if isinstance(item, ConfigModel):
227+
item._collect_secrets(secrets, prefix=f"{full_name}[{key}].")
228+
132229
@classmethod
133230
def parse_obj_allow_extras(cls, obj: Any) -> Self:
134231
"""Parse an object while allowing extra fields.

metadata-ingestion/src/datahub/configuration/config_loader.py

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,32 @@
1515
from datahub.configuration.json_loader import JsonConfigurationMechanism
1616
from datahub.configuration.toml import TomlConfigurationMechanism
1717
from datahub.configuration.yaml import YamlConfigurationMechanism
18+
from datahub.masking.secret_registry import SecretRegistry, is_masking_enabled
1819

1920
Environ = Mapping[str, str]
2021

2122

23+
def _extract_env_var_names(text: str) -> Set[str]:
24+
"""Extract environment variable names from ${VAR} or $VAR patterns."""
25+
var_names = set()
26+
27+
# Match ${VAR} and bash parameter expansion patterns
28+
# Pattern breakdown:
29+
# ([A-Za-z_][A-Za-z0-9_]*) - capture variable name
30+
# (?::[+\-=?][^}]*)? - optional bash operators with content
31+
for match in re.finditer(r"\$\{([A-Za-z_][A-Za-z0-9_]*)(?::[+\-=?][^}]*)?\}", text):
32+
var_names.add(match.group(1))
33+
34+
# Match $VAR patterns (without braces)
35+
for match in re.finditer(r"\$([A-Za-z_][A-Za-z0-9_]*)", text):
36+
var_name = match.group(1)
37+
# Only add if not already captured by ${} pattern
38+
if var_name not in var_names:
39+
var_names.add(var_name)
40+
41+
return var_names
42+
43+
2244
def resolve_env_variables(config: dict, environ: Environ) -> dict:
2345
# TODO: This is kept around for backwards compatibility.
2446
return EnvResolver(environ).resolve(config)
@@ -30,9 +52,26 @@ def list_referenced_env_variables(config: dict) -> Set[str]:
3052

3153

3254
class EnvResolver:
33-
def __init__(self, environ: Environ, strict_env_syntax: bool = False):
55+
"""Resolves environment variable references in configuration dictionaries."""
56+
57+
def __init__(
58+
self,
59+
environ: Environ,
60+
strict_env_syntax: bool = False,
61+
register_secrets: bool = True,
62+
):
63+
"""
64+
Initialize the environment variable resolver.
65+
66+
Args:
67+
environ: Environment variable mapping (os.environ, custom dict, or external secrets)
68+
strict_env_syntax: If True, only match ${VAR} syntax (not $VAR)
69+
register_secrets: If True, register resolved values with masking registry
70+
(only if masking is globally enabled)
71+
"""
3472
self.environ = environ
3573
self.strict_env_syntax = strict_env_syntax
74+
self.register_secrets = register_secrets
3675

3776
def resolve(self, config: dict) -> dict:
3877
return self._resolve_dict(config)
@@ -57,16 +96,42 @@ def mock_get_env(key: str, default: Optional[str] = None) -> str:
5796
mock = unittest.mock.MagicMock()
5897
mock.get.side_effect = mock_get_env
5998

60-
resolver = EnvResolver(environ=mock, strict_env_syntax=strict_env_syntax)
99+
resolver = EnvResolver(
100+
environ=mock, strict_env_syntax=strict_env_syntax, register_secrets=False
101+
)
61102
resolver._resolve_dict(config)
62103

63104
return vars
64105

106+
def _register_env_vars_from_element(self, element: str) -> None:
107+
"""Register environment variables found in element for secret masking."""
108+
if not self.register_secrets or not is_masking_enabled():
109+
return
110+
111+
# Extract variable names from the pattern
112+
var_names = _extract_env_var_names(element)
113+
114+
# Collect secrets for batch registration
115+
# We register all ${VAR} references from recipe as secrets
116+
secrets = {}
117+
for var_name in var_names:
118+
value = self.environ.get(var_name)
119+
if value:
120+
secrets[var_name] = value
121+
122+
# Batch register all secrets from this element
123+
if secrets:
124+
SecretRegistry.get_instance().register_secrets_batch(secrets)
125+
65126
def _resolve_element(self, element: str) -> str:
66127
if re.search(r"(\$\{).+(\})", element):
128+
# Register secrets before expansion
129+
self._register_env_vars_from_element(element)
67130
return expand(element, nounset=True, environ=self.environ)
68131
elif not self.strict_env_syntax and element.startswith("$"):
69132
try:
133+
# Register secrets before expansion
134+
self._register_env_vars_from_element(element)
70135
return expand(element, nounset=True, environ=self.environ)
71136
except UnboundVariable:
72137
# TODO: This fallback is kept around for backwards compatibility, but

metadata-ingestion/src/datahub/configuration/env_vars.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,16 @@ def get_debug() -> bool:
151151
return os.getenv("DATAHUB_DEBUG", "").lower() == "true"
152152

153153

154+
def get_disable_secret_masking() -> bool:
155+
"""
156+
Disable secret masking for debugging purposes.
157+
158+
WARNING: Only use this in development/debugging scenarios.
159+
Disabling secret masking will expose sensitive information in logs.
160+
"""
161+
return os.getenv("DATAHUB_DISABLE_SECRET_MASKING", "").lower() in ("true", "1")
162+
163+
154164
# ============================================================================
155165
# Data Processing Configuration
156166
# ============================================================================
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""
2+
Secret masking system for DataHub ingestion.
3+
"""
4+
5+
__all__ = [
6+
"SecretMaskingFilter",
7+
"StreamMaskingWrapper",
8+
"install_masking_filter",
9+
"uninstall_masking_filter",
10+
"SecretRegistry",
11+
"is_masking_enabled",
12+
"initialize_secret_masking",
13+
"get_masking_safe_logger",
14+
]
15+
16+
from datahub.masking.bootstrap import initialize_secret_masking
17+
from datahub.masking.logging_utils import get_masking_safe_logger
18+
from datahub.masking.masking_filter import (
19+
SecretMaskingFilter,
20+
StreamMaskingWrapper,
21+
install_masking_filter,
22+
uninstall_masking_filter,
23+
)
24+
from datahub.masking.secret_registry import SecretRegistry, is_masking_enabled

0 commit comments

Comments
 (0)