Skip to content

Add telemetry events #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## v0.2.2 (TBA)

### Changes

* Added `idempotency_plug.track` telemetry span
* Added `idempotency_plug.request_tracker.cache_hit`, `idempotency_plug.request_tracker.cache_miss`, and `idempotency_plug.request_tracker.prune` telemetry events

## v0.2.1 (2023-04-28)

Relaxed dependency requirements for `ecto` and `ecto_sql`.
Expand Down
65 changes: 48 additions & 17 deletions lib/idempotency_plug.ex
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ defmodule IdempotencyPlug do
- `{mod, fun, args}` - calls the MFA to process the conn with error, the
connection MUST be halted.

## Telemetry events

The following events are emitted by the Plug:

* `[:idempotency_plug, :track, :start]` - dispatched before request tracking
* Measurement: `%{system_time: System.system_time}`
* Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary}`
tracker: tracker,
idempotency_key: key

* `[:idempotency_plug, :track, :exception]` - dispatched after exceptions on tracking a request
* Measurement: `%{duration: native_time}`
* Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary, kind: :throw | :error | :exit, reason: term(), stacktrace: list()}`

* `[:idempotency_plug, :track, :stop]` - dispatched after successfully tracking a request
* Measurement: `%{duration: native_time}`
* Metadata: `%{telemetry_span_context: term(), conn: Plug.Conn.t, tracker: module, idempotency_key: binary}`

## Examples

plug IdempotencyPlug,
Expand Down Expand Up @@ -218,28 +236,41 @@ defmodule IdempotencyPlug do
idempotency_key_hash = hash_idempotency_key(conn, key, opts)
request_payload_hash = hash_request_payload(conn, opts)

case RequestTracker.track(tracker, idempotency_key_hash, request_payload_hash) do
{:processing, _node_caller, _expires} ->
raise ConcurrentRequestError
metadata = %{
conn: conn,
tracker: tracker,
idempotency_key: key
}

{:mismatch, {:fingerprint, fingerprint}, _expires} ->
raise RequestPayloadFingerprintMismatchError, fingerprint: fingerprint
:telemetry.span([:idempotency_plug, :track], metadata, fn ->
case RequestTracker.track(tracker, idempotency_key_hash, request_payload_hash) do
{:processing, _node_caller, _expires} ->
raise ConcurrentRequestError

{:cache, {:halted, reason}, _expires} ->
raise HaltedResponseError, reason: reason
{:mismatch, {:fingerprint, fingerprint}, _expires} ->
raise RequestPayloadFingerprintMismatchError, fingerprint: fingerprint

{:cache, {:ok, response}, expires} ->
conn
|> put_expires_header(expires)
|> set_resp(response)
|> Conn.halt()
{:cache, {:halted, reason}, _expires} ->
raise HaltedResponseError, reason: reason

{:init, idempotency_key, _expires} ->
update_response_before_send(conn, idempotency_key, opts)
{:cache, {:ok, response}, expires} ->
conn =
conn
|> put_expires_header(expires)
|> set_resp(response)
|> Conn.halt()

{:error, error} ->
raise "failed to track request, got: #{error}"
end
{conn, %{metadata | conn: conn}}

{:init, idempotency_key, _expires} ->
conn = update_response_before_send(conn, idempotency_key, opts)

{conn, %{metadata | conn: conn}}

{:error, error} ->
raise "failed to track request, got: #{error}"
end
end)
end

@doc """
Expand Down
45 changes: 45 additions & 0 deletions lib/idempotency_plug/request_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,22 @@ defmodule IdempotencyPlug.RequestTracker do
* `:store` - the cache store module to use to store the cache objects.
Defaults to `{IdempotencyPlug.ETSStore, [table: #{__MODULE__}]}`.

## Telemetry events

The following events are emitted by the tracker:

* `[:idempotency_plug, :request_tracker, :cache_miss]` - dispatched after request has not found in the cache
* Measurement: `%{}`
* Metadata: `%{telemetry_span_context: term(), request_id: binary, fingerprint: binary, store: atom, expires_at: DateTime.t}`

* `[:idempotency_plug, :request_tracker, :cache_hit]` - dispatched after request has been found in the cache
* Measurement: `%{}`
* Metadata: `%{telemetry_span_context: term(), request_id: binary, fingerprint: binary, store: atom, expires_at: DateTime.t}`

* `[:idempotency_plug, :request_tracker, :prune]` - dispatched before the cache is pruned
* Measurement: `%{}`
* Metadata: `%{telemetry_span_context: term()}`

## Examples

children = [
Expand Down Expand Up @@ -133,11 +149,20 @@ defmodule IdempotencyPlug.RequestTracker do
def handle_call({:track, request_id, fingerprint}, {caller, _}, state) do
{store, store_opts} = fetch_store(state.options)

metadata = %{
request_id: request_id,
fingerprint: fingerprint,
store: store,
expires_at: nil
}

case store.lookup(request_id, store_opts) do
:not_found ->
data = {:processing, {Node.self(), caller}}
expires_at = expires_at(state.options)

execute_telemetry(:cache_miss, %{metadata | expires_at: expires_at})

case store.insert(request_id, data, fingerprint, expires_at, store_opts) do
:ok ->
{:reply, {:init, request_id, expires_at}, put_monitored(state, request_id, caller)}
Expand All @@ -147,15 +172,23 @@ defmodule IdempotencyPlug.RequestTracker do
end

{{:processing, node_caller}, ^fingerprint, expires} ->
execute_telemetry(:cache_hit, %{metadata | expires_at: expires})

{:reply, {:processing, node_caller, expires}, state}

{{:halted, reason}, ^fingerprint, expires} ->
execute_telemetry(:cache_hit, %{metadata | expires_at: expires})

{:reply, {:cache, {:halted, reason}, expires}, state}

{{:ok, response}, ^fingerprint, expires} ->
execute_telemetry(:cache_hit, %{metadata | expires_at: expires})

{:reply, {:cache, {:ok, response}, expires}, state}

{_res, other_fingerprint, expires} ->
execute_telemetry(:cache_hit, %{metadata | expires_at: expires})

{:reply, {:mismatch, {:fingerprint, other_fingerprint}, expires}, state}
end
end
Expand All @@ -172,6 +205,14 @@ defmodule IdempotencyPlug.RequestTracker do
end
end

defp execute_telemetry(event, metadata) do
:telemetry.execute(
[:idempotency_plug, :request_tracker, event],
_measurements = %{},
metadata
)
end

defp put_monitored(state, request_id, caller) do
ref = Process.monitor(caller)

Expand Down Expand Up @@ -201,6 +242,10 @@ defmodule IdempotencyPlug.RequestTracker do
def handle_info(:prune, state) do
{store, store_opts} = fetch_store(state.options)

metadata = %{store: store}

execute_telemetry(:prune, metadata)

store.prune(store_opts)

Process.send_after(self(), :prune, Keyword.fetch!(state.options, :prune))
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule IdempotencyPlug.MixProject do
[
{:plug, "~> 1.14"},
{:jason, "~> 1.2"},
{:telemetry, "~> 1.0"},
{:ecto, "~> 3.9", optional: true},
{:ecto_sql, "~> 3.9", optional: true},

Expand Down
64 changes: 63 additions & 1 deletion test/idempotency_plug/request_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,33 @@ defmodule IdempotencyPlug.RequestTrackerTest do
test "with no cached response", %{pid: pid} do
expires_after = DateTime.add(DateTime.utc_now(), 24, :hour)

ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :cache_miss]
])

assert {:init, key, expires} = RequestTracker.track(pid, "no-cache", "fingerprint")
assert DateTime.compare(expires, expires_after) != :lt

assert_receive {[:idempotency_plug, :request_tracker, :cache_miss], ^ref, measurements,
metadata}

assert measurements == %{}
assert metadata.request_id == key
assert metadata.fingerprint == "fingerprint"
assert metadata.store == IdempotencyPlug.ETSStore
assert metadata.expires_at == expires

assert {:ok, expires} = RequestTracker.put_response(pid, key, "OK")
assert DateTime.compare(expires, expires_after) != :lt
end

test "with concurrent requests", %{pid: pid} do
ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :cache_hit]
])

test_pid = self()

task =
Expand All @@ -42,6 +61,11 @@ defmodule IdempotencyPlug.RequestTrackerTest do
{:expires, expires} ->
assert {:processing, _node_caller, ^expires} =
RequestTracker.track(pid, "concurrent-request", "fingerprint")

assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements,
metadata}

assert metadata.expires_at == expires
end

send(task.pid, :continue)
Expand All @@ -57,16 +81,36 @@ defmodule IdempotencyPlug.RequestTrackerTest do
{:init, key, _expires} = RequestTracker.track(pid, "cached-fingerprint", "fingerprint")
{:ok, expires} = RequestTracker.put_response(pid, key, "OK")

ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :cache_hit]
])

assert {:mismatch, {:fingerprint, "fingerprint"}, ^expires} =
RequestTracker.track(pid, "cached-fingerprint", "other-fingerprint")

assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements,
metadata}

assert metadata.expires_at == expires
end

test "with cached response", %{pid: pid} do
{:init, key, _expires} = RequestTracker.track(pid, "cached-response", "fingerprint")
{:ok, expires} = RequestTracker.put_response(pid, key, "OK")

ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :cache_hit]
])

assert {:cache, {:ok, "OK"}, ^expires} =
RequestTracker.track(pid, "cached-response", "fingerprint")

assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements,
metadata}

assert metadata.expires_at == expires
end

@tag capture_log: true
Expand All @@ -81,8 +125,18 @@ defmodule IdempotencyPlug.RequestTrackerTest do

{{%RuntimeError{message: "oops"}, _}, _} = catch_exit(Task.await(task))

assert {:cache, {:halted, {%RuntimeError{message: "oops"}, _}}, _expires} =
ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :cache_hit]
])

assert {:cache, {:halted, {%RuntimeError{message: "oops"}, _}}, expires} =
RequestTracker.track(pid, "halted-request", "fingerprint")

assert_receive {[:idempotency_plug, :request_tracker, :cache_hit], ^ref, _measurements,
metadata}

assert metadata.expires_at == expires
end

test "when no tracked request", %{pid: pid} do
Expand All @@ -97,7 +151,15 @@ defmodule IdempotencyPlug.RequestTrackerTest do
assert {:processing, _node_caller, _expires} =
RequestTracker.track(pid, "prune", "fingerprint")

ref =
:telemetry_test.attach_event_handlers(self(), [
[:idempotency_plug, :request_tracker, :prune]
])

:timer.sleep(20)
assert {:init, _id, _expires} = RequestTracker.track(pid, "prune", "fingerprint")

assert_receive {[:idempotency_plug, :request_tracker, :prune], ^ref, _measurements, metadata}
assert metadata.store == IdempotencyPlug.ETSStore
end
end
Loading
Loading