diff --git a/lib/solarwinds_apm/config.rb b/lib/solarwinds_apm/config.rb index a357497b..c0b55863 100644 --- a/lib/solarwinds_apm/config.rb +++ b/lib/solarwinds_apm/config.rb @@ -181,29 +181,14 @@ def self.[]=(key, value) case key when :sampling_rate SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] sampling_rate is not a supported setting for SolarWindsAPM::Config. Please use :sample_rate." + '[Deprecated] sampling_rate is not a supported setting for SolarWindsAPM::Config.' end when :sample_rate - unless value.is_a?(Integer) || value.is_a?(Float) - SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] :sample_rate must be a number between 0 and 1000000 (1m) (provided: #{value}), corrected to 0" - end - value = 0 - end - - # Validate :sample_rate value - unless value.between?(0, 1e6) - new_value = value.negative? ? 0 : 1_000_000 - SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] :sample_rate must be between 0 and 1000000 (1m) (provided: #{value}), corrected to #{new_value}" - end + SolarWindsAPM.logger.warn do + '[Deprecated] sample_rate is not a supported setting for SolarWindsAPM::Config.' end - # Assure value is an integer - @@config[key.to_sym] = new_value.to_i - SolarWindsAPM.sample_rate(new_value) - when :transaction_settings compile_settings(value) diff --git a/lib/solarwinds_apm/sampling/token_bucket.rb b/lib/solarwinds_apm/sampling/token_bucket.rb index 820e02c9..fc79f9bf 100644 --- a/lib/solarwinds_apm/sampling/token_bucket.rb +++ b/lib/solarwinds_apm/sampling/token_bucket.rb @@ -23,9 +23,10 @@ def initialize(token_bucket_settings) @timer = nil end - # used call from update_settings e.g. bucket.update(bucket_settings) + # oboe sampler update_settings will update the token + # (thread safe as update_settings is guarded by mutex from oboe sampler) def update(settings) - settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_token_bucket_settings(settings) + settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings)) end def update_from_hash(settings) @@ -36,32 +37,13 @@ def update_from_hash(settings) end self.rate = settings[:rate] if settings[:rate] - - return unless settings[:interval] - - self.interval = settings[:interval] - return unless running - - stop start end - def update_from_token_bucket_settings(settings) - if settings.capacity - difference = settings.capacity - @capacity - self.capacity = settings.capacity - self.tokens = @tokens + difference - end - - self.rate = settings.rate if settings.rate - - return unless settings.interval - - self.interval = settings.interval - return unless running - - stop - start + def tb_to_hash(settings) + { capacity: settings.capacity, + rate: settings.rate, + interval: settings.interval } end def capacity=(capacity) @@ -72,8 +54,11 @@ def rate=(rate) @rate = [0, rate].max end + # self.interval= sets the @interval and @sleep_interval + # @sleep_interval is used in the timer thread to sleep between replenishing the bucket def interval=(interval) @interval = interval.clamp(0, MAX_INTERVAL) + @sleep_interval = @interval / 1000.0 end def tokens=(tokens) @@ -83,11 +68,15 @@ def tokens=(tokens) # Attempts to consume tokens from the bucket # @param n [Integer] Number of tokens to consume # @return [Boolean] Whether there were enough tokens + # TODO: we need to include thread-safety here since sampler is shared across threads + # and we may have multiple threads trying to consume tokens at the same time def consume(token = 1) if @tokens >= token self.tokens = @tokens - token + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } true else + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } false end end @@ -99,7 +88,7 @@ def start @timer = Thread.new do loop do task - sleep(@interval / 1000.0) + sleep(@sleep_interval) end end end @@ -114,7 +103,7 @@ def stop # Whether the bucket is actively being replenished def running - !@timer.nil? + !@timer.nil? && @timer.alive? end private diff --git a/lib/solarwinds_apm/version.rb b/lib/solarwinds_apm/version.rb index 54f4f657..357aa11a 100644 --- a/lib/solarwinds_apm/version.rb +++ b/lib/solarwinds_apm/version.rb @@ -13,7 +13,7 @@ module SolarWindsAPM module Version MAJOR = 7 # breaking, MINOR = 0 # feature, - PATCH = 1 # fix => BFF + PATCH = 2 # fix => BFF PRE = nil STRING = [MAJOR, MINOR, PATCH, PRE].compact.join('.') diff --git a/test/patch/sw_pg_patch_integrate_test.rb b/test/patch/sw_pg_patch_integrate_test.rb index c1e4070d..6b1a029c 100644 --- a/test/patch/sw_pg_patch_integrate_test.rb +++ b/test/patch/sw_pg_patch_integrate_test.rb @@ -47,6 +47,7 @@ def pg_dbo_integration_verification(sql, finished_spans) _(client_ancestors[2]).must_equal PG::Connection pg_client = PG::Connection.new + exporter.reset args = ['SELECT * FROM ABC;'] diff --git a/test/sampling/token_bucket_test.rb b/test/sampling/token_bucket_test.rb index 4226d1ec..9215a95a 100644 --- a/test/sampling/token_bucket_test.rb +++ b/test/sampling/token_bucket_test.rb @@ -67,7 +67,8 @@ end it 'defaults to zero' do - bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new) + # default interval is ~24 days, not suitable for testing + bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(0, 0, 1000)) bucket.start sleep(0.1) @@ -75,4 +76,48 @@ refute bucket.consume end + + describe 'when a process fork occurs' do + it 'creates new timer thread when update is called' do + bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(8, 2, 10)) + bucket.start + + parent_timer_id = bucket.instance_variable_get(:@timer).object_id + parent_pid = Process.pid + + read_pipe, write_pipe = IO.pipe + + pid = fork do + read_pipe.close + # In the child process, call update which should create a new timer thread + bucket.update(rate: 3, capacity: 100) + + current_timer_id = bucket.instance_variable_get(:@timer).object_id + child_pid = Process.pid + + # Write results to parent + write_pipe.puts "#{child_pid},#{current_timer_id},#{bucket.running}" + write_pipe.close + + bucket.stop + exit! + end + + write_pipe.close + result = read_pipe.read + read_pipe.close + Process.wait(pid) + + child_pid, current_timer_id, is_running = result.strip.split(',') + child_pid = child_pid.to_i + current_timer_id = current_timer_id.to_i + + # Verify that we're in a different process and have a new timer thread + refute_equal parent_pid, child_pid + refute_equal parent_timer_id, current_timer_id + assert_equal 'true', is_running + + bucket.stop + end + end end