Skip to content

Commit ff65c86

Browse files
committed
Example load with no manual source creation
1 parent ce62f48 commit ff65c86

File tree

3 files changed

+81
-45
lines changed

3 files changed

+81
-45
lines changed

sources/pg_replication/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,13 @@ It also needs `CREATE` privilege on the database:
2929
GRANT CREATE ON DATABASE dlt_data TO replication_user;
3030
```
3131

32+
If not a superuser, the user must have ownership of the tables that need to be replicated:
33+
34+
```sql
35+
ALTER TABLE your_table OWNER TO replication_user;
36+
```
37+
38+
3239
### Set up RDS
3340
1. You must enable replication for RDS Postgres instance via **Parameter Group**: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_PostgreSQL.Replication.ReadReplicas.html
3441
2. `WITH LOGIN REPLICATION;` does not work on RDS, instead do:

sources/pg_replication/helpers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ def init_replication(
114114
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
115115
to include in the publication. If not provided, the whole schema with `schema_name` will be replicated
116116
(also tables added to the schema after the publication was created). You need superuser privileges
117-
for the schema replication.
117+
for the whole schema replication. When specifying individual table names, the database role must
118+
own the tables if it's not a superuser.
118119
credentials (ConnectionStringCredentials): Postgres database credentials.
119120
publish (str): Comma-separated string of DML operations. Can be used to
120121
control which changes are included in the publication. Allowed operations

sources/pg_replication_pipeline.py

Lines changed: 72 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from typing import Optional, Union, Sequence
12
import dlt
23

34
from dlt.common.destination import Destination
@@ -9,50 +10,66 @@
910

1011
PG_CREDS = dlt.secrets.get("sources.pg_replication.credentials", PostgresCredentials)
1112

12-
def replicate_with_initial_load(pipeline_name: str, slot_name: str, pub_name: str) -> None:
13-
"""Production example: Sets up replication with initial load.
1413

15-
Demonstrates usage of `persist_snapshots` argument and snapshot resource
16-
returned by `init_replication` helper.
14+
def replicate_single_table_with_initial_load(
15+
schema_name: str, table_names: Optional[Union[str, Sequence[str]]]
16+
) -> None:
17+
"""Sets up replication with initial load for your existing PostgreSQL database.
18+
19+
Unlike the other functions, this function does NOT simulate changes in the source table.
20+
It connects to your actual database and performs initial load from the specified tables
21+
and performs replication if any.
22+
23+
Args:
24+
schema_name (str): Name of the schema containing the tables to replicate.
25+
table_names (Optional[Union[str, Sequence[str]]]): Name(s) of the table(s)
26+
to replicate. Can be a single table name as string or a sequence of table names.
27+
If None, replicates all tables in the schema (requires superuser privileges).
28+
When specifying table names, the Postgres user must own the tables or be a superuser.
29+
30+
Returns:
31+
None
1732
"""
18-
# create source and destination pipelines
33+
# create destination pipeline
1934
dest_pl = dlt.pipeline(
20-
pipeline_name=pipeline_name,
21-
destination='duckdb',
22-
dataset_name="replication_postgres",
35+
pipeline_name="pg_replication_pipeline",
36+
destination="duckdb",
37+
dataset_name="replicate_with_initial_load",
2338
)
24-
schema_name = "public"
2539

26-
creds = dlt.secrets.get(
27-
f"{pipeline_name}.sources.pg_replication.credentials", PostgresCredentials
28-
)
29-
print(creds)
40+
# initialize replication for the source table
41+
slot_name = "example_slot"
42+
pub_name = "example_pub"
3043
snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
3144
slot_name=slot_name,
32-
credentials=creds,
3345
pub_name=pub_name,
46+
table_names=table_names, # requires the Postgres user to own the table(s) or be superuser
3447
schema_name=schema_name,
3548
persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load
36-
reset=True
49+
reset=True,
3750
)
38-
print("replication initialized")
51+
3952
# perform initial load to capture all records present in source table prior to replication initialization
40-
dest_pl.run(snapshot)
41-
print("replication run")
42-
# insert record in source table and propagate change to destination
43-
changes = replication_resource(slot_name, pub_name, credentials=creds)
44-
print("changes initialized")
45-
dest_pl.run(changes)
46-
print("changes run")
47-
53+
load_info = dest_pl.run(snapshot)
54+
print(load_info)
55+
print(dest_pl.last_trace.last_normalize_info)
56+
57+
# assuming there were changes in the source table, propagate change to destination
58+
changes = replication_resource(slot_name, pub_name)
59+
load_info = dest_pl.run(changes)
60+
print(load_info)
61+
print(dest_pl.last_trace.last_normalize_info)
62+
4863

4964
def replicate_single_table_demo() -> None:
50-
"""Sets up replication for a single Postgres table and loads changes into a destination.
65+
"""Demonstrates PostgreSQL replication by simulating a source table and changes.
66+
67+
Shows basic usage of `init_replication` helper and `replication_resource` resource.
68+
This demo creates a source table and simulates INSERT, UPDATE, and DELETE operations
69+
to show how replication works end-to-end. In production, you would have an existing
70+
PostgreSQL database with real changes instead of simulating them.
5171
52-
Demonstrates basic usage of `init_replication` helper and `replication_resource` resource.
53-
Uses `src_pl` to create and change the replicated Postgres table—this
54-
is only for demonstration purposes, you won't need this when you run in production
55-
as you'll probably have another process feeding your Postgres instance.
72+
For production use with existing data, see `replicate_with_initial_load()`.
5673
"""
5774
# create source and destination pipelines
5875
src_pl = get_postgres_pipeline()
@@ -100,16 +117,14 @@ def replicate_single_table_demo() -> None:
100117
show_destination_table(dest_pl)
101118

102119

103-
def demo_initial_load_replication() -> None:
104-
"""Sets up replication with initial load.
120+
def replicate_with_initial_load_demo() -> None:
121+
"""Demonstrates PostgreSQL replication with initial load by simulating a source table and changes.
105122
106-
Demonstrates usage of `persist_snapshots` argument and snapshot resource
107-
returned by `init_replication` helper.
123+
Shows usage of `persist_snapshots` argument and snapshot resource returned by `init_replication` helper.
124+
This demo creates a source table with existing data, then simulates additional changes to show how
125+
initial load captures pre-existing records and replication handles subsequent changes.
108126
109-
Notes:
110-
- This function also creates the source table itself. That’s only useful for demos or
111-
when starting with a brand-new database. In production you normally won’t create tables here,
112-
since your application/database already has them.
127+
For production use with existing data, see `replicate_with_initial_load()`.
113128
"""
114129
# create source and destination pipelines
115130
src_pl = get_postgres_pipeline()
@@ -154,10 +169,16 @@ def demo_initial_load_replication() -> None:
154169

155170

156171
def replicate_entire_schema_demo() -> None:
157-
"""Demonstrates setup and usage of schema replication.
172+
"""Demonstrates schema-level replication by simulating multiple tables and changes.
173+
174+
Shows setup and usage of schema replication, which captures changes across all tables
175+
in a schema. This demo creates multiple source tables and simulates changes to show
176+
how schema replication works, including tables added after replication starts.
158177
159-
Schema replication requires a Postgres server version of 15 or higher. An
160-
exception is raised if that's not the case.
178+
Schema replication requires PostgreSQL server version 15 or higher. An exception
179+
is raised if that's not the case.
180+
181+
For production use with existing schemas, adapt this pattern to your real database.
161182
"""
162183
# create source and destination pipelines
163184
src_pl = get_postgres_pipeline()
@@ -214,9 +235,13 @@ def replicate_entire_schema_demo() -> None:
214235

215236

216237
def replicate_with_column_selection_demo() -> None:
217-
"""Sets up replication with column selection.
238+
"""Demonstrates column selection in replication by simulating tables with selective column capture.
239+
240+
Shows usage of `include_columns` argument to replicate only specific columns from tables.
241+
This demo creates source tables and simulates changes to show how column selection works,
242+
where some tables have filtered columns while others include all columns by default.
218243
219-
Demonstrates usage of `include_columns` argument.
244+
For production use, apply the `include_columns` pattern to your existing tables.
220245
"""
221246
# create source and destination pipelines
222247
src_pl = get_postgres_pipeline()
@@ -327,7 +352,10 @@ def show_destination_table(
327352

328353

329354
if __name__ == "__main__":
330-
replicate_single_table_demo()
331-
# replicate_with_initial_load()
355+
replicate_single_table_with_initial_load(
356+
schema_name="public", table_names="test_table"
357+
)
358+
# replicate_single_table_demo()
359+
# replicate_with_initial_load_demo()
332360
# replicate_entire_schema_demo()
333361
# replicate_with_column_selection_demo()

0 commit comments

Comments
 (0)