Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions sources/pg_replication/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ It also needs `CREATE` privilege on the database:
GRANT CREATE ON DATABASE dlt_data TO replication_user;
```

If not a superuser, the user must have ownership of the tables that need to be replicated:

```sql
ALTER TABLE your_table OWNER TO replication_user;
```


### Set up RDS
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:
Expand Down
3 changes: 2 additions & 1 deletion sources/pg_replication/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ def init_replication(
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
to include in the publication. If not provided, the whole schema with `schema_name` will be replicated
(also tables added to the schema after the publication was created). You need superuser privileges
for the schema replication.
for the whole schema replication. When specifying individual table names, the database role must
own the tables if it's not a superuser.
credentials (ConnectionStringCredentials): Postgres database credentials.
publish (str): Comma-separated string of DML operations. Can be used to
control which changes are included in the publication. Allowed operations
Expand Down
103 changes: 82 additions & 21 deletions sources/pg_replication_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Optional, Union, Sequence
import dlt

from dlt.common.destination import Destination
Expand All @@ -10,13 +11,63 @@
PG_CREDS = dlt.secrets.get("sources.pg_replication.credentials", PostgresCredentials)


def replicate_single_table() -> None:
"""Sets up replication for a single Postgres table and loads changes into a destination.
def replicate_single_table_with_initial_load(
schema_name: str, table_names: Optional[Union[str, Sequence[str]]]
) -> None:
"""Sets up replication with initial load for your existing PostgreSQL database.

Unlike the other functions, this function does NOT simulate changes in the source table.
It connects to your actual database and performs initial load from the specified tables
and performs replication if any.

Args:
schema_name (str): Name of the schema containing the tables to replicate.
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
to replicate. Can be a single table name as string or a sequence of table names.
If None, replicates all tables in the schema (requires superuser privileges).
When specifying table names, the Postgres user must own the tables or be a superuser.

Returns:
None
"""
# create destination pipeline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not call it the destination pipeline! Pipeline is a connection between source and destination, can we just call it replication_pipeline?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we call them simulation and replication pipelines - i finally understand from the user perspective how source and destination pipelines sounds odd 🫡

dest_pl = dlt.pipeline(
pipeline_name="pg_replication_pipeline",
destination="duckdb",
dataset_name="replicate_with_initial_load",
)

# initialize replication for the source table
slot_name = "example_slot"
pub_name = "example_pub"
snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
slot_name=slot_name,
pub_name=pub_name,
table_names=table_names, # requires the Postgres user to own the table(s) or be superuser
schema_name=schema_name,
persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load
reset=True,
)

# perform initial load to capture all records present in source table prior to replication initialization
load_info = dest_pl.run(snapshot)
print(load_info)
print(dest_pl.last_trace.last_normalize_info)

# assuming there were changes in the source table, propagate change to destination
changes = replication_resource(slot_name, pub_name)
load_info = dest_pl.run(changes)
print(load_info)
print(dest_pl.last_trace.last_normalize_info)


Demonstrates basic usage of `init_replication` helper and `replication_resource` resource.
Uses `src_pl` to create and change the replicated Postgres table—this
is only for demonstration purposes, you won't need this when you run in production
as you'll probably have another process feeding your Postgres instance.
def replicate_single_table_demo() -> None:
"""Demonstrates PostgreSQL replication by simulating a source table and changes.

Shows basic usage of `init_replication` helper and `replication_resource` resource.
This demo creates a source table and simulates INSERT, UPDATE, and DELETE operations
to show how replication works end-to-end. In production, you would have an existing
PostgreSQL database with real changes instead of simulating them.
"""
# create source and destination pipelines
src_pl = get_postgres_pipeline()
Expand Down Expand Up @@ -64,11 +115,12 @@ def replicate_single_table() -> None:
show_destination_table(dest_pl)


def replicate_with_initial_load() -> None:
"""Sets up replication with initial load.
def replicate_with_initial_load_demo() -> None:
"""Demonstrates PostgreSQL replication with initial load by simulating a source table and changes.

Demonstrates usage of `persist_snapshots` argument and snapshot resource
returned by `init_replication` helper.
Shows usage of `persist_snapshots` argument and snapshot resource returned by `init_replication` helper.
This demo creates a source table with existing data, then simulates additional changes to show how
initial load captures pre-existing records and replication handles subsequent changes.
"""
# create source and destination pipelines
src_pl = get_postgres_pipeline()
Expand Down Expand Up @@ -112,11 +164,15 @@ def replicate_with_initial_load() -> None:
show_destination_table(dest_pl)


def replicate_entire_schema() -> None:
"""Demonstrates setup and usage of schema replication.
def replicate_entire_schema_demo() -> None:
"""Demonstrates schema-level replication by simulating multiple tables and changes.

Shows setup and usage of schema replication, which captures changes across all tables
in a schema. This demo creates multiple source tables and simulates changes to show
how schema replication works, including tables added after replication starts.

Schema replication requires a Postgres server version of 15 or higher. An
exception is raised if that's not the case.
Schema replication requires PostgreSQL server version 15 or higher. An exception
is raised if that's not the case.
"""
# create source and destination pipelines
src_pl = get_postgres_pipeline()
Expand Down Expand Up @@ -172,10 +228,12 @@ def replicate_entire_schema() -> None:
show_destination_table(dest_pl, "tbl_z")


def replicate_with_column_selection() -> None:
"""Sets up replication with column selection.
def replicate_with_column_selection_demo() -> None:
"""Demonstrates column selection in replication by simulating tables with selective column capture.

Demonstrates usage of `include_columns` argument.
Shows usage of `include_columns` argument to replicate only specific columns from tables.
This demo creates source tables and simulates changes to show how column selection works,
where some tables have filtered columns while others include all columns by default.
"""
# create source and destination pipelines
src_pl = get_postgres_pipeline()
Expand Down Expand Up @@ -286,7 +344,10 @@ def show_destination_table(


if __name__ == "__main__":
replicate_single_table()
# replicate_with_initial_load()
# replicate_entire_schema()
# replicate_with_column_selection()
replicate_single_table_with_initial_load(
schema_name="public", table_names="test_table"
)
# replicate_single_table_demo()
# replicate_with_initial_load_demo()
# replicate_entire_schema_demo()
# replicate_with_column_selection_demo()