From cf372897dc73cbb8fddf5a8f4bdddc93198b1bd8 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Tue, 28 Dec 2021 17:45:46 +0100 Subject: [PATCH 1/7] Add CopyOut! API --- src/copy.jl | 84 ++++++++++++++++++++++++++++++++++++++++++++++++ test/runtests.jl | 54 +++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) diff --git a/src/copy.jl b/src/copy.jl index 60e8ae56..8e3b5a05 100644 --- a/src/copy.jl +++ b/src/copy.jl @@ -81,3 +81,87 @@ function execute( Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error ) end + +""" + CopyOut!(data, query) -> CopyOut! + +Create a `CopyOut!` query instance which can be executed to receive data from PostgreSQL via a +`COPY TO STDIN` query. + +`query` must be a `COPY TO STDIN` query as described in the [PostgreSQL documentation](https://www.postgresql.org/docs/10/sql-copy.html). +`COPY TO` queries which use a file or `PROGRAM` source can instead use the standard +[`execute`](@ref) query interface. + +`data` is an IOBuffer where strings of data received from PostgreSQL are written to. +The data is received as text in CSV format. +""" +struct CopyOut! + data::IOBuffer + query::String +end + +""" + execute(jl_conn::Connection, copyout::CopyOut!, args...; + throw_error::Bool=true, kwargs... + ) -> Result + +Runs [`execute`](@ref execute(::Connection, ::String)) on `copyout`'s query, then fills +`copyout`'s data from the server. + +All other arguments are passed through to the `execute` call for the initial query. +""" +function execute( + jl_conn::Connection, + copy::CopyOut!, + parameters=nothing; + throw_error=true, + kwargs..., +) + level = throw_error ? error : warn + if parameters !== nothing + string_params = string_parameters(parameters) + pointer_params = parameter_pointers(string_params) + end + + copy_end_result = lock(jl_conn) do + if parameters === nothing + result = _execute(jl_conn.conn, copy.query) + else + result = _execute(jl_conn.conn, copy.query, pointer_params) + end + result_status = libpq_c.PQresultStatus(result) + + if result_status != libpq_c.PGRES_COPY_OUT + if !(result_status in (libpq_c.PGRES_BAD_RESPONSE, libpq_c.PGRES_FATAL_ERROR)) + level(LOGGER, Errors.JLResultError( + "Expected PGRES_COPY_OUT after COPY query, got $result_status" + )) + end + return result + end + + io = copy.data # store csv string + async::Cint = 0 # blocking call + rowRef = Ref{Cstring}() + status_code = Cint(0) + while (status_code = libpq_c.PQgetCopyData(jl_conn.conn, rowRef, async)) > 0 + rowPtr = rowRef[] + write(io, unsafe_string(rowPtr)) + if rowPtr != C_NULL + libpq_c.PQfreemem(convert(Ptr{Cvoid}, rowPtr)) + end + end + seekstart(io) # rewind iobuffer so future user read will begin from start + if -2 == status_code + level(LOGGER, Errors.JLResultError( + "PQgetCopyData error: $(error_message(jl_conn))" + )) + end + + libpq_c.PQgetResult(jl_conn.conn) + end + + return handle_result( + Result(copy_end_result, jl_conn, kwargs...); throw_error=throw_error + ) +end diff --git a/test/runtests.jl b/test/runtests.jl index 07cc8478..4b504725 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,6 @@ using LibPQ using Test +using CSV using Dates using DataFrames using DataFrames: eachrow @@ -444,6 +445,59 @@ end end end + @testset "COPY TO" begin + @testset "Example COPY TO" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + + # make data + no_nulls = map(string, 'a':'z') + yes_nulls = Union{String, Missing}[isodd(Int(c)) ? string(c) : missing for c in 'a':'z'] + data = DataFrame(no_nulls=no_nulls, yes_nulls=yes_nulls) + + row_strings = imap(eachrow(data)) do row + if ismissing(row[:yes_nulls]) + "$(row[:no_nulls]),\n" + else + "$(row[:no_nulls]),$(row[:yes_nulls])\n" + end + end + + # setup db table + result = execute(conn, """ + CREATE TEMPORARY TABLE libpqjl_test ( + no_nulls varchar(10) PRIMARY KEY, + yes_nulls varchar(10) + ); + """) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + close(result) + + # populate db table + copyin = LibPQ.CopyIn("COPY libpqjl_test FROM STDIN (FORMAT CSV);", row_strings) + + result = execute(conn, copyin) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + @test isempty(LibPQ.error_message(result)) + close(result) + + # test CopyOut! + databuf = IOBuffer() + copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER, ENCODING 'UTF8');") + + result = execute(conn, copyout) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_COMMAND_OK + @test isempty(LibPQ.error_message(result)) + close(result) + + df = DataFrame(CSV.File(databuf, stringtype=String)) + @test isequal(data, df) + + close(conn) + end + end + @testset "LibPQ.Connection" begin @testset "do" begin local saved_conn From 137a76946b11e610210165194d008f013b095c65 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 09:59:02 +0100 Subject: [PATCH 2/7] Add CSV to Test deps --- Project.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 8223d27a..c52fb0b9 100644 --- a/Project.toml +++ b/Project.toml @@ -37,8 +37,9 @@ TimeZones = "0.9.2, 0.10, 0.11, 1" julia = "1.6" [extras] +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test", "DataFrames"] +test = ["Test", "CSV", "DataFrames"] From b335cd506915995ed14f2c4f0046464fa3108ff9 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 15:23:39 +0100 Subject: [PATCH 3/7] Add docs for CopyOut! --- docs/src/index.md | 24 ++++++++++++++++++++++-- docs/src/pages/api.md | 2 ++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/docs/src/index.md b/docs/src/index.md index 3d5c0121..b12803d8 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -75,9 +75,9 @@ LibPQ.load!( execute(conn, "COMMIT;") ``` -### `COPY` +### `COPY FROM` -An alternative to repeated `INSERT` queries is the PostgreSQL `COPY` query. +An alternative to repeated `INSERT` queries is the PostgreSQL `COPY FROM` query. `LibPQ.CopyIn` makes it easier to stream data to the server using a `COPY FROM STDIN` query. ```julia @@ -99,3 +99,23 @@ execute(conn, copyin) close(conn) ``` + +### `COPY TO` + +An alternative to selection for large datasets in `SELECT` queries is the PostgreSQL `COPY TO` query. +`LibPQ.CopyOut!` makes it easier to stream data out of the server using a `COPY TO STDIN` query. + +```julia +using LibPQ, CSV, DataFrames + +conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + +databuf = IOBuffer() +copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test) TO STDOUT (FORMAT CSV, HEADER);") + +execute(conn, copyout) + +df = DataFrame(CSV.File(databuf)) + +close(conn) +``` \ No newline at end of file diff --git a/docs/src/pages/api.md b/docs/src/pages/api.md index 7296e593..198b1021 100644 --- a/docs/src/pages/api.md +++ b/docs/src/pages/api.md @@ -43,6 +43,8 @@ LibPQ.load! ```@docs LibPQ.CopyIn execute(::LibPQ.Connection, ::LibPQ.CopyIn) +LibPQ.CopyOut! +execute(::LibPQ.Connection, ::LibPQ.CopyOut!) ``` ### Asynchronous From fa51788019238ba2de63cb9f58d9f87ee709f1c4 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 15:43:41 +0100 Subject: [PATCH 4/7] Support Julia 1.0: CSV 0.8 API differs --- test/runtests.jl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/test/runtests.jl b/test/runtests.jl index 4b504725..9529cbe8 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -491,7 +491,12 @@ end @test isempty(LibPQ.error_message(result)) close(result) - df = DataFrame(CSV.File(databuf, stringtype=String)) + @static if VERSION >= v"1.3.0-1" + csvfile = CSV.File(databuf, stringtype=String) + else + csvfile = CSV.File(databuf) + end + df = DataFrame(csvfile) @test isequal(data, df) close(conn) From 671be667063303d81e6c84f4a6664b438a597621 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 19:39:49 +0100 Subject: [PATCH 5/7] Add COPY TO error test case --- test/runtests.jl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/test/runtests.jl b/test/runtests.jl index 9529cbe8..4a2966eb 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -501,6 +501,24 @@ end close(conn) end + + @testset "Wrong COPY TO" begin + conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") + + # test CopyOut! + databuf = IOBuffer() + copyout = LibPQ.CopyOut!(databuf, "SELECT libpqjl_test;") + + result = execute(conn, copyout; throw_error=false) + @test isopen(result) + @test status(result) == LibPQ.libpq_c.PGRES_FATAL_ERROR + + err_msg = LibPQ.error_message(result) + @test occursin("ERROR", err_msg) + close(result) + + close(conn) + end end @testset "LibPQ.Connection" begin From 79e9e971fc1b1a826b35bb0ebc5e284d841fcb21 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 20:22:30 +0100 Subject: [PATCH 6/7] COPY with parameter is not supported --- src/copy.jl | 14 +++++--------- test/runtests.jl | 6 +++++- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/copy.jl b/src/copy.jl index 8e3b5a05..7484e827 100644 --- a/src/copy.jl +++ b/src/copy.jl @@ -119,16 +119,12 @@ function execute( ) level = throw_error ? error : warn if parameters !== nothing - string_params = string_parameters(parameters) - pointer_params = parameter_pointers(string_params) + # https://postgrespro.com/list/thread-id/1893680 + throw(ArgumentError("COPY can't take any parameter")) end copy_end_result = lock(jl_conn) do - if parameters === nothing - result = _execute(jl_conn.conn, copy.query) - else - result = _execute(jl_conn.conn, copy.query, pointer_params) - end + result = _execute(jl_conn.conn, copy.query) result_status = libpq_c.PQresultStatus(result) if result_status != libpq_c.PGRES_COPY_OUT @@ -154,8 +150,8 @@ function execute( seekstart(io) # rewind iobuffer so future user read will begin from start if -2 == status_code level(LOGGER, Errors.JLResultError( - "PQgetCopyData error: $(error_message(jl_conn))" - )) + "PQgetCopyData error: $(error_message(jl_conn))" + )) end libpq_c.PQgetResult(jl_conn.conn) diff --git a/test/runtests.jl b/test/runtests.jl index 4a2966eb..69331212 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -505,7 +505,7 @@ end @testset "Wrong COPY TO" begin conn = LibPQ.Connection("dbname=postgres user=$DATABASE_USER") - # test CopyOut! + # test CopyOut! with an error databuf = IOBuffer() copyout = LibPQ.CopyOut!(databuf, "SELECT libpqjl_test;") @@ -517,6 +517,10 @@ end @test occursin("ERROR", err_msg) close(result) + # parameters are not supported + copyout = LibPQ.CopyOut!(databuf, "COPY (SELECT * FROM libpqjl_test WHERE no_nulls = \$1) TO STDOUT (FORMAT CSV, HEADER);") + @test_throws ArgumentError execute(conn, copyout, ['z']) + close(conn) end end From 7400ffcae4e21f5aca84b4b1346f186269612fd7 Mon Sep 17 00:00:00 2001 From: Christophe Meyer Date: Wed, 29 Dec 2021 20:33:35 +0100 Subject: [PATCH 7/7] Simplify error condition --- src/copy.jl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/copy.jl b/src/copy.jl index 7484e827..166eb50d 100644 --- a/src/copy.jl +++ b/src/copy.jl @@ -128,11 +128,9 @@ function execute( result_status = libpq_c.PQresultStatus(result) if result_status != libpq_c.PGRES_COPY_OUT - if !(result_status in (libpq_c.PGRES_BAD_RESPONSE, libpq_c.PGRES_FATAL_ERROR)) - level(LOGGER, Errors.JLResultError( - "Expected PGRES_COPY_OUT after COPY query, got $result_status" - )) - end + level(LOGGER, Errors.JLResultError( + "Expected PGRES_COPY_OUT after COPY query, got $result_status" + )) return result end @@ -148,11 +146,7 @@ function execute( end end seekstart(io) # rewind iobuffer so future user read will begin from start - if -2 == status_code - level(LOGGER, Errors.JLResultError( - "PQgetCopyData error: $(error_message(jl_conn))" - )) - end + -2 == status_code && level(LOGGER, Errors.JLResultError("PQgetCopyData error: $(error_message(jl_conn))")) libpq_c.PQgetResult(jl_conn.conn) end