Skip to content

Commit 2ede18b

Browse files
committed
Add a couple of fixes to Debezium source
1 parent 966e4f4 commit 2ede18b

File tree

4 files changed

+856
-131
lines changed

4 files changed

+856
-131
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/kafka_connect.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import base64
22
import logging
3-
from functools import lru_cache
43
from typing import Dict, Iterable, List, Optional
54

65
import jpype
@@ -114,6 +113,9 @@ def __init__(self, config: KafkaConnectSourceConfig, ctx: PipelineContext):
114113
self.session, self.config, self.report
115114
)
116115

116+
# Cache for all Kafka topics (single ingestion run)
117+
self._all_kafka_topics_cache: Optional[List[str]] = None
118+
117119
if not jpype.isJVMStarted():
118120
jpype.startJVM()
119121

@@ -445,18 +447,23 @@ def _get_topics_confluent_cloud_from_manifest(
445447
# Final fallback to config-based approach
446448
return self._get_topics_from_connector_config(connector_manifest)
447449

448-
@lru_cache(maxsize=1)
449450
def _get_all_topics_from_kafka_api(self) -> List[str]:
450451
"""
451452
Get all topics from Confluent Cloud Kafka REST API v3.
452453
453454
This provides the comprehensive topic list needed for the reverse transform
454455
pipeline strategy to work effectively.
455456
457+
Uses instance variable caching to avoid repeated API calls during a single ingestion run.
458+
456459
Returns:
457460
List of all topic names from the Kafka cluster.
458461
Empty list if API is not accessible or fails.
459462
"""
463+
# Return cached result if available
464+
if self._all_kafka_topics_cache is not None:
465+
return self._all_kafka_topics_cache
466+
460467
try:
461468
# Extract cluster information from Connect URI
462469
kafka_rest_endpoint, cluster_id = self._parse_confluent_cloud_info()
@@ -507,13 +514,19 @@ def _get_all_topics_from_kafka_api(self) -> List[str]:
507514
logger.info(
508515
f"Retrieved {len(all_topics)} topics from Confluent Cloud Kafka REST API v3"
509516
)
517+
# Cache the result for subsequent calls
518+
self._all_kafka_topics_cache = all_topics
510519
return all_topics
511520
else:
512521
logger.warning("Unexpected response format from Kafka REST API")
522+
# Cache empty result to avoid repeated failures
523+
self._all_kafka_topics_cache = []
513524
return []
514525

515526
except Exception as e:
516527
logger.debug(f"Failed to get topics from Kafka REST API: {e}")
528+
# Cache empty result to avoid repeated API calls on failure
529+
self._all_kafka_topics_cache = []
517530
return []
518531

519532
def _parse_confluent_cloud_info(self) -> tuple[Optional[str], Optional[str]]:

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/source_connectors.py

Lines changed: 145 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1692,15 +1692,26 @@ def extract_lineages(self) -> List[KafkaConnectLineage]:
16921692
source_platform = parser.source_platform
16931693
server_name = parser.server_name
16941694
database_name = parser.database_name
1695+
1696+
if not self.connector_manifest.topic_names:
1697+
return lineages
1698+
1699+
# Check for EventRouter transform - requires special handling
1700+
if self._has_event_router_transform():
1701+
logger.debug(
1702+
f"Connector {self.connector_manifest.name} uses EventRouter transform - using table-based lineage extraction"
1703+
)
1704+
return self._extract_lineages_for_event_router(
1705+
source_platform, database_name
1706+
)
1707+
1708+
# Standard Debezium topic processing
16951709
# Escape server_name to handle cases where topic.prefix contains dots
16961710
# Some users configure topic.prefix like "my.server" which breaks the regex
16971711
server_name = server_name or ""
16981712
# Regex pattern (\w+\.\w+(?:\.\w+)?) supports BOTH 2-part and 3-part table names
16991713
topic_naming_pattern = rf"({re.escape(server_name)})\.(\w+\.\w+(?:\.\w+)?)"
17001714

1701-
if not self.connector_manifest.topic_names:
1702-
return lineages
1703-
17041715
# Handle connectors with 2-level container (database + schema) in topic pattern
17051716
connector_class = self.connector_manifest.config.get(CONNECTOR_CLASS, "")
17061717
maybe_duplicated_database_name = (
@@ -1749,6 +1760,137 @@ def extract_lineages(self) -> List[KafkaConnectLineage]:
17491760

17501761
return []
17511762

1763+
def _has_event_router_transform(self) -> bool:
1764+
"""Check if connector uses Debezium EventRouter transform."""
1765+
transforms_config = self.connector_manifest.config.get("transforms", "")
1766+
if not transforms_config:
1767+
return False
1768+
1769+
transform_names = parse_comma_separated_list(transforms_config)
1770+
for name in transform_names:
1771+
transform_type = self.connector_manifest.config.get(
1772+
f"transforms.{name}.type", ""
1773+
)
1774+
if transform_type == "io.debezium.transforms.outbox.EventRouter":
1775+
return True
1776+
1777+
return False
1778+
1779+
def _extract_lineages_for_event_router(
1780+
self, source_platform: str, database_name: Optional[str]
1781+
) -> List[KafkaConnectLineage]:
1782+
"""
1783+
Extract lineages for connectors using EventRouter transform.
1784+
1785+
EventRouter is a data-dependent transform that reads fields from row data
1786+
to determine output topics. We cannot predict output topics from configuration alone,
1787+
so we extract source tables from table.include.list and try to match them to
1788+
actual topics using RegexRouter patterns.
1789+
1790+
Reference: https://debezium.io/documentation/reference/transformations/outbox-event-router.html
1791+
"""
1792+
lineages: List[KafkaConnectLineage] = []
1793+
1794+
# Extract source tables from configuration
1795+
table_config = self.connector_manifest.config.get(
1796+
"table.include.list"
1797+
) or self.connector_manifest.config.get("table.whitelist")
1798+
1799+
if not table_config:
1800+
logger.warning(
1801+
f"EventRouter connector {self.connector_manifest.name} has no table.include.list config"
1802+
)
1803+
return lineages
1804+
1805+
table_names = parse_comma_separated_list(table_config)
1806+
1807+
# Try to filter topics using RegexRouter replacement pattern (if available)
1808+
filtered_topics = self._filter_topics_for_event_router()
1809+
1810+
# For each source table, create lineages to filtered topics
1811+
for table_name in table_names:
1812+
# Clean quoted table names
1813+
clean_table = table_name.strip('"')
1814+
1815+
# Apply database name if present
1816+
if database_name:
1817+
source_dataset = get_dataset_name(database_name, clean_table)
1818+
else:
1819+
source_dataset = clean_table
1820+
1821+
# Create lineages from this source table to filtered topics
1822+
for topic in filtered_topics:
1823+
lineage = KafkaConnectLineage(
1824+
source_dataset=source_dataset,
1825+
source_platform=source_platform,
1826+
target_dataset=topic,
1827+
target_platform=KAFKA,
1828+
)
1829+
lineages.append(lineage)
1830+
1831+
logger.info(
1832+
f"Created {len(lineages)} EventRouter lineages from {len(table_names)} source tables "
1833+
f"to {len(filtered_topics)} topics for connector {self.connector_manifest.name}"
1834+
)
1835+
1836+
return lineages
1837+
1838+
def _filter_topics_for_event_router(self) -> List[str]:
1839+
"""
1840+
Filter topics for EventRouter connectors using RegexRouter replacement pattern.
1841+
1842+
EventRouter often works with RegexRouter to rename output topics. We can use
1843+
the RegexRouter replacement pattern to identify which topics belong to this connector.
1844+
"""
1845+
# Look for RegexRouter transform configuration
1846+
transforms_config = self.connector_manifest.config.get("transforms", "")
1847+
if not transforms_config:
1848+
return list(self.connector_manifest.topic_names)
1849+
1850+
transform_names = parse_comma_separated_list(transforms_config)
1851+
1852+
# Find RegexRouter configuration
1853+
regex_replacement = None
1854+
for name in transform_names:
1855+
transform_type = self.connector_manifest.config.get(
1856+
f"transforms.{name}.type", ""
1857+
)
1858+
if transform_type in [
1859+
"org.apache.kafka.connect.transforms.RegexRouter",
1860+
"io.confluent.connect.cloud.transforms.TopicRegexRouter",
1861+
]:
1862+
# Extract the replacement pattern
1863+
# Example: "dev.ern.cashout.$1" -> we want topics starting with "dev.ern.cashout."
1864+
replacement = self.connector_manifest.config.get(
1865+
f"transforms.{name}.replacement", ""
1866+
)
1867+
if replacement:
1868+
# Extract prefix from replacement pattern (before first $)
1869+
# "dev.ern.cashout.$1" -> "dev.ern.cashout."
1870+
if "$" in replacement:
1871+
regex_replacement = replacement.split("$")[0]
1872+
else:
1873+
regex_replacement = replacement
1874+
break
1875+
1876+
# Filter topics using the replacement prefix
1877+
if regex_replacement:
1878+
filtered_topics = [
1879+
topic
1880+
for topic in self.connector_manifest.topic_names
1881+
if topic.startswith(regex_replacement)
1882+
]
1883+
logger.debug(
1884+
f"Filtered EventRouter topics to {len(filtered_topics)} topics matching prefix '{regex_replacement}'"
1885+
)
1886+
return filtered_topics
1887+
1888+
# No RegexRouter found - use all topics (risky but best effort)
1889+
logger.warning(
1890+
f"EventRouter connector {self.connector_manifest.name} has no RegexRouter - cannot filter topics accurately"
1891+
)
1892+
return list(self.connector_manifest.topic_names)
1893+
17521894

17531895
@dataclass
17541896
class ConfigDrivenSourceConnector(BaseConnector):

metadata-ingestion/tests/integration/kafka-connect-confluent-cloud/confluent_cloud_mock_api.py

Lines changed: 82 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import os
44
from functools import wraps
5+
from typing import Any, Dict
56

67
from flask import Flask, jsonify, request
78

@@ -149,9 +150,64 @@ def log_request_info():
149150
"id": None,
150151
"extensions": {},
151152
},
153+
"outbox-source-connector": {
154+
"status": None,
155+
"info": {
156+
"name": "outbox-source-connector",
157+
"type": "source",
158+
"config": {
159+
"after.state.only": "false",
160+
"cloud.environment": "prod",
161+
"cloud.provider": "aws",
162+
"connector.class": "MySqlCdcSource",
163+
"database.connectionTimeZone": "UTC",
164+
"database.dbname": "outbox_db",
165+
"database.hostname": "mysql-primary.us-west-2.rds.amazonaws.com",
166+
"database.password": "****************",
167+
"database.port": "3306",
168+
"database.server.name": "outbox-db-server",
169+
"database.ssl.mode": "required",
170+
"database.sslmode": "require",
171+
"database.user": "outbox_connector",
172+
"hstore.handling.mode": "json",
173+
"interval.handling.mode": "numeric",
174+
"kafka.auth.mode": "SERVICE_ACCOUNT",
175+
"kafka.endpoint": "SASL_SSL://pkc-abc123.us-west-2.aws.confluent.cloud:9092",
176+
"kafka.region": "us-west-2",
177+
"kafka.service.account.id": "sa-123456",
178+
"max.batch.size": "2000",
179+
"name": "outbox-source-connector",
180+
"output.data.format": "JSON",
181+
"output.key.format": "JSON",
182+
"poll.interval.ms": "500",
183+
"provide.transaction.metadata": "true",
184+
"snapshot.locking.mode": "none",
185+
"snapshot.mode": "schema_only",
186+
"table.include.list": "outbox_db.outbox",
187+
"tasks.max": "1",
188+
"tombstones.on.delete": "false",
189+
"transforms": "EventRouter,RegexRouter",
190+
"transforms.EventRouter.route.by.field": "_route_suffix",
191+
"transforms.EventRouter.table.expand.json.payload": "true",
192+
"transforms.EventRouter.table.field.event.id": "event_id",
193+
"transforms.EventRouter.table.field.event.key": "aggregate_id",
194+
"transforms.EventRouter.table.field.event.payload": "payload",
195+
"transforms.EventRouter.table.field.event.type": "event_type",
196+
"transforms.EventRouter.table.fields.additional.placement": "event_timestamp:header,event_version:header,aggregate_type:header",
197+
"transforms.EventRouter.type": "io.debezium.transforms.outbox.EventRouter",
198+
"transforms.RegexRouter.regex": "outbox.event.(.*)",
199+
"transforms.RegexRouter.replacement": "app.events.$1",
200+
"transforms.RegexRouter.type": "io.confluent.connect.cloud.transforms.TopicRegexRouter",
201+
},
202+
"tasks": [{"connector": "outbox-source-connector", "task": 0}],
203+
},
204+
"id": None,
205+
"extensions": {},
206+
},
152207
}
153208

154-
TOPICS_DATA = {
209+
TOPICS_DATA: Dict[str, Any] = {
210+
"kind": "KafkaTopicList",
155211
"data": [
156212
{
157213
"topic_name": "public.customer_profiles",
@@ -201,7 +257,31 @@ def log_request_info():
201257
"is_internal": False,
202258
"cluster_id": "4k0R9d1GTS5tI9f4Y2xZ0Q",
203259
},
204-
]
260+
{
261+
"topic_name": "app.events.order_created",
262+
"partitions": {
263+
"related": "https://api.confluent.cloud/kafka/v3/clusters/4k0R9d1GTS5tI9f4Y2xZ0Q/topics/app.events.order_created/partitions"
264+
},
265+
"configs": {
266+
"related": "https://api.confluent.cloud/kafka/v3/clusters/4k0R9d1GTS5tI9f4Y2xZ0Q/topics/app.events.order_created/configs"
267+
},
268+
"replication_factor": 3,
269+
"is_internal": False,
270+
"cluster_id": "4k0R9d1GTS5tI9f4Y2xZ0Q",
271+
},
272+
{
273+
"topic_name": "app.events.order_updated",
274+
"partitions": {
275+
"related": "https://api.confluent.cloud/kafka/v3/clusters/4k0R9d1GTS5tI9f4Y2xZ0Q/topics/app.events.order_updated/partitions"
276+
},
277+
"configs": {
278+
"related": "https://api.confluent.cloud/kafka/v3/clusters/4k0R9d1GTS5tI9f4Y2xZ0Q/topics/app.events.order_updated/configs"
279+
},
280+
"replication_factor": 3,
281+
"is_internal": False,
282+
"cluster_id": "4k0R9d1GTS5tI9f4Y2xZ0Q",
283+
},
284+
],
205285
}
206286

207287

0 commit comments

Comments
 (0)