From 825cd2a267c502a65b14b432889ca9fa38ab65c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A1vio=20Lucena?= Date: Wed, 24 Sep 2025 23:13:45 -0700 Subject: [PATCH] feat: add latency and retry count to instrumentation --- .../middlewares/server/tracer_middleware.rb | 17 ++++++--- .../server/tracer_middleware_test.rb | 36 +++++++++++++++++++ 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb index 47c3d7dce7..afbb45e319 100644 --- a/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb +++ b/instrumentation/sidekiq/lib/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware.rb @@ -15,6 +15,9 @@ class TracerMiddleware include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware) def call(_worker, msg, _queue) + created_at = time_from_timestamp(msg['created_at']) + enqueued_at = time_from_timestamp(msg['enqueued_at']) if msg['enqueued_at'] + attributes = { SemanticConventions::Trace::MESSAGING_SYSTEM => 'sidekiq', 'messaging.sidekiq.job_class' => msg['wrapped']&.to_s || msg['class'], @@ -24,6 +27,8 @@ def call(_worker, msg, _queue) SemanticConventions::Trace::MESSAGING_OPERATION => 'process' } attributes[SemanticConventions::Trace::PEER_SERVICE] = instrumentation_config[:peer_service] if instrumentation_config[:peer_service] + attributes['messaging.sidekiq.latency'] = queue_latency(enqueued_at) if enqueued_at + attributes['messaging.sidekiq.retry.count'] = msg['retry_count'] if msg['retry_count'] span_name = case instrumentation_config[:span_naming] when :job_class then "#{msg['wrapped']&.to_s || msg['class']} process" @@ -31,13 +36,11 @@ def call(_worker, msg, _queue) end extracted_context = OpenTelemetry.propagation.extract(msg) - created_at = time_from_timestamp(msg['created_at']) - enqueued_at = time_from_timestamp(msg['created_at']) OpenTelemetry::Context.with_current(extracted_context) do if instrumentation_config[:propagation_style] == :child tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span| span.add_event('created_at', timestamp: created_at) - span.add_event('enqueued_at', timestamp: enqueued_at) + span.add_event('enqueued_at', timestamp: enqueued_at) if enqueued_at yield end else @@ -47,7 +50,7 @@ def call(_worker, msg, _queue) span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer) OpenTelemetry::Trace.with_span(span) do span.add_event('created_at', timestamp: created_at) - span.add_event('enqueued_at', timestamp: enqueued_at) + span.add_event('enqueued_at', timestamp: enqueued_at) if enqueued_at yield rescue Exception => e # rubocop:disable Lint/RescueException span.record_exception(e) @@ -78,6 +81,12 @@ def time_from_timestamp(timestamp) timestamp/1000r end end + + def queue_latency(time) + return unless time + + Time.now.to_f - time + end end end end diff --git a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb index 5c770bb78c..2066612e55 100644 --- a/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb +++ b/instrumentation/sidekiq/test/opentelemetry/instrumentation/sidekiq/middlewares/server/tracer_middleware_test.rb @@ -44,6 +44,8 @@ _(job_span.attributes['messaging.destination_kind']).must_equal 'queue' _(job_span.attributes['messaging.operation']).must_equal 'process' _(job_span.attributes['peer.service']).must_be_nil + _(job_span.attributes['messaging.sidekiq.latency']).wont_be_nil + _(job_span.attributes['messaging.sidekiq.retry.count']).must_be_nil _(job_span.events.size).must_equal(2) created_event = job_span.events[0] @@ -65,6 +67,40 @@ _(job_span.attributes['messaging.destination']).must_equal('default') _(job_span.attributes['messaging.destination_kind']).must_equal('queue') _(job_span.attributes['messaging.operation']).must_equal 'process' + _(job_span.attributes['messaging.sidekiq.latency']).wont_be_nil + _(job_span.attributes['messaging.sidekiq.retry.count']).must_be_nil + end + + it 'records retry count when present in job message' do + # Create a job message with retry count + job_message = { + 'jid' => 'test-retry-job-id', + 'class' => 'ExceptionTestingJob', + 'queue' => 'default', + 'created_at' => Time.now.to_f, + 'retry_count' => 2 + } + + # Create a mock worker + worker = ExceptionTestingJob.new + + # Test the middleware directly with the job message + middleware = OpenTelemetry::Instrumentation::Sidekiq::Middlewares::Server::TracerMiddleware.new + + begin + middleware.call(worker, job_message, 'default') do + raise 'a little hell' + end + rescue RuntimeError + # Expected - job will fail + end + + # Find the span that was created + retry_span = spans.find { |s| s.attributes['messaging.sidekiq.retry.count'] } + + _(retry_span).wont_be_nil + _(retry_span.attributes['messaging.sidekiq.retry.count']).must_equal 2 + _(retry_span.attributes['messaging.sidekiq.job_class']).must_equal 'ExceptionTestingJob' end it 'defaults to using links to the enqueing span but does not continue the trace' do