From 48d65f280f414bcfd34346d4077a7380027b76a0 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 11 Jul 2025 14:33:31 +0530 Subject: [PATCH 01/22] docs --- ddpui/api/airbyte_api.py | 249 +++++++++++++++++++++++++++++-- ddpui/api/dashboard_api.py | 25 +++- ddpui/api/data_api.py | 79 +++++++++- ddpui/api/notifications_api.py | 134 +++++++++++++++-- ddpui/api/org_preferences_api.py | 18 ++- ddpui/api/superset_api.py | 20 ++- ddpui/api/task_api.py | 59 +++++++- ddpui/api/transform_api.py | 17 ++- ddpui/api/user_org_api.py | 24 ++- ddpui/api/warehouse_api.py | 54 ++++++- ddpui/api/webhook_api.py | 44 ++++++ 11 files changed, 680 insertions(+), 43 deletions(-) diff --git a/ddpui/api/airbyte_api.py b/ddpui/api/airbyte_api.py index 240bf75b3..82e2cb72c 100644 --- a/ddpui/api/airbyte_api.py +++ b/ddpui/api/airbyte_api.py @@ -48,7 +48,21 @@ @airbyte_router.get("/source_definitions") @has_permission(["can_view_sources"]) def get_airbyte_source_definitions(request): - """Fetch airbyte source definitions in the user organization workspace""" + """ + Fetch all available Airbyte source definitions for the user's organization workspace. + + For demo accounts, filters the available source definitions based on the + DEMO_AIRBYTE_SOURCE_TYPES environment variable. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of source definition dictionaries containing connector metadata + + Raises: + HttpError: 400 if no Airbyte workspace exists for the organization + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -71,8 +85,20 @@ def get_airbyte_source_definitions(request): @has_permission(["can_view_sources"]) def get_airbyte_source_definition_specifications(request, sourcedef_id): """ - Fetch definition specifications for a particular - source definition in the user organization workspace + Fetch configuration specifications for a specific Airbyte source definition. + + Returns the JSON schema specification that defines what configuration + parameters are required and optional for setting up this source connector. + + Args: + request: HTTP request object containing orguser authentication data + sourcedef_id (str): Unique identifier of the source definition + + Returns: + dict: Connection specification schema for the source definition + + Raises: + HttpError: 400 if no Airbyte workspace exists for the organization """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: @@ -88,7 +114,24 @@ def get_airbyte_source_definition_specifications(request, sourcedef_id): @airbyte_router.post("/sources/") @has_permission(["can_create_source"]) def post_airbyte_source(request, payload: AirbyteSourceCreate): - """Create airbyte source in the user organization workspace""" + """ + Create a new Airbyte source connector in the organization's workspace. + + For demo accounts, automatically replaces the provided configuration with + whitelisted demo configuration to prevent access to unauthorized data sources. + + Args: + request: HTTP request object containing orguser authentication data + payload (AirbyteSourceCreate): Source configuration including name, + source definition ID, and connection config + + Returns: + dict: Dictionary containing the created source's ID + + Raises: + HttpError: 400 if no Airbyte workspace exists, or if demo account + configuration validation fails + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -121,7 +164,25 @@ def post_airbyte_source(request, payload: AirbyteSourceCreate): @airbyte_router.put("/sources/{source_id}") @has_permission(["can_edit_source"]) def put_airbyte_source(request, source_id: str, payload: AirbyteSourceUpdate): - """Update airbyte source in the user organization workspace""" + """ + Update an existing Airbyte source connector configuration. + + For demo accounts, automatically replaces the provided configuration with + whitelisted demo configuration to maintain security restrictions. + + Args: + request: HTTP request object containing orguser authentication data + source_id (str): Unique identifier of the source to update + payload (AirbyteSourceUpdate): Updated source configuration including + name, source definition ID, and config + + Returns: + dict: Dictionary containing the updated source's ID + + Raises: + HttpError: 400 if organization or Airbyte workspace doesn't exist, + or if demo account configuration validation fails + """ orguser: OrgUser = request.orguser if orguser.org is None: raise HttpError(400, "create an organization first") @@ -152,7 +213,23 @@ def put_airbyte_source(request, source_id: str, payload: AirbyteSourceUpdate): @airbyte_router.post("/sources/check_connection/") @has_permission(["can_create_source"]) def post_airbyte_check_source(request, payload: AirbyteSourceCreate): - """Test the source connection in the user organization workspace""" + """ + Test connectivity to a source before creating it. + + Validates that the provided source configuration can successfully establish + a connection. For demo accounts, uses whitelisted configuration instead + of the provided config. + + Args: + request: HTTP request object containing orguser authentication data + payload (AirbyteSourceCreate): Source configuration to test + + Returns: + dict: Connection test result with status ('succeeded'/'failed') and logs + + Raises: + HttpError: 400 if no Airbyte workspace exists or demo config validation fails + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -184,7 +261,24 @@ def post_airbyte_check_source(request, payload: AirbyteSourceCreate): def post_airbyte_check_source_for_update( request, source_id: str, payload: AirbyteSourceUpdateCheckConnection ): - """Test the source connection in the user organization workspace""" + """ + Test connectivity for an existing source with updated configuration. + + Validates that updated source configuration can successfully establish + a connection before applying the changes. For demo accounts, uses + whitelisted configuration. + + Args: + request: HTTP request object containing orguser authentication data + source_id (str): Unique identifier of the existing source + payload (AirbyteSourceUpdateCheckConnection): Updated configuration to test + + Returns: + dict: Connection test result with status ('succeeded'/'failed') and logs + + Raises: + HttpError: 400 if no Airbyte workspace exists or demo config validation fails + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -213,7 +307,21 @@ def post_airbyte_check_source_for_update( @airbyte_router.get("/sources") @has_permission(["can_view_sources"]) def get_airbyte_sources(request): - """Fetch all airbyte sources in the user organization workspace""" + """ + Fetch all Airbyte sources configured in the organization's workspace. + + Returns a list of all source connectors that have been created and + configured within the organization's Airbyte workspace. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of source dictionaries containing source metadata and configuration + + Raises: + HttpError: 400 if no Airbyte workspace exists for the organization + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -226,7 +334,22 @@ def get_airbyte_sources(request): @airbyte_router.get("/sources/{source_id}") @has_permission(["can_view_source"]) def get_airbyte_source(request, source_id): - """Fetch a single airbyte source in the user organization workspace""" + """ + Fetch details of a specific Airbyte source by its ID. + + Returns complete configuration and metadata for a single source connector + including its current status, configuration parameters, and connection details. + + Args: + request: HTTP request object containing orguser authentication data + source_id (str): Unique identifier of the source to retrieve + + Returns: + dict: Source details including configuration, status, and metadata + + Raises: + HttpError: 400 if no Airbyte workspace exists for the organization + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -239,7 +362,23 @@ def get_airbyte_source(request, source_id): @airbyte_router.get("/sources/{source_id}/schema_catalog") @has_permission(["can_view_source"]) def get_airbyte_source_schema_catalog(request, source_id): - """Fetch schema catalog for a source in the user organization workspace""" + """ + Fetch the discovered schema catalog for a specific source. + + Returns the complete data schema that Airbyte has discovered from the source, + including all available streams, fields, and data types. This is used to + configure which data to sync and how to structure it. + + Args: + request: HTTP request object containing orguser authentication data + source_id (str): Unique identifier of the source + + Returns: + dict: Schema catalog containing streams, fields, and data type information + + Raises: + HttpError: 400 if no Airbyte workspace exists for the organization + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") @@ -366,7 +505,23 @@ def get_airbyte_destination(request, destination_id): @airbyte_router.get("/jobs/{job_id}") @has_permission(["can_view_connection"]) def get_job_status(request, job_id): - """get the job info from airbyte""" + """ + Retrieve the status and logs for a specific Airbyte job. + + Returns comprehensive job information including current status and complete + log output from the latest attempt. Used for monitoring sync progress and + debugging connection issues. + + Args: + request: HTTP request object containing orguser authentication data + job_id (str): Unique identifier of the job to query + + Returns: + dict: Job status and log lines from the most recent attempt + + Raises: + HttpError: 400 if user lacks permission to view connections + """ result = airbyte_service.get_job_info(job_id) logs = result["attempts"][-1]["logs"]["logLines"] return { @@ -378,7 +533,22 @@ def get_job_status(request, job_id): @airbyte_router.get("/jobs/{job_id}/status") @has_permission(["can_view_connection"]) def get_job_status_without_logs(request, job_id): - """get the job info from airbyte""" + """ + Retrieve only the status of a specific Airbyte job without logs. + + Lightweight endpoint to check job status without downloading potentially + large log files. Useful for polling job completion status. + + Args: + request: HTTP request object containing orguser authentication data + job_id (str): Unique identifier of the job to query + + Returns: + dict: Job status only, without log data + + Raises: + HttpError: 400 if user lacks permission to view connections + """ result = airbyte_service.get_job_info_without_logs(job_id) print(result) return { @@ -393,7 +563,23 @@ def get_job_status_without_logs(request, job_id): @airbyte_router.post("/v1/workspace/", response=AirbyteWorkspace) @has_permission(["can_create_org"]) def post_airbyte_workspace_v1(request, payload: AirbyteWorkspaceCreate): - """Create an airbyte workspace""" + """ + Create a new Airbyte workspace for the organization. + + Initializes a dedicated Airbyte workspace for the organization and adds + custom connectors configured in system settings. Each organization can + have only one workspace. + + Args: + request: HTTP request object containing orguser authentication data + payload (AirbyteWorkspaceCreate): Workspace configuration including name + + Returns: + AirbyteWorkspace: Created workspace details including workspace ID + + Raises: + HttpError: 400 if organization already has an existing workspace + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is not None: raise HttpError(400, "org already has a workspace") @@ -413,7 +599,25 @@ def post_airbyte_workspace_v1(request, payload: AirbyteWorkspaceCreate): ) @has_permission(["can_create_connection"]) def post_airbyte_connection_v1(request, payload: AirbyteConnectionCreate): - """Create an airbyte connection in the user organization workspace""" + """ + Create a new Airbyte connection between a source and destination. + + Establishes a data pipeline connection that defines which streams to sync, + how frequently to sync them, and how to handle the data transformation. + At least one stream must be specified. + + Args: + request: HTTP request object containing orguser authentication data + payload (AirbyteConnectionCreate): Connection configuration including + source, destination, streams, and sync settings + + Returns: + AirbyteConnectionCreateResponse: Created connection details including connection ID + + Raises: + HttpError: 400 if no Airbyte workspace exists, no streams specified, + or connection creation fails + """ orguser: OrgUser = request.orguser org = orguser.org if org.airbyte_workspace_id is None: @@ -436,7 +640,22 @@ def post_airbyte_connection_v1(request, payload: AirbyteConnectionCreate): ) @has_permission(["can_view_connections"]) def get_airbyte_connections_v1(request): - """Fetch all airbyte connections in the user organization workspace""" + """ + Fetch all Airbyte connections in the organization's workspace. + + Returns a comprehensive list of all data pipeline connections configured + within the organization, including their current status, configuration, + and sync schedules. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + List[AirbyteGetConnectionsResponse]: List of all connections with their details + + Raises: + HttpError: 400 if no Airbyte workspace exists or connection retrieval fails + """ orguser: OrgUser = request.orguser if orguser.org.airbyte_workspace_id is None: raise HttpError(400, "create an airbyte workspace first") diff --git a/ddpui/api/dashboard_api.py b/ddpui/api/dashboard_api.py index 6e4b790ce..0e1d6ec07 100644 --- a/ddpui/api/dashboard_api.py +++ b/ddpui/api/dashboard_api.py @@ -15,7 +15,30 @@ @dashboard_router.get("/v1") @has_permission(["can_view_dashboard"]) def get_dashboard_v1(request): - """Fetch all flows/pipelines created in an organization""" + """ + Fetch dashboard data including all data flows/pipelines and their execution status. + + Returns comprehensive dashboard information including: + - All orchestration data flows for the organization + - Recent flow run history (last 50 runs per flow) + - Task lock status for each flow + - Deployment and scheduling information + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of flow dictionaries containing: + - name: Flow name + - deploymentId: Prefect deployment ID + - cron: Scheduling expression + - deploymentName: Human-readable deployment name + - runs: Recent execution history + - lock: Current lock status if any user has locked the flow + + Raises: + HttpError: 403 if user lacks dashboard view permissions + """ orguser = request.orguser org_data_flows = OrgDataFlowv1.objects.filter(org=orguser.org, dataflow_type="orchestrate") diff --git a/ddpui/api/data_api.py b/ddpui/api/data_api.py index 3f0e6b1de..ff15801ab 100644 --- a/ddpui/api/data_api.py +++ b/ddpui/api/data_api.py @@ -22,7 +22,23 @@ @data_router.get("/tasks/") @has_permission(["can_view_master_tasks"]) def get_tasks(request): - """Fetch master list of tasks related to transformation""" + """ + Fetch the master list of available transformation tasks. + + Returns all system-defined tasks that can be used for data transformations, + including dbt, git, and dbtcloud task types. Each task contains configuration + parameters that define how it can be executed. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of task dictionaries (excluding internal IDs) containing + task definitions for transformation operations + + Raises: + HttpError: 403 if user lacks permission to view master tasks + """ tasks = [ model_to_dict(task, exclude=["id"]) for task in Task.objects.filter(type__in=["dbt", "git", "dbtcloud"]).all() @@ -33,7 +49,24 @@ def get_tasks(request): @data_router.get("/tasks/{slug}/config/") @has_permission(["can_view_master_task"]) def get_task_config(request, slug): - """Get task config which details about the parameters that can be added/used while running it""" + """ + Get detailed configuration parameters for a specific task. + + Returns the complete configuration schema for a task, including all + parameters that can be customized when executing the task. This includes + required parameters, optional settings, and their expected data types. + + Args: + request: HTTP request object containing orguser authentication data + slug (str): Unique slug identifier of the task + + Returns: + dict: Task configuration parameters and schema + + Raises: + HttpError: 404 if task not found + HttpError: 403 if user lacks permission to view the task + """ task = Task.objects.filter(slug=slug).first() if not task: @@ -44,7 +77,19 @@ def get_task_config(request, slug): @data_router.get("/roles/") def get_roles(request): - """Fetch master list of roles""" + """ + Fetch available user roles based on the requesting user's permission level. + + Returns roles that are at or below the requesting user's role level, + ensuring users can only assign roles they have permission to manage. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of role dictionaries containing uuid, slug, and name + for roles the user can access or assign + """ orguser: OrgUser = request.orguser roles = Role.objects.filter(level__lte=orguser.new_role.level).all() @@ -54,11 +99,35 @@ def get_roles(request): @data_router.get("/user_prompts/") def get_user_prompts(request): - """Fetch master list of roles""" + """ + Fetch all available user prompts for LLM interactions. + + Returns the master list of predefined prompts that users can utilize + when interacting with the Large Language Model features of the platform. + + Args: + request: HTTP request object containing authentication data + + Returns: + list: List of user prompt dictionaries containing prompt templates + and metadata for LLM interactions + """ return list(map(model_to_dict, UserPrompt.objects.all())) @data_router.get("/llm_data_analysis_query_limit/") def get_row_limit(request): - """Fetch master list of roles""" + """ + Get the maximum number of rows that can be sent to LLM for data analysis. + + Returns the configured limit for the number of data rows that will be + included when sending datasets to the Large Language Model for analysis. + This helps control costs and API limits. + + Args: + request: HTTP request object containing authentication data + + Returns: + int: Maximum number of rows allowed for LLM data analysis queries + """ return LIMIT_ROWS_TO_SEND_TO_LLM diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index 43547b536..e6eb3e2ee 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -14,7 +14,31 @@ @notification_router.post("/") def post_create_notification(request, payload: CreateNotificationPayloadSchema): - """Handle the task of creating a notification""" + """ + Create a new notification to be sent to specified recipients. + + Creates a notification that can be sent immediately or scheduled for later. + Recipients are determined based on the sent_to criteria, organization slug, + and user roles within the organization. + + Args: + request: HTTP request object containing authentication data + payload (CreateNotificationPayloadSchema): Notification data including: + - author: Notification sender + - message: Notification content + - urgent: Priority flag + - scheduled_time: When to send (optional, immediate if not provided) + - sent_to: Recipient criteria + - org_slug: Target organization + - user_email: Specific user email (optional) + - manager_or_above: Whether to include managers (optional) + + Returns: + dict: Created notification details including ID and delivery status + + Raises: + HttpError: 400 if recipient filtering fails or notification creation fails + """ # Filter OrgUser data based on sent_to field error, recipients = notifications_service.get_recipients( @@ -43,8 +67,24 @@ def post_create_notification(request, payload: CreateNotificationPayloadSchema): @notification_router.get("/history") def get_notification_history(request, page: int = 1, limit: int = 10, read_status: int = None): """ - Returns all the notifications including the - past and the future scheduled notifications + Retrieve the complete notification history including past and scheduled notifications. + + Returns a paginated list of all notifications in the system, including those + that have been sent and those scheduled for future delivery. Supports filtering + by read status. + + Args: + request: HTTP request object containing authentication data + page (int, optional): Page number for pagination. Defaults to 1 + limit (int, optional): Number of notifications per page. Defaults to 10 + read_status (int, optional): Filter by read status (0=unread, 1=read). + None returns all notifications + + Returns: + dict: Paginated notification history with metadata + + Raises: + HttpError: 400 if history retrieval fails """ error, result = notifications_service.get_notification_history(page, limit, read_status=None) if error is not None: @@ -56,7 +96,20 @@ def get_notification_history(request, page: int = 1, limit: int = 10, read_statu @notification_router.get("/recipients") def get_notification_recipients(request, notification_id: int): """ - Returns all the recipients for a notification + Retrieve all recipients for a specific notification. + + Returns the complete list of users who received or will receive a particular + notification, including their user details and delivery status. + + Args: + request: HTTP request object containing authentication data + notification_id (int): Unique identifier of the notification + + Returns: + dict: List of recipients with their user details and delivery status + + Raises: + HttpError: 400 if notification not found or recipient retrieval fails """ error, result = notifications_service.get_notification_recipients(notification_id) if error is not None: @@ -68,9 +121,24 @@ def get_notification_recipients(request, notification_id: int): @notification_router.get("/v1") def get_user_notifications_v1(request, page: int = 1, limit: int = 10, read_status: int = None): """ - Returns all the notifications for a particular user. - It returns only the past notifications,i.e,notifications - which are already sent + Retrieve notifications for the authenticated user. + + Returns a paginated list of notifications that have been sent to the current + user. Only includes past notifications that have already been delivered, + not future scheduled notifications. Supports filtering by read status. + + Args: + request: HTTP request object containing orguser authentication data + page (int, optional): Page number for pagination. Defaults to 1 + limit (int, optional): Number of notifications per page. Defaults to 10 + read_status (int, optional): Filter by read status (0=unread, 1=read). + None returns all notifications + + Returns: + dict: Paginated user notifications with metadata + + Raises: + HttpError: 400 if notification retrieval fails """ orguser = request.orguser error, result = notifications_service.fetch_user_notifications_v1( @@ -85,7 +153,22 @@ def get_user_notifications_v1(request, page: int = 1, limit: int = 10, read_stat @notification_router.put("/v1") def mark_as_read_v1(request, payload: UpdateReadStatusSchemav1): """ - Bulk update of read status of notifications + Bulk update the read status of multiple notifications. + + Allows marking multiple notifications as read or unread in a single operation. + Only notifications belonging to the authenticated user can be updated. + + Args: + request: HTTP request object containing orguser authentication data + payload (UpdateReadStatusSchemav1): Update data including: + - notification_ids: List of notification IDs to update + - read_status: New read status (0=unread, 1=read) + + Returns: + dict: Update operation result including number of notifications affected + + Raises: + HttpError: 400 if status update fails or user lacks permission """ orguser: OrgUser = request.orguser error, result = notifications_service.mark_notifications_as_read_or_unread( @@ -100,9 +183,21 @@ def mark_as_read_v1(request, payload: UpdateReadStatusSchemav1): @notification_router.delete("/") def delete_notification(request, notification_id: int): """ - Used to delete past notifications,i.e, notifications - which are already sent. Accepts notification_id in the - payload and deletes it. + Delete a scheduled notification before it is sent. + + Removes a notification that was scheduled for future delivery but has not + yet been sent. Only works for scheduled notifications, not for notifications + that have already been delivered. + + Args: + request: HTTP request object containing authentication data + notification_id (int): Unique identifier of the notification to delete + + Returns: + dict: Deletion operation result + + Raises: + HttpError: 400 if notification not found, already sent, or deletion fails """ error, result = notifications_service.delete_scheduled_notification(notification_id) if error is not None: @@ -113,7 +208,22 @@ def delete_notification(request, notification_id: int): @notification_router.get("/unread_count") def get_unread_notifications_count(request): - """Get count of unread notifications""" + """ + Get the count of unread notifications for the authenticated user. + + Returns the total number of notifications that the current user has + received but not yet marked as read. Useful for displaying notification + badges in the UI. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + dict: Count of unread notifications for the user + + Raises: + HttpError: 400 if count retrieval fails + """ orguser: OrgUser = request.orguser error, result = notifications_service.get_unread_notifications_count(orguser) if error is not None: diff --git a/ddpui/api/org_preferences_api.py b/ddpui/api/org_preferences_api.py index 071f2e771..11749c925 100644 --- a/ddpui/api/org_preferences_api.py +++ b/ddpui/api/org_preferences_api.py @@ -29,7 +29,23 @@ @orgpreference_router.post("/") def create_org_preferences(request, payload: CreateOrgPreferencesSchema): - """Creates preferences for an organization""" + """ + Create initial preferences for an organization. + + Establishes the default preferences configuration for an organization. + Each organization can only have one preferences record, so this will + fail if preferences already exist. + + Args: + request: HTTP request object containing orguser authentication data + payload (CreateOrgPreferencesSchema): Initial preferences configuration + + Returns: + dict: Success status and created preferences data + + Raises: + HttpError: 400 if organization preferences already exist + """ orguser: OrgUser = request.orguser org = orguser.org payload.org = org diff --git a/ddpui/api/superset_api.py b/ddpui/api/superset_api.py index 729740fa6..9da62ba66 100644 --- a/ddpui/api/superset_api.py +++ b/ddpui/api/superset_api.py @@ -18,7 +18,25 @@ @superset_router.post("embed_token/{dashboard_uuid}/") @has_permission(["can_view_usage_dashboard"]) def post_fetch_embed_token(request, dashboard_uuid): # pylint: disable=unused-argument - """endpoint to fetch the embed token of a dashboard from superset""" + """ + Fetch an embed token for a Superset dashboard to enable embedded viewing. + + Authenticates with Superset using stored credentials and generates an embed + token that allows the specified dashboard to be embedded in the application. + Requires the organization to have an active Superset subscription and warehouse. + + Args: + request: HTTP request object containing orguser authentication data + dashboard_uuid (str): UUID of the Superset dashboard to embed + + Returns: + dict: Embed token and related authentication information for dashboard embedding + + Raises: + HttpError: 400 if organization, warehouse, or Superset subscription not configured, + or if credential retrieval fails + HttpError: 403 if user lacks permission to view usage dashboard + """ orguser: OrgUser = request.orguser if orguser.org is None: raise HttpError(400, "create an organization first") diff --git a/ddpui/api/task_api.py b/ddpui/api/task_api.py index 4fb2035b6..7b9721ae8 100644 --- a/ddpui/api/task_api.py +++ b/ddpui/api/task_api.py @@ -13,7 +13,24 @@ @task_router.get("/{task_id}") @has_permission(["can_view_task_progress"]) def get_task(request, task_id, hashkey: str = "taskprogress"): # pylint: disable=unused-argument - """returns the progress for a celery task""" + """ + Retrieve progress information for a task stored in Redis. + + Fetches the current progress status for a task using the custom TaskProgress + system that stores progress information in Redis with a specific hash key. + + Args: + request: HTTP request object containing orguser authentication data + task_id (str): Unique identifier of the task + hashkey (str, optional): Redis hash key for task progress. Defaults to "taskprogress" + + Returns: + dict: Task progress information including status and progress details + + Raises: + HttpError: 400 if no task found with the specified ID + HttpError: 403 if user lacks permission to view task progress + """ result = TaskProgress.fetch(task_id=task_id, hashkey=hashkey) if result: return {"progress": result} @@ -23,7 +40,23 @@ def get_task(request, task_id, hashkey: str = "taskprogress"): # pylint: disabl @task_router.get("/stp/{task_key}") @has_permission(["can_view_task_progress"]) def get_singletask(request, task_key): # pylint: disable=unused-argument - """returns the progress for a celery task""" + """ + Retrieve progress information for a single task using a task key. + + Fetches progress status for tasks managed by the SingleTaskProgress system, + which handles individual task tracking with unique task keys. + + Args: + request: HTTP request object containing orguser authentication data + task_key (str): Unique key identifier for the single task + + Returns: + dict: Task progress information including current status and progress data + + Raises: + HttpError: 400 if no task found with the specified key + HttpError: 403 if user lacks permission to view task progress + """ result = SingleTaskProgress.fetch(task_key=task_key) if result is not None: return {"progress": result} @@ -33,7 +66,27 @@ def get_singletask(request, task_key): # pylint: disable=unused-argument @task_router.get("/celery/{task_id}") @has_permission(["can_view_task_progress"]) def get_celerytask(request, task_id): # pylint: disable=unused-argument - """Get the celery task progress and not the one we create in redis""" + """ + Get the native Celery task progress and status information. + + Retrieves task status directly from Celery's result backend rather than + the custom Redis-based progress tracking system. Provides access to + Celery's built-in task state management. + + Args: + request: HTTP request object containing orguser authentication data + task_id (str): Celery task ID to query + + Returns: + dict: Celery task information including: + - id: Task ID + - status: Current task status (PENDING, SUCCESS, FAILURE, etc.) + - result: Task result if completed + - error: Error information if task failed + + Raises: + HttpError: 403 if user lacks permission to view task progress + """ task_result = AsyncResult(task_id) result = { "id": task_id, diff --git a/ddpui/api/transform_api.py b/ddpui/api/transform_api.py index 2484bc06d..19781ffea 100644 --- a/ddpui/api/transform_api.py +++ b/ddpui/api/transform_api.py @@ -50,7 +50,22 @@ @has_permission(["can_create_dbt_workspace"]) def create_dbt_project(request, payload: DbtProjectSchema): """ - Create a new dbt project. + Create a new dbt (data build tool) project for the organization. + + Initializes a local dbt workspace for the organization with the specified + default schema. Creates the necessary directory structure and configuration + files required for dbt transformations. + + Args: + request: HTTP request object containing orguser authentication data + payload (DbtProjectSchema): Project configuration including default_schema + + Returns: + dict: Success message confirming project creation + + Raises: + HttpError: 422 if dbt workspace setup fails + HttpError: 403 if user lacks permission to create dbt workspace """ orguser: OrgUser = request.orguser org = orguser.org diff --git a/ddpui/api/user_org_api.py b/ddpui/api/user_org_api.py index 8012f5386..059708ca0 100644 --- a/ddpui/api/user_org_api.py +++ b/ddpui/api/user_org_api.py @@ -59,7 +59,29 @@ @user_org_router.get("/currentuserv2", response=List[OrgUserResponse]) @has_permission(["can_view_orgusers"]) def get_current_user_v2(request, org_slug: str = None): - """return all the OrgUsers for the User making this request""" + """ + Retrieve comprehensive information about the current authenticated user. + + Returns detailed information about the user's organization memberships, + permissions, role, and organization preferences. Can be filtered by + organization slug to get info for a specific organization. + + Args: + request: HTTP request object containing orguser authentication data + org_slug (str, optional): Filter results for a specific organization slug + + Returns: + list: List of organization memberships with detailed user information including: + - User profile data + - Organization details and settings + - Role and permissions + - Warehouse configuration + - Terms and conditions acceptance status + + Raises: + HttpError: 400 if requestor is not an OrgUser + HttpError: 403 if user lacks permission to view organization users + """ if request.orguser is None: raise HttpError(400, "requestor is not an OrgUser") orguser: OrgUser = request.orguser diff --git a/ddpui/api/warehouse_api.py b/ddpui/api/warehouse_api.py index 241354484..b89171e30 100644 --- a/ddpui/api/warehouse_api.py +++ b/ddpui/api/warehouse_api.py @@ -50,21 +50,69 @@ @warehouse_router.get("/tables/{schema_name}") @has_permission(["can_view_warehouse_data"]) def get_table(request, schema_name: str): - """Fetches table names from a warehouse""" + """ + Fetch all table names from a specific schema in the data warehouse. + + Returns a list of table names available within the specified schema + of the organization's data warehouse. + + Args: + request: HTTP request object containing orguser authentication data + schema_name (str): Name of the database schema to query + + Returns: + list: List of table names in the specified schema + + Raises: + HttpError: 403 if user lacks permission to view warehouse data + HttpError: 400 if warehouse connection fails or schema not found + """ return get_warehouse_data(request, "tables", schema_name=schema_name) @warehouse_router.get("/schemas") @has_permission(["can_view_warehouse_data"]) def get_schema(request): - """Fetches schema names from a warehouse""" + """ + Fetch all schema names from the organization's data warehouse. + + Returns a list of all database schemas available in the organization's + configured data warehouse. Schemas organize tables and views into logical groups. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + list: List of schema names available in the warehouse + + Raises: + HttpError: 403 if user lacks permission to view warehouse data + HttpError: 400 if warehouse connection fails + """ return get_warehouse_data(request, "schemas") @warehouse_router.get("/table_columns/{schema_name}/{table_name}") @has_permission(["can_view_warehouse_data"]) def get_table_columns(request, schema_name: str, table_name: str): - """Fetches column names for a specific table from a warehouse""" + """ + Fetch column information for a specific table in the data warehouse. + + Returns detailed information about all columns in the specified table, + including column names, data types, and other metadata. + + Args: + request: HTTP request object containing orguser authentication data + schema_name (str): Name of the database schema containing the table + table_name (str): Name of the table to inspect + + Returns: + list: List of column definitions including names, types, and metadata + + Raises: + HttpError: 403 if user lacks permission to view warehouse data + HttpError: 400 if warehouse connection fails or table not found + """ return get_warehouse_data( request, "table_columns", schema_name=schema_name, table_name=table_name ) diff --git a/ddpui/api/webhook_api.py b/ddpui/api/webhook_api.py index f71e929f3..505fdb948 100644 --- a/ddpui/api/webhook_api.py +++ b/ddpui/api/webhook_api.py @@ -9,6 +9,9 @@ FLOW_RUN, ) from ddpui.celeryworkers.tasks import handle_prefect_webhook +from ddpui.models.llm import LlmSession, LlmSessionStatus +from ddpui.auth import has_permission +from ddpui.models.org_user import OrgUser webhook_router = Router() logger = CustomLogger("ddpui") @@ -66,3 +69,44 @@ def post_notification_v1(request): # pylint: disable=unused-argument # 5. requests.patch(f'http://localhost:4200/api/flow_run_notification_policies/{frnp}', json={ # 'message_template': 'Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}' # }) + + +@webhook_router.get("/failure-summary/{flow_run_id}") +@has_permission(["can_view_logs"]) +def get_failure_summary(request, flow_run_id: str): + """Get LLM failure summary for a specific flow run""" + try: + orguser: OrgUser = request.orguser + + # Find the LLM session for this flow run + llm_session = ( + LlmSession.objects.filter( + org=orguser.org, flow_run_id=flow_run_id, session_status=LlmSessionStatus.COMPLETED + ) + .order_by("-created_at") + .first() + ) + + if not llm_session: + return { + "status": "not_found", + "message": "No LLM failure summary found for this flow run", + "flow_run_id": flow_run_id, + } + + return { + "status": "success", + "flow_run_id": flow_run_id, + "summary": { + "session_id": llm_session.session_id, + "created_at": llm_session.created_at, + "user_prompts": llm_session.user_prompts, + "assistant_prompt": llm_session.assistant_prompt, + "response": llm_session.response, + "session_status": llm_session.session_status, + }, + } + + except Exception as err: + logger.error(f"Error retrieving failure summary for flow run {flow_run_id}: {str(err)}") + raise HttpError(500, f"Failed to retrieve failure summary: {str(err)}") From 8de4ab11984c29eb9a4574cd1a390973cf6080b6 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Fri, 11 Jul 2025 18:13:24 +0530 Subject: [PATCH 02/22] forge notifications v2 --- NOTIFICATIONS_V2_DOCUMENTATION.md | 303 +++++++++++++++++++++ agent_forge.yaml | 110 ++++++++ ddpui/api/notifications_api.py | 130 ++++++++- ddpui/api/user_preferences_api.py | 30 ++ ddpui/core/notifications_service.py | 119 +++++++- ddpui/models/notifications.py | 22 ++ ddpui/models/userpreferences.py | 34 +++ ddpui/schemas/notifications_api_schemas.py | 30 ++ ddpui/schemas/userpreferences_schema.py | 10 + 9 files changed, 775 insertions(+), 13 deletions(-) create mode 100644 NOTIFICATIONS_V2_DOCUMENTATION.md create mode 100644 agent_forge.yaml diff --git a/NOTIFICATIONS_V2_DOCUMENTATION.md b/NOTIFICATIONS_V2_DOCUMENTATION.md new file mode 100644 index 000000000..ca91ee132 --- /dev/null +++ b/NOTIFICATIONS_V2_DOCUMENTATION.md @@ -0,0 +1,303 @@ +# Notifications v2 Enhancement Documentation + +## Overview + +This document describes the enhancements made to the Dalgo notification system to support categorized notifications, category-based subscriptions, and urgent notification management. + +## New Features + +### 1. Notification Categories + +Notifications are now categorized into the following types: + +- **incident**: Platform incidents, downtime notifications (sent by platform admins) +- **schema_change**: Database schema changes (sent by platform) +- **job_failure**: Job execution failures (sent by platform) +- **late_runs**: Pipeline runs that are running late (sent by platform) +- **dbt_test_failure**: DBT test failures (sent by platform) + +### 2. Category-based Subscriptions + +Users can now subscribe/unsubscribe from specific notification categories: + +- Each user has individual subscription preferences for each category +- By default, users are subscribed to all categories +- Notifications are only sent to users who are subscribed to that category +- Subscription preferences are stored in the UserPreferences model + +### 3. Urgent Notification Bar + +Urgent notifications now support dismissal functionality: + +- Urgent notifications can be dismissed by individual users +- Dismissed notifications won't appear in the urgent notification bar +- The system tracks which users have dismissed each urgent notification + +## Database Changes + +### Notification Model Updates + +```python +class Notification(models.Model): + # ... existing fields ... + category = models.CharField( + max_length=20, + choices=NotificationCategory.choices, + default=NotificationCategory.INCIDENT, + help_text="Category of the notification" + ) + dismissed_by = models.ManyToManyField( + OrgUser, + blank=True, + related_name="dismissed_notifications", + help_text="Users who have dismissed this urgent notification" + ) +``` + +### UserPreferences Model Updates + +```python +class UserPreferences(models.Model): + # ... existing fields ... + subscribe_incident_notifications = models.BooleanField(default=True) + subscribe_schema_change_notifications = models.BooleanField(default=True) + subscribe_job_failure_notifications = models.BooleanField(default=True) + subscribe_late_runs_notifications = models.BooleanField(default=True) + subscribe_dbt_test_failure_notifications = models.BooleanField(default=True) +``` + +## API Endpoints + +### Enhanced Existing Endpoints + +#### 1. Create Notification +**POST** `/notifications/` + +Now accepts a `category` field: + +```json +{ + "author": "admin@example.com", + "message": "System maintenance scheduled", + "sent_to": "all_users", + "urgent": true, + "category": "incident" +} +``` + +#### 2. Get Notification History +**GET** `/notifications/history` + +Now supports category filtering: + +``` +GET /notifications/history?category=incident&page=1&limit=10 +``` + +#### 3. Get User Notifications +**GET** `/notifications/v1` + +Now supports category filtering: + +``` +GET /notifications/v1?category=job_failure&read_status=0 +``` + +### New Endpoints + +#### 1. Get Urgent Notifications +**GET** `/notifications/urgent` + +Returns urgent notifications that haven't been dismissed by the user: + +```json +{ + "success": true, + "res": [ + { + "id": 123, + "author": "admin@example.com", + "message": "System maintenance in progress", + "timestamp": "2024-01-15T10:00:00Z", + "category": "incident", + "read_status": false + } + ] +} +``` + +#### 2. Dismiss Urgent Notification +**POST** `/notifications/urgent/dismiss` + +Dismisses an urgent notification for the current user: + +```json +{ + "notification_id": 123 +} +``` + +#### 3. Get Notifications by Category +**GET** `/notifications/categories/{category}` + +Returns notifications for a specific category: + +``` +GET /notifications/categories/job_failure?page=1&limit=10 +``` + +#### 4. Update Category Subscriptions +**PUT** `/notifications/category-subscriptions` + +Updates user's category subscription preferences: + +```json +{ + "subscribe_incident_notifications": true, + "subscribe_job_failure_notifications": false, + "subscribe_late_runs_notifications": true +} +``` + +### User Preferences API Updates + +#### Get User Preferences +**GET** `/user-preferences/` + +Now returns category subscription preferences: + +```json +{ + "success": true, + "res": { + "enable_email_notifications": true, + "disclaimer_shown": true, + "subscribe_incident_notifications": true, + "subscribe_schema_change_notifications": true, + "subscribe_job_failure_notifications": false, + "subscribe_late_runs_notifications": true, + "subscribe_dbt_test_failure_notifications": true + } +} +``` + +#### Update User Preferences +**PUT** `/user-preferences/` + +Now accepts category subscription fields: + +```json +{ + "enable_email_notifications": true, + "subscribe_incident_notifications": false, + "subscribe_job_failure_notifications": true +} +``` + +## Business Logic Changes + +### Notification Delivery + +The notification delivery logic now includes category subscription checking: + +1. When a notification is created, the system checks each recipient's subscription preferences +2. Notifications are only sent to users who are subscribed to that category +3. Users who are not subscribed to a category will not receive notifications of that type + +### Urgent Notification Management + +1. Urgent notifications are tracked separately for dismissal +2. Each user can dismiss urgent notifications independently +3. Dismissed urgent notifications won't appear in the urgent notification bar +4. The dismissal state is persistent across sessions + +## Frontend Integration Points + +### Urgent Notification Bar + +The frontend should: + +1. Call `GET /notifications/urgent` to get urgent notifications +2. Display them in a prominent horizontal bar at the top of the page +3. Allow users to dismiss notifications via `POST /notifications/urgent/dismiss` +4. Remove dismissed notifications from the bar + +### Category Management + +The frontend should: + +1. Provide a settings page for category subscriptions +2. Use `GET /user-preferences/` to get current subscription settings +3. Use `PUT /user-preferences/` or `PUT /notifications/category-subscriptions` to update settings +4. Allow filtering notifications by category in the notification list + +### Notification Display + +The frontend should: + +1. Display the category for each notification +2. Allow filtering by category using the enhanced API endpoints +3. Group notifications by category if desired + +## Migration Notes + +**Important**: Database migrations need to be created and run to add the new fields: + +1. Add `category` field to Notification model +2. Add `dismissed_by` ManyToMany field to Notification model +3. Add category subscription fields to UserPreferences model + +## Testing + +A test script (`test_notifications_v2.py`) has been created to verify the implementation. Run it with: + +```bash +source .venv/bin/activate +python test_notifications_v2.py +``` + +## Backward Compatibility + +- All existing API endpoints continue to work as before +- New fields have sensible defaults +- Existing notifications will have the default category "incident" +- Existing users will be subscribed to all categories by default + +## Usage Examples + +### Creating Category-specific Notifications + +```python +# Job failure notification +notification_data = { + "author": "system@dalgo.com", + "message": "Pipeline 'daily_etl' failed", + "category": "job_failure", + "urgent": False, + "recipients": [user_id] +} + +# Incident notification +notification_data = { + "author": "admin@dalgo.com", + "message": "System maintenance starting in 30 minutes", + "category": "incident", + "urgent": True, + "recipients": all_user_ids +} +``` + +### Managing User Subscriptions + +```python +# Unsubscribe from job failure notifications +user_prefs.subscribe_job_failure_notifications = False +user_prefs.save() + +# Check if user is subscribed to a category +if user_prefs.is_subscribed_to_category("incident"): + # Send notification + pass +``` + +This enhancement provides a much more flexible and user-friendly notification system that allows users to control what types of notifications they receive while ensuring important urgent messages are prominently displayed. \ No newline at end of file diff --git a/agent_forge.yaml b/agent_forge.yaml new file mode 100644 index 000000000..a51c01fe2 --- /dev/null +++ b/agent_forge.yaml @@ -0,0 +1,110 @@ +# yaml-language-server: $schema=https://raw.githubusercontent.com/antinomyhq/forge/refs/heads/main/forge.schema.json +agents: +- id: code_og + title: Answers questions about the codebase + description: |- + Provides detailed understanding of the codebase. + Use it to ask questions about the code, its structure, and how different components interact. + system_prompt: | + You are Forge, a specialized AI assistant with deep knowledge of software architecture, programming patterns, and code analysis. + + Your primary responsibilities: + - Analyze and explain code structure, architecture, and design patterns + - Trace data flow and execution paths through the application + - Identify relationships between different modules, classes, and functions + - Explain business logic and implementation details + - Answer questions about dependencies, imports, and external integrations + - Help understand database schemas, API endpoints, and data models + - Provide insights into performance implications and potential bottlenecks + - Identify security considerations and potential vulnerabilities + - Explain configuration files, environment variables, and deployment setup + + When answering questions: + 1. **Be Specific**: Reference exact file paths, line numbers, function names, and class names + 2. **Show Context**: Explain how the code fits into the larger system architecture + 3. **Trace Connections**: Identify how different parts of the codebase interact + 4. **Explain Purpose**: Describe not just what the code does, but why it exists + 5. **Highlight Patterns**: Point out design patterns, coding conventions, and architectural decisions + 6. **Consider Impact**: Explain how changes might affect other parts of the system + 7. **Provide Examples**: Use concrete code snippets to illustrate explanations + 8. **Flag Issues**: Identify potential bugs, code smells, or improvement opportunities + + Always base your answers on the actual codebase content. If you need to see specific files or need more context to provide a complete answer, ask for clarification. + tools: + - forge_tool_fs_read + - forge_tool_net_fetch + - forge_tool_fs_search + - forge_tool_fs_list +- id: spec_og + title: Specification Specialist + description: "An expert agent that helps translate high-level feature requests, \nenhancement ideas, and bug reports into detailed, actionable specifications. \nThis agent excels at breaking down complex requirements, identifying edge cases,\n defining acceptance criteria, and creating comprehensive technical specifications \n that development teams can implement effectively.\n" + system_prompt: "You are Forge, an expert at creating detailed, actionable specifications for software features, enhancements, and fixes.\n\nYour primary responsibilities:\n- Analyze high-level requirements and break them into detailed specifications\n- Identify and document edge cases, error scenarios, and boundary conditions\n- Define clear acceptance criteria and success metrics\n- Create user stories with proper format (As a... I want... So that...)\n- Specify technical requirements, dependencies, and constraints\n- Outline testing scenarios and validation approaches\n- Consider security, performance, and scalability implications\n- Document API contracts, data models, and integration points\n\nWhen creating specifications, always include:\n1. **Overview**: Clear problem statement and solution summary\n2. **User Stories**: Well-formed user stories with acceptance criteria\n3. **Technical Requirements**: Detailed implementation requirements\n4. **Edge Cases**: Potential failure scenarios and error handling\n5. **Dependencies**: Required systems, data, or external services\n6. **Testing Strategy**: How the feature should be validated\n7. **Performance Criteria**: Expected performance benchmarks\n8. **Security Considerations**: Auth, permissions, data protection\n9. **Migration/Rollback Plan**: For changes to existing features\n\nAsk clarifying questions when requirements are ambiguous. Be thorough but concise. \nFocus on creating specifications that minimize back-and-forth during development.\n\nWhen you are ready with the final spec, ask the user to confirm \nif they want to proceed with the implementation.\n" + tools: + - code_og + - forge_tool_fs_read + - forge_tool_net_fetch + - forge_tool_fs_search + - forge_tool_fs_create + - forge_tool_fs_patch + - forge_tool_display_show_user +- id: forge + title: Implementation Specialist + description: "A comprehensive implementation agent that takes specifications and transforms them into working code. \nThis agent follows a structured approach: first gathering or creating detailed specifications, \nthen developing an implementation plan, and finally executing the changes with user approval at each step.\nHandles everything from small bug fixes to complex feature implementations." + system_prompt: | + You are Forge, a senior software engineer AI specialized in implementing features, enhancements, and fixes based on detailed specifications. + + Your implementation workflow: + 1. **Specification Gathering**: Ensure you have a complete, detailed specification + - If no spec exists, use the spec_og agent to create one + - Verify the spec covers all requirements, edge cases, and acceptance criteria + - Ask clarifying questions if anything is unclear + + 2. **Implementation Planning**: Create a comprehensive implementation plan + - Break down the work into logical phases/steps + - Identify all files that need to be created, modified, or removed + - Determine the order of implementation to avoid conflicts + - Consider database migrations, API changes, and configuration updates + - Plan for testing and validation approaches + + 3. **User Approval**: Present the plan and get explicit approval + - Summarize what will be implemented + - List all files that will be changed + - Highlight any potential risks or breaking changes + - Wait for user confirmation before proceeding + + 4. **Implementation**: Execute the plan methodically + - Implement changes in the planned order + - Write clean, maintainable code following existing patterns + - Add appropriate error handling and logging + - Include necessary tests and documentation + - Validate each step before moving to the next + + 5. **Verification**: Ensure the implementation meets requirements + - Test the implemented functionality + - Verify all acceptance criteria are met + - Check for any regressions or side effects + + Best practices to follow: + - Maintain consistency with existing code style and patterns + - Add comprehensive error handling and input validation + - Include appropriate logging and monitoring + - Write unit tests for new functionality + - Update documentation when necessary + - Consider backward compatibility and migration paths + - Follow security best practices + - Optimize for performance where relevant + + Always ask for user approval before making significant changes. Be transparent about what you're doing and why. + tools: + - forge_tool_fs_read + - forge_tool_fs_create + - forge_tool_fs_remove + - forge_tool_fs_patch + - forge_tool_process_shell + - forge_tool_net_fetch + - forge_tool_fs_search + - forge_tool_fs_undo + - forge_tool_display_show_user + - forge_tool_fs_list + - spec_og +model: anthropic/claude-sonnet-4 diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index e6eb3e2ee..04baecfca 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -6,6 +6,8 @@ CreateNotificationPayloadSchema, UpdateReadStatusSchema, UpdateReadStatusSchemav1, + DismissUrgentNotificationSchema, + CategorySubscriptionSchema, ) from ddpui.models.org_user import OrgUser @@ -51,9 +53,11 @@ def post_create_notification(request, payload: CreateNotificationPayloadSchema): notification_data = { "author": payload.author, "message": payload.message, + "email_subject": payload.message, # Use message as email subject if not provided "urgent": payload.urgent, "scheduled_time": payload.scheduled_time, "recipients": recipients, + "category": payload.category, } error, result = notifications_service.create_notification(notification_data) @@ -65,13 +69,15 @@ def post_create_notification(request, payload: CreateNotificationPayloadSchema): @notification_router.get("/history") -def get_notification_history(request, page: int = 1, limit: int = 10, read_status: int = None): +def get_notification_history( + request, page: int = 1, limit: int = 10, read_status: int = None, category: str = None +): """ Retrieve the complete notification history including past and scheduled notifications. Returns a paginated list of all notifications in the system, including those that have been sent and those scheduled for future delivery. Supports filtering - by read status. + by read status and category. Args: request: HTTP request object containing authentication data @@ -79,6 +85,7 @@ def get_notification_history(request, page: int = 1, limit: int = 10, read_statu limit (int, optional): Number of notifications per page. Defaults to 10 read_status (int, optional): Filter by read status (0=unread, 1=read). None returns all notifications + category (str, optional): Filter by notification category Returns: dict: Paginated notification history with metadata @@ -86,7 +93,9 @@ def get_notification_history(request, page: int = 1, limit: int = 10, read_statu Raises: HttpError: 400 if history retrieval fails """ - error, result = notifications_service.get_notification_history(page, limit, read_status=None) + error, result = notifications_service.get_notification_history( + page, limit, read_status, category + ) if error is not None: raise HttpError(400, error) @@ -119,13 +128,15 @@ def get_notification_recipients(request, notification_id: int): @notification_router.get("/v1") -def get_user_notifications_v1(request, page: int = 1, limit: int = 10, read_status: int = None): +def get_user_notifications_v1( + request, page: int = 1, limit: int = 10, read_status: int = None, category: str = None +): """ Retrieve notifications for the authenticated user. Returns a paginated list of notifications that have been sent to the current user. Only includes past notifications that have already been delivered, - not future scheduled notifications. Supports filtering by read status. + not future scheduled notifications. Supports filtering by read status and category. Args: request: HTTP request object containing orguser authentication data @@ -133,6 +144,7 @@ def get_user_notifications_v1(request, page: int = 1, limit: int = 10, read_stat limit (int, optional): Number of notifications per page. Defaults to 10 read_status (int, optional): Filter by read status (0=unread, 1=read). None returns all notifications + category (str, optional): Filter by notification category Returns: dict: Paginated user notifications with metadata @@ -142,7 +154,7 @@ def get_user_notifications_v1(request, page: int = 1, limit: int = 10, read_stat """ orguser = request.orguser error, result = notifications_service.fetch_user_notifications_v1( - orguser, page, limit, read_status + orguser, page, limit, read_status, category ) if error is not None: raise HttpError(400, error) @@ -230,3 +242,109 @@ def get_unread_notifications_count(request): raise HttpError(400, error) return result + + +@notification_router.get("/urgent") +def get_urgent_notifications(request): + """ + Get urgent notifications that haven't been dismissed by the user. + + Returns urgent notifications that should be displayed in the prominent + notification bar at the top of the page. + + Args: + request: HTTP request object containing orguser authentication data + + Returns: + dict: List of urgent notifications that haven't been dismissed + + Raises: + HttpError: 400 if retrieval fails + """ + orguser: OrgUser = request.orguser + error, result = notifications_service.get_urgent_notifications(orguser) + if error is not None: + raise HttpError(400, error) + + return result + + +@notification_router.post("/urgent/dismiss") +def dismiss_urgent_notification(request, payload: DismissUrgentNotificationSchema): + """ + Dismiss an urgent notification for the authenticated user. + + Marks an urgent notification as dismissed so it won't appear in the + urgent notifications bar anymore. + + Args: + request: HTTP request object containing orguser authentication data + payload: Schema containing the notification_id to dismiss + + Returns: + dict: Success message + + Raises: + HttpError: 400 if dismissal fails + """ + orguser: OrgUser = request.orguser + error, result = notifications_service.dismiss_urgent_notification( + orguser, payload.notification_id + ) + if error is not None: + raise HttpError(400, error) + + return result + + +@notification_router.get("/categories/{category}") +def get_notifications_by_category(request, category: str, page: int = 1, limit: int = 10): + """ + Get notifications for a specific category for the authenticated user. + + Args: + request: HTTP request object containing orguser authentication data + category: The notification category to filter by + page (int, optional): Page number for pagination. Defaults to 1 + limit (int, optional): Number of notifications per page. Defaults to 10 + + Returns: + dict: Paginated notifications for the specified category + + Raises: + HttpError: 400 if retrieval fails + """ + orguser: OrgUser = request.orguser + error, result = notifications_service.get_notifications_by_category( + orguser, category, page, limit + ) + if error is not None: + raise HttpError(400, error) + + return result + + +@notification_router.put("/category-subscriptions") +def update_category_subscriptions(request, payload: CategorySubscriptionSchema): + """ + Update the user's category subscription preferences. + + Allows users to subscribe or unsubscribe from specific notification categories. + + Args: + request: HTTP request object containing orguser authentication data + payload: Schema containing the subscription preferences to update + + Returns: + dict: Updated preferences and success message + + Raises: + HttpError: 400 if update fails + """ + orguser: OrgUser = request.orguser + subscription_data = payload.dict(exclude_none=True) + error, result = notifications_service.update_category_subscriptions(orguser, subscription_data) + if error is not None: + raise HttpError(400, error) + + return result diff --git a/ddpui/api/user_preferences_api.py b/ddpui/api/user_preferences_api.py index 60d6635fc..5961f2dc4 100644 --- a/ddpui/api/user_preferences_api.py +++ b/ddpui/api/user_preferences_api.py @@ -28,6 +28,11 @@ def create_user_preferences(request, payload: CreateUserPreferencesSchema): orguser=orguser, enable_email_notifications=payload.enable_email_notifications, disclaimer_shown=payload.disclaimer_shown, + subscribe_incident_notifications=payload.subscribe_incident_notifications, + subscribe_schema_change_notifications=payload.subscribe_schema_change_notifications, + subscribe_job_failure_notifications=payload.subscribe_job_failure_notifications, + subscribe_late_runs_notifications=payload.subscribe_late_runs_notifications, + subscribe_dbt_test_failure_notifications=payload.subscribe_dbt_test_failure_notifications, ) return {"success": True, "res": user_preferences.to_json()} @@ -44,6 +49,25 @@ def update_user_preferences(request, payload: UpdateUserPreferencesSchema): user_preferences.enable_email_notifications = payload.enable_email_notifications if payload.disclaimer_shown is not None: user_preferences.disclaimer_shown = payload.disclaimer_shown + if payload.subscribe_incident_notifications is not None: + user_preferences.subscribe_incident_notifications = payload.subscribe_incident_notifications + if payload.subscribe_schema_change_notifications is not None: + user_preferences.subscribe_schema_change_notifications = ( + payload.subscribe_schema_change_notifications + ) + if payload.subscribe_job_failure_notifications is not None: + user_preferences.subscribe_job_failure_notifications = ( + payload.subscribe_job_failure_notifications + ) + if payload.subscribe_late_runs_notifications is not None: + user_preferences.subscribe_late_runs_notifications = ( + payload.subscribe_late_runs_notifications + ) + if payload.subscribe_dbt_test_failure_notifications is not None: + user_preferences.subscribe_dbt_test_failure_notifications = ( + payload.subscribe_dbt_test_failure_notifications + ) + user_preferences.save() return {"success": True, "res": user_preferences.to_json()} @@ -59,6 +83,11 @@ def get_user_preferences(request): res = { "enable_email_notifications": user_preferences.enable_email_notifications, "disclaimer_shown": user_preferences.disclaimer_shown, + "subscribe_incident_notifications": user_preferences.subscribe_incident_notifications, + "subscribe_schema_change_notifications": user_preferences.subscribe_schema_change_notifications, + "subscribe_job_failure_notifications": user_preferences.subscribe_job_failure_notifications, + "subscribe_late_runs_notifications": user_preferences.subscribe_late_runs_notifications, + "subscribe_dbt_test_failure_notifications": user_preferences.subscribe_dbt_test_failure_notifications, "is_llm_active": org_preferences.llm_optin, "enable_llm_requested": org_preferences.enable_llm_request, } @@ -88,6 +117,7 @@ def post_request_llm_analysis_feature_enabled(request): urgent=False, scheduled_time=None, recipients=[acc_manager.id for acc_manager in acc_managers], + category="incident", # Adding category field ) error, res = create_notification(notification_data) diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 0b289ba49..77a0fb18f 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -68,6 +68,11 @@ def handle_recipient( """ recipient = OrgUser.objects.get(id=recipient_id) user_preference, created = UserPreferences.objects.get_or_create(orguser=recipient) + + # Check if user is subscribed to this notification category + if not user_preference.is_subscribed_to_category(notification.category): + return None # Skip sending notification if user is not subscribed + notification_recipient = NotificationRecipient.objects.create( notification=notification, recipient=recipient ) @@ -112,6 +117,7 @@ def create_notification( urgent = notification_data.urgent scheduled_time = notification_data.scheduled_time recipients = notification_data.recipients + category = notification_data.category errors = [] notification = Notification.objects.create( @@ -120,6 +126,7 @@ def create_notification( email_subject=email_subject, urgent=urgent, scheduled_time=scheduled_time, + category=category, ) if not notification: @@ -151,6 +158,7 @@ def create_notification( "sent_time": notification.sent_time, "scheduled_time": notification.scheduled_time, "author": notification.author, + "category": notification.category, } return None, { @@ -161,7 +169,7 @@ def create_notification( # get notification history def get_notification_history( - page: int, limit: int, read_status: Optional[int] = None + page: int, limit: int, read_status: Optional[int] = None, category: Optional[str] = None ) -> Tuple[Optional[None], Dict[str, Any]]: """returns history of sent notifications""" notifications = Notification.objects @@ -169,6 +177,9 @@ def get_notification_history( if read_status: notifications = notifications.filter(read_status=(read_status == 1)) + if category: + notifications = notifications.filter(category=category) + notifications = notifications.all().order_by("-timestamp") paginator = Paginator(notifications, limit) @@ -183,6 +194,7 @@ def get_notification_history( "urgent": notification.urgent, "scheduled_time": notification.scheduled_time, "sent_time": notification.sent_time, + "category": notification.category, } for notification in paginated_notifications ] @@ -264,16 +276,23 @@ def fetch_user_notifications( def fetch_user_notifications_v1( - orguser: OrgUser, page: int, limit: int, read_status: int = None + orguser: OrgUser, page: int, limit: int, read_status: int = None, category: Optional[str] = None ) -> Tuple[Optional[None], Dict[str, Any]]: """returns all notifications for a specific user""" + filter_kwargs = { + "recipient": orguser, + "notification__sent_time__isnull": False, + } + + if read_status is not None: + filter_kwargs["read_status"] = read_status == 1 + + if category: + filter_kwargs["notification__category"] = category + notifications = ( - NotificationRecipient.objects.filter( - recipient=orguser, - notification__sent_time__isnull=False, - **({"read_status": read_status == 1} if read_status is not None else {}), - ) + NotificationRecipient.objects.filter(**filter_kwargs) .select_related("notification") .order_by("-notification__timestamp") ) @@ -295,6 +314,7 @@ def fetch_user_notifications_v1( "scheduled_time": notification.scheduled_time, "sent_time": notification.sent_time, "read_status": recipient.read_status, + "category": notification.category, } ) @@ -380,3 +400,88 @@ def get_unread_notifications_count( ).count() return None, {"success": True, "res": unread_count} + + +# get urgent notifications that haven't been dismissed +def get_urgent_notifications( + orguser: OrgUser, +) -> Tuple[Optional[None], Dict[str, Any]]: + """ + Returns urgent notifications that haven't been dismissed by the user. + """ + urgent_notifications = ( + NotificationRecipient.objects.filter( + recipient=orguser, + notification__urgent=True, + notification__sent_time__isnull=False, + ) + .exclude(notification__dismissed_by=orguser) + .select_related("notification") + .order_by("-notification__timestamp") + ) + + notifications_list = [] + for recipient in urgent_notifications: + notification = recipient.notification + notifications_list.append( + { + "id": notification.id, + "author": notification.author, + "message": notification.message, + "timestamp": notification.timestamp, + "category": notification.category, + "read_status": recipient.read_status, + } + ) + + return None, {"success": True, "res": notifications_list} + + +# dismiss urgent notification +def dismiss_urgent_notification( + orguser: OrgUser, notification_id: int +) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: + """ + Dismiss an urgent notification for a specific user. + """ + try: + notification = Notification.objects.get(id=notification_id, urgent=True) + notification.dismissed_by.add(orguser) + return None, {"success": True, "message": "Urgent notification dismissed successfully"} + except Notification.DoesNotExist: + return "Urgent notification not found", None + + +# update category subscription preferences +def update_category_subscriptions( + orguser: OrgUser, subscription_data: Dict[str, bool] +) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: + """ + Update user's category subscription preferences. + """ + try: + user_preference, created = UserPreferences.objects.get_or_create(orguser=orguser) + + for field, value in subscription_data.items(): + if hasattr(user_preference, field) and value is not None: + setattr(user_preference, field, value) + + user_preference.save() + + return None, { + "success": True, + "message": "Category subscriptions updated successfully", + "preferences": user_preference.to_json(), + } + except Exception as e: + return f"Error updating category subscriptions: {str(e)}", None + + +# get notifications by category +def get_notifications_by_category( + orguser: OrgUser, category: str, page: int = 1, limit: int = 10 +) -> Tuple[Optional[None], Dict[str, Any]]: + """ + Get notifications for a specific category for the user. + """ + return fetch_user_notifications_v1(orguser, page, limit, category=category) diff --git a/ddpui/models/notifications.py b/ddpui/models/notifications.py index 47bd16976..39ffd0d2d 100644 --- a/ddpui/models/notifications.py +++ b/ddpui/models/notifications.py @@ -2,6 +2,16 @@ from ddpui.models.org_user import OrgUser +class NotificationCategory(models.TextChoices): + """Choices for notification categories""" + + INCIDENT = "incident", "Incident" + SCHEMA_CHANGE = "schema_change", "Schema Change" + JOB_FAILURE = "job_failure", "Job Failure" + LATE_RUNS = "late_runs", "Late Runs" + DBT_TEST_FAILURE = "dbt_test_failure", "DBT Test Failure" + + class Notification(models.Model): """Model to store notifications for users""" @@ -12,6 +22,18 @@ class Notification(models.Model): urgent = models.BooleanField(default=False) scheduled_time = models.DateTimeField(null=True, blank=True) sent_time = models.DateTimeField(null=True, blank=True) + category = models.CharField( + max_length=20, + choices=NotificationCategory.choices, + default=NotificationCategory.INCIDENT, + help_text="Category of the notification", + ) + dismissed_by = models.ManyToManyField( + OrgUser, + blank=True, + related_name="dismissed_notifications", + help_text="Users who have dismissed this urgent notification", + ) class NotificationRecipient(models.Model): diff --git a/ddpui/models/userpreferences.py b/ddpui/models/userpreferences.py index 0c36c91a5..d90aef15c 100644 --- a/ddpui/models/userpreferences.py +++ b/ddpui/models/userpreferences.py @@ -11,6 +11,24 @@ class UserPreferences(models.Model): discord_webhook = models.URLField(blank=True, null=True) # deprecated enable_email_notifications = models.BooleanField(default=False) disclaimer_shown = models.BooleanField(default=False) + + # Category-based subscription preferences + subscribe_incident_notifications = models.BooleanField( + default=True, help_text="Subscribe to incident notifications" + ) + subscribe_schema_change_notifications = models.BooleanField( + default=True, help_text="Subscribe to schema change notifications" + ) + subscribe_job_failure_notifications = models.BooleanField( + default=True, help_text="Subscribe to job failure notifications" + ) + subscribe_late_runs_notifications = models.BooleanField( + default=True, help_text="Subscribe to late runs notifications" + ) + subscribe_dbt_test_failure_notifications = models.BooleanField( + default=True, help_text="Subscribe to DBT test failure notifications" + ) + created_at = models.DateTimeField(default=timezone.now) updated_at = models.DateTimeField(default=timezone.now) @@ -19,4 +37,20 @@ def to_json(self) -> dict: return { "enable_email_notifications": self.enable_email_notifications, "disclaimer_shown": self.disclaimer_shown, + "subscribe_incident_notifications": self.subscribe_incident_notifications, + "subscribe_schema_change_notifications": self.subscribe_schema_change_notifications, + "subscribe_job_failure_notifications": self.subscribe_job_failure_notifications, + "subscribe_late_runs_notifications": self.subscribe_late_runs_notifications, + "subscribe_dbt_test_failure_notifications": self.subscribe_dbt_test_failure_notifications, + } + + def is_subscribed_to_category(self, category: str) -> bool: + """Check if user is subscribed to a specific notification category""" + category_mapping = { + "incident": self.subscribe_incident_notifications, + "schema_change": self.subscribe_schema_change_notifications, + "job_failure": self.subscribe_job_failure_notifications, + "late_runs": self.subscribe_late_runs_notifications, + "dbt_test_failure": self.subscribe_dbt_test_failure_notifications, } + return category_mapping.get(category, True) diff --git a/ddpui/schemas/notifications_api_schemas.py b/ddpui/schemas/notifications_api_schemas.py index 1c6ffd723..17572a430 100644 --- a/ddpui/schemas/notifications_api_schemas.py +++ b/ddpui/schemas/notifications_api_schemas.py @@ -16,6 +16,18 @@ class SentToEnum(str, Enum): SINGLE_USER = "single_user" +class NotificationCategoryEnum(str, Enum): + """ + Schema for notification categories + """ + + INCIDENT = "incident" + SCHEMA_CHANGE = "schema_change" + JOB_FAILURE = "job_failure" + LATE_RUNS = "late_runs" + DBT_TEST_FAILURE = "dbt_test_failure" + + class CreateNotificationPayloadSchema(BaseModel): """Schema for creating a new notification api.""" @@ -27,6 +39,7 @@ class CreateNotificationPayloadSchema(BaseModel): user_email: Optional[str] = None manager_or_above: Optional[bool] = False org_slug: Optional[str] = None + category: Optional[NotificationCategoryEnum] = NotificationCategoryEnum.INCIDENT class Config: use_enum_values = True @@ -55,3 +68,20 @@ class NotificationDataSchema(Schema): urgent: Optional[bool] = False scheduled_time: Optional[datetime] = None recipients: List[int] # list of orguser ids + category: Optional[str] = "incident" + + +class DismissUrgentNotificationSchema(Schema): + """Schema for dismissing urgent notifications""" + + notification_id: int + + +class CategorySubscriptionSchema(Schema): + """Schema for updating category subscription preferences""" + + subscribe_incident_notifications: Optional[bool] = None + subscribe_schema_change_notifications: Optional[bool] = None + subscribe_job_failure_notifications: Optional[bool] = None + subscribe_late_runs_notifications: Optional[bool] = None + subscribe_dbt_test_failure_notifications: Optional[bool] = None diff --git a/ddpui/schemas/userpreferences_schema.py b/ddpui/schemas/userpreferences_schema.py index 7651686f2..a95c5ff89 100644 --- a/ddpui/schemas/userpreferences_schema.py +++ b/ddpui/schemas/userpreferences_schema.py @@ -7,6 +7,11 @@ class CreateUserPreferencesSchema(Schema): enable_email_notifications: bool disclaimer_shown: Optional[bool] = None + subscribe_incident_notifications: Optional[bool] = True + subscribe_schema_change_notifications: Optional[bool] = True + subscribe_job_failure_notifications: Optional[bool] = True + subscribe_late_runs_notifications: Optional[bool] = True + subscribe_dbt_test_failure_notifications: Optional[bool] = True class UpdateUserPreferencesSchema(Schema): @@ -14,3 +19,8 @@ class UpdateUserPreferencesSchema(Schema): enable_email_notifications: Optional[bool] = None disclaimer_shown: Optional[bool] = None + subscribe_incident_notifications: Optional[bool] = None + subscribe_schema_change_notifications: Optional[bool] = None + subscribe_job_failure_notifications: Optional[bool] = None + subscribe_late_runs_notifications: Optional[bool] = None + subscribe_dbt_test_failure_notifications: Optional[bool] = None From 19177c1e9386188ea484eb409d9fad8fa9fa8966 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Tue, 5 Aug 2025 17:10:10 +0530 Subject: [PATCH 03/22] migrations --- ...gory_notification_dismissed_by_and_more.py | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py diff --git a/ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py b/ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py new file mode 100644 index 000000000..1fe9cb2f1 --- /dev/null +++ b/ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py @@ -0,0 +1,73 @@ +# Generated by Django 4.2 on 2025-08-05 11:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("ddpui", "0125_remove_invitation_invited_role_remove_orguser_role"), + ] + + operations = [ + migrations.AddField( + model_name="notification", + name="category", + field=models.CharField( + choices=[ + ("incident", "Incident"), + ("schema_change", "Schema Change"), + ("job_failure", "Job Failure"), + ("late_runs", "Late Runs"), + ("dbt_test_failure", "DBT Test Failure"), + ], + default="incident", + help_text="Category of the notification", + max_length=20, + ), + ), + migrations.AddField( + model_name="notification", + name="dismissed_by", + field=models.ManyToManyField( + blank=True, + help_text="Users who have dismissed this urgent notification", + related_name="dismissed_notifications", + to="ddpui.orguser", + ), + ), + migrations.AddField( + model_name="userpreferences", + name="subscribe_dbt_test_failure_notifications", + field=models.BooleanField( + default=True, help_text="Subscribe to DBT test failure notifications" + ), + ), + migrations.AddField( + model_name="userpreferences", + name="subscribe_incident_notifications", + field=models.BooleanField( + default=True, help_text="Subscribe to incident notifications" + ), + ), + migrations.AddField( + model_name="userpreferences", + name="subscribe_job_failure_notifications", + field=models.BooleanField( + default=True, help_text="Subscribe to job failure notifications" + ), + ), + migrations.AddField( + model_name="userpreferences", + name="subscribe_late_runs_notifications", + field=models.BooleanField( + default=True, help_text="Subscribe to late runs notifications" + ), + ), + migrations.AddField( + model_name="userpreferences", + name="subscribe_schema_change_notifications", + field=models.BooleanField( + default=True, help_text="Subscribe to schema change notifications" + ), + ), + ] From d90eeb988a94bede6753e8f66453744755aaacc8 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Thu, 7 Aug 2025 15:06:32 +0530 Subject: [PATCH 04/22] Remove update category subscriptions functionality from notifications API and service as this can be done from user preference. --- ddpui/api/notifications_api.py | 26 -------------------------- ddpui/core/notifications_service.py | 25 ------------------------- 2 files changed, 51 deletions(-) diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index 04baecfca..d8a944a00 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -322,29 +322,3 @@ def get_notifications_by_category(request, category: str, page: int = 1, limit: raise HttpError(400, error) return result - - -@notification_router.put("/category-subscriptions") -def update_category_subscriptions(request, payload: CategorySubscriptionSchema): - """ - Update the user's category subscription preferences. - - Allows users to subscribe or unsubscribe from specific notification categories. - - Args: - request: HTTP request object containing orguser authentication data - payload: Schema containing the subscription preferences to update - - Returns: - dict: Updated preferences and success message - - Raises: - HttpError: 400 if update fails - """ - orguser: OrgUser = request.orguser - subscription_data = payload.dict(exclude_none=True) - error, result = notifications_service.update_category_subscriptions(orguser, subscription_data) - if error is not None: - raise HttpError(400, error) - - return result diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 77a0fb18f..0bf4b494f 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -452,31 +452,6 @@ def dismiss_urgent_notification( return "Urgent notification not found", None -# update category subscription preferences -def update_category_subscriptions( - orguser: OrgUser, subscription_data: Dict[str, bool] -) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: - """ - Update user's category subscription preferences. - """ - try: - user_preference, created = UserPreferences.objects.get_or_create(orguser=orguser) - - for field, value in subscription_data.items(): - if hasattr(user_preference, field) and value is not None: - setattr(user_preference, field, value) - - user_preference.save() - - return None, { - "success": True, - "message": "Category subscriptions updated successfully", - "preferences": user_preference.to_json(), - } - except Exception as e: - return f"Error updating category subscriptions: {str(e)}", None - - # get notifications by category def get_notifications_by_category( orguser: OrgUser, category: str, page: int = 1, limit: int = 10 From 2c88e70ad498c67761bce8e7ee2109ed0c981d6f Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Thu, 7 Aug 2025 15:23:52 +0530 Subject: [PATCH 05/22] Add category argument to create_notification command and validate input --- ddpui/management/commands/create_notification.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/ddpui/management/commands/create_notification.py b/ddpui/management/commands/create_notification.py index 6a69aab17..e59bced67 100644 --- a/ddpui/management/commands/create_notification.py +++ b/ddpui/management/commands/create_notification.py @@ -7,6 +7,7 @@ NotificationDataSchema, ) from ddpui.models.org import Org +from ddpui.models.notifications import NotificationCategory class Command(BaseCommand): @@ -52,6 +53,7 @@ def add_arguments(self, parser): help="The scheduled time of the notification in ISO 8601 format (e.g., 2024-08-30T19:51:51Z)", ) parser.add_argument("--dry-run", action="store_true", help="Dry run mode") + parser.add_argument("--category", type=str, help="Category of the notification") def handle(self, *args, **options): # Parse scheduled_time @@ -100,6 +102,14 @@ def handle(self, *args, **options): print(f"Error in getting recipients: {error}") sys.exit(1) + # Validate category + category = options.get("category") + if category and category not in NotificationCategory.values: + print( + f"Invalid category: '{category}'. Must be one of: {', '.join(NotificationCategory.values)}" + ) + sys.exit(1) + # Create notification data notification_data = NotificationDataSchema( author=options["author"], @@ -108,6 +118,8 @@ def handle(self, *args, **options): message=message, urgent=options.get("urgent", False), scheduled_time=scheduled_time, + category=options.get("category") + or NotificationCategory.INCIDENT, # Default to 'incident' if not provided ) if options["dry_run"]: From 0630021effc8b37ddf0a95862083e2ddd8bf90ed Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Thu, 7 Aug 2025 15:58:37 +0530 Subject: [PATCH 06/22] Filte recipients based on category subscription. --- ddpui/api/notifications_api.py | 7 ++++++- ddpui/core/notifications_service.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index d8a944a00..e76fd09ef 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -34,6 +34,7 @@ def post_create_notification(request, payload: CreateNotificationPayloadSchema): - org_slug: Target organization - user_email: Specific user email (optional) - manager_or_above: Whether to include managers (optional) + - category: Notification category (optional, defaults to INCIDENT) Returns: dict: Created notification details including ID and delivery status @@ -44,7 +45,11 @@ def post_create_notification(request, payload: CreateNotificationPayloadSchema): # Filter OrgUser data based on sent_to field error, recipients = notifications_service.get_recipients( - payload.sent_to, payload.org_slug, payload.user_email, payload.manager_or_above + payload.sent_to, + payload.org_slug, + payload.user_email, + payload.manager_or_above, + payload.category, ) if error is not None: diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 0bf4b494f..7c62e79df 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -18,7 +18,11 @@ def get_recipients( - sent_to: str, org_slug: str, user_email: str, manager_or_above: bool + sent_to: str, + org_slug: str, + user_email: str, + manager_or_above: bool, + category: Optional[str] = None, ) -> Tuple[Optional[str], Optional[List[int]]]: """Returns the list of recipients based on the request parameters""" @@ -52,6 +56,29 @@ def get_recipients( "id", flat=True ) + # Map category string to the correct field name + category_field_mapping = { + "incident": "subscribe_incident_notifications", + "schema_change": "subscribe_schema_change_notifications", + "job_failure": "subscribe_job_failure_notifications", + "late_runs": "subscribe_late_runs_notifications", + "dbt_test_failure": "subscribe_dbt_test_failure_notifications", + } + + preference_field = category_field_mapping.get(category) + + if preference_field: + # Filter recipients by subscription preference + recipients = ( + OrgUser.objects.filter( + id__in=recipients, + preferences__isnull=False, + ) + .filter(**{f"preferences__{preference_field}": True}) + .values_list("id", flat=True) + ) + + # If no recipients found, return an error message if not recipients: return "No users found for the given information", None From d1c68795023054ff14ed23f14e39977f860c3790 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 12 Aug 2025 16:36:26 +0530 Subject: [PATCH 07/22] Remove dismissed by, allow null category and update the get urgent notifications logic. --- ddpui/api/notifications_api.py | 29 ---------------- ddpui/core/notifications_service.py | 17 +--------- .../commands/create_notification.py | 3 +- ...move_notification_dismissed_by_and_more.py | 34 +++++++++++++++++++ ddpui/migrations/0128_allow_null_category.py | 30 ++++++++++++++++ ddpui/models/notifications.py | 10 ++---- ddpui/schemas/notifications_api_schemas.py | 6 ---- 7 files changed, 69 insertions(+), 60 deletions(-) create mode 100644 ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py create mode 100644 ddpui/migrations/0128_allow_null_category.py diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index e76fd09ef..2f1c875a7 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -6,7 +6,6 @@ CreateNotificationPayloadSchema, UpdateReadStatusSchema, UpdateReadStatusSchemav1, - DismissUrgentNotificationSchema, CategorySubscriptionSchema, ) from ddpui.models.org_user import OrgUser @@ -274,34 +273,6 @@ def get_urgent_notifications(request): return result -@notification_router.post("/urgent/dismiss") -def dismiss_urgent_notification(request, payload: DismissUrgentNotificationSchema): - """ - Dismiss an urgent notification for the authenticated user. - - Marks an urgent notification as dismissed so it won't appear in the - urgent notifications bar anymore. - - Args: - request: HTTP request object containing orguser authentication data - payload: Schema containing the notification_id to dismiss - - Returns: - dict: Success message - - Raises: - HttpError: 400 if dismissal fails - """ - orguser: OrgUser = request.orguser - error, result = notifications_service.dismiss_urgent_notification( - orguser, payload.notification_id - ) - if error is not None: - raise HttpError(400, error) - - return result - - @notification_router.get("/categories/{category}") def get_notifications_by_category(request, category: str, page: int = 1, limit: int = 10): """ diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 7c62e79df..1492e9eeb 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -441,8 +441,8 @@ def get_urgent_notifications( recipient=orguser, notification__urgent=True, notification__sent_time__isnull=False, + read_status=False, ) - .exclude(notification__dismissed_by=orguser) .select_related("notification") .order_by("-notification__timestamp") ) @@ -464,21 +464,6 @@ def get_urgent_notifications( return None, {"success": True, "res": notifications_list} -# dismiss urgent notification -def dismiss_urgent_notification( - orguser: OrgUser, notification_id: int -) -> Tuple[Optional[str], Optional[Dict[str, Any]]]: - """ - Dismiss an urgent notification for a specific user. - """ - try: - notification = Notification.objects.get(id=notification_id, urgent=True) - notification.dismissed_by.add(orguser) - return None, {"success": True, "message": "Urgent notification dismissed successfully"} - except Notification.DoesNotExist: - return "Urgent notification not found", None - - # get notifications by category def get_notifications_by_category( orguser: OrgUser, category: str, page: int = 1, limit: int = 10 diff --git a/ddpui/management/commands/create_notification.py b/ddpui/management/commands/create_notification.py index e59bced67..e8c13ed9b 100644 --- a/ddpui/management/commands/create_notification.py +++ b/ddpui/management/commands/create_notification.py @@ -118,8 +118,7 @@ def handle(self, *args, **options): message=message, urgent=options.get("urgent", False), scheduled_time=scheduled_time, - category=options.get("category") - or NotificationCategory.INCIDENT, # Default to 'incident' if not provided + category=options.get("category"), # Default to None if not provided ) if options["dry_run"]: diff --git a/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py b/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py new file mode 100644 index 000000000..475d21430 --- /dev/null +++ b/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py @@ -0,0 +1,34 @@ +# Generated by Django 4.2 on 2025-08-12 10:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("ddpui", "0126_notification_category_notification_dismissed_by_and_more"), + ] + + operations = [ + migrations.RemoveField( + model_name="notification", + name="dismissed_by", + ), + migrations.AlterField( + model_name="notification", + name="category", + field=models.CharField( + blank=True, + choices=[ + ("incident", "Incident"), + ("schema_change", "Schema Change"), + ("job_failure", "Job Failure"), + ("late_runs", "Late Runs"), + ("dbt_test_failure", "DBT Test Failure"), + ], + default=None, + help_text="Category of the notification", + max_length=20, + null=True, + ), + ), + ] diff --git a/ddpui/migrations/0128_allow_null_category.py b/ddpui/migrations/0128_allow_null_category.py new file mode 100644 index 000000000..2a63b42b5 --- /dev/null +++ b/ddpui/migrations/0128_allow_null_category.py @@ -0,0 +1,30 @@ +# Generated by Django 4.2 on 2025-08-12 10:58 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("ddpui", "0127_remove_notification_dismissed_by_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="notification", + name="category", + field=models.CharField( + blank=True, + choices=[ + ("incident", "Incident"), + ("schema_change", "Schema Change"), + ("job_failure", "Job Failure"), + ("late_runs", "Late Runs"), + ("dbt_test_failure", "DBT Test Failure"), + ], + default=None, + help_text="Category of the notification", + max_length=20, + null=True, + ), + ), + ] diff --git a/ddpui/models/notifications.py b/ddpui/models/notifications.py index 39ffd0d2d..ce48de571 100644 --- a/ddpui/models/notifications.py +++ b/ddpui/models/notifications.py @@ -25,14 +25,10 @@ class Notification(models.Model): category = models.CharField( max_length=20, choices=NotificationCategory.choices, - default=NotificationCategory.INCIDENT, - help_text="Category of the notification", - ) - dismissed_by = models.ManyToManyField( - OrgUser, + null=True, blank=True, - related_name="dismissed_notifications", - help_text="Users who have dismissed this urgent notification", + default=None, + help_text="Category of the notification", ) diff --git a/ddpui/schemas/notifications_api_schemas.py b/ddpui/schemas/notifications_api_schemas.py index 17572a430..79e086fc7 100644 --- a/ddpui/schemas/notifications_api_schemas.py +++ b/ddpui/schemas/notifications_api_schemas.py @@ -71,12 +71,6 @@ class NotificationDataSchema(Schema): category: Optional[str] = "incident" -class DismissUrgentNotificationSchema(Schema): - """Schema for dismissing urgent notifications""" - - notification_id: int - - class CategorySubscriptionSchema(Schema): """Schema for updating category subscription preferences""" From 1e747f4dc38e1b42629e58b093f70c116fca0e37 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 12 Aug 2025 16:38:37 +0530 Subject: [PATCH 08/22] Delete agent_forge.yaml --- agent_forge.yaml | 110 ----------------------------------------------- 1 file changed, 110 deletions(-) delete mode 100644 agent_forge.yaml diff --git a/agent_forge.yaml b/agent_forge.yaml deleted file mode 100644 index a51c01fe2..000000000 --- a/agent_forge.yaml +++ /dev/null @@ -1,110 +0,0 @@ -# yaml-language-server: $schema=https://raw.githubusercontent.com/antinomyhq/forge/refs/heads/main/forge.schema.json -agents: -- id: code_og - title: Answers questions about the codebase - description: |- - Provides detailed understanding of the codebase. - Use it to ask questions about the code, its structure, and how different components interact. - system_prompt: | - You are Forge, a specialized AI assistant with deep knowledge of software architecture, programming patterns, and code analysis. - - Your primary responsibilities: - - Analyze and explain code structure, architecture, and design patterns - - Trace data flow and execution paths through the application - - Identify relationships between different modules, classes, and functions - - Explain business logic and implementation details - - Answer questions about dependencies, imports, and external integrations - - Help understand database schemas, API endpoints, and data models - - Provide insights into performance implications and potential bottlenecks - - Identify security considerations and potential vulnerabilities - - Explain configuration files, environment variables, and deployment setup - - When answering questions: - 1. **Be Specific**: Reference exact file paths, line numbers, function names, and class names - 2. **Show Context**: Explain how the code fits into the larger system architecture - 3. **Trace Connections**: Identify how different parts of the codebase interact - 4. **Explain Purpose**: Describe not just what the code does, but why it exists - 5. **Highlight Patterns**: Point out design patterns, coding conventions, and architectural decisions - 6. **Consider Impact**: Explain how changes might affect other parts of the system - 7. **Provide Examples**: Use concrete code snippets to illustrate explanations - 8. **Flag Issues**: Identify potential bugs, code smells, or improvement opportunities - - Always base your answers on the actual codebase content. If you need to see specific files or need more context to provide a complete answer, ask for clarification. - tools: - - forge_tool_fs_read - - forge_tool_net_fetch - - forge_tool_fs_search - - forge_tool_fs_list -- id: spec_og - title: Specification Specialist - description: "An expert agent that helps translate high-level feature requests, \nenhancement ideas, and bug reports into detailed, actionable specifications. \nThis agent excels at breaking down complex requirements, identifying edge cases,\n defining acceptance criteria, and creating comprehensive technical specifications \n that development teams can implement effectively.\n" - system_prompt: "You are Forge, an expert at creating detailed, actionable specifications for software features, enhancements, and fixes.\n\nYour primary responsibilities:\n- Analyze high-level requirements and break them into detailed specifications\n- Identify and document edge cases, error scenarios, and boundary conditions\n- Define clear acceptance criteria and success metrics\n- Create user stories with proper format (As a... I want... So that...)\n- Specify technical requirements, dependencies, and constraints\n- Outline testing scenarios and validation approaches\n- Consider security, performance, and scalability implications\n- Document API contracts, data models, and integration points\n\nWhen creating specifications, always include:\n1. **Overview**: Clear problem statement and solution summary\n2. **User Stories**: Well-formed user stories with acceptance criteria\n3. **Technical Requirements**: Detailed implementation requirements\n4. **Edge Cases**: Potential failure scenarios and error handling\n5. **Dependencies**: Required systems, data, or external services\n6. **Testing Strategy**: How the feature should be validated\n7. **Performance Criteria**: Expected performance benchmarks\n8. **Security Considerations**: Auth, permissions, data protection\n9. **Migration/Rollback Plan**: For changes to existing features\n\nAsk clarifying questions when requirements are ambiguous. Be thorough but concise. \nFocus on creating specifications that minimize back-and-forth during development.\n\nWhen you are ready with the final spec, ask the user to confirm \nif they want to proceed with the implementation.\n" - tools: - - code_og - - forge_tool_fs_read - - forge_tool_net_fetch - - forge_tool_fs_search - - forge_tool_fs_create - - forge_tool_fs_patch - - forge_tool_display_show_user -- id: forge - title: Implementation Specialist - description: "A comprehensive implementation agent that takes specifications and transforms them into working code. \nThis agent follows a structured approach: first gathering or creating detailed specifications, \nthen developing an implementation plan, and finally executing the changes with user approval at each step.\nHandles everything from small bug fixes to complex feature implementations." - system_prompt: | - You are Forge, a senior software engineer AI specialized in implementing features, enhancements, and fixes based on detailed specifications. - - Your implementation workflow: - 1. **Specification Gathering**: Ensure you have a complete, detailed specification - - If no spec exists, use the spec_og agent to create one - - Verify the spec covers all requirements, edge cases, and acceptance criteria - - Ask clarifying questions if anything is unclear - - 2. **Implementation Planning**: Create a comprehensive implementation plan - - Break down the work into logical phases/steps - - Identify all files that need to be created, modified, or removed - - Determine the order of implementation to avoid conflicts - - Consider database migrations, API changes, and configuration updates - - Plan for testing and validation approaches - - 3. **User Approval**: Present the plan and get explicit approval - - Summarize what will be implemented - - List all files that will be changed - - Highlight any potential risks or breaking changes - - Wait for user confirmation before proceeding - - 4. **Implementation**: Execute the plan methodically - - Implement changes in the planned order - - Write clean, maintainable code following existing patterns - - Add appropriate error handling and logging - - Include necessary tests and documentation - - Validate each step before moving to the next - - 5. **Verification**: Ensure the implementation meets requirements - - Test the implemented functionality - - Verify all acceptance criteria are met - - Check for any regressions or side effects - - Best practices to follow: - - Maintain consistency with existing code style and patterns - - Add comprehensive error handling and input validation - - Include appropriate logging and monitoring - - Write unit tests for new functionality - - Update documentation when necessary - - Consider backward compatibility and migration paths - - Follow security best practices - - Optimize for performance where relevant - - Always ask for user approval before making significant changes. Be transparent about what you're doing and why. - tools: - - forge_tool_fs_read - - forge_tool_fs_create - - forge_tool_fs_remove - - forge_tool_fs_patch - - forge_tool_process_shell - - forge_tool_net_fetch - - forge_tool_fs_search - - forge_tool_fs_undo - - forge_tool_display_show_user - - forge_tool_fs_list - - spec_og -model: anthropic/claude-sonnet-4 From 5af572607b12398fc5f1d044260404e7811c1b8c Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 12 Aug 2025 16:39:56 +0530 Subject: [PATCH 09/22] Delete NOTIFICATIONS_V2_DOCUMENTATION.md --- NOTIFICATIONS_V2_DOCUMENTATION.md | 303 ------------------------------ 1 file changed, 303 deletions(-) delete mode 100644 NOTIFICATIONS_V2_DOCUMENTATION.md diff --git a/NOTIFICATIONS_V2_DOCUMENTATION.md b/NOTIFICATIONS_V2_DOCUMENTATION.md deleted file mode 100644 index ca91ee132..000000000 --- a/NOTIFICATIONS_V2_DOCUMENTATION.md +++ /dev/null @@ -1,303 +0,0 @@ -# Notifications v2 Enhancement Documentation - -## Overview - -This document describes the enhancements made to the Dalgo notification system to support categorized notifications, category-based subscriptions, and urgent notification management. - -## New Features - -### 1. Notification Categories - -Notifications are now categorized into the following types: - -- **incident**: Platform incidents, downtime notifications (sent by platform admins) -- **schema_change**: Database schema changes (sent by platform) -- **job_failure**: Job execution failures (sent by platform) -- **late_runs**: Pipeline runs that are running late (sent by platform) -- **dbt_test_failure**: DBT test failures (sent by platform) - -### 2. Category-based Subscriptions - -Users can now subscribe/unsubscribe from specific notification categories: - -- Each user has individual subscription preferences for each category -- By default, users are subscribed to all categories -- Notifications are only sent to users who are subscribed to that category -- Subscription preferences are stored in the UserPreferences model - -### 3. Urgent Notification Bar - -Urgent notifications now support dismissal functionality: - -- Urgent notifications can be dismissed by individual users -- Dismissed notifications won't appear in the urgent notification bar -- The system tracks which users have dismissed each urgent notification - -## Database Changes - -### Notification Model Updates - -```python -class Notification(models.Model): - # ... existing fields ... - category = models.CharField( - max_length=20, - choices=NotificationCategory.choices, - default=NotificationCategory.INCIDENT, - help_text="Category of the notification" - ) - dismissed_by = models.ManyToManyField( - OrgUser, - blank=True, - related_name="dismissed_notifications", - help_text="Users who have dismissed this urgent notification" - ) -``` - -### UserPreferences Model Updates - -```python -class UserPreferences(models.Model): - # ... existing fields ... - subscribe_incident_notifications = models.BooleanField(default=True) - subscribe_schema_change_notifications = models.BooleanField(default=True) - subscribe_job_failure_notifications = models.BooleanField(default=True) - subscribe_late_runs_notifications = models.BooleanField(default=True) - subscribe_dbt_test_failure_notifications = models.BooleanField(default=True) -``` - -## API Endpoints - -### Enhanced Existing Endpoints - -#### 1. Create Notification -**POST** `/notifications/` - -Now accepts a `category` field: - -```json -{ - "author": "admin@example.com", - "message": "System maintenance scheduled", - "sent_to": "all_users", - "urgent": true, - "category": "incident" -} -``` - -#### 2. Get Notification History -**GET** `/notifications/history` - -Now supports category filtering: - -``` -GET /notifications/history?category=incident&page=1&limit=10 -``` - -#### 3. Get User Notifications -**GET** `/notifications/v1` - -Now supports category filtering: - -``` -GET /notifications/v1?category=job_failure&read_status=0 -``` - -### New Endpoints - -#### 1. Get Urgent Notifications -**GET** `/notifications/urgent` - -Returns urgent notifications that haven't been dismissed by the user: - -```json -{ - "success": true, - "res": [ - { - "id": 123, - "author": "admin@example.com", - "message": "System maintenance in progress", - "timestamp": "2024-01-15T10:00:00Z", - "category": "incident", - "read_status": false - } - ] -} -``` - -#### 2. Dismiss Urgent Notification -**POST** `/notifications/urgent/dismiss` - -Dismisses an urgent notification for the current user: - -```json -{ - "notification_id": 123 -} -``` - -#### 3. Get Notifications by Category -**GET** `/notifications/categories/{category}` - -Returns notifications for a specific category: - -``` -GET /notifications/categories/job_failure?page=1&limit=10 -``` - -#### 4. Update Category Subscriptions -**PUT** `/notifications/category-subscriptions` - -Updates user's category subscription preferences: - -```json -{ - "subscribe_incident_notifications": true, - "subscribe_job_failure_notifications": false, - "subscribe_late_runs_notifications": true -} -``` - -### User Preferences API Updates - -#### Get User Preferences -**GET** `/user-preferences/` - -Now returns category subscription preferences: - -```json -{ - "success": true, - "res": { - "enable_email_notifications": true, - "disclaimer_shown": true, - "subscribe_incident_notifications": true, - "subscribe_schema_change_notifications": true, - "subscribe_job_failure_notifications": false, - "subscribe_late_runs_notifications": true, - "subscribe_dbt_test_failure_notifications": true - } -} -``` - -#### Update User Preferences -**PUT** `/user-preferences/` - -Now accepts category subscription fields: - -```json -{ - "enable_email_notifications": true, - "subscribe_incident_notifications": false, - "subscribe_job_failure_notifications": true -} -``` - -## Business Logic Changes - -### Notification Delivery - -The notification delivery logic now includes category subscription checking: - -1. When a notification is created, the system checks each recipient's subscription preferences -2. Notifications are only sent to users who are subscribed to that category -3. Users who are not subscribed to a category will not receive notifications of that type - -### Urgent Notification Management - -1. Urgent notifications are tracked separately for dismissal -2. Each user can dismiss urgent notifications independently -3. Dismissed urgent notifications won't appear in the urgent notification bar -4. The dismissal state is persistent across sessions - -## Frontend Integration Points - -### Urgent Notification Bar - -The frontend should: - -1. Call `GET /notifications/urgent` to get urgent notifications -2. Display them in a prominent horizontal bar at the top of the page -3. Allow users to dismiss notifications via `POST /notifications/urgent/dismiss` -4. Remove dismissed notifications from the bar - -### Category Management - -The frontend should: - -1. Provide a settings page for category subscriptions -2. Use `GET /user-preferences/` to get current subscription settings -3. Use `PUT /user-preferences/` or `PUT /notifications/category-subscriptions` to update settings -4. Allow filtering notifications by category in the notification list - -### Notification Display - -The frontend should: - -1. Display the category for each notification -2. Allow filtering by category using the enhanced API endpoints -3. Group notifications by category if desired - -## Migration Notes - -**Important**: Database migrations need to be created and run to add the new fields: - -1. Add `category` field to Notification model -2. Add `dismissed_by` ManyToMany field to Notification model -3. Add category subscription fields to UserPreferences model - -## Testing - -A test script (`test_notifications_v2.py`) has been created to verify the implementation. Run it with: - -```bash -source .venv/bin/activate -python test_notifications_v2.py -``` - -## Backward Compatibility - -- All existing API endpoints continue to work as before -- New fields have sensible defaults -- Existing notifications will have the default category "incident" -- Existing users will be subscribed to all categories by default - -## Usage Examples - -### Creating Category-specific Notifications - -```python -# Job failure notification -notification_data = { - "author": "system@dalgo.com", - "message": "Pipeline 'daily_etl' failed", - "category": "job_failure", - "urgent": False, - "recipients": [user_id] -} - -# Incident notification -notification_data = { - "author": "admin@dalgo.com", - "message": "System maintenance starting in 30 minutes", - "category": "incident", - "urgent": True, - "recipients": all_user_ids -} -``` - -### Managing User Subscriptions - -```python -# Unsubscribe from job failure notifications -user_prefs.subscribe_job_failure_notifications = False -user_prefs.save() - -# Check if user is subscribed to a category -if user_prefs.is_subscribed_to_category("incident"): - # Send notification - pass -``` - -This enhancement provides a much more flexible and user-friendly notification system that allows users to control what types of notifications they receive while ensuring important urgent messages are prominently displayed. \ No newline at end of file From 22a5bdd2282a769c60991450e761751610a3152c Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 12 Aug 2025 16:49:25 +0530 Subject: [PATCH 10/22] Add MAINTENANCE category to notification models and update default category in payload schema --- ddpui/models/notifications.py | 1 + ddpui/schemas/notifications_api_schemas.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ddpui/models/notifications.py b/ddpui/models/notifications.py index ce48de571..3d0ab614f 100644 --- a/ddpui/models/notifications.py +++ b/ddpui/models/notifications.py @@ -10,6 +10,7 @@ class NotificationCategory(models.TextChoices): JOB_FAILURE = "job_failure", "Job Failure" LATE_RUNS = "late_runs", "Late Runs" DBT_TEST_FAILURE = "dbt_test_failure", "DBT Test Failure" + MAINTENANCE = "maintenance", "Maintenance" class Notification(models.Model): diff --git a/ddpui/schemas/notifications_api_schemas.py b/ddpui/schemas/notifications_api_schemas.py index 79e086fc7..fd17e85c0 100644 --- a/ddpui/schemas/notifications_api_schemas.py +++ b/ddpui/schemas/notifications_api_schemas.py @@ -26,6 +26,7 @@ class NotificationCategoryEnum(str, Enum): JOB_FAILURE = "job_failure" LATE_RUNS = "late_runs" DBT_TEST_FAILURE = "dbt_test_failure" + MAINTENANCE = "maintenance" class CreateNotificationPayloadSchema(BaseModel): @@ -39,7 +40,7 @@ class CreateNotificationPayloadSchema(BaseModel): user_email: Optional[str] = None manager_or_above: Optional[bool] = False org_slug: Optional[str] = None - category: Optional[NotificationCategoryEnum] = NotificationCategoryEnum.INCIDENT + category: Optional[NotificationCategoryEnum] = None class Config: use_enum_values = True From d35b5943c4d8ebd7ecf5b5f764c0747b5a9a0604 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 12 Aug 2025 16:56:06 +0530 Subject: [PATCH 11/22] Update gitignore for dbt-venv folder --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 5353b10a5..c0f203952 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,8 @@ __pycache__/ # C extensions *.so +dbt-venv/ + # Distribution / packaging .Python build/ From fbb98a108ab18f0823762f4d1d92c54226da225b Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 18 Aug 2025 12:13:26 +0530 Subject: [PATCH 12/22] cleanup --- ddpui/api/webhook_api.py | 41 ---------------------------------------- 1 file changed, 41 deletions(-) diff --git a/ddpui/api/webhook_api.py b/ddpui/api/webhook_api.py index 505fdb948..1a32ea4b8 100644 --- a/ddpui/api/webhook_api.py +++ b/ddpui/api/webhook_api.py @@ -69,44 +69,3 @@ def post_notification_v1(request): # pylint: disable=unused-argument # 5. requests.patch(f'http://localhost:4200/api/flow_run_notification_policies/{frnp}', json={ # 'message_template': 'Flow run {flow_run_name} with id {flow_run_id} entered state {flow_run_state_name}' # }) - - -@webhook_router.get("/failure-summary/{flow_run_id}") -@has_permission(["can_view_logs"]) -def get_failure_summary(request, flow_run_id: str): - """Get LLM failure summary for a specific flow run""" - try: - orguser: OrgUser = request.orguser - - # Find the LLM session for this flow run - llm_session = ( - LlmSession.objects.filter( - org=orguser.org, flow_run_id=flow_run_id, session_status=LlmSessionStatus.COMPLETED - ) - .order_by("-created_at") - .first() - ) - - if not llm_session: - return { - "status": "not_found", - "message": "No LLM failure summary found for this flow run", - "flow_run_id": flow_run_id, - } - - return { - "status": "success", - "flow_run_id": flow_run_id, - "summary": { - "session_id": llm_session.session_id, - "created_at": llm_session.created_at, - "user_prompts": llm_session.user_prompts, - "assistant_prompt": llm_session.assistant_prompt, - "response": llm_session.response, - "session_status": llm_session.session_status, - }, - } - - except Exception as err: - logger.error(f"Error retrieving failure summary for flow run {flow_run_id}: {str(err)}") - raise HttpError(500, f"Failed to retrieve failure summary: {str(err)}") From d48016fdc6ff3376a8793eb9b64430684fd9f534 Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 18 Aug 2025 12:13:56 +0530 Subject: [PATCH 13/22] cleanup --- ddpui/api/webhook_api.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ddpui/api/webhook_api.py b/ddpui/api/webhook_api.py index 1a32ea4b8..f71e929f3 100644 --- a/ddpui/api/webhook_api.py +++ b/ddpui/api/webhook_api.py @@ -9,9 +9,6 @@ FLOW_RUN, ) from ddpui.celeryworkers.tasks import handle_prefect_webhook -from ddpui.models.llm import LlmSession, LlmSessionStatus -from ddpui.auth import has_permission -from ddpui.models.org_user import OrgUser webhook_router = Router() logger = CustomLogger("ddpui") From d6c87e3a294016f5d3c657ce7c815a781cd00b6e Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 18 Aug 2025 12:27:37 +0530 Subject: [PATCH 14/22] merge migration files --- ...=> 0126_notification_category_and_more.py} | 17 +++------- ...move_notification_dismissed_by_and_more.py | 34 ------------------- ddpui/migrations/0128_allow_null_category.py | 30 ---------------- 3 files changed, 5 insertions(+), 76 deletions(-) rename ddpui/migrations/{0126_notification_category_notification_dismissed_by_and_more.py => 0126_notification_category_and_more.py} (83%) delete mode 100644 ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py delete mode 100644 ddpui/migrations/0128_allow_null_category.py diff --git a/ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py b/ddpui/migrations/0126_notification_category_and_more.py similarity index 83% rename from ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py rename to ddpui/migrations/0126_notification_category_and_more.py index 1fe9cb2f1..f42725380 100644 --- a/ddpui/migrations/0126_notification_category_notification_dismissed_by_and_more.py +++ b/ddpui/migrations/0126_notification_category_and_more.py @@ -1,4 +1,4 @@ -# Generated by Django 4.2 on 2025-08-05 11:25 +# Generated by Django 4.2 on 2025-08-18 06:55 from django.db import migrations, models @@ -13,26 +13,19 @@ class Migration(migrations.Migration): model_name="notification", name="category", field=models.CharField( + blank=True, choices=[ ("incident", "Incident"), ("schema_change", "Schema Change"), ("job_failure", "Job Failure"), ("late_runs", "Late Runs"), ("dbt_test_failure", "DBT Test Failure"), + ("maintenance", "Maintenance"), ], - default="incident", + default=None, help_text="Category of the notification", max_length=20, - ), - ), - migrations.AddField( - model_name="notification", - name="dismissed_by", - field=models.ManyToManyField( - blank=True, - help_text="Users who have dismissed this urgent notification", - related_name="dismissed_notifications", - to="ddpui.orguser", + null=True, ), ), migrations.AddField( diff --git a/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py b/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py deleted file mode 100644 index 475d21430..000000000 --- a/ddpui/migrations/0127_remove_notification_dismissed_by_and_more.py +++ /dev/null @@ -1,34 +0,0 @@ -# Generated by Django 4.2 on 2025-08-12 10:57 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("ddpui", "0126_notification_category_notification_dismissed_by_and_more"), - ] - - operations = [ - migrations.RemoveField( - model_name="notification", - name="dismissed_by", - ), - migrations.AlterField( - model_name="notification", - name="category", - field=models.CharField( - blank=True, - choices=[ - ("incident", "Incident"), - ("schema_change", "Schema Change"), - ("job_failure", "Job Failure"), - ("late_runs", "Late Runs"), - ("dbt_test_failure", "DBT Test Failure"), - ], - default=None, - help_text="Category of the notification", - max_length=20, - null=True, - ), - ), - ] diff --git a/ddpui/migrations/0128_allow_null_category.py b/ddpui/migrations/0128_allow_null_category.py deleted file mode 100644 index 2a63b42b5..000000000 --- a/ddpui/migrations/0128_allow_null_category.py +++ /dev/null @@ -1,30 +0,0 @@ -# Generated by Django 4.2 on 2025-08-12 10:58 - -from django.db import migrations, models - - -class Migration(migrations.Migration): - dependencies = [ - ("ddpui", "0127_remove_notification_dismissed_by_and_more"), - ] - - operations = [ - migrations.AlterField( - model_name="notification", - name="category", - field=models.CharField( - blank=True, - choices=[ - ("incident", "Incident"), - ("schema_change", "Schema Change"), - ("job_failure", "Job Failure"), - ("late_runs", "Late Runs"), - ("dbt_test_failure", "DBT Test Failure"), - ], - default=None, - help_text="Category of the notification", - max_length=20, - null=True, - ), - ), - ] From 65ad2229184e60a178411be4f546f39461b3e47b Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 18 Aug 2025 12:31:46 +0530 Subject: [PATCH 15/22] dont set defaults --- ddpui/schemas/notifications_api_schemas.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddpui/schemas/notifications_api_schemas.py b/ddpui/schemas/notifications_api_schemas.py index fd17e85c0..d2db13d63 100644 --- a/ddpui/schemas/notifications_api_schemas.py +++ b/ddpui/schemas/notifications_api_schemas.py @@ -69,7 +69,7 @@ class NotificationDataSchema(Schema): urgent: Optional[bool] = False scheduled_time: Optional[datetime] = None recipients: List[int] # list of orguser ids - category: Optional[str] = "incident" + category: Optional[str] class CategorySubscriptionSchema(Schema): From 0e822aac5b21933b8411a6d8c025b2b59db08f8e Mon Sep 17 00:00:00 2001 From: Ishankoradia Date: Mon, 18 Aug 2025 13:03:28 +0530 Subject: [PATCH 16/22] merge migration from the main branch --- ddpui/migrations/0128_merge_20250818_0733.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 ddpui/migrations/0128_merge_20250818_0733.py diff --git a/ddpui/migrations/0128_merge_20250818_0733.py b/ddpui/migrations/0128_merge_20250818_0733.py new file mode 100644 index 000000000..0bce5686b --- /dev/null +++ b/ddpui/migrations/0128_merge_20250818_0733.py @@ -0,0 +1,12 @@ +# Generated by Django 4.2 on 2025-08-18 07:33 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("ddpui", "0126_notification_category_and_more"), + ("ddpui", "0127_alter_orgwarehouse_name"), + ] + + operations = [] From 1fc39f48a215d197c161eff2217a0766da1f746f Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 14:44:36 +0530 Subject: [PATCH 17/22] refactor: remove incident and late runs notification preferences; update notification category handling --- ddpui/api/user_preferences_api.py | 8 -------- ddpui/celeryworkers/tasks.py | 2 ++ ddpui/core/notifications_service.py | 13 ++++--------- ddpui/schemas/notifications_api_schemas.py | 2 -- ddpui/schemas/userpreferences_schema.py | 4 ---- ddpui/utils/webhook_helpers.py | 13 ++++++++----- 6 files changed, 14 insertions(+), 28 deletions(-) diff --git a/ddpui/api/user_preferences_api.py b/ddpui/api/user_preferences_api.py index 5961f2dc4..6ae6254c1 100644 --- a/ddpui/api/user_preferences_api.py +++ b/ddpui/api/user_preferences_api.py @@ -49,8 +49,6 @@ def update_user_preferences(request, payload: UpdateUserPreferencesSchema): user_preferences.enable_email_notifications = payload.enable_email_notifications if payload.disclaimer_shown is not None: user_preferences.disclaimer_shown = payload.disclaimer_shown - if payload.subscribe_incident_notifications is not None: - user_preferences.subscribe_incident_notifications = payload.subscribe_incident_notifications if payload.subscribe_schema_change_notifications is not None: user_preferences.subscribe_schema_change_notifications = ( payload.subscribe_schema_change_notifications @@ -59,10 +57,6 @@ def update_user_preferences(request, payload: UpdateUserPreferencesSchema): user_preferences.subscribe_job_failure_notifications = ( payload.subscribe_job_failure_notifications ) - if payload.subscribe_late_runs_notifications is not None: - user_preferences.subscribe_late_runs_notifications = ( - payload.subscribe_late_runs_notifications - ) if payload.subscribe_dbt_test_failure_notifications is not None: user_preferences.subscribe_dbt_test_failure_notifications = ( payload.subscribe_dbt_test_failure_notifications @@ -83,10 +77,8 @@ def get_user_preferences(request): res = { "enable_email_notifications": user_preferences.enable_email_notifications, "disclaimer_shown": user_preferences.disclaimer_shown, - "subscribe_incident_notifications": user_preferences.subscribe_incident_notifications, "subscribe_schema_change_notifications": user_preferences.subscribe_schema_change_notifications, "subscribe_job_failure_notifications": user_preferences.subscribe_job_failure_notifications, - "subscribe_late_runs_notifications": user_preferences.subscribe_late_runs_notifications, "subscribe_dbt_test_failure_notifications": user_preferences.subscribe_dbt_test_failure_notifications, "is_llm_active": org_preferences.llm_optin, "enable_llm_requested": org_preferences.enable_llm_request, diff --git a/ddpui/celeryworkers/tasks.py b/ddpui/celeryworkers/tasks.py index 735c099a5..32a091a87 100644 --- a/ddpui/celeryworkers/tasks.py +++ b/ddpui/celeryworkers/tasks.py @@ -54,6 +54,7 @@ LogsSummarizationType, LlmSessionStatus, ) +from ddpui.models.notifications import NotificationCategory from ddpui.utils.helpers import runcmd, runcmd_with_output, subprocess, get_integer_env_var from ddpui.utils import secretsmanager from ddpui.utils.taskprogress import TaskProgress @@ -501,6 +502,7 @@ def detect_schema_changes_for_org(org: Org, delay=0): f' schema changes have been detected in your Dalgo sources for "{connection_name}".' f"\n\nPlease visit {connections_page} and review the Pending Actions", f"{org.name}: Schema changes detected in your Dalgo sources", + category=NotificationCategory.SCHEMA_CHANGE.value, ) except Exception as err: logger.error(err) diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 62025ebd7..d252cf2ff 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -2,10 +2,7 @@ from datetime import datetime from celery.result import AsyncResult from django.core.paginator import Paginator -from ddpui.models.notifications import ( - Notification, - NotificationRecipient, -) +from ddpui.models.notifications import Notification, NotificationRecipient, NotificationCategory from ddpui.models.userpreferences import UserPreferences from ddpui.models.org import Org from ddpui.models.org_user import OrgUser @@ -58,11 +55,9 @@ def get_recipients( # Map category string to the correct field name category_field_mapping = { - "incident": "subscribe_incident_notifications", - "schema_change": "subscribe_schema_change_notifications", - "job_failure": "subscribe_job_failure_notifications", - "late_runs": "subscribe_late_runs_notifications", - "dbt_test_failure": "subscribe_dbt_test_failure_notifications", + NotificationCategory.SCHEMA_CHANGE: "subscribe_schema_change_notifications", + NotificationCategory.JOB_FAILURE: "subscribe_job_failure_notifications", + NotificationCategory.DBT_TEST_FAILURE: "subscribe_dbt_test_failure_notifications", } preference_field = category_field_mapping.get(category) diff --git a/ddpui/schemas/notifications_api_schemas.py b/ddpui/schemas/notifications_api_schemas.py index d2db13d63..bfc6ae1e3 100644 --- a/ddpui/schemas/notifications_api_schemas.py +++ b/ddpui/schemas/notifications_api_schemas.py @@ -75,8 +75,6 @@ class NotificationDataSchema(Schema): class CategorySubscriptionSchema(Schema): """Schema for updating category subscription preferences""" - subscribe_incident_notifications: Optional[bool] = None subscribe_schema_change_notifications: Optional[bool] = None subscribe_job_failure_notifications: Optional[bool] = None - subscribe_late_runs_notifications: Optional[bool] = None subscribe_dbt_test_failure_notifications: Optional[bool] = None diff --git a/ddpui/schemas/userpreferences_schema.py b/ddpui/schemas/userpreferences_schema.py index a95c5ff89..e07e14628 100644 --- a/ddpui/schemas/userpreferences_schema.py +++ b/ddpui/schemas/userpreferences_schema.py @@ -7,10 +7,8 @@ class CreateUserPreferencesSchema(Schema): enable_email_notifications: bool disclaimer_shown: Optional[bool] = None - subscribe_incident_notifications: Optional[bool] = True subscribe_schema_change_notifications: Optional[bool] = True subscribe_job_failure_notifications: Optional[bool] = True - subscribe_late_runs_notifications: Optional[bool] = True subscribe_dbt_test_failure_notifications: Optional[bool] = True @@ -19,8 +17,6 @@ class UpdateUserPreferencesSchema(Schema): enable_email_notifications: Optional[bool] = None disclaimer_shown: Optional[bool] = None - subscribe_incident_notifications: Optional[bool] = None subscribe_schema_change_notifications: Optional[bool] = None subscribe_job_failure_notifications: Optional[bool] = None - subscribe_late_runs_notifications: Optional[bool] = None subscribe_dbt_test_failure_notifications: Optional[bool] = None diff --git a/ddpui/utils/webhook_helpers.py b/ddpui/utils/webhook_helpers.py index fb184f2d7..7e0f13422 100644 --- a/ddpui/utils/webhook_helpers.py +++ b/ddpui/utils/webhook_helpers.py @@ -10,6 +10,7 @@ from ddpui.models.tasks import OrgTask from ddpui.models.org_user import OrgUser from ddpui.models.flow_runs import PrefectFlowRun +from ddpui.models.notifications import NotificationCategory from ddpui.utils.awsses import send_text_message from ddpui.models.tasks import ( TaskLock, @@ -147,7 +148,7 @@ def email_flowrun_logs_to_superadmins(org: Org, flow_run_id: str): email_superadmins(org, email_body) -def notify_org_managers(org: Org, message: str, email_subject: str): +def notify_org_managers(org: Org, message: str, email_subject: str, category: str = None): """send a notification to all users in the org""" error, recipients = get_recipients( SentToEnum.ALL_ORG_USERS, org.slug, None, manager_or_above=True @@ -157,7 +158,11 @@ def notify_org_managers(org: Org, message: str, email_subject: str): return error, response = create_notification( NotificationDataSchema( - author="Dalgo", message=message, email_subject=email_subject, recipients=recipients + author="Dalgo", + message=message, + email_subject=email_subject, + recipients=recipients, + category=category, ) ) if error: @@ -368,9 +373,7 @@ def send_failure_emails(org: Org, odf: OrgDataFlowv1 | None, flow_run: dict, sta email_body.append(f"\nPlease visit {os.getenv('FRONTEND_URL')} for more details") notify_org_managers( - org, - "\n".join(email_body), - email_subject, + org, "\n".join(email_body), email_subject, category=NotificationCategory.JOB_FAILURE.value ) From f980c680cb63f6ec024f102b0249e1dd30c2da95 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 15:03:30 +0530 Subject: [PATCH 18/22] refactor: remove incident and late runs notification subscriptions; update notification manager calls to include category --- ddpui/api/user_preferences_api.py | 2 -- ddpui/tests/api_tests/test_webhook_api.py | 22 +++++++++++++++++++--- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/ddpui/api/user_preferences_api.py b/ddpui/api/user_preferences_api.py index 6ae6254c1..741dcc43d 100644 --- a/ddpui/api/user_preferences_api.py +++ b/ddpui/api/user_preferences_api.py @@ -28,10 +28,8 @@ def create_user_preferences(request, payload: CreateUserPreferencesSchema): orguser=orguser, enable_email_notifications=payload.enable_email_notifications, disclaimer_shown=payload.disclaimer_shown, - subscribe_incident_notifications=payload.subscribe_incident_notifications, subscribe_schema_change_notifications=payload.subscribe_schema_change_notifications, subscribe_job_failure_notifications=payload.subscribe_job_failure_notifications, - subscribe_late_runs_notifications=payload.subscribe_late_runs_notifications, subscribe_dbt_test_failure_notifications=payload.subscribe_dbt_test_failure_notifications, ) diff --git a/ddpui/tests/api_tests/test_webhook_api.py b/ddpui/tests/api_tests/test_webhook_api.py index 7d68a57e9..63aadf940 100644 --- a/ddpui/tests/api_tests/test_webhook_api.py +++ b/ddpui/tests/api_tests/test_webhook_api.py @@ -37,6 +37,7 @@ from ddpui.models.role_based_access import Role, RolePermission, Permission from ddpui.models.flow_runs import PrefectFlowRun from ddpui.models.tasks import Task, OrgTask, DataflowOrgTask, TaskLock +from ddpui.models.notifications import NotificationCategory from ddpui.settings import PRODUCTION from ddpui.tests.api_tests.test_user_org_api import seed_db from ddpui.ddpprefect import ( @@ -235,7 +236,12 @@ def test_post_notification_v1_orchestrate(): mock_email_flowrun_logs_to_superadmins_2.assert_not_called() mock_notify_platform_admins.assert_not_called() mock_email_orgusers_ses_whitelisted.assert_called_once() - mock_notify_org_managers.assert_called_once_with(org, "\n".join(email_body), email_subject) + mock_notify_org_managers.assert_called_once_with( + org, + "\n".join(email_body), + email_subject, + category=NotificationCategory.JOB_FAILURE.value, + ) def test_post_notification_v1_manual_with_connection_id(): @@ -284,7 +290,12 @@ def test_post_notification_v1_manual_with_connection_id(): mock_email_flowrun_logs_to_superadmins_2.assert_not_called() mock_notify_platform_admins.assert_not_called() mock_email_orgusers_ses_whitelisted.assert_called_once() - mock_notify_org_managers.assert_called_once_with(org, "\n".join(email_body), email_subject) + mock_notify_org_managers.assert_called_once_with( + org, + "\n".join(email_body), + email_subject, + category=NotificationCategory.JOB_FAILURE.value, + ) def test_post_notification_v1_manual_with_orgtask_id(seed_master_tasks): @@ -333,7 +344,12 @@ def test_post_notification_v1_manual_with_orgtask_id(seed_master_tasks): mock_email_flowrun_logs_to_superadmins_2.assert_not_called() mock_notify_platform_admins.assert_not_called() mock_email_orgusers_ses_whitelisted.assert_called_once() - mock_notify_org_managers.assert_called_once_with(org, "\n".join(email_body), email_subject) + mock_notify_org_managers.assert_called_once_with( + org, + "\n".join(email_body), + email_subject, + category=NotificationCategory.JOB_FAILURE.value, + ) def test_post_notification_v1_manual_with_orgtask_id_generate_edr(seed_master_tasks): From 528b4c65e6ebf32c863e6ff910f2fd89ce00aa71 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 15:24:16 +0530 Subject: [PATCH 19/22] refactor: add category parameter in get_recipients function call --- ddpui/utils/webhook_helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddpui/utils/webhook_helpers.py b/ddpui/utils/webhook_helpers.py index 7e0f13422..b1b4929bb 100644 --- a/ddpui/utils/webhook_helpers.py +++ b/ddpui/utils/webhook_helpers.py @@ -151,7 +151,7 @@ def email_flowrun_logs_to_superadmins(org: Org, flow_run_id: str): def notify_org_managers(org: Org, message: str, email_subject: str, category: str = None): """send a notification to all users in the org""" error, recipients = get_recipients( - SentToEnum.ALL_ORG_USERS, org.slug, None, manager_or_above=True + SentToEnum.ALL_ORG_USERS, org.slug, None, manager_or_above=True, category=category ) if error: logger.error(f"Error getting recipients: {error}") From 8b30361085bdfc0cd030858328cdee3539ea1b71 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 15:33:31 +0530 Subject: [PATCH 20/22] refactor: remove get_notifications_by_category function and its related code --- ddpui/api/notifications_api.py | 23 ----------------------- ddpui/core/notifications_service.py | 10 ---------- 2 files changed, 33 deletions(-) diff --git a/ddpui/api/notifications_api.py b/ddpui/api/notifications_api.py index ca2af7464..1f514da2d 100644 --- a/ddpui/api/notifications_api.py +++ b/ddpui/api/notifications_api.py @@ -273,29 +273,6 @@ def get_urgent_notifications(request): return result -@notification_router.get("/categories/{category}") -def get_notifications_by_category(request, category: str, page: int = 1, limit: int = 10): - """ - Get notifications for a specific category for the authenticated user. - - Args: - request: HTTP request object containing orguser authentication data - category: The notification category to filter by - page (int, optional): Page number for pagination. Defaults to 1 - limit (int, optional): Number of notifications per page. Defaults to 10 - - Returns: - dict: Paginated notifications for the specified category - - Raises: - HttpError: 400 if retrieval fails - """ - orguser: OrgUser = request.orguser - error, result = notifications_service.get_notifications_by_category( - orguser, category, page, limit - ) - - @notification_router.put("/mark_all_as_read") def mark_all_notifications_as_read(request): """Mark all notifications as read for the user""" diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index d252cf2ff..90c037055 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -473,13 +473,3 @@ def get_urgent_notifications( ) return None, {"success": True, "res": notifications_list} - - -# get notifications by category -def get_notifications_by_category( - orguser: OrgUser, category: str, page: int = 1, limit: int = 10 -) -> Tuple[Optional[None], Dict[str, Any]]: - """ - Get notifications for a specific category for the user. - """ - return fetch_user_notifications_v1(orguser, page, limit, category=category) From 3278a683f3f21cf290aaf4891c418e247ec15be7 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 16:21:01 +0530 Subject: [PATCH 21/22] refactor: add handling for DBT test failure notifications in webhook processing --- ddpui/tests/api_tests/test_webhook_api.py | 37 +++++++++++++++++++++++ ddpui/utils/webhook_helpers.py | 16 ++++++++++ 2 files changed, 53 insertions(+) diff --git a/ddpui/tests/api_tests/test_webhook_api.py b/ddpui/tests/api_tests/test_webhook_api.py index 63aadf940..fa8d70e28 100644 --- a/ddpui/tests/api_tests/test_webhook_api.py +++ b/ddpui/tests/api_tests/test_webhook_api.py @@ -702,3 +702,40 @@ def test_get_flow_run_times_no_expected_start_time(): start_time, expected_start_time = get_flow_run_times(flow_run) assert str(start_time) == flow_run["start_time"] assert expected_start_time is not None + + +def test_post_notification_v1_dbt_test_failed(): + """tests the api endpoint /notifications/ for DBT test failures""" + org = Org.objects.create(name="temp", slug="temp") + deployment_id = "test-deployment-id" + flow_run = { + "parameters": { + "config": {"org_slug": org.slug}, + }, + "deployment_id": deployment_id, + "id": "test-run-id", + "name": "test-flow-run-name", + "start_time": str(datetime.now()), + "expected_start_time": str(datetime.now()), + "total_run_time": 12, + "status": "COMPLETED", + "state_name": "DBT_TEST_FAILED", + } + odf = OrgDataFlowv1.objects.create( + org=org, name=deployment_id, dataflow_type="orchestrate", deployment_id=deployment_id + ) + email_body = f"To the admins of {org.name},\n\nDBT tests have failed in pipeline {odf.name}\n\nPlease visit {os.getenv('FRONTEND_URL')} for more details" + email_subject = f"{org.name}: DBT test failure for {odf.name}" + + with patch("ddpui.ddpprefect.prefect_service.get_flow_run_poll") as mock_get_flow_run, patch( + "ddpui.utils.webhook_helpers.notify_org_managers" + ) as mock_notify_org_managers: + mock_get_flow_run.return_value = flow_run + user = User.objects.create(email="email", username="username") + new_role = Role.objects.filter(slug=SUPER_ADMIN_ROLE).first() + OrgUser.objects.create(org=org, user=user, new_role=new_role) + do_handle_prefect_webhook(flow_run["id"], flow_run["state_name"]) + assert PrefectFlowRun.objects.filter(flow_run_id="test-run-id").count() == 1 + mock_notify_org_managers.assert_called_once_with( + org, email_body, email_subject, category=NotificationCategory.DBT_TEST_FAILURE.value + ) diff --git a/ddpui/utils/webhook_helpers.py b/ddpui/utils/webhook_helpers.py index b1b4929bb..aa790609d 100644 --- a/ddpui/utils/webhook_helpers.py +++ b/ddpui/utils/webhook_helpers.py @@ -398,6 +398,7 @@ def do_handle_prefect_webhook(flow_run_id: str, state: str): FLOW_RUN_FAILED_STATE_NAME, FLOW_RUN_CRASHED_STATE_NAME, FLOW_RUN_COMPLETED_STATE_NAME, + "DBT_TEST_FAILED", ]: org = get_org_from_flow_run(flow_run) if org: @@ -413,6 +414,21 @@ def do_handle_prefect_webhook(flow_run_id: str, state: str): odf = OrgDataFlowv1.objects.filter(org=org, deployment_id=deployment_id).first() send_failure_emails(org, odf, flow_run, state) + elif state == "DBT_TEST_FAILED": + # Handle DBT test failures specifically + odf = OrgDataFlowv1.objects.filter(org=org, deployment_id=deployment_id).first() + pipeline_name = odf.name if odf else "Unknown pipeline" + + message = f"To the admins of {org.name},\n\nDBT tests have failed in pipeline {pipeline_name}\n\nPlease visit {os.getenv('FRONTEND_URL')} for more details" + email_subject = f"{org.name}: DBT test failure for {pipeline_name}" + + notify_org_managers( + org, + message, + email_subject, + category=NotificationCategory.DBT_TEST_FAILURE.value, + ) + elif state in [FLOW_RUN_COMPLETED_STATE_NAME]: email_orgusers_ses_whitelisted(org, "Your pipeline completed successfully") except Exception as err: From 2c69732a81ffda86cea1270de189a2537b3c1882 Mon Sep 17 00:00:00 2001 From: Mohit Agrawal Date: Tue, 19 Aug 2025 17:34:00 +0530 Subject: [PATCH 22/22] refactor: update category mapping to use value attributes for notification preferences --- ddpui/core/notifications_service.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddpui/core/notifications_service.py b/ddpui/core/notifications_service.py index 90c037055..72dcec056 100644 --- a/ddpui/core/notifications_service.py +++ b/ddpui/core/notifications_service.py @@ -55,9 +55,9 @@ def get_recipients( # Map category string to the correct field name category_field_mapping = { - NotificationCategory.SCHEMA_CHANGE: "subscribe_schema_change_notifications", - NotificationCategory.JOB_FAILURE: "subscribe_job_failure_notifications", - NotificationCategory.DBT_TEST_FAILURE: "subscribe_dbt_test_failure_notifications", + NotificationCategory.SCHEMA_CHANGE.value: "subscribe_schema_change_notifications", + NotificationCategory.JOB_FAILURE.value: "subscribe_job_failure_notifications", + NotificationCategory.DBT_TEST_FAILURE.value: "subscribe_dbt_test_failure_notifications", } preference_field = category_field_mapping.get(category)