Skip to content

Conversation

@SarthakB11
Copy link

@SarthakB11 SarthakB11 commented Apr 22, 2025

Overview

Implementation of comprehensive Airbyte integration test suite (Issue #1014) that enables end-to-end testing of Airbyte operations within Dalgo, providing confidence during upgrades and maintenance.

Changes Made i have made

Files Added/Modified:

  1. ddpui/tests/integration_tests/test_airbyte_full_integration.py (new)

    • Full integration test suite with pytest
    • Implements workspace creation, sources, destinations, connections, syncs, and validation
    • Includes error handling and graceful skipping when Airbyte unavailable
  2. .env.airbyte_integration_test (new)

    • Configuration template for integration tests
    • Settings for test enablement, source/destination selection, timeouts
  3. README.md (modified)

    • Documentation for running Airbyte integration tests
    • Prerequisites, configuration, and usage examples

Key Features

  • Configurability: Environment variable configuration, runtime source/destination selection
  • Robustness: Handles unavailable servers, cleanup after failures, comprehensive logging
  • Coverage: Full pipeline from workspace creation to sync validation
  • Extensibility: Easy addition of new source/destination types

Testing Instructions

  1. Local testing:

    python -m pytest ddpui/tests/integration_tests/test_airbyte_full_integration.py -v
    
  2. Production testing:

    • Configure Airbyte server in .env
    • Set AIRBYTE_TEST_ENABLED=true
    • Run test suite
  3. Individual tests:

    python -m pytest ddpui/tests/integration_tests/test_airbyte_full_integration.py::TestAirbyteFullIntegration::test_01_create_destinations -v
    

The test suite is ready for CI/CD integration or manual runs during Airbyte upgrades.

Closes #1014

Summary by CodeRabbit

  • New Features

    • Introduced a comprehensive integration test suite for Airbyte, covering end-to-end workflows such as creating workspaces, sources, destinations, connections, running syncs, and validating sync history.
    • Added a dedicated environment configuration file for Airbyte integration testing, supporting multiple source and destination types.
  • Documentation

    • Added detailed instructions to the README on how to configure and run Airbyte integration tests, including prerequisites and environment variable setup.

@coderabbitai
Copy link

coderabbitai bot commented Apr 22, 2025

Walkthrough

A new comprehensive Airbyte integration test suite has been introduced, including a dedicated environment configuration file and detailed documentation. The .env.airbyte_integration_test file defines environment variables for configuring test enablement, source and destination types, and connection details for various connectors. The README.md has been updated with instructions for running these integration tests. A new pytest-based test suite implements the full end-to-end workflow: creating an Airbyte workspace, configuring sources and destinations, establishing connections, running syncs, validating sync history, and cleaning up all created resources.

Changes

File(s) Change Summary
.env.airbyte_integration_test Added new environment configuration file specifying variables for Airbyte integration testing, including test enablement, source/destination types, timeouts, and connection details for PostgreSQL, Google Sheets, KoboToolbox, and S3.
README.md Added a section detailing how to run Airbyte integration tests, including prerequisites, environment variable setup, and pytest commands.
ddpui/tests/integration_tests/test_airbyte_full_integration.py Added a new pytest test suite implementing full end-to-end Airbyte integration testing: workspace creation, source/destination setup, connection creation, sync execution, sync history/log validation, and teardown. Includes configuration loading, server availability checks, and structured test steps for each workflow phase.

Sequence Diagram(s)

sequenceDiagram
    participant Tester
    participant AirbyteAPI

    Tester->>AirbyteAPI: Create Workspace
    AirbyteAPI-->>Tester: Workspace ID

    loop For each Destination
        Tester->>AirbyteAPI: Create Destination
        AirbyteAPI-->>Tester: Destination ID
    end

    loop For each Source
        Tester->>AirbyteAPI: Create Source
        AirbyteAPI-->>Tester: Source ID
    end

    loop For each Source-Destination Pair
        Tester->>AirbyteAPI: Get Source Schema Catalog
        AirbyteAPI-->>Tester: Catalog
        Tester->>AirbyteAPI: Create Connection (Full Refresh Overwrite)
        AirbyteAPI-->>Tester: Connection ID
    end

    loop For each Connection
        Tester->>AirbyteAPI: Run Sync
        AirbyteAPI-->>Tester: Sync Job Status (polling until complete)
        Tester->>AirbyteAPI: Fetch Sync History and Logs
        AirbyteAPI-->>Tester: Sync History/Logs
    end

    Tester->>AirbyteAPI: Delete Connections, Sources, Destinations, Workspace
    AirbyteAPI-->>Tester: Confirmation
Loading

Assessment against linked issues

Objective Addressed Explanation
End-to-end Airbyte integration test suite: workspace, destinations, sources, connections, syncs, validation, teardown (#1014)
Configurable sources and destinations via environment variables (#1014)
Use of pytest for test implementation (#1014)
Validation of sync history and logs for connections (#1014)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
.env.airbyte_integration_test (2)

4-8: Consider leaving the test-suite disabled by default in committed templates

Committing the template with AIRBYTE_TEST_ENABLED=false is good; however, once a developer flips the flag to true and forgets to revert it, the CI pipeline may accidentally create real resources on every run.
A safer pattern is to omit the variable (or comment it out) and have the test‑suite default to false when the variable is missing, forcing an explicit opt‑in per run.


19-25: Mask credentials & add explicit ‘_EXAMPLE’ suffixes

Even though the values are local‑only, it is best to ship templates with obviously fake placeholders (e.g. PG_PASSWORD=changeme) or suffix each key with _EXAMPLE.
That prevents casual copy‑paste of real secrets into VC and makes secret‑scanning tools happier.

README.md (2)

248-252: Clarify comma‑separated values must be lowercase

The test‑suite performs a str.lower() comparison when matching connector names.
Mentioning that AIRBYTE_TEST_SOURCES / DESTINATIONS should be lowercase avoids mysterious “Skipping unknown …” log lines.


269-273: Add warning about production data safety

The note is helpful, but emphasising that the tests use destructive operations (overwrite sync mode, workspace deletion) will prevent accidental runs against customer instances.
Suggest adding “DO NOT run against production workspaces” in bold.

ddpui/tests/integration_tests/test_airbyte_full_integration.py (3)

169-188: Workspace leak on early failure

If create_workspace succeeds but setup_class later raises (e.g. during destination creation) the workspace id is stored only on the class, yet teardown_class will still run. Good.
However, if the process dies (Ctrl‑C) before teardown, the workspace remains.
Add a try/finally around resource creation or tag the workspace name with CI_BUILD_ID so that a nightly cleanup job can purge leftovers.


372-390: Polling loop can exceed global timeout

You sleep AIRBYTE_SYNC_WAIT_INTERVAL (default 10 s) for up to AIRBYTE_MAX_WAIT_CYCLES (default 30) → 5 minutes max, regardless of AIRBYTE_TEST_TIMEOUT.
Either honour AIRBYTE_TEST_TIMEOUT or document the precedence.


429-430: Do not execute tests on import

The __main__ section runs pytest recursively, which is surprising behaviour if someone imports the module in another script.
Consider removing it; developers can still run pytest -k airbyte_full_integration.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 11dfa1c and 8961d26.

📒 Files selected for processing (3)
  • .env.airbyte_integration_test (1 hunks)
  • README.md (1 hunks)
  • ddpui/tests/integration_tests/test_airbyte_full_integration.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (2)
ddpui/ddpairbyte/airbyte_service.py (12)
  • create_workspace (156-168)
  • delete_workspace (171-174)
  • create_destination (588-611)
  • create_source (331-366)
  • get_source_definitions (199-221)
  • get_destination_definitions (501-510)
  • sync_connection (920-929)
  • get_jobs_for_connection (950-976)
  • get_job_info (932-938)
  • get_logs_for_job (1009-1028)
  • delete_destination (580-585)
  • get_source_schema_catalog (459-498)
ddpui/ddpairbyte/schema.py (1)
  • AirbyteConnectionCreate (59-66)
🪛 Ruff (0.8.2)
ddpui/tests/integration_tests/test_airbyte_full_integration.py

20-20: typing.List imported but unused

Remove unused import

(F401)


20-20: typing.Dict imported but unused

Remove unused import

(F401)


20-20: typing.Optional imported but unused

Remove unused import

(F401)

🔇 Additional comments (2)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (2)

141-150: Hard‑coded /health endpoint may be wrong for Airbyte > 0.50

Recent Airbyte versions expose /health/ (note the trailing slash) and return JSON with "available":true.
Consider checking both variants or parsing the JSON for backward compatibility.


282-321: get_source_definitions may return dict, code expects list

get_source_definitions returns a dict with key "sourceDefinitions".
You build source_def_map correctly from it, but later when skipping you rely on source_def_key in key which is lenient.
The bigger issue appears in test 05 (see below).

Comment on lines 44 to 48
AIRBYTE_TEST_S3_BUCKET=test-bucket
AIRBYTE_TEST_S3_PATH=test-path
AIRBYTE_TEST_S3_REGION=us-east-1
AIRBYTE_TEST_S3_ACCESS_KEY=
AIRBYTE_TEST_S3_SECRET_KEY= No newline at end of file
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Trailing space may break some dotenv loaders

Line 48 ends with a space after the = which is preserved by some implementations and leads to authentication failures.
Remove trailing whitespace from all keys that expect secret values.

-AIRBYTE_TEST_S3_SECRET_KEY= 
+AIRBYTE_TEST_S3_SECRET_KEY=

@SarthakB11
Copy link
Author

incorporated coderabbit feedback

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (2)

385-395: Correct Airbyte catalog payload – good job!

The re‑written list‐comprehension now meets the API contract by embedding stream and config keys. This addresses the earlier review feedback.


480-481: Correct argument order for get_logs_for_job – fixed

Using 0 as the attempt index matches the helper’s signature and resolves the earlier bug.

🧹 Nitpick comments (4)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (4)

54-59: Strip whitespace when parsing env‐driven connector lists

split(",") keeps surrounding whitespace, so values like "postgres, s3" will contain " s3" and silently skip the matching definition a few lines later.
A small strip() makes the test suite less brittle to human editing errors.

-AIRBYTE_TEST_DESTINATIONS = os.getenv("AIRBYTE_TEST_DESTINATIONS", "postgres").split(",")
-AIRBYTE_TEST_SOURCES = os.getenv("AIRBYTE_TEST_SOURCES", "file").split(",")
+AIRBYTE_TEST_DESTINATIONS = [d.strip() for d in os.getenv("AIRBYTE_TEST_DESTINATIONS", "postgres").split(",") if d.strip()]
+AIRBYTE_TEST_SOURCES = [s.strip() for s in os.getenv("AIRBYTE_TEST_SOURCES", "file").split(",") if s.strip()]

63-68: MAX_SYNC_WAIT_TIME becomes stale after you shrink AIRBYTE_MAX_WAIT_CYCLES

You recalculate AIRBYTE_MAX_WAIT_CYCLES to honour the global timeout but leave the previously computed MAX_SYNC_WAIT_TIME unchanged.
Re‑compute it (or avoid the extra variable altogether) so the later while‑loop reflects the corrected limit.

-    AIRBYTE_MAX_WAIT_CYCLES = AIRBYTE_TEST_TIMEOUT // AIRBYTE_SYNC_WAIT_INTERVAL
-    logger.warning(
-        f"Adjusted max wait cycles to {AIRBYTE_MAX_WAIT_CYCLES} to respect timeout of {AIRBYTE_TEST_TIMEOUT}s"
-    )
+    AIRBYTE_MAX_WAIT_CYCLES = AIRBYTE_TEST_TIMEOUT // AIRBYTE_SYNC_WAIT_INTERVAL
+    MAX_SYNC_WAIT_TIME = AIRBYTE_SYNC_WAIT_INTERVAL * AIRBYTE_MAX_WAIT_CYCLES
+    logger.warning(
+        "Adjusted max wait cycles to %s to respect timeout of %ss",
+        AIRBYTE_MAX_WAIT_CYCLES,
+        AIRBYTE_TEST_TIMEOUT,
+    )

208-231: try/finally comment is misleading – and cleanup can be simpler

The outer try advertises a finally that does not exist, which is confusing to future maintainers.
You can both clarify intent and shrink the code by using contextlib.suppress for the best‑effort delete_workspace in the error path.

-        try:
-            # Use try/finally to ensure cleanup even if setup fails partially
-            try:
-                workspace_result = create_workspace(workspace_name)
-                cls.workspace_id = workspace_result["workspaceId"]
-                logger.info(f"Created workspace with ID: {cls.workspace_id}")
-            except HttpError as e:
-                logger.error(f"Failed to create workspace: {str(e)}")
-                pytest.skip(f"Could not create workspace: {str(e)}")
-                return
-            except Exception as e:
-                logger.error(f"Failed to create workspace: {str(e)}")
-                pytest.skip(f"Unexpected error: {str(e)}")
-                return
-        except Exception as e:
-            # This catch-all ensures we don't leak workspaces even on unexpected errors
-            logger.error(f"Error during setup: {str(e)}")
-            # Try to clean up if we did create a workspace
-            if cls.workspace_id:
-                try:
-                    delete_workspace(cls.workspace_id)
-                except Exception:
-                    pass
-            raise
+        try:
+            workspace_result = create_workspace(workspace_name)
+            cls.workspace_id = workspace_result["workspaceId"]
+            logger.info("Created workspace with ID: %s", cls.workspace_id)
+        except HttpError as e:
+            logger.error("Failed to create workspace: %s", e)
+            pytest.skip(f"Could not create workspace: {e}")
+        except Exception as e:
+            logger.error("Unexpected error creating workspace: %s", e)
+            with contextlib.suppress(Exception):
+                if cls.workspace_id:
+                    delete_workspace(cls.workspace_id)
+            raise

Remember to import contextlib at the top.

🧰 Tools
🪛 Ruff (0.8.2)

227-230: Use contextlib.suppress(Exception) instead of try-except-pass

Replace with contextlib.suppress(Exception)

(SIM105)


282-287: dest_type comparison could be case/space robust

Minor, but normalising both sides (lower().strip()) avoids surprises from environment variables such as " Postgres ".

No diff provided – applies equally to the source loop below.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8961d26 and af6160f.

📒 Files selected for processing (3)
  • .env.airbyte_integration_test (1 hunks)
  • README.md (1 hunks)
  • ddpui/tests/integration_tests/test_airbyte_full_integration.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • README.md
  • .env.airbyte_integration_test
🧰 Additional context used
🧬 Code Graph Analysis (1)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (3)
ddpui/ddpairbyte/airbyte_service.py (12)
  • create_workspace (156-168)
  • delete_workspace (171-174)
  • create_destination (588-611)
  • create_source (331-366)
  • get_source_definitions (199-221)
  • get_destination_definitions (501-510)
  • sync_connection (920-929)
  • get_jobs_for_connection (950-976)
  • get_job_info (932-938)
  • get_logs_for_job (1009-1028)
  • delete_destination (580-585)
  • get_source_schema_catalog (459-498)
ddpui/ddpairbyte/schema.py (1)
  • AirbyteConnectionCreate (59-66)
ddpui/utils/custom_logger.py (3)
  • warning (49-53)
  • info (25-29)
  • error (31-35)
🪛 Ruff (0.8.2)
ddpui/tests/integration_tests/test_airbyte_full_integration.py

227-230: Use contextlib.suppress(Exception) instead of try-except-pass

Replace with contextlib.suppress(Exception)

(SIM105)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (2)

380-380: Move import statement to top of file

This import statement should be moved to the top of the file with the other imports rather than inside the function body.

-                    from ddpui.ddpairbyte.airbyte_service import get_source_schema_catalog

Add this to the imports at the top of the file (around line 44):

+    get_source_schema_catalog,

416-454: Consider implementing exponential backoff for sync status polling

The current implementation uses fixed interval polling which works but may not be optimal for varying job durations. Consider implementing exponential backoff to reduce API calls while improving responsiveness for short-running jobs.

-                while (status not in ["succeeded", "failed", "cancelled"] and 
-                       wait_cycles < AIRBYTE_MAX_WAIT_CYCLES and 
-                       (time.time() - start_time) < AIRBYTE_TEST_TIMEOUT):
-                    time.sleep(AIRBYTE_SYNC_WAIT_INTERVAL)
+                wait_interval = AIRBYTE_SYNC_WAIT_INTERVAL
+                while (status not in ["succeeded", "failed", "cancelled"] and 
+                       wait_cycles < AIRBYTE_MAX_WAIT_CYCLES and 
+                       (time.time() - start_time) < AIRBYTE_TEST_TIMEOUT):
+                    time.sleep(wait_interval)
+                    # Apply exponential backoff with a cap
+                    wait_interval = min(wait_interval * 1.5, 60)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between af6160f and c1377b4.

📒 Files selected for processing (1)
  • ddpui/tests/integration_tests/test_airbyte_full_integration.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (3)
ddpui/ddpairbyte/airbyte_service.py (12)
  • create_workspace (156-168)
  • delete_workspace (171-174)
  • create_destination (588-611)
  • create_source (331-366)
  • get_source_definitions (199-221)
  • get_destination_definitions (501-510)
  • sync_connection (920-929)
  • get_jobs_for_connection (950-976)
  • get_job_info (932-938)
  • get_logs_for_job (1009-1028)
  • delete_destination (580-585)
  • get_source_schema_catalog (459-498)
ddpui/ddpairbyte/schema.py (1)
  • AirbyteConnectionCreate (59-66)
ddpui/utils/custom_logger.py (3)
  • warning (49-53)
  • info (25-29)
  • error (31-35)
🔇 Additional comments (6)
ddpui/tests/integration_tests/test_airbyte_full_integration.py (6)

154-184: Well-implemented server availability check with version compatibility

The check_airbyte_available() function intelligently handles both older and newer Airbyte API versions by trying endpoints with and without trailing slashes. This future-proofs the tests and makes them more robust.


199-226: Good error handling and resource management in setup

The setup_class implementation properly:

  1. Checks Airbyte availability before attempting setup
  2. Creates a unique workspace name with timestamp
  3. Implements proper error handling with graceful skipping
  4. Attempts cleanup when unexpected errors occur

This ensures tests don't fail with cryptic errors when prerequisites aren't met.


227-264: Comprehensive teardown with robust error handling

The teardown process properly cleans up all resources in the correct order (connections, sources, destinations, workspace) and uses contextlib.suppress with detailed warning logs to continue cleanup even if individual deletions fail.


368-415: Well-structured connection creation with proper API payload formatting

The connection creation uses the proper Airbyte API structure for stream configuration, correctly setting up sync modes. This implementation resolves a previous issue where stream configurations were improperly structured.


54-73: Good environment variable handling with validation

The configuration section properly:

  1. Parses boolean flags with lowercasing
  2. Strips whitespace from lists for robustness
  3. Validates and adjusts wait cycles against timeout
  4. Provides clear logging when adjustments are made

This ensures configuration errors are caught early with helpful feedback.


455-484: Fixed get_logs_for_job parameter usage

The implementation correctly uses index 0 for the attempt number parameter to get_logs_for_job(), fixing a previous issue where an attempt ID was incorrectly passed.

@SarthakB11
Copy link
Author

@Ishankoradia kindly review and let me know.

@SarthakB11
Copy link
Author

@fatchat @Ishankoradia reminder!

@Ishankoradia
Copy link
Contributor

Hi @SarthakB11 this looks like a solid first attempt. Have you been able to run the test suite ? Could you share your findings.

@fatchat
Copy link
Contributor

fatchat commented Jun 25, 2025

@Ishankoradia are we merging or closing

@SarthakB11
Copy link
Author

@Ishankoradia hi, could you guide me in running the test suite? That would be helpful!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airbyte integration test suite

3 participants