Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class TracerMiddleware
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)

def call(_worker, msg, _queue)
created_at = time_from_timestamp(msg['created_at'])
enqueued_at = time_from_timestamp(msg['enqueued_at']) if msg['enqueued_at']

attributes = {
SemanticConventions::Trace::MESSAGING_SYSTEM => 'sidekiq',
'messaging.sidekiq.job_class' => msg['wrapped']&.to_s || msg['class'],
Expand All @@ -24,20 +27,20 @@ def call(_worker, msg, _queue)
SemanticConventions::Trace::MESSAGING_OPERATION => 'process'
}
attributes[SemanticConventions::Trace::PEER_SERVICE] = instrumentation_config[:peer_service] if instrumentation_config[:peer_service]
attributes['messaging.sidekiq.latency'] = queue_latency(enqueued_at) if enqueued_at
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We intentionally avoid adding metric values to span attributes.

Is this something that you can derive in the collector or your own implementation of an exporter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for that? Favouring the usage of the Metrics API? Though I'd understand the motivation given the tri modal definition for Open Telemetry (Trace, Metrics & Logs) this stipulation might be too restrictive considering the shift happening in the industry where companies like Honeycomb are normalizing cost efficient storage and rich retrieval of high cardinality/dimensionality span data. Recently I came across Sentry's decision of not rolling a Metrics product 1 year in the making in favor of Span Metrics

attributes['messaging.sidekiq.retry.count'] = msg['retry_count'] if msg['retry_count']

span_name = case instrumentation_config[:span_naming]
when :job_class then "#{msg['wrapped']&.to_s || msg['class']} process"
else "#{msg['queue']} process"
end

extracted_context = OpenTelemetry.propagation.extract(msg)
created_at = time_from_timestamp(msg['created_at'])
enqueued_at = time_from_timestamp(msg['created_at'])
OpenTelemetry::Context.with_current(extracted_context) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: created_at)
span.add_event('enqueued_at', timestamp: enqueued_at)
span.add_event('enqueued_at', timestamp: enqueued_at) if enqueued_at
yield
end
else
Expand All @@ -47,7 +50,7 @@ def call(_worker, msg, _queue)
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: created_at)
span.add_event('enqueued_at', timestamp: enqueued_at)
span.add_event('enqueued_at', timestamp: enqueued_at) if enqueued_at
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
Expand Down Expand Up @@ -78,6 +81,12 @@ def time_from_timestamp(timestamp)
timestamp/1000r
end
end

def queue_latency(time)
return unless time

Time.now.to_f - time
end
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
_(job_span.attributes['messaging.destination_kind']).must_equal 'queue'
_(job_span.attributes['messaging.operation']).must_equal 'process'
_(job_span.attributes['peer.service']).must_be_nil
_(job_span.attributes['messaging.sidekiq.latency']).wont_be_nil
_(job_span.attributes['messaging.sidekiq.retry.count']).must_be_nil
_(job_span.events.size).must_equal(2)

created_event = job_span.events[0]
Expand All @@ -65,6 +67,40 @@
_(job_span.attributes['messaging.destination']).must_equal('default')
_(job_span.attributes['messaging.destination_kind']).must_equal('queue')
_(job_span.attributes['messaging.operation']).must_equal 'process'
_(job_span.attributes['messaging.sidekiq.latency']).wont_be_nil
_(job_span.attributes['messaging.sidekiq.retry.count']).must_be_nil
end

it 'records retry count when present in job message' do
# Create a job message with retry count
job_message = {
'jid' => 'test-retry-job-id',
'class' => 'ExceptionTestingJob',
'queue' => 'default',
'created_at' => Time.now.to_f,
'retry_count' => 2
}

# Create a mock worker
worker = ExceptionTestingJob.new

# Test the middleware directly with the job message
middleware = OpenTelemetry::Instrumentation::Sidekiq::Middlewares::Server::TracerMiddleware.new

begin
middleware.call(worker, job_message, 'default') do
raise 'a little hell'
end
rescue RuntimeError
# Expected - job will fail
end

# Find the span that was created
retry_span = spans.find { |s| s.attributes['messaging.sidekiq.retry.count'] }

_(retry_span).wont_be_nil
_(retry_span.attributes['messaging.sidekiq.retry.count']).must_equal 2
_(retry_span.attributes['messaging.sidekiq.job_class']).must_equal 'ExceptionTestingJob'
end

it 'defaults to using links to the enqueing span but does not continue the trace' do
Expand Down