Skip to content

Commit 28a72d1

Browse files
committed
write copy-test in JS too - meant implementing msg-copy-fail because of porsager/postgres#1016
1 parent 11b488b commit 28a72d1

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

core/src/main/clojure/xtdb/pgwire.clj

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,22 @@
14021402

14031403
(cmd-send-ready conn)))
14041404

1405+
(defmethod handle-msg* :msg-copy-fail [{:keys [conn-state] :as conn} _msg]
1406+
(when-let [{:keys [copy-file write-ch]} (:copy @conn-state)]
1407+
(try
1408+
(util/close write-ch)
1409+
(catch Exception e
1410+
(log/debug e "Error closing COPY write channel during failure")))
1411+
1412+
(try
1413+
(util/delete-file copy-file)
1414+
(catch Exception e
1415+
(log/debug e "Error deleting COPY file during failure")))
1416+
1417+
(swap! conn-state dissoc :copy))
1418+
1419+
(cmd-send-ready conn))
1420+
14051421
;; ignore password messages, we are authenticated when getting here
14061422
(defmethod handle-msg* :msg-password [_conn _msg])
14071423

lang/js/test/copy_test.js

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import assert from 'assert';
2+
import postgres from 'postgres';
3+
import tjs from 'transit-js';
4+
import * as uuid from 'uuid';
5+
import { pipeline } from 'node:stream/promises';
6+
import { Readable } from 'node:stream';
7+
8+
const transitReadHandlers = {
9+
'time/zoned-date-time': (s) => new Date(s.replace(/\[.+\]$/, '')),
10+
}
11+
12+
let sql;
13+
14+
beforeEach(async () => {
15+
sql = postgres({
16+
host: process.env.PG_HOST || "localhost",
17+
port: process.env.PG_PORT || 5439,
18+
database: uuid.v4().toString(),
19+
fetch_types: false,
20+
types: {
21+
bool: {to: 16},
22+
int: {
23+
to: 20,
24+
from: [23, 20],
25+
parse: parseInt
26+
},
27+
transit: {
28+
to: 16384,
29+
from: [16384],
30+
serialize: (v) => tjs.writer('json').write(v),
31+
parse: (v) => tjs.reader('json', { handlers: transitReadHandlers }).read(v)
32+
}
33+
}
34+
})
35+
36+
await sql`SELECT 1`
37+
})
38+
39+
afterEach(async () => {
40+
await sql.end()
41+
})
42+
43+
describe("COPY with transit-json format", function() {
44+
it("should copy records with only _id using transit-json format #4677", async () => {
45+
const conn = await sql.reserve()
46+
47+
try {
48+
const testData = [
49+
{
50+
"_id": "item-3",
51+
"title": "Test Item 3",
52+
"price": 199.99
53+
},
54+
{
55+
"_id": "item-4",
56+
"title": "Test Item 4",
57+
"price": 249.99
58+
}
59+
]
60+
61+
const writer = tjs.writer('json');
62+
const lines = testData.map(record => writer.write(record));
63+
const transitData = lines.join('\n');
64+
65+
const transitStream = Readable.from([transitData])
66+
const query = await conn`COPY test_items_only_id FROM STDIN WITH (FORMAT 'transit-json')`.writable()
67+
await pipeline(transitStream, query)
68+
69+
// See https://github.com/porsager/postgres/pull/1016
70+
await new Promise(resolve => setTimeout(resolve, 50))
71+
72+
const result = await conn`SELECT * FROM test_items_only_id ORDER BY _id`;
73+
74+
assert.deepStrictEqual([...result], [
75+
{"_id": "item-3", "title": "Test Item 3", "price": 199.99},
76+
{"_id": "item-4", "title": "Test Item 4", "price": 249.99}
77+
]);
78+
79+
} finally {
80+
await conn.release()
81+
}
82+
})
83+
})

0 commit comments

Comments
 (0)