diff --git a/src/sempy_labs/_helper_functions.py b/src/sempy_labs/_helper_functions.py index d28b483f..2630c131 100644 --- a/src/sempy_labs/_helper_functions.py +++ b/src/sempy_labs/_helper_functions.py @@ -1929,12 +1929,12 @@ def get_token(audience="pbi"): elif client == "fabric_sp": token = auth.token_provider.get() or get_token c = fabric.FabricRestClient(token_provider=token) - elif client in ["azure", "graph"]: + elif client in ["azure", "graph", "keyvault"]: pass else: raise ValueError(f"{icons.red_dot} The '{client}' client is not supported.") - if client not in ["azure", "graph"]: + if client not in ["azure", "graph", "keyvault"]: if method == "get": response = c.get(request) elif method == "delete": @@ -1947,7 +1947,7 @@ def get_token(audience="pbi"): response = c.put(request, json=payload) else: raise NotImplementedError - else: + elif client in ["azure", "graph"]: headers = _get_headers(auth.token_provider.get(), audience=client) if client == "graph": url = f"https://graph.microsoft.com/v1.0/{request}" @@ -1961,6 +1961,14 @@ def get_token(audience="pbi"): headers=headers, json=payload, ) + elif client == "keyvault": + token = notebookutils.credentials.getToken("keyvault") + headers = {"Authorization": f"Bearer {token}"} + response = requests.request( + method.upper(), f"{request}?api-version=7.4", headers=headers, json=payload + ) + else: + raise NotImplementedError if lro_return_json: return lro(c, response, status_codes).json() diff --git a/src/sempy_labs/keyvault/__init__.py b/src/sempy_labs/keyvault/__init__.py new file mode 100644 index 00000000..54086b84 --- /dev/null +++ b/src/sempy_labs/keyvault/__init__.py @@ -0,0 +1,18 @@ +from ._secrets import ( + get_secret, + set_secret, + delete_secret, + list_secrets, + recover_deleted_secret, + purge_deleted_secret, +) + + +__all__ = [ + "get_secret", + "set_secret", + "delete_secret", + "list_secrets", + "recover_deleted_secret", + "purge_deleted_secret", +] diff --git a/src/sempy_labs/keyvault/_secrets.py b/src/sempy_labs/keyvault/_secrets.py new file mode 100644 index 00000000..fd005616 --- /dev/null +++ b/src/sempy_labs/keyvault/_secrets.py @@ -0,0 +1,267 @@ +from typing import Any +import pandas as pd +from sempy_labs._helper_functions import ( + _base_api, + _create_dataframe, +) +import sempy_labs._icons as icons +from sempy._utils._log import log + + +@log +def get_secret(key_vault_uri: str, secret_name: str) -> Any: + """ + Retrieves the latest version of a secret from the specified Azure Key Vault. + + This is a wrapper function for the following API: `Get Secret - Get Secret `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + secret_name : str + Name of the secret in the Key Vault. + + Returns + ------- + Any + The value of the latest version of the secret. + """ + + response = _base_api( + request=f"{key_vault_uri}/secrets/{secret_name}/versions", client="keyvault" + ) + url = response.json().get("value")[-1].get("id") + response = _base_api(request=f"{url}", client="keyvault") + return response.json().get("value") + + +@log +def set_secret(key_vault_uri: str, secret_name: str, secret_value: Any): + """ + Sets a secret in the specified Azure Key Vault. + + This is a wrapper function for the following API: `Set Secret - Set Secret `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + secret_name : str + Name of the secret to be set in the Key Vault. + secret_value : Any + Value of the secret to be set in the Key Vault. + """ + + payload = {"value": secret_value} + _base_api( + request=f"{key_vault_uri}/secrets/{secret_name}", + client="keyvault", + method="put", + payload=payload, + ) + print( + f"{icons.green_dot} The '{secret_name}' secret has been successfully set within the '{key_vault_uri}' Key Vault." + ) + + +@log +def list_secrets(key_vault_uri: str) -> pd.DataFrame: + """ + Lists all secrets in the specified Azure Key Vault. + + This is a wrapper function for the following API: `Get Secrets - Get Secrets `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + + Returns + ------- + pandas.DataFrame + A pandas DataFrame containing details of all secrets in the Key Vault. + """ + + columns = { + "Secret Id": "str", + "Enabled": "bool", + "Created Date": "datetime", + "Updated Date": "datetime", + "Recovery Level": "str", + "Recoverable Days": "int", + } + + df = _create_dataframe(columns=columns) + + all_items = [] + url = f"{key_vault_uri}/secrets" + while url: + response = _base_api(request=url, client="keyvault") + result = response.json() + items = result.get("value", []) + all_items.extend(items) + url = result.get("nextLink") # Update to next page if it exists + + if not all_items: + return df + + df = pd.json_normalize(all_items) + df.rename( + columns={ + "id": "Secret Id", + "attributes.enabled": "Enabled", + "attributes.created": "Created Date", + "attributes.updated": "Updated Date", + "attributes.recoveryLevel": "Recovery Level", + "attributes.recoverableDays": "Recoverable Days", + }, + inplace=True, + ) + df["Created Date"] = pd.to_datetime(df["Created Date"], unit="s") + df["Updated Date"] = pd.to_datetime(df["Updated Date"], unit="s") + return df + + +@log +def list_deleted_secrets(key_vault_uri: str) -> pd.DataFrame: + """ + Lists all deleted secrets in the specified Azure Key Vault. + + This is a wrapper function for the following API: `Get Deleted Secrets - GetDeleted Secrets `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + + Returns + ------- + pandas.DataFrame + A pandas DataFrame containing details of all secrets in the Key Vault. + """ + + columns = { + "Recovery Id": "str", + "Deleted Date": "datetime", + "Scheduled Purge Date": "datetime", + "Content Type": "str", + "Secret Id": "str", + "Enabled": "bool", + "Created Date": "datetime", + "Updated Date": "datetime", + "Recovery Level": "str", + } + + df = _create_dataframe(columns=columns) + + all_items = [] + url = f"{key_vault_uri}/deletedSecrets" + while url: + response = _base_api(request=url, client="keyvault") + result = response.json() + items = result.get("value", []) + all_items.extend(items) + url = result.get("nextLink") # Update to next page if it exists + + if not all_items: + return df + + df = pd.json_normalize(all_items) + df.rename( + columns={ + "recoveryId": "Recovery Id", + "deletedDate": "Deleted Date", + "scheduledPurgeDate": "Scheduled Purge Date", + "contentType": "Content Type", + "id": "Secret Id", + "attributes.enabled": "Enabled", + "attributes.created": "Created Date", + "attributes.updated": "Updated Date", + "attributes.recoveryLevel": "Recovery Level", + }, + inplace=True, + ) + df["Scheduled Purge Date"] = pd.to_datetime(df["Scheduled Purge Date"], unit="s") + df["Deleted Date"] = pd.to_datetime(df["Deleted Date"], unit="s") + df["Created Date"] = pd.to_datetime(df["Created Date"], unit="s") + df["Updated Date"] = pd.to_datetime(df["Updated Date"], unit="s") + + return df + + +@log +def recover_deleted_secret(key_vault_uri: str, secret_name: str): + """ + Recovers a deleted secret in the specified Azure Key Vault. + + This is a wrapper function for the following API: `Recover Deleted Secret - Recover Deleted Secret `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + secret_name : str + Name of the deleted secret to be recovered in the Key Vault. + """ + + _base_api( + request=f"{key_vault_uri}/deletedsecrets/{secret_name}/recover", + client="keyvault", + method="post", + ) + print( + f"{icons.green_dot} The '{secret_name}' secret within the '{key_vault_uri}' Key Vault has been successfully recovered." + ) + + +@log +def purge_deleted_secret(key_vault_uri: str, secret_name: str): + """ + Permanently deletes the specified secret. + The purge deleted secret operation removes the secret permanently, without the possibility of recovery. This operation can only be enabled on a soft-delete enabled vault. This operation requires the secrets/purge permission. + + This is a wrapper function for the following API: `Purge Deleted Secret - Purge Deleted Secret `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + secret_name : str + Name of the deleted secret to be recovered in the Key Vault. + """ + + _base_api( + request=f"{key_vault_uri}/deletedsecrets/{secret_name}", + client="keyvault", + method="delete", + status_codes=204, + ) + print( + f"{icons.green_dot} The '{secret_name}' secret within the '{key_vault_uri}' Key Vault has been successfully purged." + ) + + +@log +def delete_secret(key_vault_uri: str, secret_name: str): + """ + Deletes a secret in the specified Azure Key Vault. + + This is a wrapper function for the following API: `Delete Secret - Delete Secret `_. + + Parameters + ---------- + key_vault_uri : str + Azure Key Vault URI. + secret_name : str + Name of the secret to be deleted in the Key Vault. + """ + + _base_api( + request=f"{key_vault_uri}/secrets/{secret_name}", + client="keyvault", + method="delete", + ) + print( + f"{icons.green_dot} The '{secret_name}' secret has been successfully deleted within the '{key_vault_uri}' Key Vault." + )