Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion metadata-ingestion/src/datahub/sdk/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from datahub.errors import (
IngestionAttributionWarning,
)
from datahub.metadata.urns import DataFlowUrn, Urn
from datahub.metadata.urns import DataFlowUrn, DataPlatformInstanceUrn, Urn
from datahub.sdk._attribution import is_ingestion_attribution
from datahub.sdk._shared import (
DomainInputType,
Expand Down Expand Up @@ -123,6 +123,7 @@ def __init__(
self._set_extra_aspects(extra_aspects)

self._set_platform_instance(urn.orchestrator, platform_instance)
self._set_default_browse_path(platform_instance)

# Initialize DataFlowInfoClass directly with name
self._setdefault_aspect(models.DataFlowInfoClass(name=display_name or name))
Expand Down Expand Up @@ -307,3 +308,25 @@ def set_last_modified(self, last_modified: datetime) -> None:
def env(self) -> Optional[Union[str, models.FabricTypeClass]]:
"""Get the environment of the dataflow."""
return self._ensure_dataflow_props().env

def _set_default_browse_path(self, platform_instance: Optional[str]) -> None:
"""Set a default browse path for the dataflow based on platform and instance.

This creates a browse path with the platform instance as the root if provided,
preventing the dataflow from appearing in a generic "Default" folder.

Args:
platform_instance: Optional platform instance identifier.
"""
browse_path = []
if platform_instance:
platform_instance_urn = DataPlatformInstanceUrn(
self.urn.orchestrator, platform_instance
)
browse_path.append(
models.BrowsePathEntryClass(
id=platform_instance_urn.urn(), urn=platform_instance_urn.urn()
)
)

self._set_aspect(models.BrowsePathsV2Class(path=browse_path))
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,17 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": []
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,example_dag,PROD)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
"changeType": "UPSERT",
"aspectName": "browsePathsV2",
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
}
]
}
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
},
{
"id": "my_instance.example_dag",
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
"aspect": {
"json": {
"path": [
{
"id": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)",
"urn": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:airflow,my_instance)"
},
{
"id": "my_instance.example_dag",
"urn": "urn:li:dataFlow:(airflow,my_instance.example_dag,PROD)"
Expand Down
29 changes: 29 additions & 0 deletions metadata-ingestion/tests/unit/sdk_v2/test_dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,32 @@ def test_dataflow_with_container() -> None:
)
assert flow.parent_container == container.urn
assert flow.browse_path == [container.urn]


def test_dataflow_browse_path_with_platform_instance() -> None:
"""Test that DataFlow with platform_instance has a correct browse path."""
flow = DataFlow(
platform="fivetran",
name="my_connector",
platform_instance="prod_instance",
)

assert flow.platform_instance is not None
assert flow.browse_path is not None
assert len(flow.browse_path) == 1
assert (
str(flow.browse_path[0])
== "urn:li:dataPlatformInstance:(urn:li:dataPlatform:fivetran,prod_instance)"
)


def test_dataflow_browse_path_without_platform_instance() -> None:
"""Test that DataFlow without platform_instance has an empty browse path."""
flow = DataFlow(
platform="fivetran",
name="my_connector",
)

assert flow.platform_instance is None
assert flow.browse_path is not None
assert len(flow.browse_path) == 0
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/unit/sdk_v2/test_datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,10 @@ def test_datajob_complex() -> None:
assert job.platform == DataPlatformUrn("airflow")
assert job.platform_instance == flow.platform_instance
assert job.platform_instance == DataPlatformInstanceUrn("airflow", "my_instance")
assert job.browse_path == [flow.urn]
assert job.browse_path == [
DataPlatformInstanceUrn("airflow", "my_instance"),
flow.urn,
]

# Validate golden file
assert_entity_golden(job, GOLDEN_DIR / "test_datajob_complex_golden.json")
Expand Down
Loading