Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/sempy_labs/_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1929,12 +1929,12 @@ def get_token(audience="pbi"):
elif client == "fabric_sp":
token = auth.token_provider.get() or get_token
c = fabric.FabricRestClient(token_provider=token)
elif client in ["azure", "graph"]:
elif client in ["azure", "graph", "keyvault"]:
pass
else:
raise ValueError(f"{icons.red_dot} The '{client}' client is not supported.")

if client not in ["azure", "graph"]:
if client not in ["azure", "graph", "keyvault"]:
if method == "get":
response = c.get(request)
elif method == "delete":
Expand All @@ -1947,7 +1947,7 @@ def get_token(audience="pbi"):
response = c.put(request, json=payload)
else:
raise NotImplementedError
else:
elif client in ["azure", "graph"]:
headers = _get_headers(auth.token_provider.get(), audience=client)
if client == "graph":
url = f"https://graph.microsoft.com/v1.0/{request}"
Expand All @@ -1961,6 +1961,14 @@ def get_token(audience="pbi"):
headers=headers,
json=payload,
)
elif client == "keyvault":
token = notebookutils.credentials.getToken("keyvault")
headers = {"Authorization": f"Bearer {token}"}
response = requests.request(
method.upper(), f"{request}?api-version=7.4", headers=headers, json=payload
)
else:
raise NotImplementedError

if lro_return_json:
return lro(c, response, status_codes).json()
Expand Down
16 changes: 16 additions & 0 deletions src/sempy_labs/keyvault/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from ._secrets import (
get_secret,
set_secret,
delete_secret,
list_secrets,
recover_deleted_secret,
)


__all__ = [
"get_secret",
"set_secret",
"delete_secret",
"list_secrets",
"recover_deleted_secret",
]
240 changes: 240 additions & 0 deletions src/sempy_labs/keyvault/_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
from typing import Any
import pandas as pd
from sempy_labs._helper_functions import (
_base_api,
_create_dataframe,
)
import sempy_labs._icons as icons
from sempy._utils._log import log


@log
def get_secret(key_vault_uri: str, secret_name: str) -> Any:
"""
Retrieves the latest version of a secret from the specified Azure Key Vault.

This is a wrapper function for the following API: `Get Secret - Get Secret <https://learn.microsoft.com/rest/api/keyvault/secrets/get-secret/get-secret>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.
secret_name : str
Name of the secret in the Key Vault.

Returns
-------
Any
The value of the latest version of the secret.
"""

response = _base_api(
request=f"{key_vault_uri}/secrets/{secret_name}/versions", client="keyvault"
)
url = response.json().get("value")[-1].get("id")
response = _base_api(request=f"{url}", client="keyvault")
return response.json().get("value")


@log
def set_secret(key_vault_uri: str, secret_name: str, secret_value: Any):
"""
Sets a secret in the specified Azure Key Vault.

This is a wrapper function for the following API: `Set Secret - Set Secret <https://learn.microsoft.com/rest/api/keyvault/secrets/set-secret/set-secret>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.
secret_name : str
Name of the secret to be set in the Key Vault.
secret_value : Any
Value of the secret to be set in the Key Vault.
"""

payload = {"value": secret_value}
_base_api(
request=f"{key_vault_uri}/secrets/{secret_name}",
client="keyvault",
method="put",
payload=payload,
)
print(
f"{icons.green_dot} The '{secret_name}' secret has been successfully set within the '{key_vault_uri}' Key Vault."
)


@log
def list_secrets(key_vault_uri: str) -> pd.DataFrame:
"""
Lists all secrets in the specified Azure Key Vault.

This is a wrapper function for the following API: `Get Secrets - Get Secrets <https://learn.microsoft.com/rest/api/keyvault/secrets/get-secrets/get-secrets>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.

Returns
-------
pandas.DataFrame
A pandas DataFrame containing details of all secrets in the Key Vault.
"""

columns = {
"Secret Id": "str",
"Enabled": "bool",
"Created Date": "datetime",
"Updated Date": "datetime",
"Recovery Level": "str",
"Recoverable Days": "int",
}

df = _create_dataframe(columns=columns)

all_items = []
url = f"{key_vault_uri}/secrets"
while url:
response = _base_api(request=url, client="keyvault")
result = response.json()
items = result.get("value", [])
all_items.extend(items)
url = result.get("nextLink") # Update to next page if it exists

if not all_items:
return df

df = pd.json_normalize(all_items)
df.rename(
columns={
"id": "Secret Id",
"attributes.enabled": "Enabled",
"attributes.created": "Created Date",
"attributes.updated": "Updated Date",
"attributes.recoveryLevel": "Recovery Level",
"attributes.recoverableDays": "Recoverable Days",
},
inplace=True,
)
df["Created Date"] = pd.to_datetime(df["Created Date"], unit="s")
df["Updated Date"] = pd.to_datetime(df["Updated Date"], unit="s")
return df


@log
def list_deleted_secrets(key_vault_uri: str) -> pd.DataFrame:
"""
Lists all deleted secrets in the specified Azure Key Vault.

This is a wrapper function for the following API: `Get Deleted Secrets - GetDeleted Secrets <https://learn.microsoft.com/rest/api/keyvault/secrets/get-deleted-secrets/get-deleted-secrets>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.

Returns
-------
pandas.DataFrame
A pandas DataFrame containing details of all secrets in the Key Vault.
"""

columns = {
"Recovery Id": "str",
"Deleted Date": "datetime",
"Scheduled Purge Date": "datetime",
"Content Type": "str",
"Secret Id": "str",
"Enabled": "bool",
"Created Date": "datetime",
"Updated Date": "datetime",
"Recovery Level": "str",
}

df = _create_dataframe(columns=columns)

all_items = []
url = f"{key_vault_uri}/deletedSecrets"
while url:
response = _base_api(request=url, client="keyvault")
result = response.json()
items = result.get("value", [])
all_items.extend(items)
url = result.get("nextLink") # Update to next page if it exists

if not all_items:
return df

df = pd.json_normalize(all_items)
df.rename(
columns={
"recoveryId": "Recovery Id",
"deletedDate": "Deleted Date",
"scheduledPurgeDate": "Scheduled Purge Date",
"contentType": "Content Type",
"id": "Secret Id",
"attributes.enabled": "Enabled",
"attributes.created": "Created Date",
"attributes.updated": "Updated Date",
"attributes.recoveryLevel": "Recovery Level",
},
inplace=True,
)
df["Scheduled Purge Date"] = pd.to_datetime(df["Scheduled Purge Date"], unit="s")
df["Deleted Date"] = pd.to_datetime(df["Deleted Date"], unit="s")
df["Created Date"] = pd.to_datetime(df["Created Date"], unit="s")
df["Updated Date"] = pd.to_datetime(df["Updated Date"], unit="s")

return df


@log
def recover_deleted_secret(key_vault_uri: str, secret_name: str):
"""
Recovers a deleted secret in the specified Azure Key Vault.

This is a wrapper function for the following API: `Recover Deleted Secret - Recover Deleted Secret <https://learn.microsoft.com/rest/api/keyvault/secrets/recover-deleted-secret/recover-deleted-secret>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.
secret_name : str
Name of the deleted secret to be recovered in the Key Vault.
"""

_base_api(
request=f"{key_vault_uri}/deletedsecrets/{secret_name}/recover",
client="keyvault",
method="post",
)
print(
f"{icons.green_dot} The '{secret_name}' secret has been successfully recovered within the '{key_vault_uri}' Key Vault."
)


@log
def delete_secret(key_vault_uri: str, secret_name: str):
"""
Deletes a secret in the specified Azure Key Vault.

This is a wrapper function for the following API: `Delete Secret - Delete Secret <https://learn.microsoft.com/rest/api/keyvault/secrets/delete-secret/delete-secret>`_.

Parameters
----------
key_vault_uri : str
Azure Key Vault URI.
secret_name : str
Name of the secret to be deleted in the Key Vault.
"""

_base_api(
request=f"{key_vault_uri}/secrets/{secret_name}",
client="keyvault",
method="delete",
)
print(
f"{icons.green_dot} The '{secret_name}' secret has been successfully deleted within the '{key_vault_uri}' Key Vault."
)