Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
48d65f2
docs
Ishankoradia Jul 11, 2025
8de4ab1
forge notifications v2
Ishankoradia Jul 11, 2025
19177c1
migrations
Ishankoradia Aug 5, 2025
d90eeb9
Remove update category subscriptions functionality from notifications…
MohitAgrawal16 Aug 7, 2025
2c88e70
Add category argument to create_notification command and validate input
MohitAgrawal16 Aug 7, 2025
0630021
Filte recipients based on category subscription.
MohitAgrawal16 Aug 7, 2025
d1c6879
Remove dismissed by, allow null category and update the get urgent no…
MohitAgrawal16 Aug 12, 2025
1e747f4
Delete agent_forge.yaml
MohitAgrawal16 Aug 12, 2025
5af5726
Delete NOTIFICATIONS_V2_DOCUMENTATION.md
MohitAgrawal16 Aug 12, 2025
22a5bdd
Add MAINTENANCE category to notification models and update default ca…
MohitAgrawal16 Aug 12, 2025
2449c8b
Merge branch 'forge-notifications-v2' of https://github.com/DalgoT4D/…
MohitAgrawal16 Aug 12, 2025
d35b594
Update gitignore for dbt-venv folder
MohitAgrawal16 Aug 12, 2025
fbb98a1
cleanup
Ishankoradia Aug 18, 2025
d48016f
cleanup
Ishankoradia Aug 18, 2025
d6c87e3
merge migration files
Ishankoradia Aug 18, 2025
65ad222
dont set defaults
Ishankoradia Aug 18, 2025
566db78
Merge remote-tracking branch 'origin/main' into forge-notifications-v2
Ishankoradia Aug 18, 2025
0e822aa
merge migration from the main branch
Ishankoradia Aug 18, 2025
1fc39f4
refactor: remove incident and late runs notification preferences; upd…
MohitAgrawal16 Aug 19, 2025
f980c68
refactor: remove incident and late runs notification subscriptions; u…
MohitAgrawal16 Aug 19, 2025
528b4c6
refactor: add category parameter in get_recipients function call
MohitAgrawal16 Aug 19, 2025
8b30361
refactor: remove get_notifications_by_category function and its relat…
MohitAgrawal16 Aug 19, 2025
3278a68
refactor: add handling for DBT test failure notifications in webhook …
MohitAgrawal16 Aug 19, 2025
2c69732
refactor: update category mapping to use value attributes for notific…
MohitAgrawal16 Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ __pycache__/
# C extensions
*.so

dbt-venv/

# Distribution / packaging
.Python
build/
Expand Down
249 changes: 234 additions & 15 deletions ddpui/api/airbyte_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@
@airbyte_router.get("/source_definitions")
@has_permission(["can_view_sources"])
def get_airbyte_source_definitions(request):
"""Fetch airbyte source definitions in the user organization workspace"""
"""
Fetch all available Airbyte source definitions for the user's organization workspace.

For demo accounts, filters the available source definitions based on the
DEMO_AIRBYTE_SOURCE_TYPES environment variable.

Args:
request: HTTP request object containing orguser authentication data

Returns:
list: List of source definition dictionaries containing connector metadata

Raises:
HttpError: 400 if no Airbyte workspace exists for the organization
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand All @@ -71,8 +85,20 @@ def get_airbyte_source_definitions(request):
@has_permission(["can_view_sources"])
def get_airbyte_source_definition_specifications(request, sourcedef_id):
"""
Fetch definition specifications for a particular
source definition in the user organization workspace
Fetch configuration specifications for a specific Airbyte source definition.

Returns the JSON schema specification that defines what configuration
parameters are required and optional for setting up this source connector.

Args:
request: HTTP request object containing orguser authentication data
sourcedef_id (str): Unique identifier of the source definition

Returns:
dict: Connection specification schema for the source definition

Raises:
HttpError: 400 if no Airbyte workspace exists for the organization
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
Expand All @@ -88,7 +114,24 @@ def get_airbyte_source_definition_specifications(request, sourcedef_id):
@airbyte_router.post("/sources/")
@has_permission(["can_create_source"])
def post_airbyte_source(request, payload: AirbyteSourceCreate):
"""Create airbyte source in the user organization workspace"""
"""
Create a new Airbyte source connector in the organization's workspace.

For demo accounts, automatically replaces the provided configuration with
whitelisted demo configuration to prevent access to unauthorized data sources.

Args:
request: HTTP request object containing orguser authentication data
payload (AirbyteSourceCreate): Source configuration including name,
source definition ID, and connection config

Returns:
dict: Dictionary containing the created source's ID

Raises:
HttpError: 400 if no Airbyte workspace exists, or if demo account
configuration validation fails
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand Down Expand Up @@ -121,7 +164,25 @@ def post_airbyte_source(request, payload: AirbyteSourceCreate):
@airbyte_router.put("/sources/{source_id}")
@has_permission(["can_edit_source"])
def put_airbyte_source(request, source_id: str, payload: AirbyteSourceUpdate):
"""Update airbyte source in the user organization workspace"""
"""
Update an existing Airbyte source connector configuration.

For demo accounts, automatically replaces the provided configuration with
whitelisted demo configuration to maintain security restrictions.

Args:
request: HTTP request object containing orguser authentication data
source_id (str): Unique identifier of the source to update
payload (AirbyteSourceUpdate): Updated source configuration including
name, source definition ID, and config

Returns:
dict: Dictionary containing the updated source's ID

Raises:
HttpError: 400 if organization or Airbyte workspace doesn't exist,
or if demo account configuration validation fails
"""
orguser: OrgUser = request.orguser
if orguser.org is None:
raise HttpError(400, "create an organization first")
Expand Down Expand Up @@ -152,7 +213,23 @@ def put_airbyte_source(request, source_id: str, payload: AirbyteSourceUpdate):
@airbyte_router.post("/sources/check_connection/")
@has_permission(["can_create_source"])
def post_airbyte_check_source(request, payload: AirbyteSourceCreate):
"""Test the source connection in the user organization workspace"""
"""
Test connectivity to a source before creating it.

Validates that the provided source configuration can successfully establish
a connection. For demo accounts, uses whitelisted configuration instead
of the provided config.

Args:
request: HTTP request object containing orguser authentication data
payload (AirbyteSourceCreate): Source configuration to test

Returns:
dict: Connection test result with status ('succeeded'/'failed') and logs

Raises:
HttpError: 400 if no Airbyte workspace exists or demo config validation fails
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand Down Expand Up @@ -184,7 +261,24 @@ def post_airbyte_check_source(request, payload: AirbyteSourceCreate):
def post_airbyte_check_source_for_update(
request, source_id: str, payload: AirbyteSourceUpdateCheckConnection
):
"""Test the source connection in the user organization workspace"""
"""
Test connectivity for an existing source with updated configuration.

Validates that updated source configuration can successfully establish
a connection before applying the changes. For demo accounts, uses
whitelisted configuration.

Args:
request: HTTP request object containing orguser authentication data
source_id (str): Unique identifier of the existing source
payload (AirbyteSourceUpdateCheckConnection): Updated configuration to test

Returns:
dict: Connection test result with status ('succeeded'/'failed') and logs

Raises:
HttpError: 400 if no Airbyte workspace exists or demo config validation fails
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand Down Expand Up @@ -213,7 +307,21 @@ def post_airbyte_check_source_for_update(
@airbyte_router.get("/sources")
@has_permission(["can_view_sources"])
def get_airbyte_sources(request):
"""Fetch all airbyte sources in the user organization workspace"""
"""
Fetch all Airbyte sources configured in the organization's workspace.

Returns a list of all source connectors that have been created and
configured within the organization's Airbyte workspace.

Args:
request: HTTP request object containing orguser authentication data

Returns:
list: List of source dictionaries containing source metadata and configuration

Raises:
HttpError: 400 if no Airbyte workspace exists for the organization
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand All @@ -226,7 +334,22 @@ def get_airbyte_sources(request):
@airbyte_router.get("/sources/{source_id}")
@has_permission(["can_view_source"])
def get_airbyte_source(request, source_id):
"""Fetch a single airbyte source in the user organization workspace"""
"""
Fetch details of a specific Airbyte source by its ID.

Returns complete configuration and metadata for a single source connector
including its current status, configuration parameters, and connection details.

Args:
request: HTTP request object containing orguser authentication data
source_id (str): Unique identifier of the source to retrieve

Returns:
dict: Source details including configuration, status, and metadata

Raises:
HttpError: 400 if no Airbyte workspace exists for the organization
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand All @@ -239,7 +362,23 @@ def get_airbyte_source(request, source_id):
@airbyte_router.get("/sources/{source_id}/schema_catalog")
@has_permission(["can_view_source"])
def get_airbyte_source_schema_catalog(request, source_id):
"""Fetch schema catalog for a source in the user organization workspace"""
"""
Fetch the discovered schema catalog for a specific source.

Returns the complete data schema that Airbyte has discovered from the source,
including all available streams, fields, and data types. This is used to
configure which data to sync and how to structure it.

Args:
request: HTTP request object containing orguser authentication data
source_id (str): Unique identifier of the source

Returns:
dict: Schema catalog containing streams, fields, and data type information

Raises:
HttpError: 400 if no Airbyte workspace exists for the organization
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand Down Expand Up @@ -366,7 +505,23 @@ def get_airbyte_destination(request, destination_id):
@airbyte_router.get("/jobs/{job_id}")
@has_permission(["can_view_connection"])
def get_job_status(request, job_id):
"""get the job info from airbyte"""
"""
Retrieve the status and logs for a specific Airbyte job.

Returns comprehensive job information including current status and complete
log output from the latest attempt. Used for monitoring sync progress and
debugging connection issues.

Args:
request: HTTP request object containing orguser authentication data
job_id (str): Unique identifier of the job to query

Returns:
dict: Job status and log lines from the most recent attempt

Raises:
HttpError: 400 if user lacks permission to view connections
"""
result = airbyte_service.get_job_info(job_id)
logs = result["attempts"][-1]["logs"]["logLines"]
return {
Expand All @@ -378,7 +533,22 @@ def get_job_status(request, job_id):
@airbyte_router.get("/jobs/{job_id}/status")
@has_permission(["can_view_connection"])
def get_job_status_without_logs(request, job_id):
"""get the job info from airbyte"""
"""
Retrieve only the status of a specific Airbyte job without logs.

Lightweight endpoint to check job status without downloading potentially
large log files. Useful for polling job completion status.

Args:
request: HTTP request object containing orguser authentication data
job_id (str): Unique identifier of the job to query

Returns:
dict: Job status only, without log data

Raises:
HttpError: 400 if user lacks permission to view connections
"""
result = airbyte_service.get_job_info_without_logs(job_id)
print(result)
return {
Expand All @@ -393,7 +563,23 @@ def get_job_status_without_logs(request, job_id):
@airbyte_router.post("/v1/workspace/", response=AirbyteWorkspace)
@has_permission(["can_create_org"])
def post_airbyte_workspace_v1(request, payload: AirbyteWorkspaceCreate):
"""Create an airbyte workspace"""
"""
Create a new Airbyte workspace for the organization.

Initializes a dedicated Airbyte workspace for the organization and adds
custom connectors configured in system settings. Each organization can
have only one workspace.

Args:
request: HTTP request object containing orguser authentication data
payload (AirbyteWorkspaceCreate): Workspace configuration including name

Returns:
AirbyteWorkspace: Created workspace details including workspace ID

Raises:
HttpError: 400 if organization already has an existing workspace
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is not None:
raise HttpError(400, "org already has a workspace")
Expand All @@ -413,7 +599,25 @@ def post_airbyte_workspace_v1(request, payload: AirbyteWorkspaceCreate):
)
@has_permission(["can_create_connection"])
def post_airbyte_connection_v1(request, payload: AirbyteConnectionCreate):
"""Create an airbyte connection in the user organization workspace"""
"""
Create a new Airbyte connection between a source and destination.

Establishes a data pipeline connection that defines which streams to sync,
how frequently to sync them, and how to handle the data transformation.
At least one stream must be specified.

Args:
request: HTTP request object containing orguser authentication data
payload (AirbyteConnectionCreate): Connection configuration including
source, destination, streams, and sync settings

Returns:
AirbyteConnectionCreateResponse: Created connection details including connection ID

Raises:
HttpError: 400 if no Airbyte workspace exists, no streams specified,
or connection creation fails
"""
orguser: OrgUser = request.orguser
org = orguser.org
if org.airbyte_workspace_id is None:
Expand All @@ -436,7 +640,22 @@ def post_airbyte_connection_v1(request, payload: AirbyteConnectionCreate):
)
@has_permission(["can_view_connections"])
def get_airbyte_connections_v1(request):
"""Fetch all airbyte connections in the user organization workspace"""
"""
Fetch all Airbyte connections in the organization's workspace.

Returns a comprehensive list of all data pipeline connections configured
within the organization, including their current status, configuration,
and sync schedules.

Args:
request: HTTP request object containing orguser authentication data

Returns:
List[AirbyteGetConnectionsResponse]: List of all connections with their details

Raises:
HttpError: 400 if no Airbyte workspace exists or connection retrieval fails
"""
orguser: OrgUser = request.orguser
if orguser.org.airbyte_workspace_id is None:
raise HttpError(400, "create an airbyte workspace first")
Expand Down
25 changes: 24 additions & 1 deletion ddpui/api/dashboard_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,30 @@
@dashboard_router.get("/v1")
@has_permission(["can_view_dashboard"])
def get_dashboard_v1(request):
"""Fetch all flows/pipelines created in an organization"""
"""
Fetch dashboard data including all data flows/pipelines and their execution status.

Returns comprehensive dashboard information including:
- All orchestration data flows for the organization
- Recent flow run history (last 50 runs per flow)
- Task lock status for each flow
- Deployment and scheduling information

Args:
request: HTTP request object containing orguser authentication data

Returns:
list: List of flow dictionaries containing:
- name: Flow name
- deploymentId: Prefect deployment ID
- cron: Scheduling expression
- deploymentName: Human-readable deployment name
- runs: Recent execution history
- lock: Current lock status if any user has locked the flow

Raises:
HttpError: 403 if user lacks dashboard view permissions
"""
orguser = request.orguser

org_data_flows = OrgDataFlowv1.objects.filter(org=orguser.org, dataflow_type="orchestrate")
Expand Down
Loading