Skip to content

Commit e2ce77f

Browse files
committed
Lint fixes
1 parent b1383de commit e2ce77f

File tree

2 files changed

+134
-2
lines changed

2 files changed

+134
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/common.py

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,92 @@ def get_platform(self) -> str:
648648
"""Get the platform for this connector instance. Override in subclasses."""
649649
return "unknown"
650650

651+
def _apply_replace_field_transform(
652+
self, source_columns: List[str]
653+
) -> Dict[str, Optional[str]]:
654+
"""
655+
Apply ReplaceField SMT transformations to column mappings.
656+
657+
ReplaceField transform can filter, rename, or drop fields:
658+
- include: Keep only specified fields (all others dropped)
659+
- exclude: Drop specified fields (all others kept)
660+
- rename: Rename fields using from:to format
661+
662+
Reference: https://docs.confluent.io/platform/current/connect/transforms/replacefield.html
663+
664+
Args:
665+
source_columns: List of source column names
666+
667+
Returns:
668+
Dictionary mapping source column -> target column name (None if dropped)
669+
"""
670+
# Parse transforms from connector config
671+
transforms_config = self.connector_manifest.config.get("transforms", "")
672+
if not transforms_config:
673+
# No transforms - return 1:1 mapping
674+
return {col: col for col in source_columns}
675+
676+
transform_names = parse_comma_separated_list(transforms_config)
677+
678+
# Build column mapping (source -> target, None means dropped)
679+
column_mapping: Dict[str, Optional[str]] = {col: col for col in source_columns}
680+
681+
# Apply each ReplaceField transform in order
682+
for transform_name in transform_names:
683+
transform_type = self.connector_manifest.config.get(
684+
f"transforms.{transform_name}.type", ""
685+
)
686+
687+
# Check if this is a ReplaceField$Value transform
688+
# We only support Value transforms since those affect the column data
689+
if (
690+
transform_type
691+
!= "org.apache.kafka.connect.transforms.ReplaceField$Value"
692+
):
693+
continue
694+
695+
# Get transform configuration
696+
include_config = self.connector_manifest.config.get(
697+
f"transforms.{transform_name}.include", ""
698+
)
699+
exclude_config = self.connector_manifest.config.get(
700+
f"transforms.{transform_name}.exclude", ""
701+
)
702+
rename_config = self.connector_manifest.config.get(
703+
f"transforms.{transform_name}.renames", ""
704+
)
705+
706+
# Apply include filter (keep only specified fields)
707+
if include_config:
708+
include_fields = set(parse_comma_separated_list(include_config))
709+
for col in list(column_mapping.keys()):
710+
if column_mapping[col] not in include_fields:
711+
column_mapping[col] = None
712+
713+
# Apply exclude filter (drop specified fields)
714+
if exclude_config:
715+
exclude_fields = set(parse_comma_separated_list(exclude_config))
716+
for col in list(column_mapping.keys()):
717+
if column_mapping[col] in exclude_fields:
718+
column_mapping[col] = None
719+
720+
# Apply renames (format: "from:to,from2:to2")
721+
if rename_config:
722+
rename_pairs = parse_comma_separated_list(rename_config)
723+
rename_map = {}
724+
for pair in rename_pairs:
725+
if ":" in pair:
726+
from_field, to_field = pair.split(":", 1)
727+
rename_map[from_field.strip()] = to_field.strip()
728+
729+
# Apply renames to the column mapping
730+
for col in list(column_mapping.keys()):
731+
current_name = column_mapping[col]
732+
if current_name and current_name in rename_map:
733+
column_mapping[col] = rename_map[current_name]
734+
735+
return column_mapping
736+
651737
def _extract_fine_grained_lineage(
652738
self,
653739
source_dataset: str,
@@ -706,16 +792,30 @@ def _extract_fine_grained_lineage(
706792
env=self.config.env,
707793
)
708794

795+
# Apply ReplaceField transforms to column mappings
796+
# source_schema is Dict[str, str] mapping column names to types
797+
column_mapping = self._apply_replace_field_transform(
798+
list(source_schema.keys())
799+
)
800+
709801
# Create fine-grained lineage for each source column
710-
# Assume 1:1 mapping (column names are preserved)
711802
fine_grained_lineages: List[FineGrainedLineageDict] = []
712803

713804
for source_col in source_schema:
805+
target_col = column_mapping.get(source_col)
806+
807+
# Skip if field was dropped by ReplaceField transform
808+
if target_col is None:
809+
logger.debug(
810+
f"Skipping column '{source_col}' - dropped by ReplaceField transform"
811+
)
812+
continue
813+
714814
fine_grained_lineage: FineGrainedLineageDict = {
715815
"upstreamType": "FIELD_SET",
716816
"downstreamType": "FIELD",
717817
"upstreams": [make_schema_field_urn(source_urn_str, source_col)],
718-
"downstreams": [make_schema_field_urn(str(target_urn), source_col)],
818+
"downstreams": [make_schema_field_urn(str(target_urn), target_col)],
719819
}
720820
fine_grained_lineages.append(fine_grained_lineage)
721821

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/transform_plugins.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,37 @@ def should_apply_automatically(cls) -> bool:
189189
return False # Complex transforms require explicit user configuration
190190

191191

192+
class ReplaceFieldPlugin(TransformPlugin):
193+
"""
194+
Plugin for ReplaceField transforms.
195+
196+
ReplaceField transforms only affect message field names (include/exclude/rename),
197+
not topic names, so they're a no-op for topic transformation but need to be
198+
registered as known transforms to avoid warnings.
199+
"""
200+
201+
SUPPORTED_TYPES = {
202+
"org.apache.kafka.connect.transforms.ReplaceField$Value",
203+
"org.apache.kafka.connect.transforms.ReplaceField$Key",
204+
}
205+
206+
@classmethod
207+
def supports_transform_type(cls, transform_type: str) -> bool:
208+
return transform_type in cls.SUPPORTED_TYPES
209+
210+
def apply_forward(self, topics: List[str], config: TransformConfig) -> List[str]:
211+
"""ReplaceField doesn't affect topic names, only field names within messages."""
212+
return topics
213+
214+
def apply_reverse(self, topics: List[str], config: TransformConfig) -> List[str]:
215+
"""ReplaceField doesn't affect topic names, only field names within messages."""
216+
return topics
217+
218+
@classmethod
219+
def should_apply_automatically(cls) -> bool:
220+
return True # Safe to apply automatically - it's a no-op for topic names
221+
222+
192223
class TransformPluginRegistry:
193224
"""Registry for transform plugins."""
194225

@@ -200,6 +231,7 @@ def _register_default_plugins(self):
200231
"""Register default transform plugins."""
201232
self.register(RegexRouterPlugin())
202233
self.register(ComplexTransformPlugin())
234+
self.register(ReplaceFieldPlugin())
203235

204236
def register(self, plugin: TransformPlugin) -> None:
205237
"""Register a transform plugin."""

0 commit comments

Comments
 (0)