From a2c3885a044cf0fdbfa7705e42f5e1246dc8720f Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 11 Nov 2025 16:14:08 +0100 Subject: [PATCH] properly add data platform instance to browsepath for DataFlow --- .../src/datahub/sdk/dataflow.py | 25 +++++++++++++++- .../test_dataflow_basic_golden.json | 11 +++++++ .../test_dataflow_complex_golden.json | 16 ++++++++++ .../test_datajob_complex_golden.json | 4 +++ ...est_datajob_init_with_flow_urn_golden.json | 4 +++ .../tests/unit/sdk_v2/test_dataflow.py | 29 +++++++++++++++++++ .../tests/unit/sdk_v2/test_datajob.py | 5 +++- 7 files changed, 92 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/sdk/dataflow.py b/metadata-ingestion/src/datahub/sdk/dataflow.py index 7af266c21a409..dc1d7e9f68c2c 100644 --- a/metadata-ingestion/src/datahub/sdk/dataflow.py +++ b/metadata-ingestion/src/datahub/sdk/dataflow.py @@ -12,7 +12,7 @@ from datahub.errors import ( IngestionAttributionWarning, ) -from datahub.metadata.urns import DataFlowUrn, Urn +from datahub.metadata.urns import DataFlowUrn, DataPlatformInstanceUrn, Urn from datahub.sdk._attribution import is_ingestion_attribution from datahub.sdk._shared import ( DomainInputType, @@ -123,6 +123,7 @@ def __init__( self._set_extra_aspects(extra_aspects) self._set_platform_instance(urn.orchestrator, platform_instance) + self._set_default_browse_path(platform_instance) # Initialize DataFlowInfoClass directly with name self._setdefault_aspect(models.DataFlowInfoClass(name=display_name or name)) @@ -307,3 +308,25 @@ def set_last_modified(self, last_modified: datetime) -> None: def env(self) -> Optional[Union[str, models.FabricTypeClass]]: """Get the environment of the dataflow.""" return self._ensure_dataflow_props().env + + def _set_default_browse_path(self, platform_instance: Optional[str]) -> None: + """Set a default browse path for the dataflow based on platform and instance. + + This creates a browse path with the platform instance as the root if provided, + preventing the dataflow from appearing in a generic "Default" folder. + + Args: + platform_instance: Optional platform instance identifier. + """ + browse_path = [] + if platform_instance: + platform_instance_urn = DataPlatformInstanceUrn( + self.urn.orchestrator, platform_instance + ) + browse_path.append( + models.BrowsePathEntryClass( + id=platform_instance_urn.urn(), urn=platform_instance_urn.urn() + ) + ) + + self._set_aspect(models.BrowsePathsV2Class(path=browse_path)) diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_basic_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_basic_golden.json index 1453d8dc8e720..226e17ebfdecf 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_basic_golden.json +++ b/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_basic_golden.json @@ -10,6 +10,17 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)", diff --git a/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_complex_golden.json b/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_complex_golden.json index 14c1ba397c2a5..559fa33124a6a 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_complex_golden.json +++ b/metadata-ingestion/tests/unit/sdk_v2/dataflow_golden/test_dataflow_complex_golden.json @@ -11,6 +11,22 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)" + } + ] + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)", diff --git a/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_complex_golden.json b/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_complex_golden.json index cdd8b6ff98c19..071ee067e9321 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_complex_golden.json +++ b/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_complex_golden.json @@ -19,6 +19,10 @@ "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)" + }, { "id": "my_instance.example_dag", "urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)" diff --git a/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_init_with_flow_urn_golden.json b/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_init_with_flow_urn_golden.json index ddd1687ed6706..cd86ce7270923 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_init_with_flow_urn_golden.json +++ b/metadata-ingestion/tests/unit/sdk_v2/datajob_golden/test_datajob_init_with_flow_urn_golden.json @@ -19,6 +19,10 @@ "aspect": { "json": { "path": [ + { + "id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)", + "urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)" + }, { "id": "my_instance.example_dag", "urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)" diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py b/metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py index b12b322c5785b..05c7b3186079c 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py +++ b/metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py @@ -204,3 +204,32 @@ def test_dataflow_with_container() -> None: ) assert flow.parent_container == container.urn assert flow.browse_path == [container.urn] + + +def test_dataflow_browse_path_with_platform_instance() -> None: + """Test that DataFlow with platform_instance has a correct browse path.""" + flow = DataFlow( + platform="fivetran", + name="my_connector", + platform_instance="prod_instance", + ) + + assert flow.platform_instance is not None + assert flow.browse_path is not None + assert len(flow.browse_path) == 1 + assert ( + str(flow.browse_path[0]) + == "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,prod_instance)" + ) + + +def test_dataflow_browse_path_without_platform_instance() -> None: + """Test that DataFlow without platform_instance has an empty browse path.""" + flow = DataFlow( + platform="fivetran", + name="my_connector", + ) + + assert flow.platform_instance is None + assert flow.browse_path is not None + assert len(flow.browse_path) == 0 diff --git a/metadata-ingestion/tests/unit/sdk_v2/test_datajob.py b/metadata-ingestion/tests/unit/sdk_v2/test_datajob.py index ea591e823179e..c1d7eca411f72 100644 --- a/metadata-ingestion/tests/unit/sdk_v2/test_datajob.py +++ b/metadata-ingestion/tests/unit/sdk_v2/test_datajob.py @@ -116,7 +116,10 @@ def test_datajob_complex() -> None: assert job.platform == DataPlatformUrn("airflow") assert job.platform_instance == flow.platform_instance assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance") - assert job.browse_path == [flow.urn] + assert job.browse_path == [ + DataPlatformInstanceUrn("airflow", "my_instance"), + flow.urn, + ] # Validate golden file assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")