Skip to content

Commit c360b7e

Browse files
committed
added support for recovery token
1 parent bddfcf8 commit c360b7e

File tree

1 file changed

+31
-9
lines changed

1 file changed

+31
-9
lines changed

lib/mongo/session.ex

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ defmodule Mongo.Session do
116116
# * `implicit` true or false
117117
# * `causal_consistency` true orfalse
118118
# * `wire_version` current wire version to check if transactions are possible
119-
defstruct [conn: nil, server_session: nil, causal_consistency: false, operation_time: nil, implicit: false, wire_version: 0, state: :no_transaction, opts: []]
119+
# * `recovery_token` tracked recovery token from response in a sharded transaction
120+
defstruct [conn: nil, recovery_token: nil, server_session: nil, causal_consistency: false, operation_time: nil, implicit: false, wire_version: 0, state: :no_transaction, opts: []]
120121

121122
@doc """
122123
Start the generic state machine.
@@ -204,25 +205,35 @@ defmodule Mongo.Session do
204205
"""
205206
@spec update_session(Session.t, %{key: BSON.Timestamp.t}, keyword()) :: BSON.document
206207
def update_session(pid, doc, opts \\ [])
207-
def update_session(pid, %{"operationTime" => operationTime} = doc, opts) do
208+
def update_session(pid, doc, opts) do
208209
case opts |> write_concern() |> acknowledged?() do
209-
true -> advance_operation_time(pid, operationTime)
210+
true -> advance_operation_time(pid, doc["operationTime"])
210211
false -> []
211212
end
212-
doc
213-
end
214-
def update_session(_pid, doc, _opts) do
213+
update_recovery_token(pid, doc["recoveryToken"])
215214
doc
216215
end
217216

218217
@doc """
219218
Advance the `operationTime` for causally consistent read commands
220219
"""
221220
@spec advance_operation_time(Session.t, BSON.Timestamp.t) :: none()
221+
def advance_operation_time(_pid, nil) do
222+
end
222223
def advance_operation_time(pid, timestamp) do
223224
cast(pid, {:advance_operation_time, timestamp})
224225
end
225226

227+
@doc """
228+
Update the `recoveryToken` after each response from mongos
229+
"""
230+
@spec update_recovery_token(Session.t, BSON.document) :: none()
231+
def update_recovery_token(_pid, nil) do
232+
end
233+
def update_recovery_token(pid, recovery_token) do
234+
cast(pid, {:update_recovery_token, recovery_token})
235+
end
236+
226237
@doc """
227238
End implicit session. There is no need to call this function directly. It is called automatically.
228239
"""
@@ -337,6 +348,7 @@ defmodule Mongo.Session do
337348
server_session: server_session,
338349
implicit: (type == :implicit),
339350
wire_version: wire_version,
351+
recovery_token: nil,
340352
causal_consistency: Keyword.get(opts, :causal_consistency, false),
341353
state: :no_transaction,
342354
opts: opts}
@@ -378,7 +390,7 @@ defmodule Mongo.Session do
378390
end
379391

380392
def handle_call_event(:start_transaction, transaction, %Session{server_session: session} = data) when transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
381-
{:next_state, :starting_transaction, %Session{data | server_session: ServerSession.next_txn_num(session)}, :ok}
393+
{:next_state, :starting_transaction, %Session{data | recovery_token: nil, server_session: ServerSession.next_txn_num(session)}, :ok}
382394
end
383395
##
384396
# bind session: only if wire_version >= 6, MongoDB 3.6.x and no transaction is running: only lsid is added
@@ -445,6 +457,9 @@ defmodule Mongo.Session do
445457
def handle_call_event(:server_session, _state, %Session{server_session: session_server, implicit: implicit}) do
446458
{:keep_state_and_data, session_server, implicit}
447459
end
460+
def handle_cast_event({:update_recovery_token, recovery_token}, _state, %Session{} = data) do
461+
%Session{data | recovery_token: recovery_token}
462+
end
448463
def handle_cast_event({:advance_operation_time, timestamp}, _state, %Session{operation_time: nil} = data) do
449464
%Session{data | operation_time: timestamp}
450465
end
@@ -457,7 +472,7 @@ defmodule Mongo.Session do
457472
##
458473
# Run the commit transaction command.
459474
#
460-
defp run_commit_command(%Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
475+
defp run_commit_command(%Session{conn: conn, recovery_token: recovery_token, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
461476

462477
Logger.debug("Running commit transaction")
463478

@@ -471,14 +486,21 @@ defmodule Mongo.Session do
471486
txnNumber: %BSON.LongNumber{value: txn_num},
472487
autocommit: false,
473488
writeConcern: write_concern(opts),
474-
maxTimeMS: Keyword.get(opts, :max_commit_time_ms)
489+
maxTimeMS: max_time_ms(opts),
490+
recoveryToken: recovery_token
475491
] |> filter_nils()
476492

477493
_doc = Mongo.exec_command(conn, cmd, database: "admin")
478494

479495
:ok
480496
end
481497

498+
defp max_time_ms(opts) do
499+
opts |> Keyword.get(:max_commit_time_ms) |> optional_int64()
500+
end
501+
defp optional_int64(nil), do: nil
502+
defp optional_int64(value), do: %BSON.LongNumber{value: value}
503+
482504
##
483505
# Run the abort transaction command.
484506
#

0 commit comments

Comments
 (0)