Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions lib/solarwinds_apm/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,29 +181,14 @@ def self.[]=(key, value)
case key
when :sampling_rate
SolarWindsAPM.logger.warn do
"[#{name}/#{__method__}] sampling_rate is not a supported setting for SolarWindsAPM::Config. Please use :sample_rate."
'[Deprecated] sampling_rate is not a supported setting for SolarWindsAPM::Config.'
end

when :sample_rate
unless value.is_a?(Integer) || value.is_a?(Float)
SolarWindsAPM.logger.warn do
"[#{name}/#{__method__}] :sample_rate must be a number between 0 and 1000000 (1m) (provided: #{value}), corrected to 0"
end
value = 0
end

# Validate :sample_rate value
unless value.between?(0, 1e6)
new_value = value.negative? ? 0 : 1_000_000
SolarWindsAPM.logger.warn do
"[#{name}/#{__method__}] :sample_rate must be between 0 and 1000000 (1m) (provided: #{value}), corrected to #{new_value}"
end
SolarWindsAPM.logger.warn do
'[Deprecated] sample_rate is not a supported setting for SolarWindsAPM::Config.'
end

# Assure value is an integer
@@config[key.to_sym] = new_value.to_i
SolarWindsAPM.sample_rate(new_value)

when :transaction_settings
compile_settings(value)

Expand Down
43 changes: 16 additions & 27 deletions lib/solarwinds_apm/sampling/token_bucket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ def initialize(token_bucket_settings)
@timer = nil
end

# used call from update_settings e.g. bucket.update(bucket_settings)
# oboe sampler update_settings will update the token
# (thread safe as update_settings is guarded by mutex from oboe sampler)
def update(settings)
settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_token_bucket_settings(settings)
settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings))
end

def update_from_hash(settings)
Expand All @@ -36,32 +37,13 @@ def update_from_hash(settings)
end

self.rate = settings[:rate] if settings[:rate]

return unless settings[:interval]

self.interval = settings[:interval]
return unless running

stop
start
end

def update_from_token_bucket_settings(settings)
if settings.capacity
difference = settings.capacity - @capacity
self.capacity = settings.capacity
self.tokens = @tokens + difference
end

self.rate = settings.rate if settings.rate

return unless settings.interval

self.interval = settings.interval
return unless running

stop
start
def tb_to_hash(settings)
{ capacity: settings.capacity,
rate: settings.rate,
interval: settings.interval }
end

def capacity=(capacity)
Expand All @@ -72,8 +54,11 @@ def rate=(rate)
@rate = [0, rate].max
end

# self.interval= sets the @interval and @sleep_interval
# @sleep_interval is used in the timer thread to sleep between replenishing the bucket
def interval=(interval)
@interval = interval.clamp(0, MAX_INTERVAL)
@sleep_interval = @interval / 1000.0
end

def tokens=(tokens)
Expand All @@ -83,11 +68,15 @@ def tokens=(tokens)
# Attempts to consume tokens from the bucket
# @param n [Integer] Number of tokens to consume
# @return [Boolean] Whether there were enough tokens
# TODO: we need to include thread-safety here since sampler is shared across threads
# and we may have multiple threads trying to consume tokens at the same time
def consume(token = 1)
if @tokens >= token
self.tokens = @tokens - token
SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" }
true
else
SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" }
false
end
end
Expand All @@ -99,7 +88,7 @@ def start
@timer = Thread.new do
loop do
task
sleep(@interval / 1000.0)
sleep(@sleep_interval)
end
end
end
Expand All @@ -114,7 +103,7 @@ def stop

# Whether the bucket is actively being replenished
def running
!@timer.nil?
!@timer.nil? && @timer.alive?
end

private
Expand Down
2 changes: 1 addition & 1 deletion lib/solarwinds_apm/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module SolarWindsAPM
module Version
MAJOR = 7 # breaking,
MINOR = 0 # feature,
PATCH = 1 # fix => BFF
PATCH = 2 # fix => BFF
PRE = nil

STRING = [MAJOR, MINOR, PATCH, PRE].compact.join('.')
Expand Down
1 change: 1 addition & 0 deletions test/patch/sw_pg_patch_integrate_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def pg_dbo_integration_verification(sql, finished_spans)
_(client_ancestors[2]).must_equal PG::Connection

pg_client = PG::Connection.new
exporter.reset

args = ['SELECT * FROM ABC;']

Expand Down
47 changes: 46 additions & 1 deletion test/sampling/token_bucket_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,57 @@
end

it 'defaults to zero' do
bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new)
# default interval is ~24 days, not suitable for testing
bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(0, 0, 1000))

bucket.start
sleep(0.1)
bucket.stop

refute bucket.consume
end

describe 'when a process fork occurs' do
it 'creates new timer thread when update is called' do
bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(8, 2, 10))
bucket.start

parent_timer_id = bucket.instance_variable_get(:@timer).object_id
parent_pid = Process.pid

read_pipe, write_pipe = IO.pipe

pid = fork do
read_pipe.close
# In the child process, call update which should create a new timer thread
bucket.update(rate: 3, capacity: 100)

current_timer_id = bucket.instance_variable_get(:@timer).object_id
child_pid = Process.pid

# Write results to parent
write_pipe.puts "#{child_pid},#{current_timer_id},#{bucket.running}"
write_pipe.close

bucket.stop
exit!
end

write_pipe.close
result = read_pipe.read
read_pipe.close
Process.wait(pid)

child_pid, current_timer_id, is_running = result.strip.split(',')
child_pid = child_pid.to_i
current_timer_id = current_timer_id.to_i

# Verify that we're in a different process and have a new timer thread
refute_equal parent_pid, child_pid
refute_equal parent_timer_id, current_timer_id
assert_equal 'true', is_running

bucket.stop
end
end
end
Loading