Skip to content

Commit 0721ea5

Browse files
committed
feat(nonblocking) implement non blocking mode for connections
1 parent 9b92685 commit 0721ea5

File tree

2 files changed

+80
-10
lines changed

2 files changed

+80
-10
lines changed

src/asyncresults.jl

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,8 @@ function _consume(jl_conn::Connection)
8484
# this is important?
8585
# https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c#L1266
8686
# if we used non-blocking connections we would need to check for `1` as well
87-
if libpq_c.PQflush(jl_conn.conn) < 0
88-
error(LOGGER, Errors.PQConnectionError(jl_conn))
89-
end
87+
# See flush(jl_conn::Connection) in connections.jl
88+
flush(jl_conn)
9089

9190
async_result = jl_conn.async_result
9291
result_ptrs = Ptr{libpq_c.PGresult}[]
@@ -231,7 +230,7 @@ end
231230

232231
function _multi_async_execute(jl_conn::Connection, query::AbstractString; kwargs...)
233232
async_result = _async_execute(jl_conn; kwargs...) do jl_conn
234-
_async_submit(jl_conn.conn, query)
233+
_async_submit(jl_conn, query)
235234
end
236235

237236
return async_result
@@ -254,7 +253,7 @@ function async_execute(
254253

255254
async_result = _async_execute(jl_conn; binary_format=binary_format, kwargs...) do jl_conn
256255
GC.@preserve string_params _async_submit(
257-
jl_conn.conn, query, pointer_params; binary_format=binary_format
256+
jl_conn, query, pointer_params; binary_format=binary_format
258257
)
259258
end
260259

@@ -289,16 +288,22 @@ function _async_execute(
289288
return async_result
290289
end
291290

292-
function _async_submit(conn_ptr::Ptr{libpq_c.PGconn}, query::AbstractString)
293-
return libpq_c.PQsendQuery(conn_ptr, query) == 1
291+
function _async_submit(jl_conn::Connection, query::AbstractString)
292+
send_status = libpq_c.PQsendQuery(jl_conn.conn::Ptr{libpq_c.PGconn}, query)
293+
if isnonblocking(jl_conn) == 0
294+
return send_status == 1
295+
else
296+
return flush(jl_conn)
297+
end
294298
end
295299

296300
function _async_submit(
297-
conn_ptr::Ptr{libpq_c.PGconn},
301+
jl_conn::Connection,
298302
query::AbstractString,
299303
parameters::Vector{Ptr{UInt8}};
300304
binary_format::Bool=false,
301305
)
306+
conn_ptr::Ptr{libpq_c.PGconn} = jl_conn.conn
302307
num_params = length(parameters)
303308

304309
send_status = libpq_c.PQsendQueryParams(
@@ -312,5 +317,9 @@ function _async_submit(
312317
Cint(binary_format), # return result in text or binary format
313318
)
314319

315-
return send_status == 1
320+
if isnonblocking(jl_conn) == 0
321+
return send_status == 1
322+
else
323+
return flush(jl_conn)
324+
end
316325
end

src/connections.jl

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ end
672672
"""
673673
ConnectionOption(pq_opt::libpq_c.PQconninfoOption) -> ConnectionOption
674674
675-
Construct a `ConnectionOption` from a `libpg_c.PQconninfoOption`.
675+
Construct a `ConnectionOption` from a `libpq_c.PQconninfoOption`.
676676
"""
677677
function ConnectionOption(pq_opt::libpq_c.PQconninfoOption)
678678
return ConnectionOption(
@@ -789,3 +789,64 @@ function socket(conn::Ptr{libpq_c.PGconn})
789789
end
790790

791791
socket(jl_conn::Connection) = socket(jl_conn.conn)
792+
793+
"""
794+
Sets the nonblocking connection status of the PG connections.
795+
While async_execute is non-blocking on the receiving side,
796+
the sending side is still nonblockign without this
797+
Returns true on success, false on failure
798+
799+
https://www.postgresql.org/docs/current/libpq-async.html
800+
"""
801+
function setnonblocking(jl_conn::Connection; nonblock=true)
802+
return libpq_c.PQsetnonblocking(jl_conn.conn, convert(Cint, nonblock)) == 0
803+
end
804+
805+
"""
806+
Checks whether the connection is non-blocking.
807+
Returns true if the connection is set to non-blocking, false otherwise
808+
809+
https://www.postgresql.org/docs/current/libpq-async.html
810+
"""
811+
function isnonblocking(jl_conn)
812+
return libpq_c.PQisnonblocking(jl_conn.conn) == 1
813+
end
814+
815+
"""
816+
Do the flush dance described in the libpq docs. Required when the
817+
connections are set to nonblocking and we want do send queries/data
818+
without blocking.
819+
820+
https://www.postgresql.org/docs/current/libpq-async.html#LIBPQ-PQFlush
821+
"""
822+
function flush(jl_conn)
823+
watcher = FDWatcher(socket(jl_conn), true, true) # can wait for reads and writes
824+
try
825+
while true # Iterators.repeated(true) # would make me more comfotable I think
826+
flushstatus = libpq_c.PQflush(jl_conn.conn)
827+
# 0 indicates success
828+
flushstatus == 0 && return true
829+
# -1 indicates error
830+
flushstatus < 0 && error(LOGGER, Errors.PQConnectionError(jl_conn))
831+
# Could not send all data without blocking, need to wait FD
832+
flushstatus == 1 && begin
833+
wait(watcher) # Wait for the watcher
834+
# If it becomes write-ready, call PQflush again.
835+
if watcher.mask.writable
836+
continue # Call PGflush again, to send more data
837+
end
838+
if watcher.mask.readable
839+
# if the stream is readable, we have to consume data from the server first.
840+
success = libpq_c.PQconsumeInput(jl_conn.conn) == 1
841+
!success && error(LOGGER, Errors.PQConnectionError(jl_conn))
842+
end
843+
end
844+
end
845+
catch
846+
# We don't want to manage anything here
847+
rethrow()
848+
finally
849+
# Just close the watcher
850+
close(watcher)
851+
end
852+
end

0 commit comments

Comments
 (0)