Skip to content

Commit 0d7562e

Browse files
committed
No ambiguous source and destination pipelines
1 parent cce8d1b commit 0d7562e

File tree

2 files changed

+59
-59
lines changed

2 files changed

+59
-59
lines changed

sources/pg_replication/helpers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from dlt.sources.credentials import ConnectionStringCredentials
4343
from dlt.sources.sql_database import (
4444
sql_table as core_sql_table,
45-
sql_database as core_sql_datbase,
45+
sql_database as core_sql_database,
4646
)
4747

4848
from .schema_types import _to_dlt_column_schema, _to_dlt_val
@@ -185,7 +185,7 @@ def init_replication(
185185
# do not include dlt tables
186186
table_names = [
187187
table_name
188-
for table_name in core_sql_datbase(
188+
for table_name in core_sql_database(
189189
credentials, schema=schema_name, reflection_level="minimal"
190190
).resources.keys()
191191
if not table_name.lower().startswith(DLT_NAME_PREFIX)

sources/pg_replication_pipeline.py

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ def replicate_single_table_with_initial_load(
3030
Returns:
3131
None
3232
"""
33-
# create destination pipeline
34-
dest_pl = dlt.pipeline(
33+
# create replication pipeline
34+
repl_pl = dlt.pipeline(
3535
pipeline_name="pg_replication_pipeline",
3636
destination="duckdb",
3737
dataset_name="replicate_with_initial_load",
@@ -50,15 +50,15 @@ def replicate_single_table_with_initial_load(
5050
)
5151

5252
# perform initial load to capture all records present in source table prior to replication initialization
53-
load_info = dest_pl.run(snapshot)
53+
load_info = repl_pl.run(snapshot)
5454
print(load_info)
55-
print(dest_pl.last_trace.last_normalize_info)
55+
print(repl_pl.last_trace.last_normalize_info)
5656

5757
# assuming there were changes in the source table, propagate change to destination
5858
changes = replication_resource(slot_name, pub_name)
59-
load_info = dest_pl.run(changes)
59+
load_info = repl_pl.run(changes)
6060
print(load_info)
61-
print(dest_pl.last_trace.last_normalize_info)
61+
print(repl_pl.last_trace.last_normalize_info)
6262

6363

6464
def replicate_single_table_demo() -> None:
@@ -69,9 +69,9 @@ def replicate_single_table_demo() -> None:
6969
to show how replication works end-to-end. In production, you would have an existing
7070
PostgreSQL database with real changes instead of simulating them.
7171
"""
72-
# create source and destination pipelines
73-
src_pl = get_postgres_pipeline()
74-
dest_pl = dlt.pipeline(
72+
# create simulation and replication pipelines
73+
sim_pl = get_postgres_pipeline()
74+
repl_pl = dlt.pipeline(
7575
pipeline_name="pg_replication_pipeline",
7676
destination="duckdb",
7777
dataset_name="replicate_single_table",
@@ -80,7 +80,7 @@ def replicate_single_table_demo() -> None:
8080

8181
# create table "my_source_table" in source to demonstrate replication
8282
create_source_table(
83-
src_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val bool);"
83+
sim_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val bool);"
8484
)
8585

8686
# initialize replication for the source table—this creates a replication slot and publication
@@ -89,7 +89,7 @@ def replicate_single_table_demo() -> None:
8989
init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
9090
slot_name=slot_name,
9191
pub_name=pub_name,
92-
schema_name=src_pl.dataset_name,
92+
schema_name=sim_pl.dataset_name,
9393
table_names="my_source_table",
9494
reset=True,
9595
)
@@ -99,20 +99,20 @@ def replicate_single_table_demo() -> None:
9999

100100
# insert two records in source table and propagate changes to destination
101101
change_source_table(
102-
src_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);"
102+
sim_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);"
103103
)
104-
dest_pl.run(changes)
105-
show_destination_table(dest_pl)
104+
repl_pl.run(changes)
105+
show_destination_table(repl_pl)
106106

107107
# update record in source table and propagate change to destination
108-
change_source_table(src_pl, "UPDATE {table_name} SET val = true WHERE id = 2;")
109-
dest_pl.run(changes)
110-
show_destination_table(dest_pl)
108+
change_source_table(sim_pl, "UPDATE {table_name} SET val = true WHERE id = 2;")
109+
repl_pl.run(changes)
110+
show_destination_table(repl_pl)
111111

112112
# delete record from source table and propagate change to destination
113-
change_source_table(src_pl, "DELETE FROM {table_name} WHERE id = 2;")
114-
dest_pl.run(changes)
115-
show_destination_table(dest_pl)
113+
change_source_table(sim_pl, "DELETE FROM {table_name} WHERE id = 2;")
114+
repl_pl.run(changes)
115+
show_destination_table(repl_pl)
116116

117117

118118
def replicate_with_initial_load_demo() -> None:
@@ -122,9 +122,9 @@ def replicate_with_initial_load_demo() -> None:
122122
This demo creates a source table with existing data, then simulates additional changes to show how
123123
initial load captures pre-existing records and replication handles subsequent changes.
124124
"""
125-
# create source and destination pipelines
126-
src_pl = get_postgres_pipeline()
127-
dest_pl = dlt.pipeline(
125+
# create simulation and replication pipelines
126+
sim_pl = get_postgres_pipeline()
127+
repl_pl = dlt.pipeline(
128128
pipeline_name="pg_replication_pipeline",
129129
destination="duckdb",
130130
dataset_name="replicate_with_initial_load",
@@ -133,12 +133,12 @@ def replicate_with_initial_load_demo() -> None:
133133

134134
# create table "my_source_table" in source to demonstrate replication
135135
create_source_table(
136-
src_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val bool);"
136+
sim_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val bool);"
137137
)
138138

139139
# insert records before initializing replication
140140
change_source_table(
141-
src_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);"
141+
sim_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);"
142142
)
143143

144144
# initialize replication for the source table
@@ -147,21 +147,21 @@ def replicate_with_initial_load_demo() -> None:
147147
snapshot = init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
148148
slot_name=slot_name,
149149
pub_name=pub_name,
150-
schema_name=src_pl.dataset_name,
150+
schema_name=sim_pl.dataset_name,
151151
table_names="my_source_table",
152152
persist_snapshots=True, # persist snapshot table(s) and let function return resource(s) for initial load
153153
reset=True,
154154
)
155155

156156
# perform initial load to capture all records present in source table prior to replication initialization
157-
dest_pl.run(snapshot)
158-
show_destination_table(dest_pl)
157+
repl_pl.run(snapshot)
158+
show_destination_table(repl_pl)
159159

160160
# insert record in source table and propagate change to destination
161-
change_source_table(src_pl, "INSERT INTO {table_name} VALUES (3, true);")
161+
change_source_table(sim_pl, "INSERT INTO {table_name} VALUES (3, true);")
162162
changes = replication_resource(slot_name, pub_name)
163-
dest_pl.run(changes)
164-
show_destination_table(dest_pl)
163+
repl_pl.run(changes)
164+
show_destination_table(repl_pl)
165165

166166

167167
def replicate_entire_schema_demo() -> None:
@@ -174,9 +174,9 @@ def replicate_entire_schema_demo() -> None:
174174
Schema replication requires PostgreSQL server version 15 or higher. An exception
175175
is raised if that's not the case.
176176
"""
177-
# create source and destination pipelines
178-
src_pl = get_postgres_pipeline()
179-
dest_pl = dlt.pipeline(
177+
# create simulation and replication pipelines
178+
sim_pl = get_postgres_pipeline()
179+
repl_pl = dlt.pipeline(
180180
pipeline_name="pg_replication_pipeline",
181181
destination="duckdb",
182182
dataset_name="replicate_entire_schema",
@@ -185,12 +185,12 @@ def replicate_entire_schema_demo() -> None:
185185

186186
# create two source tables to demonstrate schema replication
187187
create_source_table(
188-
src_pl,
188+
sim_pl,
189189
"CREATE TABLE {table_name} (id integer PRIMARY KEY, val bool);",
190190
"tbl_x",
191191
)
192192
create_source_table(
193-
src_pl,
193+
sim_pl,
194194
"CREATE TABLE {table_name} (id integer PRIMARY KEY, val varchar);",
195195
"tbl_y",
196196
)
@@ -201,7 +201,7 @@ def replicate_entire_schema_demo() -> None:
201201
init_replication( # initializing schema replication requires the Postgres user to be a superuser
202202
slot_name=slot_name,
203203
pub_name=pub_name,
204-
schema_name=src_pl.dataset_name,
204+
schema_name=sim_pl.dataset_name,
205205
reset=True,
206206
)
207207

@@ -210,22 +210,22 @@ def replicate_entire_schema_demo() -> None:
210210

211211
# insert records in source tables and propagate changes to destination
212212
change_source_table(
213-
src_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);", "tbl_x"
213+
sim_pl, "INSERT INTO {table_name} VALUES (1, true), (2, false);", "tbl_x"
214214
)
215-
change_source_table(src_pl, "INSERT INTO {table_name} VALUES (1, 'foo');", "tbl_y")
216-
dest_pl.run(changes)
217-
show_destination_table(dest_pl, "tbl_x")
218-
show_destination_table(dest_pl, "tbl_y")
215+
change_source_table(sim_pl, "INSERT INTO {table_name} VALUES (1, 'foo');", "tbl_y")
216+
repl_pl.run(changes)
217+
show_destination_table(repl_pl, "tbl_x")
218+
show_destination_table(repl_pl, "tbl_y")
219219

220220
# tables added to the schema later are also included in the replication
221221
create_source_table(
222-
src_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val date);", "tbl_z"
222+
sim_pl, "CREATE TABLE {table_name} (id integer PRIMARY KEY, val date);", "tbl_z"
223223
)
224224
change_source_table(
225-
src_pl, "INSERT INTO {table_name} VALUES (1, '2023-03-18');", "tbl_z"
225+
sim_pl, "INSERT INTO {table_name} VALUES (1, '2023-03-18');", "tbl_z"
226226
)
227-
dest_pl.run(changes)
228-
show_destination_table(dest_pl, "tbl_z")
227+
repl_pl.run(changes)
228+
show_destination_table(repl_pl, "tbl_z")
229229

230230

231231
def replicate_with_column_selection_demo() -> None:
@@ -235,9 +235,9 @@ def replicate_with_column_selection_demo() -> None:
235235
This demo creates source tables and simulates changes to show how column selection works,
236236
where some tables have filtered columns while others include all columns by default.
237237
"""
238-
# create source and destination pipelines
239-
src_pl = get_postgres_pipeline()
240-
dest_pl = dlt.pipeline(
238+
# create simulation and replication pipelines
239+
sim_pl = get_postgres_pipeline()
240+
repl_pl = dlt.pipeline(
241241
pipeline_name="pg_replication_pipeline",
242242
destination="duckdb",
243243
dataset_name="replicate_with_column_selection",
@@ -246,12 +246,12 @@ def replicate_with_column_selection_demo() -> None:
246246

247247
# create two source tables to demonstrate schema replication
248248
create_source_table(
249-
src_pl,
249+
sim_pl,
250250
"CREATE TABLE {table_name} (c1 integer PRIMARY KEY, c2 bool, c3 varchar);",
251251
"tbl_x",
252252
)
253253
create_source_table(
254-
src_pl,
254+
sim_pl,
255255
"CREATE TABLE {table_name} (c1 integer PRIMARY KEY, c2 bool, c3 varchar);",
256256
"tbl_y",
257257
)
@@ -262,7 +262,7 @@ def replicate_with_column_selection_demo() -> None:
262262
init_replication( # requires the Postgres user to have the REPLICATION attribute assigned
263263
slot_name=slot_name,
264264
pub_name=pub_name,
265-
schema_name=src_pl.dataset_name,
265+
schema_name=sim_pl.dataset_name,
266266
table_names=("tbl_x", "tbl_y"),
267267
reset=True,
268268
)
@@ -278,18 +278,18 @@ def replicate_with_column_selection_demo() -> None:
278278

279279
# insert records in source tables and propagate changes to destination
280280
change_source_table(
281-
src_pl, "INSERT INTO {table_name} VALUES (1, true, 'foo');", "tbl_x"
281+
sim_pl, "INSERT INTO {table_name} VALUES (1, true, 'foo');", "tbl_x"
282282
)
283283
change_source_table(
284-
src_pl, "INSERT INTO {table_name} VALUES (1, false, 'bar');", "tbl_y"
284+
sim_pl, "INSERT INTO {table_name} VALUES (1, false, 'bar');", "tbl_y"
285285
)
286-
dest_pl.run(changes)
286+
repl_pl.run(changes)
287287

288288
# show columns in schema for both tables
289289
# column c3 is not in the schema for tbl_x because we did not include it
290290
# tbl_y does have column c3 because we didn't specify include columns for this table and by default all columns are included
291-
print("tbl_x", ":", list(dest_pl.default_schema.get_table_columns("tbl_x").keys()))
292-
print("tbl_y", ":", list(dest_pl.default_schema.get_table_columns("tbl_y").keys()))
291+
print("tbl_x", ":", list(repl_pl.default_schema.get_table_columns("tbl_x").keys()))
292+
print("tbl_y", ":", list(repl_pl.default_schema.get_table_columns("tbl_y").keys()))
293293

294294

295295
# define some helper methods to make examples more readable

0 commit comments

Comments
 (0)