Skip to content

Commit dcef746

Browse files
committed
Couple of pr review fixes
1 parent 3710245 commit dcef746

File tree

2 files changed

+66
-52
lines changed

2 files changed

+66
-52
lines changed

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/common.py

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from dataclasses import dataclass, field
3-
from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional
3+
from typing import TYPE_CHECKING, Dict, Final, List, Optional, TypedDict
44

55
from pydantic import model_validator
66
from pydantic.fields import Field
@@ -18,6 +18,7 @@
1818
StatefulIngestionConfigBase,
1919
)
2020
from datahub.utilities.lossy_collections import LossyList
21+
from datahub.utilities.urns.dataset_urn import DatasetUrn
2122

2223
if TYPE_CHECKING:
2324
from datahub.sql_parsing.schema_resolver import SchemaResolver
@@ -33,6 +34,15 @@
3334
DEFAULT_CONNECT_URI: Final[str] = "http://localhost:8083/"
3435

3536

37+
class FineGrainedLineageDict(TypedDict):
38+
"""Structure for fine-grained (column-level) lineage mappings."""
39+
40+
upstreamType: str
41+
downstreamType: str
42+
upstreams: List[str]
43+
downstreams: List[str]
44+
45+
3646
class ConnectorConfigKeys:
3747
"""Centralized configuration keys to avoid magic strings throughout the codebase."""
3848

@@ -361,7 +371,7 @@ class KafkaConnectLineage:
361371
target_platform: str
362372
job_property_bag: Optional[Dict[str, str]] = None
363373
source_dataset: Optional[str] = None
364-
fine_grained_lineages: Optional[List[Dict[str, Any]]] = None
374+
fine_grained_lineages: Optional[List[FineGrainedLineageDict]] = None
365375

366376

367377
@dataclass
@@ -664,7 +674,7 @@ def _extract_fine_grained_lineage(
664674
source_platform: str,
665675
target_dataset: str,
666676
target_platform: str = "kafka",
667-
) -> Optional[List[Dict[str, Any]]]:
677+
) -> Optional[List[FineGrainedLineageDict]]:
668678
"""
669679
Extract column-level lineage using schema metadata from DataHub.
670680
@@ -681,9 +691,9 @@ def _extract_fine_grained_lineage(
681691
List of fine-grained lineage dictionaries or None if not available
682692
"""
683693
# Check if feature is enabled
684-
if not getattr(self.config, "use_schema_resolver", False):
694+
if not self.config.use_schema_resolver:
685695
return None
686-
if not getattr(self.config, "schema_resolver_finegrained_lineage", False):
696+
if not self.config.schema_resolver_finegrained_lineage:
687697
return None
688698
if not self.schema_resolver:
689699
return None
@@ -718,10 +728,10 @@ def _extract_fine_grained_lineage(
718728

719729
# Create fine-grained lineage for each source column
720730
# Assume 1:1 mapping (column names are preserved)
721-
fine_grained_lineages = []
731+
fine_grained_lineages: List[FineGrainedLineageDict] = []
722732

723733
for source_col in source_schema:
724-
fine_grained_lineage = {
734+
fine_grained_lineage: FineGrainedLineageDict = {
725735
"upstreamType": "FIELD_SET",
726736
"downstreamType": "FIELD",
727737
"upstreams": [make_schema_field_urn(source_urn_str, source_col)],
@@ -744,5 +754,23 @@ def _extract_fine_grained_lineage(
744754

745755
return None
746756

757+
def _extract_table_name_from_urn(self, urn: str) -> Optional[str]:
758+
"""
759+
Extract table name from DataHub URN using standard DatasetUrn parser.
760+
761+
Args:
762+
urn: DataHub dataset URN
763+
Format: urn:li:dataset:(urn:li:dataPlatform:platform,table_name,ENV)
764+
Example: urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)
765+
766+
Returns:
767+
Extracted table name (e.g., "database.schema.table") or None if parsing fails
768+
"""
769+
try:
770+
return DatasetUrn.from_string(urn).name
771+
except Exception as e:
772+
logger.debug(f"Failed to extract table name from URN {urn}: {e}")
773+
return None
774+
747775

748776
# Removed: TopicResolver and ConnectorTopicHandlerRegistry - logic moved directly to BaseConnector subclasses

metadata-ingestion/src/datahub/ingestion/source/kafka_connect/source_connectors.py

Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
import re
3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
44
from typing import Dict, Final, Iterable, List, Optional, Tuple
55

66
from sqlalchemy.engine.url import make_url
@@ -1512,7 +1512,8 @@ def get_platform(self) -> str:
15121512
try:
15131513
parser = self.get_parser(self.connector_manifest)
15141514
return parser.source_platform
1515-
except Exception:
1515+
except Exception as e:
1516+
logger.debug(f"Failed to get platform from parser: {e}")
15161517
# If parser fails, try to infer from JDBC URL directly
15171518
jdbc_url = self.connector_manifest.config.get("connection.url", "")
15181519
if jdbc_url:
@@ -1531,6 +1532,8 @@ class SnowflakeSourceConnector(BaseConnector):
15311532
Topic naming: <topic.prefix><database.schema.tableName>
15321533
"""
15331534

1535+
_cached_expanded_tables: Optional[List[str]] = field(default=None, init=False)
1536+
15341537
@dataclass
15351538
class SnowflakeSourceParser:
15361539
source_platform: str
@@ -1770,6 +1773,10 @@ def _query_tables_from_datahub(
17701773
)
17711774
regex = re.compile(regex_pattern)
17721775

1776+
# TODO: Performance optimization - This loops through ALL datasets in DataHub
1777+
# for the platform without filtering. For large DataHub instances with thousands
1778+
# of tables, this could be very slow. Consider using graph.get_urns_by_filter()
1779+
# with more specific filters or implementing pagination.
17731780
for urn in all_urns:
17741781
# URN format: urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)
17751782
table_name = self._extract_table_name_from_urn(urn)
@@ -1789,31 +1796,20 @@ def _query_tables_from_datahub(
17891796
)
17901797
return matched_tables
17911798

1799+
except (ConnectionError, TimeoutError) as e:
1800+
logger.error(f"Failed to connect to DataHub for pattern '{pattern}': {e}")
1801+
if self.report:
1802+
self.report.report_failure(
1803+
f"datahub_connection_{self.connector_manifest.name}", str(e)
1804+
)
1805+
return []
17921806
except Exception as e:
17931807
logger.warning(
1794-
f"Failed to query tables from DataHub for pattern '{pattern}': {e}"
1808+
f"Failed to query tables from DataHub for pattern '{pattern}': {e}",
1809+
exc_info=True,
17951810
)
17961811
return []
17971812

1798-
def _extract_table_name_from_urn(self, urn: str) -> Optional[str]:
1799-
"""
1800-
Extract table name from DataHub URN.
1801-
1802-
URN format: urn:li:dataset:(urn:li:dataPlatform:snowflake,database.schema.table,PROD)
1803-
Returns: database.schema.table
1804-
"""
1805-
try:
1806-
# Simple parsing - extract between second comma and third comma
1807-
parts = urn.split(",")
1808-
if len(parts) >= 2:
1809-
# Second part contains the table name
1810-
table_name = parts[1]
1811-
return table_name
1812-
except Exception as e:
1813-
logger.debug(f"Failed to extract table name from URN {urn}: {e}")
1814-
1815-
return None
1816-
18171813
def extract_lineages(self) -> List[KafkaConnectLineage]:
18181814
"""
18191815
Extract lineage mappings from Snowflake tables to Kafka topics.
@@ -1836,7 +1832,7 @@ def extract_lineages(self) -> List[KafkaConnectLineage]:
18361832
return []
18371833

18381834
# Check if we have cached expanded tables from get_topics_from_config()
1839-
if hasattr(self, "_cached_expanded_tables"):
1835+
if self._cached_expanded_tables is not None:
18401836
table_names = self._cached_expanded_tables
18411837
if not table_names:
18421838
logger.debug(
@@ -2386,8 +2382,9 @@ def _expand_table_patterns(
23862382
List of fully expanded table names
23872383
"""
23882384
# Check if feature is enabled
2389-
if not getattr(self.config, "use_schema_resolver", False) or not getattr(
2390-
self.config, "schema_resolver_expand_patterns", False
2385+
if (
2386+
not self.config.use_schema_resolver
2387+
or not self.config.schema_resolver_expand_patterns
23912388
):
23922389
# Fall back to original behavior - parse as-is
23932390
return parse_comma_separated_list(table_config)
@@ -2537,31 +2534,20 @@ def _query_tables_from_datahub(
25372534
)
25382535
return matched_tables
25392536

2537+
except (ConnectionError, TimeoutError) as e:
2538+
logger.error(f"Failed to connect to DataHub for pattern '{pattern}': {e}")
2539+
if self.report:
2540+
self.report.report_failure(
2541+
f"datahub_connection_{self.connector_manifest.name}", str(e)
2542+
)
2543+
return []
25402544
except Exception as e:
25412545
logger.warning(
2542-
f"Failed to query tables from DataHub for pattern '{pattern}': {e}"
2546+
f"Failed to query tables from DataHub for pattern '{pattern}': {e}",
2547+
exc_info=True,
25432548
)
25442549
return []
25452550

2546-
def _extract_table_name_from_urn(self, urn: str) -> Optional[str]:
2547-
"""
2548-
Extract table name from DataHub URN.
2549-
2550-
URN format: urn:li:dataset:(urn:li:dataPlatform:postgres,database.schema.table,PROD)
2551-
Returns: database.schema.table
2552-
"""
2553-
try:
2554-
# Simple parsing - extract between second comma and third comma
2555-
parts = urn.split(",")
2556-
if len(parts) >= 2:
2557-
# Second part contains the table name
2558-
table_name = parts[1]
2559-
return table_name
2560-
except Exception as e:
2561-
logger.debug(f"Failed to extract table name from URN {urn}: {e}")
2562-
2563-
return None
2564-
25652551

25662552
@dataclass
25672553
class ConfigDrivenSourceConnector(BaseConnector):

0 commit comments

Comments
 (0)