Skip to content

Commit c4b574f

Browse files
authored
Merge pull request #74 from hundio/master
Add proper support for tailable cursors and awaitData
2 parents 5bef73b + 4ce1f46 commit c4b574f

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

lib/mongo.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,6 @@ defmodule Mongo do
657657
showRecordId: opts[:show_record_id],
658658
tailable: opts[:tailable],
659659
oplogReplay: opts[:oplog_replay],
660-
tailable: opts[:tailable],
661660
noCursorTimeout: opts[:no_cursor_timeout],
662661
awaitData: opts[:await_data],
663662
batchSize: opts[:batch_size],

lib/mongo/stream.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ defmodule Mongo.Stream do
55

66
import Record, only: [defrecordp: 2]
77

8-
defstruct [:topology_pid, :session, :cursor, :coll, :docs, :opts]
8+
defstruct [:topology_pid, :session, :cursor, :coll, :docs, :cmd, :opts]
99

1010
alias Mongo.Session
1111

@@ -22,7 +22,7 @@ defmodule Mongo.Stream do
2222
"ns" => coll,
2323
"firstBatch" => docs}}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
2424

25-
%Mongo.Stream{topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs, opts: opts}
25+
%Mongo.Stream{topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs, cmd: cmd, opts: opts}
2626
else
2727
{:error, error} ->
2828
case Error.should_retry_read(error, cmd, opts) do
@@ -35,11 +35,11 @@ defmodule Mongo.Stream do
3535

3636
defimpl Enumerable do
3737

38-
defrecordp :state, [:topology_pid, :session, :cursor, :coll, :docs]
38+
defrecordp :state, [:topology_pid, :session, :cursor, :coll, :cmd, :docs]
3939

40-
def reduce(%Mongo.Stream{topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs, opts: opts}, acc, reduce_fun) do
40+
def reduce(%Mongo.Stream{topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs, cmd: cmd, opts: opts}, acc, reduce_fun) do
4141

42-
start_fun = fn -> state(topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs) end
42+
start_fun = fn -> state(topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, cmd: cmd, docs: docs) end
4343
next_fun = next_fun(opts)
4444
after_fun = after_fun(opts)
4545

@@ -51,9 +51,9 @@ defmodule Mongo.Stream do
5151
state(docs: [], cursor: 0) = state -> {:halt, state}
5252

5353
# this is a regular cursor
54-
state(docs: [], topology_pid: topology_pid, session: session, cursor: cursor, coll: coll) = state ->
54+
state(docs: [], topology_pid: topology_pid, session: session, cursor: cursor, coll: coll, cmd: cmd) = state ->
5555
case get_more(topology_pid, session, only_coll(coll), cursor, nil, opts) do
56-
{:ok, %{cursor_id: cursor_id, docs: []}} -> {:halt, state(state, cursor: cursor_id)}
56+
{:ok, %{cursor_id: cursor_id, docs: []}} -> {if(cmd[:tailable], do: [], else: :halt), state(state, cursor: cursor_id)}
5757
{:ok, %{cursor_id: cursor_id, docs: docs}} -> {docs, state(state, cursor: cursor_id)}
5858
{:error, error} -> raise error
5959
end

test/mongo/cursor_test.exs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,30 @@
11
defmodule Mongo.CursorTest do
22
use CollectionCase, async: false
33

4+
test "tailable cursors with awaitData", c do
5+
6+
coll = "tailable_cursors"
7+
init_docs = Stream.cycle([%{"foo" => 42}]) |> Enum.take(5)
8+
tail_docs = Stream.cycle([%{"foo" => 10}]) |> Enum.take(10)
9+
10+
assert :ok = Mongo.create(c.pid, coll, capped: true, size: 1_000_000)
11+
assert {:ok, _} = Mongo.insert_many(c.pid, coll, init_docs)
12+
13+
tailing_task = Task.async fn ->
14+
Mongo.find(c.pid, coll, %{}, tailable: true, await_data: true)
15+
|> Enum.take(15)
16+
end
17+
18+
Enum.each tail_docs, fn doc ->
19+
Process.sleep 100
20+
Mongo.insert_one(c.pid, coll, doc)
21+
end
22+
23+
expected_docs = init_docs ++ tail_docs
24+
assert ^expected_docs = Task.await(tailing_task) |> Enum.map(fn m -> Map.pop(m, "_id") |> elem(1) end)
25+
26+
end
27+
428
test "checking if killCursor is called properly", c do
529

630
coll = "kill_cursors"

0 commit comments

Comments
 (0)