Skip to content

Commit bf95de8

Browse files
committed
Lint fixes
1 parent 8fd3573 commit bf95de8

File tree

2 files changed

+134
-2
lines changed

2 files changed

+134
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/common.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -647,6 +647,92 @@ def get_platform(self) -> str:
647647
"""Get the platform for this connector instance. Override in subclasses."""
648648
return "unknown"
649649

650+
def _apply_replace_field_transform(
651+
self, source_columns: List[str]
652+
) -> Dict[str, Optional[str]]:
653+
"""
654+
Apply ReplaceField SMT transformations to column mappings.
655+
656+
ReplaceField transform can filter, rename, or drop fields:
657+
- include: Keep only specified fields (all others dropped)
658+
- exclude: Drop specified fields (all others kept)
659+
- rename: Rename fields using from:to format
660+
661+
Reference: https://docs.confluent.io/platform/current/connect/transforms/replacefield.html
662+
663+
Args:
664+
source_columns: List of source column names
665+
666+
Returns:
667+
Dictionary mapping source column -> target column name (None if dropped)
668+
"""
669+
# Parse transforms from connector config
670+
transforms_config = self.connector_manifest.config.get("transforms", "")
671+
if not transforms_config:
672+
# No transforms - return 1:1 mapping
673+
return {col: col for col in source_columns}
674+
675+
transform_names = parse_comma_separated_list(transforms_config)
676+
677+
# Build column mapping (source -> target, None means dropped)
678+
column_mapping: Dict[str, Optional[str]] = {col: col for col in source_columns}
679+
680+
# Apply each ReplaceField transform in order
681+
for transform_name in transform_names:
682+
transform_type = self.connector_manifest.config.get(
683+
f"transforms.{transform_name}.type", ""
684+
)
685+
686+
# Check if this is a ReplaceField$Value transform
687+
# We only support Value transforms since those affect the column data
688+
if (
689+
transform_type
690+
!= "org.apache.kafka.connect.transforms.ReplaceField$Value"
691+
):
692+
continue
693+
694+
# Get transform configuration
695+
include_config = self.connector_manifest.config.get(
696+
f"transforms.{transform_name}.include", ""
697+
)
698+
exclude_config = self.connector_manifest.config.get(
699+
f"transforms.{transform_name}.exclude", ""
700+
)
701+
rename_config = self.connector_manifest.config.get(
702+
f"transforms.{transform_name}.renames", ""
703+
)
704+
705+
# Apply include filter (keep only specified fields)
706+
if include_config:
707+
include_fields = set(parse_comma_separated_list(include_config))
708+
for col in list(column_mapping.keys()):
709+
if column_mapping[col] not in include_fields:
710+
column_mapping[col] = None
711+
712+
# Apply exclude filter (drop specified fields)
713+
if exclude_config:
714+
exclude_fields = set(parse_comma_separated_list(exclude_config))
715+
for col in list(column_mapping.keys()):
716+
if column_mapping[col] in exclude_fields:
717+
column_mapping[col] = None
718+
719+
# Apply renames (format: "from:to,from2:to2")
720+
if rename_config:
721+
rename_pairs = parse_comma_separated_list(rename_config)
722+
rename_map = {}
723+
for pair in rename_pairs:
724+
if ":" in pair:
725+
from_field, to_field = pair.split(":", 1)
726+
rename_map[from_field.strip()] = to_field.strip()
727+
728+
# Apply renames to the column mapping
729+
for col in list(column_mapping.keys()):
730+
current_name = column_mapping[col]
731+
if current_name and current_name in rename_map:
732+
column_mapping[col] = rename_map[current_name]
733+
734+
return column_mapping
735+
650736
def _extract_fine_grained_lineage(
651737
self,
652738
source_dataset: str,
@@ -705,16 +791,30 @@ def _extract_fine_grained_lineage(
705791
env=self.config.env,
706792
)
707793

794+
# Apply ReplaceField transforms to column mappings
795+
# source_schema is Dict[str, str] mapping column names to types
796+
column_mapping = self._apply_replace_field_transform(
797+
list(source_schema.keys())
798+
)
799+
708800
# Create fine-grained lineage for each source column
709-
# Assume 1:1 mapping (column names are preserved)
710801
fine_grained_lineages: List[FineGrainedLineageDict] = []
711802

712803
for source_col in source_schema:
804+
target_col = column_mapping.get(source_col)
805+
806+
# Skip if field was dropped by ReplaceField transform
807+
if target_col is None:
808+
logger.debug(
809+
f"Skipping column '{source_col}' - dropped by ReplaceField transform"
810+
)
811+
continue
812+
713813
fine_grained_lineage: FineGrainedLineageDict = {
714814
"upstreamType": "FIELD_SET",
715815
"downstreamType": "FIELD",
716816
"upstreams": [make_schema_field_urn(source_urn_str, source_col)],
717-
"downstreams": [make_schema_field_urn(str(target_urn), source_col)],
817+
"downstreams": [make_schema_field_urn(str(target_urn), target_col)],
718818
}
719819
fine_grained_lineages.append(fine_grained_lineage)
720820

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/transform_plugins.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,37 @@ def should_apply_automatically(cls) -> bool:
189189
return False # Complex transforms require explicit user configuration
190190

191191

192+
class ReplaceFieldPlugin(TransformPlugin):
193+
"""
194+
Plugin for ReplaceField transforms.
195+
196+
ReplaceField transforms only affect message field names (include/exclude/rename),
197+
not topic names, so they're a no-op for topic transformation but need to be
198+
registered as known transforms to avoid warnings.
199+
"""
200+
201+
SUPPORTED_TYPES = {
202+
"org.apache.kafka.connect.transforms.ReplaceField$Value",
203+
"org.apache.kafka.connect.transforms.ReplaceField$Key",
204+
}
205+
206+
@classmethod
207+
def supports_transform_type(cls, transform_type: str) -> bool:
208+
return transform_type in cls.SUPPORTED_TYPES
209+
210+
def apply_forward(self, topics: List[str], config: TransformConfig) -> List[str]:
211+
"""ReplaceField doesn't affect topic names, only field names within messages."""
212+
return topics
213+
214+
def apply_reverse(self, topics: List[str], config: TransformConfig) -> List[str]:
215+
"""ReplaceField doesn't affect topic names, only field names within messages."""
216+
return topics
217+
218+
@classmethod
219+
def should_apply_automatically(cls) -> bool:
220+
return True # Safe to apply automatically - it's a no-op for topic names
221+
222+
192223
class TransformPluginRegistry:
193224
"""Registry for transform plugins."""
194225

@@ -200,6 +231,7 @@ def _register_default_plugins(self):
200231
"""Register default transform plugins."""
201232
self.register(RegexRouterPlugin())
202233
self.register(ComplexTransformPlugin())
234+
self.register(ReplaceFieldPlugin())
203235

204236
def register(self, plugin: TransformPlugin) -> None:
205237
"""Register a transform plugin."""

0 commit comments

Comments
 (0)