From 7dd0373f0dcb3a46a641df017ac1ddcfb3a6ee4e Mon Sep 17 00:00:00 2001 From: Michael Date: Tue, 2 Sep 2025 20:49:46 +0300 Subject: [PATCH 1/3] 837 --- src/sempy_labs/_authentication.py | 10 +- src/sempy_labs/lakehouse/_partitioning.py | 165 ++++++++++++++++++++++ 2 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 src/sempy_labs/lakehouse/_partitioning.py diff --git a/src/sempy_labs/_authentication.py b/src/sempy_labs/_authentication.py index 747876a5..94e2b7a0 100644 --- a/src/sempy_labs/_authentication.py +++ b/src/sempy_labs/_authentication.py @@ -19,7 +19,7 @@ class ServicePrincipalTokenProvider(TokenCredential): "azure": "https://management.azure.com/.default", "graph": "https://graph.microsoft.com/.default", "asazure": "https://{region}.asazure.windows.net/.default", - "keyvault": "https://vault.azure.net/.default" + "keyvault": "https://vault.azure.net/.default", } def __init__(self, credential: ClientSecretCredential): @@ -152,10 +152,14 @@ def get_token(self, *scopes, **kwargs) -> AccessToken: scopes = ("pbi",) region = kwargs.pop("region", None) - scopes = [self._get_fully_qualified_scope(scope, region=region) for scope in scopes] + scopes = [ + self._get_fully_qualified_scope(scope, region=region) for scope in scopes + ] return self.credential.get_token(*scopes, **kwargs) - def _get_fully_qualified_scope(self, scope: str, region: Optional[str] = None) -> str: + def _get_fully_qualified_scope( + self, scope: str, region: Optional[str] = None + ) -> str: """ Resolve to fully qualified scope if Fabric short-handed scope is given. Otherwise, return the original scope. diff --git a/src/sempy_labs/lakehouse/_partitioning.py b/src/sempy_labs/lakehouse/_partitioning.py new file mode 100644 index 00000000..b57aec29 --- /dev/null +++ b/src/sempy_labs/lakehouse/_partitioning.py @@ -0,0 +1,165 @@ +from typing import Optional, List +from uuid import UUID +from sempy_labs._helper_functions import ( + _create_spark_session, + create_abfss_path, + resolve_workspace_id, + resolve_lakehouse_id, + _get_delta_table, +) +from sempy._utils._log import log + + +@log +def _get_partitions( + table_name: str, + schema_name: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, + workspace: Optional[str | UUID] = None, +): + + workspace_id = resolve_workspace_id(workspace) + lakehouse_id = resolve_lakehouse_id(lakehouse, workspace) + path = create_abfss_path(lakehouse_id, workspace_id, table_name, schema_name) + + delta_table = _get_delta_table(path) + details_df = delta_table.detail() + + return details_df.collect()[0].asDict() + + +@log +def is_partitioned( + table: str, + schema: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, + workspace: Optional[str | UUID] = None, +) -> bool: + """ + Checks if a delta table is partitioned. + + Parameters + ---------- + table : str + The name of the delta table. + schema : str, optional + The schema of the table to check. If not provided, the default schema is used. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. + Defaults to None which resolves to the lakehouse attached to the notebook. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID used by the lakehouse. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + bool + True if the table is partitioned, False otherwise. + """ + + details = _get_partitions( + table_name=table, schema_name=schema, lakehouse=lakehouse, workspace=workspace + ) + return len(details["partitionColumns"]) > 0 + + +@log +def list_partitioned_columns( + table: str, + schema: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, + workspace: Optional[str | UUID] = None, +) -> List[str]: + """ + Lists the partitioned columns of a delta table. + + Parameters + ---------- + table : str + The name of the delta table. + schema : str, optional + The schema of the table to check. If not provided, the default schema is used. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. + Defaults to None which resolves to the lakehouse attached to the notebook. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID used by the lakehouse. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + + Returns + ------- + List[str] + The list of partitioned columns. + """ + + details = _get_partitions( + table_name=table, schema_name=schema, lakehouse=lakehouse, workspace=workspace + ) + + return details["partitionColumns"] + + +@log +def is_over_partitioned( + table: str, + schema: Optional[str] = None, + lakehouse: Optional[str | UUID] = None, + workspace: Optional[str | UUID] = None, + total_table_size_gb: int = 1000, + average_partition_size_gb: int = 1, +) -> bool: + """ + Checks if a delta table is over-partitioned. + + Parameters + ---------- + table : str + The name of the delta table. + schema : str, optional + The schema of the table to check. If not provided, the default schema is used. + lakehouse : str | uuid.UUID, default=None + The Fabric lakehouse name or ID. + Defaults to None which resolves to the lakehouse attached to the notebook. + workspace : str | uuid.UUID, default=None + The Fabric workspace name or ID used by the lakehouse. + Defaults to None which resolves to the workspace of the attached lakehouse + or if no lakehouse attached, resolves to the workspace of the notebook. + total_table_size_gb : int, default=1000 + Threshold for total table size in GB (default 1TB). + average_partition_size_gb : int, default=1 + Threshold for average partition size in GB. + + Returns + ------- + bool + True if the table is over-partitioned, False otherwise. + """ + + workspace_id = resolve_workspace_id(workspace) + lakehouse_id = resolve_lakehouse_id(lakehouse, workspace) + path = create_abfss_path(lakehouse_id, workspace_id, table, schema) + # Get DeltaTable details + spark = _create_spark_session() + details_df = spark.sql(f"DESCRIBE DETAIL delta.`{path}`") + details = details_df.collect()[0].asDict() + + # Extract relevant fields + size_bytes = details["sizeInBytes"] + partition_cols = details["partitionColumns"] + num_files = details["numFiles"] + + total_size_gb = size_bytes / (1024**3) + + # Only check if the table is partitioned + if len(partition_cols) > 0 and num_files > 0: + avg_partition_size_gb = total_size_gb / num_files + + if ( + total_size_gb < total_table_size_gb + or avg_partition_size_gb < average_partition_size_gb + ): + return True + + return False From eb64fb1d5aba0799a373c460fde8e764f5e641d2 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Sep 2025 09:06:09 +0300 Subject: [PATCH 2/3] fix spelling --- src/sempy_labs/report/_report_rebind.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sempy_labs/report/_report_rebind.py b/src/sempy_labs/report/_report_rebind.py index bceb1387..41d278c9 100644 --- a/src/sempy_labs/report/_report_rebind.py +++ b/src/sempy_labs/report/_report_rebind.py @@ -80,7 +80,7 @@ def report_rebind_all( dataset: str, new_dataset: str, dataset_workspace: Optional[str] = None, - new_dataset_workpace: Optional[str] = None, + new_dataset_workspace: Optional[str] = None, report_workspace: Optional[str | List[str]] = None, ): """ @@ -148,5 +148,5 @@ def report_rebind_all( report=rpt_name, dataset=new_dataset, report_workspace=rpt_wksp, - dataset_workspace=new_dataset_workpace, + dataset_workspace=new_dataset_workspace, ) From 5e6b3d32d5dffa2bd500ec340f9e2b908a549415 Mon Sep 17 00:00:00 2001 From: Michael Date: Wed, 3 Sep 2025 09:06:42 +0300 Subject: [PATCH 3/3] fix --- src/sempy_labs/report/_report_rebind.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sempy_labs/report/_report_rebind.py b/src/sempy_labs/report/_report_rebind.py index 41d278c9..bceb1387 100644 --- a/src/sempy_labs/report/_report_rebind.py +++ b/src/sempy_labs/report/_report_rebind.py @@ -80,7 +80,7 @@ def report_rebind_all( dataset: str, new_dataset: str, dataset_workspace: Optional[str] = None, - new_dataset_workspace: Optional[str] = None, + new_dataset_workpace: Optional[str] = None, report_workspace: Optional[str | List[str]] = None, ): """ @@ -148,5 +148,5 @@ def report_rebind_all( report=rpt_name, dataset=new_dataset, report_workspace=rpt_wksp, - dataset_workspace=new_dataset_workspace, + dataset_workspace=new_dataset_workpace, )