Skip to content

Commit a447013

Browse files
committed
initial
0 parents  commit a447013

File tree

6 files changed

+340
-0
lines changed

6 files changed

+340
-0
lines changed

.editorconfig

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
root = true
2+
3+
[*]
4+
end_of_line = lf
5+
trim_trailing_whitespace = true
6+
insert_final_newline = true
7+
charset = utf-8
8+
indent_style = space
9+
indent_size = 2
10+
11+
[makefile]
12+
indent_style = tab

dockerfile

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
FROM alpine:3.10
2+
3+
ENV PGDATA=/var/lib/postgresql/data
4+
5+
RUN set -x \
6+
&& install -o postgres -g postgres -m 755 -d $PGDATA /var/lib/postgresql/conf \
7+
&& cd /tmp \
8+
&& wget -qO- https://github.com/postgres/postgres/archive/REL_12_0.tar.gz | tar xz \
9+
\
10+
&& apk add --no-cache --virtual .build-deps \
11+
--repositories-file /dev/null \
12+
--repository https://mirror.ps.kz/alpine/v3.10/main \
13+
--repository https://mirror.ps.kz/alpine/v3.10/community \
14+
build-base \
15+
linux-headers \
16+
bison \
17+
flex \
18+
libxml2-dev \
19+
libxslt-dev \
20+
icu-dev \
21+
openssl-dev \
22+
autoconf \
23+
automake \
24+
libtool \
25+
clang-dev \
26+
llvm8-dev \
27+
\
28+
&& cd /tmp/postgres-* \
29+
&& ./configure \
30+
--prefix=/usr/local \
31+
--without-readline \
32+
--with-libxml \
33+
--with-libxslt \
34+
--with-icu \
35+
--with-openssl \
36+
--with-llvm \
37+
&& make \
38+
&& make install \
39+
\
40+
&& apk add --no-cache \
41+
--repositories-file /dev/null \
42+
--repository https://mirror.ps.kz/alpine/v3.10/main \
43+
--repository https://mirror.ps.kz/alpine/v3.10/community \
44+
libxml2 libxslt icu openssl llvm8
45+
46+
USER postgres
47+
RUN set -x \
48+
&& initdb \
49+
\
50+
&& printf %s\\n \
51+
# db user addr method
52+
"local all all trust" \
53+
"local replication all trust" \
54+
"host all all all trust" \
55+
"host replication all all trust" \
56+
> $PGDATA/pg_hba.conf \
57+
\
58+
&& printf %s\\n \
59+
wal_level=logical \
60+
>> $PGDATA/postgresql.conf
61+
62+
USER root
63+
RUN set -x \
64+
&& apk add --no-cache --virtual .build-deps \
65+
--repositories-file /dev/null \
66+
--repository https://mirror.ps.kz/alpine/v3.10/main \
67+
--repository https://mirror.ps.kz/alpine/v3.10/community \
68+
build-base clang
69+
70+
# COPY *.c makefile /tmp/pg_json_decoding/
71+
# USER postgres
72+
CMD set -x \
73+
&& cp -r /src /tmp/pg_json_decoding \
74+
&& cd /tmp/pg_json_decoding \
75+
&& make \
76+
&& make install \
77+
&& su postgres sh -c \
78+
'pg_ctl -w start \
79+
&& psql -v ON_ERROR_STOP=1 -f test.sql \
80+
&& pg_ctl -w stop'

makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
MODULES = pg_json_decoding
2+
PG_CONFIG = pg_config
3+
PGXS := $(shell $(PG_CONFIG) --pgxs)
4+
include $(PGXS)

pg_json_decoding.c

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
#include "postgres.h"
2+
#include "catalog/namespace.h"
3+
#include "replication/logical.h"
4+
#include "common/base64.h"
5+
#include "utils/lsyscache.h"
6+
#include "utils/memutils.h"
7+
#include "utils/rel.h"
8+
#include "utils/relcache.h"
9+
#include "utils/syscache.h"
10+
#include "utils/json.h"
11+
#include "utils/builtins.h"
12+
13+
PG_MODULE_MAGIC;
14+
15+
extern void _PG_init(void);
16+
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
17+
18+
typedef struct _JsonDecodingData {
19+
MemoryContext context;
20+
char *pubname;
21+
Oid pubid;
22+
bool puballtables;
23+
} JsonDecodingData;
24+
25+
static void pg_decode_startup(
26+
LogicalDecodingContext *ctx,
27+
OutputPluginOptions *opt,
28+
bool is_init
29+
) {
30+
JsonDecodingData *data;
31+
ListCell *option;
32+
33+
data = palloc0(sizeof(JsonDecodingData));
34+
data->pubname = NULL;
35+
data->pubid = InvalidOid;
36+
data->puballtables = false;
37+
data->context = AllocSetContextCreate(
38+
ctx->context,
39+
"pg_json_decoding context",
40+
ALLOCSET_DEFAULT_MINSIZE,
41+
ALLOCSET_DEFAULT_INITSIZE,
42+
ALLOCSET_DEFAULT_MAXSIZE
43+
);
44+
ctx->output_plugin_private = data;
45+
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
46+
47+
if (is_init) {
48+
return;
49+
}
50+
51+
foreach(option, ctx->output_plugin_options) {
52+
DefElem *elem = lfirst(option);
53+
Assert(elem->arg == NULL || IsA(elem->arg, String));
54+
if (strcmp(elem->defname, "publication") == 0) {
55+
data->pubname = strVal(elem->arg);
56+
}
57+
}
58+
59+
if (data->pubname == NULL) {
60+
ereport(ERROR, (
61+
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
62+
errmsg("publication parameter missing")
63+
));
64+
}
65+
}
66+
67+
static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
68+
JsonDecodingData *data = ctx->output_plugin_private;
69+
MemoryContextDelete(data->context);
70+
}
71+
72+
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) {
73+
OutputPluginPrepareWrite(ctx, true);
74+
appendStringInfo(
75+
ctx->out,
76+
"{\"kind\":\"begin\",\"committed\":\"%ld\"}",
77+
txn->commit_time + (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY
78+
);
79+
OutputPluginWrite(ctx, true);
80+
}
81+
82+
static void pg_decode_commit_txn(
83+
LogicalDecodingContext *ctx,
84+
ReorderBufferTXN *txn,
85+
XLogRecPtr commit_lsn
86+
) {
87+
OutputPluginPrepareWrite(ctx, true);
88+
appendStringInfoString(ctx->out, "{\"kind\":\"commit\"}");
89+
OutputPluginWrite(ctx, true);
90+
}
91+
92+
static void tuple_to_json(StringInfo out, TupleDesc tupdesc, HeapTuple tuple) {
93+
Datum tupdat = heap_copy_tuple_as_datum(tuple, tupdesc);
94+
Datum tupjson = DirectFunctionCall1(row_to_json, tupdat);
95+
char *tupjsonstr = TextDatumGetCString(tupjson);
96+
appendStringInfoString(out, tupjsonstr);
97+
}
98+
99+
static void pg_decode_change(
100+
LogicalDecodingContext *ctx,
101+
ReorderBufferTXN *txn,
102+
Relation relation,
103+
ReorderBufferChange *change
104+
) {
105+
JsonDecodingData *data;
106+
Form_pg_class class_form;
107+
TupleDesc tupdesc;
108+
MemoryContext old;
109+
char *table_name;
110+
111+
data = ctx->output_plugin_private;
112+
113+
if (data->pubid == InvalidOid) {
114+
data->pubid = get_publication_oid(data->pubname, false);
115+
data->puballtables = GetPublication(data->pubid)->alltables;
116+
}
117+
118+
if (!data->puballtables && !SearchSysCacheExists2(
119+
PUBLICATIONRELMAP,
120+
ObjectIdGetDatum(RelationGetRelid(relation)),
121+
ObjectIdGetDatum(data->pubid)
122+
)) {
123+
return;
124+
}
125+
126+
class_form = RelationGetForm(relation);
127+
tupdesc = RelationGetDescr(relation);
128+
table_name = NameStr(class_form->relname);
129+
130+
old = MemoryContextSwitchTo(data->context);
131+
OutputPluginPrepareWrite(ctx, true);
132+
133+
appendStringInfoString(ctx->out, "{\"kind\":");
134+
switch (change->action) {
135+
case REORDER_BUFFER_CHANGE_INSERT:
136+
appendStringInfoString(ctx->out, "\"insert\"");
137+
break;
138+
case REORDER_BUFFER_CHANGE_UPDATE:
139+
appendStringInfoString(ctx->out, "\"update\"");
140+
break;
141+
case REORDER_BUFFER_CHANGE_DELETE:
142+
appendStringInfoString(ctx->out, "\"delete\"");
143+
break;
144+
default:
145+
appendStringInfoString(ctx->out, "\"unknown\"");
146+
break;
147+
}
148+
appendStringInfoString(ctx->out, ",\"schema\":");
149+
escape_json(ctx->out, get_namespace_name(
150+
get_rel_namespace(RelationGetRelid(relation))
151+
));
152+
appendStringInfoString(ctx->out, ",\"table\":");
153+
escape_json(ctx->out, table_name);
154+
155+
if (change->data.tp.oldtuple != NULL) {
156+
appendStringInfoString(ctx->out, ",\"oldtuple\":");
157+
tuple_to_json(ctx->out, tupdesc, &change->data.tp.oldtuple->tuple);
158+
}
159+
if (change->data.tp.newtuple != NULL) {
160+
appendStringInfoString(ctx->out, ",\"newtuple\":");
161+
tuple_to_json(ctx->out, tupdesc, &change->data.tp.newtuple->tuple);
162+
}
163+
appendStringInfoChar(ctx->out, '}');
164+
165+
MemoryContextSwitchTo(old);
166+
MemoryContextReset(data->context);
167+
168+
OutputPluginWrite(ctx, true);
169+
}
170+
171+
static void pg_decode_truncate(
172+
LogicalDecodingContext *ctx,
173+
ReorderBufferTXN *txn,
174+
int nrelations,
175+
Relation relations[],
176+
ReorderBufferChange *change
177+
) {
178+
OutputPluginPrepareWrite(ctx, true);
179+
appendStringInfoString(ctx->out, "{\"kind\":\"truncate\"}");
180+
OutputPluginWrite(ctx, true);
181+
}
182+
183+
static void pg_decode_message(
184+
LogicalDecodingContext *ctx,
185+
ReorderBufferTXN *txn,
186+
XLogRecPtr lsn,
187+
bool transactional,
188+
const char *prefix,
189+
Size sz,
190+
const char *message
191+
) {
192+
JsonDecodingData *data;
193+
MemoryContext old;
194+
char *message_b64;
195+
196+
data = ctx->output_plugin_private;
197+
old = MemoryContextSwitchTo(data->context);
198+
OutputPluginPrepareWrite(ctx, true);
199+
if (transactional) {
200+
appendStringInfoString(ctx->out, "{\"kind\":\"xmessage\"");
201+
} else {
202+
appendStringInfoString(ctx->out, "{\"kind\":\"message\"");
203+
}
204+
appendStringInfoString(ctx->out, ",\"prefix\":");
205+
escape_json(ctx->out, prefix);
206+
message_b64 = palloc0(pg_b64_enc_len(sz) + 1);
207+
pg_b64_encode(message, sz, message_b64);
208+
appendStringInfo(ctx->out, ",\"content\":\"%s\"}", message_b64);
209+
pfree(message_b64);
210+
MemoryContextSwitchTo(old);
211+
MemoryContextReset(data->context);
212+
OutputPluginWrite(ctx, true);
213+
}
214+
215+
void _PG_output_plugin_init(OutputPluginCallbacks *cb) {
216+
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
217+
cb->startup_cb = pg_decode_startup;
218+
cb->begin_cb = pg_decode_begin_txn;
219+
cb->change_cb = pg_decode_change;
220+
cb->commit_cb = pg_decode_commit_txn;
221+
cb->shutdown_cb = pg_decode_shutdown;
222+
cb->message_cb = pg_decode_message;
223+
cb->truncate_cb = pg_decode_truncate;
224+
}
225+
226+
void _PG_init(void) {
227+
}

test.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/bin/sh
2+
set -e
3+
docker build --tag pg_json_decoding_test .
4+
docker run --rm -it -v "$PWD":/src:ro pg_json_decoding_test

test.sql

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
create table foo(a text, b text);
2+
alter table foo replica identity full;
3+
create publication test for table foo;
4+
-- create publication test for all tables;
5+
6+
select pg_create_logical_replication_slot('test', 'pg_json_decoding');
7+
insert into foo values ('1', 'a');
8+
update foo set a = '2';
9+
delete from foo;
10+
select pg_logical_emit_message(true, 'message', 'hello');
11+
12+
select *
13+
from pg_logical_slot_get_changes('test', null, null, 'publication', 'test');

0 commit comments

Comments
 (0)