Skip to content

Commit 9fda92a

Browse files
feat: AWS X-Ray Remote Sampler Part 3 - Add Rate Limiter and Sampling Targets Poller Logic (#1536)
* Add Rate Limiter and Sampling Targets Poller Logic * fix test cases related to rate limiter and timecop usage * add example for x-ray sampling on rails * address comments * address comments --------- Co-authored-by: Kayla Reopelle <87386821+kaylareopelle@users.noreply.github.com>
1 parent e1b0400 commit 9fda92a

12 files changed

+886
-16
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'bundler/inline'
8+
9+
gemfile(true) do
10+
source 'https://rubygems.org'
11+
12+
gem 'concurrent-ruby', '1.3.4'
13+
gem 'rails', '~> 7.0.4'
14+
gem 'puma'
15+
16+
gem 'opentelemetry-sdk'
17+
gem 'opentelemetry-instrumentation-rails'
18+
gem 'opentelemetry-sampler-xray', path: './../' # Use local version of the X-Ray Sampler
19+
# gem 'opentelemetry-sampler-xray' # Use RubyGems version of the X-Ray Sampler
20+
end
21+
22+
require "action_controller/railtie"
23+
require "action_mailer/railtie"
24+
require "rails/test_unit/railtie"
25+
26+
class App < Rails::Application
27+
config.root = __dir__
28+
config.consider_all_requests_local = true
29+
30+
routes.append do
31+
root to: 'welcome#index'
32+
get "/test" => 'welcome#test'
33+
end
34+
end
35+
36+
class WelcomeController < ActionController::Base
37+
def index
38+
render inline: 'Successfully called "/" endpoint'
39+
end
40+
41+
def test
42+
render inline: 'Successfully called "/test" endpoint'
43+
end
44+
end
45+
46+
ENV['OTEL_TRACES_EXPORTER'] ||= 'console'
47+
ENV['OTEL_SERVICE_NAME'] ||= 'xray-sampler-on-rails-service'
48+
49+
OpenTelemetry::SDK.configure do |c|
50+
c.use_all
51+
end
52+
53+
OpenTelemetry.tracer_provider.sampler = OpenTelemetry::Sampler::XRay::AWSXRayRemoteSampler.new(resource:OpenTelemetry::SDK::Resources::Resource.create({
54+
"service.name"=>"xray-sampler-on-rails-service"
55+
}))
56+
57+
App.initialize!
58+
59+
run App
60+
61+
#### Running and using the Sample App
62+
# To run this example run the `rackup` command with this file
63+
# Example: rackup xray_sampling_on_rails_demonstration.ru
64+
# Navigate to http://localhost:9292/
65+
# Spans for any requests sampled by the X-Ray Sampler will appear in the console
66+
67+
#### Required configuration in the OpenTelemetry Collector
68+
# In order for sampling rules to be obtained from AWS X-Ray, the awsproxy extension
69+
# must be configured in the OpenTelemetry Collector, which will use your AWS credentials.
70+
# - https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/awsproxy#aws-proxy
71+
# Without the awsproxy extension, the X-Ray Sampler will use a fallback sampler
72+
# with a sampling strategy of "1 request/second, plus 5% of any additional requests"
73+
74+
#### Testing out configurable X-Ray Sampling Rules against the "service.name" resource attribute.
75+
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
76+
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
77+
# Matching Criteria
78+
# ServiceName = xray-sampler-on-rails-service
79+
# ServiceType = *
80+
# Host = *
81+
# ResourceARN = *
82+
# HTTPMethod = *
83+
# URLPath = *
84+
# For the above matching criteria, try out the following settings to sample or not sample requests
85+
# - Limit to 0r/sec then 0 fixed rate
86+
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
87+
# - Limit to 0r/sec then 100% fixed rate
88+
89+
#### Testing out configurable X-Ray Sampling Rules against the "/test" endpoint in this sample app.
90+
# Create a new Sampling Rule with the following matching criteria in AWS CloudWatch Settings for X-Ray Traces.
91+
# - https://console.aws.amazon.com/cloudwatch/home#xray:settings/sampling-rules
92+
# Matching Criteria
93+
# ServiceName = *
94+
# ServiceType = *
95+
# Host = *
96+
# ResourceARN = *
97+
# HTTPMethod = *
98+
# URLPath = /test
99+
# For the above matching criteria, try out the following settings to sample or not sample requests
100+
# - Limit to 0r/sec then 0 fixed rate
101+
# - Limit to 1r/sec then 0 fixed rate (May take 30 seconds for this setting to apply)
102+
# - Limit to 0r/sec then 100% fixed rate

sampler/xray/lib/opentelemetry/sampler/xray/aws_xray_remote_sampler.rb

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7-
require 'net/http'
87
require 'json'
98
require 'opentelemetry/sdk'
10-
require_relative 'sampling_rule'
9+
require_relative 'aws_xray_sampling_client'
1110
require_relative 'fallback_sampler'
12-
require_relative 'sampling_rule_applier'
1311
require_relative 'rule_cache'
14-
require_relative 'aws_xray_sampling_client'
12+
require_relative 'sampling_rule'
13+
require_relative 'sampling_rule_applier'
1514

1615
module OpenTelemetry
1716
module Sampler
@@ -68,7 +67,8 @@ def initialize(endpoint: '127.0.0.1:2000', polling_interval: DEFAULT_RULES_POLLI
6867
# Start the Sampling Rules poller
6968
start_sampling_rules_poller
7069

71-
# TODO: Start the Sampling Targets poller
70+
# Start the Sampling Targets poller
71+
start_sampling_targets_poller
7272
end
7373

7474
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
@@ -113,6 +113,15 @@ def start_sampling_rules_poller
113113
end
114114
end
115115

116+
def start_sampling_targets_poller
117+
@target_poller = Thread.new do
118+
loop do
119+
sleep(((@target_polling_interval * 1000) + @target_polling_jitter_millis) / 1000.0)
120+
retrieve_and_update_sampling_targets
121+
end
122+
end
123+
end
124+
116125
def retrieve_and_update_sampling_rules
117126
sampling_rules_response = @sampling_client.fetch_sampling_rules
118127
if sampling_rules_response&.body && sampling_rules_response.body != ''
@@ -125,6 +134,19 @@ def retrieve_and_update_sampling_rules
125134
OpenTelemetry.handle_error(exception: e, message: 'Error occurred when retrieving or updating Sampling Rules')
126135
end
127136

137+
def retrieve_and_update_sampling_targets
138+
request_body = {
139+
SamplingStatisticsDocuments: @rule_cache.create_sampling_statistics_documents(@client_id)
140+
}
141+
sampling_targets_response = @sampling_client.fetch_sampling_targets(request_body)
142+
if sampling_targets_response&.body && sampling_targets_response.body != ''
143+
response_body = JSON.parse(sampling_targets_response.body)
144+
update_sampling_targets(response_body)
145+
else
146+
OpenTelemetry.logger.debug('SamplingTargets Response is falsy')
147+
end
148+
end
149+
128150
def update_sampling_rules(response_object)
129151
sampling_rules = []
130152
if response_object && response_object['SamplingRuleRecords']
@@ -140,6 +162,33 @@ def update_sampling_rules(response_object)
140162
end
141163
end
142164

165+
def update_sampling_targets(response_object)
166+
if response_object && response_object['SamplingTargetDocuments']
167+
target_documents = {}
168+
169+
response_object['SamplingTargetDocuments'].each do |new_target|
170+
target_documents[new_target['RuleName']] = new_target
171+
end
172+
173+
refresh_sampling_rules, next_polling_interval = @rule_cache.update_targets(
174+
target_documents,
175+
response_object['LastRuleModification']
176+
)
177+
178+
@target_polling_interval = next_polling_interval
179+
180+
if refresh_sampling_rules
181+
OpenTelemetry.logger.debug('Performing out-of-band sampling rule polling to fetch updated rules.')
182+
@rule_poller&.kill
183+
start_sampling_rules_poller
184+
end
185+
else
186+
OpenTelemetry.logger.debug('SamplingTargetDocuments from SamplingTargets request is not defined')
187+
end
188+
rescue StandardError => e
189+
OpenTelemetry.logger.debug("Error occurred when updating Sampling Targets: #{e}")
190+
end
191+
143192
class << self
144193
def generate_client_id
145194
hex_chars = ('0'..'9').to_a + ('a'..'f').to_a

sampler/xray/lib/opentelemetry/sampler/xray/fallback_sampler.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,24 @@
44
#
55
# SPDX-License-Identifier: Apache-2.0
66

7+
require_relative 'rate_limiting_sampler'
8+
79
module OpenTelemetry
810
module Sampler
911
module XRay
1012
# FallbackSampler samples 1 req/sec and additional 5% of requests using TraceIdRatioBasedSampler.
1113
class FallbackSampler
1214
def initialize
1315
@fixed_rate_sampler = OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(0.05)
16+
@rate_limiting_sampler = RateLimitingSampler.new(1)
1417
end
1518

1619
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
17-
# TODO: implement and use Rate Limiting Sampler
20+
sampling_result = @rate_limiting_sampler.should_sample?(
21+
trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes
22+
)
23+
24+
return sampling_result if sampling_result.instance_variable_get(:@decision) != OpenTelemetry::SDK::Trace::Samplers::Decision::DROP
1825

1926
@fixed_rate_sampler.should_sample?(trace_id: trace_id, parent_context: parent_context, links: links, name: name, kind: kind, attributes: attributes)
2027
end
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Sampler
9+
module XRay
10+
# RateLimiter keeps track of the current reservoir quota balance available (measured via available time)
11+
# If enough time has elapsed, the RateLimiter will allow quota balance to be consumed/taken (decrease available time)
12+
# A RateLimitingSampler uses this RateLimiter to determine if it should sample or not based on the quota balance available.
13+
class RateLimiter
14+
def initialize(quota, max_balance_in_seconds = 1)
15+
@max_balance_millis = max_balance_in_seconds * 1000.0
16+
@quota = quota
17+
@wallet_floor_millis = Time.now.to_f * 1000
18+
# current "balance" would be `ceiling - floor`
19+
@lock = Mutex.new
20+
end
21+
22+
def take(cost = 1)
23+
return false if @quota <= 0
24+
25+
quota_per_millis = @quota / 1000.0
26+
27+
# assume divide by zero not possible
28+
cost_in_millis = cost / quota_per_millis
29+
30+
@lock.synchronize do
31+
wallet_ceiling_millis = Time.now.to_f * 1000
32+
current_balance_millis = wallet_ceiling_millis - @wallet_floor_millis
33+
current_balance_millis = [current_balance_millis, @max_balance_millis].min
34+
pending_remaining_balance_millis = current_balance_millis - cost_in_millis
35+
36+
if pending_remaining_balance_millis >= 0
37+
@wallet_floor_millis = wallet_ceiling_millis - pending_remaining_balance_millis
38+
return true
39+
end
40+
41+
# No changes to the wallet state
42+
false
43+
end
44+
end
45+
end
46+
end
47+
end
48+
end
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require_relative 'rate_limiter'
8+
9+
module OpenTelemetry
10+
module Sampler
11+
module XRay
12+
# RateLimitingSampler is a Sampler that uses a RateLimiter to determine
13+
# if it should sample or not based on the quota balance available.
14+
class RateLimitingSampler
15+
def initialize(quota)
16+
@quota = quota
17+
@reservoir = RateLimiter.new(quota)
18+
end
19+
20+
def should_sample?(trace_id:, parent_context:, links:, name:, kind:, attributes:)
21+
tracestate = OpenTelemetry::Trace.current_span(parent_context).context.tracestate
22+
if @reservoir.take(1)
23+
OpenTelemetry::SDK::Trace::Samplers::Result.new(
24+
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::RECORD_AND_SAMPLE,
25+
tracestate: tracestate,
26+
attributes: attributes
27+
)
28+
else
29+
OpenTelemetry::SDK::Trace::Samplers::Result.new(
30+
decision: OpenTelemetry::SDK::Trace::Samplers::Decision::DROP,
31+
tracestate: tracestate,
32+
attributes: attributes
33+
)
34+
end
35+
end
36+
37+
def to_s
38+
"RateLimitingSampler{rate limiting sampling with sampling config of #{@quota} req/sec and 0% of additional requests}"
39+
end
40+
end
41+
end
42+
end
43+
end

sampler/xray/lib/opentelemetry/sampler/xray/rule_cache.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,52 @@ def update_rules(new_rule_appliers)
5353
end
5454
end
5555

56+
def create_sampling_statistics_documents(client_id)
57+
statistics_documents = []
58+
59+
@cache_lock.synchronize do
60+
@rule_appliers.each do |rule|
61+
statistics = rule.snapshot_statistics
62+
now_in_seconds = Time.now.to_i
63+
64+
sampling_statistics_doc = {
65+
ClientID: client_id,
66+
RuleName: rule.sampling_rule.rule_name,
67+
Timestamp: now_in_seconds,
68+
RequestCount: statistics.request_count,
69+
BorrowCount: statistics.borrow_count,
70+
SampledCount: statistics.sample_count
71+
}
72+
73+
statistics_documents << sampling_statistics_doc
74+
end
75+
end
76+
77+
statistics_documents
78+
end
79+
80+
def update_targets(target_documents, last_rule_modification)
81+
min_polling_interval = nil
82+
next_polling_interval = DEFAULT_TARGET_POLLING_INTERVAL_SECONDS
83+
84+
@cache_lock.synchronize do
85+
@rule_appliers.each_with_index do |rule, index|
86+
target = target_documents[rule.sampling_rule.rule_name]
87+
if target
88+
@rule_appliers[index] = rule.with_target(target)
89+
min_polling_interval = target['Interval'] if target['Interval'] && (min_polling_interval.nil? || min_polling_interval > target['Interval'])
90+
else
91+
OpenTelemetry.logger.debug('Invalid sampling target: missing rule name')
92+
end
93+
end
94+
95+
next_polling_interval = min_polling_interval if min_polling_interval
96+
97+
refresh_sampling_rules = last_rule_modification * 1000 > @last_updated_epoch_millis
98+
return [refresh_sampling_rules, next_polling_interval]
99+
end
100+
end
101+
56102
private
57103

58104
def sort_rules_by_priority

0 commit comments

Comments
 (0)