Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions ddpui/ddpairbyte/airbyte_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,10 @@ def get_sources(workspace_id: str) -> List[Dict]:
if not isinstance(workspace_id, str):
raise HttpError(400, "Invalid workspace ID")

res = abreq("sources/list", {"workspaceId": workspace_id})
# TODO: move this to paginated apis
res = abreq(
"sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"}
)
Comment on lines +271 to +273
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fixed page size can cause silent data truncation.

Setting pageSize: 100 without pagination logic means workspaces with more than 100 sources will return incomplete data. This could lead to sources being invisible in the UI or missing from operations.

Additionally, get_destinations and get_webbackend_connections have TODO comments indicating these should be moved to paginated APIs, but this function doesn't. Consider adding a similar TODO comment for consistency, or implement proper pagination that fetches all pages.

If immediate pagination implementation isn't feasible, at minimum:

  1. Add a TODO comment for tracking
  2. Document the 100-item limitation in the function docstring
  3. Consider logging a warning when exactly 100 items are returned (likely indicates truncation)

For a complete fix, implement pagination:

 def get_sources(workspace_id: str) -> List[Dict]:
     """Fetch all sources in an airbyte workspace"""
     if not isinstance(workspace_id, str):
         raise HttpError(400, "Invalid workspace ID")
 
+    all_sources = []
+    page_size = 100
+    offset = 0
+    
+    while True:
-    res = abreq(
-        "sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"}
-    )
+        res = abreq(
+            "sources/list", 
+            {
+                "workspaceId": workspace_id, 
+                "pageSize": page_size, 
+                "sortKey": "actorName_asc",
+                "offset": offset
+            }
+        )
-    if "sources" not in res:
-        logger.error("Sources not found for workspace: %s", workspace_id)
-        raise HttpError(404, "sources not found for workspace")
-    return res
+        if "sources" not in res:
+            logger.error("Sources not found for workspace: %s", workspace_id)
+            raise HttpError(404, "sources not found for workspace")
+        
+        all_sources.extend(res["sources"])
+        
+        # Break if we got fewer items than page size (last page)
+        if len(res["sources"]) < page_size:
+            break
+            
+        offset += page_size
+    
+    return {"sources": all_sources}

Note: Verify if the Airbyte API uses offset or a different pagination mechanism (e.g., pageToken).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
res = abreq(
"sources/list", {"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"}
)
def get_sources(workspace_id: str) -> List[Dict]:
"""Fetch all sources in an airbyte workspace"""
if not isinstance(workspace_id, str):
raise HttpError(400, "Invalid workspace ID")
all_sources = []
page_size = 100
offset = 0
while True:
res = abreq(
"sources/list",
{
"workspaceId": workspace_id,
"pageSize": page_size,
"sortKey": "actorName_asc",
"offset": offset
}
)
if "sources" not in res:
logger.error("Sources not found for workspace: %s", workspace_id)
raise HttpError(404, "sources not found for workspace")
all_sources.extend(res["sources"])
# Break if we got fewer items than page size (last page)
if len(res["sources"]) < page_size:
break
offset += page_size
return {"sources": all_sources}
🤖 Prompt for AI Agents
In ddpui/ddpairbyte/airbyte_service.py around lines 270 to 272, the call to
abreq uses a fixed pageSize=100 which can silently truncate results for
workspaces with >100 sources; either implement proper pagination to iterate all
pages according to Airbyte's pagination scheme (confirm whether it uses
offset/page/limit or pageToken and accumulate results), or at minimum add a TODO
noting pagination is required, update the function docstring to document the
100-item limitation, and add a warning log when the API returns exactly 100
items (indicating likely truncation) so truncation is detectable in logs.

if "sources" not in res:
logger.error("Sources not found for workspace: %s", workspace_id)
raise HttpError(404, "sources not found for workspace")
Expand Down Expand Up @@ -528,7 +531,11 @@ def get_destinations(workspace_id: str) -> dict:
if not isinstance(workspace_id, str):
raise HttpError(400, "workspace_id must be a string")

res = abreq("destinations/list", {"workspaceId": workspace_id})
# TODO: move this to paginated apis
res = abreq(
"destinations/list",
{"workspaceId": workspace_id, "pageSize": 100, "sortKey": "actorName_asc"},
)
Comment on lines +534 to +538
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Same pagination limitation applies here.

Like get_sources, this has the same critical issue where workspaces with more than 100 destinations will return incomplete data. The TODO comment acknowledges this is temporary, but ensure this is tracked in your issue tracker to prevent it from being forgotten.

Consider applying the same pagination solution as suggested for get_sources, adapting it for destinations. Also verify if the Airbyte API returns pagination metadata (like hasNext or totalCount) that could help implement robust pagination.

if "destinations" not in res:
logger.error("Destinations not found for workspace: %s", workspace_id)
raise HttpError(404, "destinations not found for this workspace")
Expand Down Expand Up @@ -677,7 +684,11 @@ def get_webbackend_connections(workspace_id: str) -> dict:
if not isinstance(workspace_id, str):
raise HttpError(400, "workspace_id must be a string")

res = abreq("web_backend/connections/list", {"workspaceId": workspace_id})
# TODO: move this to paginated apis
res = abreq(
"web_backend/connections/list",
{"workspaceId": workspace_id, "pageSize": 100, "sortKey": "connectionName_asc"},
)
Comment on lines +687 to +691
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pagination limitation in web backend connections.

This function has the same critical issue: workspaces with more than 100 connections will have incomplete data. The sort key connectionName_asc is appropriate for connections (vs actorName_asc for sources/destinations).

Note that unlike the other functions, this returns res["connections"] directly rather than the full response object. When implementing proper pagination, ensure the return type remains consistent.

🤖 Prompt for AI Agents
In ddpui/ddpairbyte/airbyte_service.py around lines 686-690, the code only
fetches the first 100 connections which drops data for workspaces with >100
connections; update this to call the "web_backend/connections/list" endpoint in
a paginated loop using pageSize (keep 100) and the pageToken/next page mechanism
returned by the API while preserving workspaceId and
sortKey="connectionName_asc", accumulate all res["connections"] across pages and
return the combined list (keep the current return type of res["connections"]);
ensure the loop stops when no next page token is returned and propagate any API
errors as before.

if "connections" not in res:
error_message = f"connections not found for workspace: {workspace_id}"
logger.error(error_message)
Expand Down
Loading