From 387576d5c7383c6ce15bd674777f1f6e30800b7e Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Tue, 2 Dec 2025 14:04:02 -0500 Subject: [PATCH 1/5] NH-124861: restart timer thread --- lib/solarwinds_apm/config.rb | 21 +----- lib/solarwinds_apm/sampling/token_bucket.rb | 82 ++++++++++++--------- lib/solarwinds_apm/version.rb | 2 +- test/sampling/token_bucket_test.rb | 3 +- 4 files changed, 52 insertions(+), 56 deletions(-) diff --git a/lib/solarwinds_apm/config.rb b/lib/solarwinds_apm/config.rb index a357497b..e102513d 100644 --- a/lib/solarwinds_apm/config.rb +++ b/lib/solarwinds_apm/config.rb @@ -181,29 +181,14 @@ def self.[]=(key, value) case key when :sampling_rate SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] sampling_rate is not a supported setting for SolarWindsAPM::Config. Please use :sample_rate." + '[Depreciated] sampling_rate is not a supported setting for SolarWindsAPM::Config.' end when :sample_rate - unless value.is_a?(Integer) || value.is_a?(Float) - SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] :sample_rate must be a number between 0 and 1000000 (1m) (provided: #{value}), corrected to 0" - end - value = 0 - end - - # Validate :sample_rate value - unless value.between?(0, 1e6) - new_value = value.negative? ? 0 : 1_000_000 - SolarWindsAPM.logger.warn do - "[#{name}/#{__method__}] :sample_rate must be between 0 and 1000000 (1m) (provided: #{value}), corrected to #{new_value}" - end + SolarWindsAPM.logger.warn do + '[Depreciated] sample_rate is not a supported setting for SolarWindsAPM::Config.' end - # Assure value is an integer - @@config[key.to_sym] = new_value.to_i - SolarWindsAPM.sample_rate(new_value) - when :transaction_settings compile_settings(value) diff --git a/lib/solarwinds_apm/sampling/token_bucket.rb b/lib/solarwinds_apm/sampling/token_bucket.rb index 820e02c9..3b9ee02a 100644 --- a/lib/solarwinds_apm/sampling/token_bucket.rb +++ b/lib/solarwinds_apm/sampling/token_bucket.rb @@ -16,68 +16,65 @@ class TokenBucket attr_reader :capacity, :rate, :interval, :tokens def initialize(token_bucket_settings) + @lock = Mutex.new self.capacity = token_bucket_settings.capacity || 0 self.rate = token_bucket_settings.rate || 0 self.interval = token_bucket_settings.interval || MAX_INTERVAL - self.tokens = @capacity + self.tokens = capacity + @stop_requested = false @timer = nil end # used call from update_settings e.g. bucket.update(bucket_settings) def update(settings) - settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_token_bucket_settings(settings) + settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings)) end def update_from_hash(settings) if settings[:capacity] - difference = settings[:capacity] - @capacity + difference = settings[:capacity] - capacity self.capacity = settings[:capacity] - self.tokens = @tokens + difference + self.tokens = tokens + difference + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated capacity: #{capacity}, tokens: #{tokens}, difference: #{difference}" } end - self.rate = settings[:rate] if settings[:rate] - - return unless settings[:interval] + if settings[:rate] + self.rate = settings[:rate] + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated rate: #{rate}" } + end - self.interval = settings[:interval] - return unless running + if settings[:interval] + self.interval = settings[:interval] + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated interval: #{interval}ms" } + if running + stop + start + end + end - stop start end - def update_from_token_bucket_settings(settings) - if settings.capacity - difference = settings.capacity - @capacity - self.capacity = settings.capacity - self.tokens = @tokens + difference - end - - self.rate = settings.rate if settings.rate - - return unless settings.interval - - self.interval = settings.interval - return unless running - - stop - start + def tb_to_hash(settings) + { capacity: settings.capacity, + rate: settings.rate, + interval: settings.interval } end def capacity=(capacity) - @capacity = [0, capacity].max + @lock.synchronize { @capacity = [0, capacity].max } end def rate=(rate) - @rate = [0, rate].max + @lock.synchronize { @rate = [0, rate].max } end def interval=(interval) - @interval = interval.clamp(0, MAX_INTERVAL) + @lock.synchronize { @interval = interval.clamp(0, MAX_INTERVAL) } end def tokens=(tokens) - @tokens = tokens.clamp(0, @capacity) + @lock.synchronize { @tokens = tokens.clamp(0, @capacity) } end # Attempts to consume tokens from the bucket @@ -86,18 +83,27 @@ def tokens=(tokens) def consume(token = 1) if @tokens >= token self.tokens = @tokens - token + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} tokens, remaining: #{@tokens}/#{@capacity} (#{(tokens.to_f / @capacity * 100).round(1)}%)" } true else + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } false end end # Starts replenishing the bucket def start - return if running + @lock.synchronize do + return if running + + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Starting replenishment timer (interval: #{interval}ms, rate: #{rate})" } + @stop_requested = false + end @timer = Thread.new do loop do + break if @stop_requested + task sleep(@interval / 1000.0) end @@ -106,21 +112,25 @@ def start # Stops replenishing the bucket def stop - return unless running + @lock.synchronize do + return unless running - @timer.kill - @timer = nil + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Stopping replenishment timer" } + @stop_requested = true + end + @timer.join # Wait for clean exit + @lock.synchronize { @timer = nil } end # Whether the bucket is actively being replenished def running - !@timer.nil? + !@timer.nil? && @timer.alive? end private def task - self.tokens = tokens + @rate + self.tokens = @tokens + rate end end end diff --git a/lib/solarwinds_apm/version.rb b/lib/solarwinds_apm/version.rb index 54f4f657..357aa11a 100644 --- a/lib/solarwinds_apm/version.rb +++ b/lib/solarwinds_apm/version.rb @@ -13,7 +13,7 @@ module SolarWindsAPM module Version MAJOR = 7 # breaking, MINOR = 0 # feature, - PATCH = 1 # fix => BFF + PATCH = 2 # fix => BFF PRE = nil STRING = [MAJOR, MINOR, PATCH, PRE].compact.join('.') diff --git a/test/sampling/token_bucket_test.rb b/test/sampling/token_bucket_test.rb index 4226d1ec..8042c8eb 100644 --- a/test/sampling/token_bucket_test.rb +++ b/test/sampling/token_bucket_test.rb @@ -67,7 +67,8 @@ end it 'defaults to zero' do - bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new) + # default interval is ~24 days, not suitable for testing + bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(0, 0, 1000)) bucket.start sleep(0.1) From 063c3e30629a8c1b0fc4192899cc8ecaa0c32fb3 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Tue, 2 Dec 2025 14:53:13 -0500 Subject: [PATCH 2/5] more thread safety around update and consume --- lib/solarwinds_apm/sampling/token_bucket.rb | 77 +++++++++++++-------- 1 file changed, 49 insertions(+), 28 deletions(-) diff --git a/lib/solarwinds_apm/sampling/token_bucket.rb b/lib/solarwinds_apm/sampling/token_bucket.rb index 3b9ee02a..f905a89e 100644 --- a/lib/solarwinds_apm/sampling/token_bucket.rb +++ b/lib/solarwinds_apm/sampling/token_bucket.rb @@ -13,8 +13,6 @@ class TokenBucket # Maximum value of a signed 32-bit integer MAX_INTERVAL = (2**31) - 1 - attr_reader :capacity, :rate, :interval, :tokens - def initialize(token_bucket_settings) @lock = Mutex.new self.capacity = token_bucket_settings.capacity || 0 @@ -31,28 +29,31 @@ def update(settings) end def update_from_hash(settings) - if settings[:capacity] - difference = settings[:capacity] - capacity - self.capacity = settings[:capacity] - self.tokens = tokens + difference - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated capacity: #{capacity}, tokens: #{tokens}, difference: #{difference}" } - end + @lock.synchronize do + if settings[:capacity] + difference = settings[:capacity] - @capacity + @capacity = [0, settings[:capacity]].max + @tokens = (@tokens + difference).clamp(0, @capacity) + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated capacity: #{@capacity}, tokens: #{@tokens}, difference: #{difference}" } + end - if settings[:rate] - self.rate = settings[:rate] - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated rate: #{rate}" } - end + if settings[:rate] + @rate = [0, settings[:rate]].max + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated rate: #{@rate}" } + end - if settings[:interval] - self.interval = settings[:interval] - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated interval: #{interval}ms" } - if running - stop - start + if settings[:interval] + @interval = settings[:interval].clamp(0, MAX_INTERVAL) + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated interval: #{@interval}ms" } end end - start + if settings[:interval] && running + stop + start + end + + start unless running end def tb_to_hash(settings) @@ -61,18 +62,34 @@ def tb_to_hash(settings) interval: settings.interval } end + def capacity + @lock.synchronize { @capacity } + end + def capacity=(capacity) @lock.synchronize { @capacity = [0, capacity].max } end + def rate + @lock.synchronize { @rate } + end + def rate=(rate) @lock.synchronize { @rate = [0, rate].max } end + def interval + @lock.synchronize { @interval } + end + def interval=(interval) @lock.synchronize { @interval = interval.clamp(0, MAX_INTERVAL) } end + def tokens + @lock.synchronize { @tokens } + end + def tokens=(tokens) @lock.synchronize { @tokens = tokens.clamp(0, @capacity) } end @@ -81,13 +98,15 @@ def tokens=(tokens) # @param n [Integer] Number of tokens to consume # @return [Boolean] Whether there were enough tokens def consume(token = 1) - if @tokens >= token - self.tokens = @tokens - token - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} tokens, remaining: #{@tokens}/#{@capacity} (#{(tokens.to_f / @capacity * 100).round(1)}%)" } - true - else - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } - false + @lock.synchronize do + if @tokens >= token + @tokens -= token + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} tokens, remaining: #{@tokens}/#{@capacity} (#{(@tokens.to_f / @capacity * 100).round(1)}%)" } + true + else + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } + false + end end end @@ -96,7 +115,7 @@ def start @lock.synchronize do return if running - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Starting replenishment timer (interval: #{interval}ms, rate: #{rate})" } + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Starting replenishment timer (interval: #{@interval}ms, rate: #{@rate})" } @stop_requested = false end @@ -130,7 +149,9 @@ def running private def task - self.tokens = @tokens + rate + @lock.synchronize do + @tokens = [@tokens + @rate, @capacity].min + end end end end From db057c461f429dd36cf8e2e2637d0179da03d6eb Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Wed, 3 Dec 2025 13:05:36 -0500 Subject: [PATCH 3/5] fix the test fail --- test/patch/sw_pg_patch_integrate_test.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/test/patch/sw_pg_patch_integrate_test.rb b/test/patch/sw_pg_patch_integrate_test.rb index c1e4070d..6b1a029c 100644 --- a/test/patch/sw_pg_patch_integrate_test.rb +++ b/test/patch/sw_pg_patch_integrate_test.rb @@ -47,6 +47,7 @@ def pg_dbo_integration_verification(sql, finished_spans) _(client_ancestors[2]).must_equal PG::Connection pg_client = PG::Connection.new + exporter.reset args = ['SELECT * FROM ABC;'] From b7f4e5c5e2ea956d0a023b301f123bfd4f40e2f8 Mon Sep 17 00:00:00 2001 From: Xuan <112967240+xuan-cao-swi@users.noreply.github.com> Date: Thu, 4 Dec 2025 14:10:30 -0500 Subject: [PATCH 4/5] Apply suggestions from code review Co-authored-by: Lin Lin --- lib/solarwinds_apm/config.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/solarwinds_apm/config.rb b/lib/solarwinds_apm/config.rb index e102513d..c0b55863 100644 --- a/lib/solarwinds_apm/config.rb +++ b/lib/solarwinds_apm/config.rb @@ -181,12 +181,12 @@ def self.[]=(key, value) case key when :sampling_rate SolarWindsAPM.logger.warn do - '[Depreciated] sampling_rate is not a supported setting for SolarWindsAPM::Config.' + '[Deprecated] sampling_rate is not a supported setting for SolarWindsAPM::Config.' end when :sample_rate SolarWindsAPM.logger.warn do - '[Depreciated] sample_rate is not a supported setting for SolarWindsAPM::Config.' + '[Deprecated] sample_rate is not a supported setting for SolarWindsAPM::Config.' end when :transaction_settings From bad8bf04990cf351810c770fec5f717d61c4ad63 Mon Sep 17 00:00:00 2001 From: xuan-cao-swi Date: Fri, 5 Dec 2025 11:27:56 -0500 Subject: [PATCH 5/5] add test case --- lib/solarwinds_apm/sampling/token_bucket.rb | 108 ++++++-------------- test/sampling/token_bucket_test.rb | 44 ++++++++ 2 files changed, 77 insertions(+), 75 deletions(-) diff --git a/lib/solarwinds_apm/sampling/token_bucket.rb b/lib/solarwinds_apm/sampling/token_bucket.rb index f905a89e..fc79f9bf 100644 --- a/lib/solarwinds_apm/sampling/token_bucket.rb +++ b/lib/solarwinds_apm/sampling/token_bucket.rb @@ -13,47 +13,31 @@ class TokenBucket # Maximum value of a signed 32-bit integer MAX_INTERVAL = (2**31) - 1 + attr_reader :capacity, :rate, :interval, :tokens + def initialize(token_bucket_settings) - @lock = Mutex.new self.capacity = token_bucket_settings.capacity || 0 self.rate = token_bucket_settings.rate || 0 self.interval = token_bucket_settings.interval || MAX_INTERVAL - self.tokens = capacity - @stop_requested = false + self.tokens = @capacity @timer = nil end - # used call from update_settings e.g. bucket.update(bucket_settings) + # oboe sampler update_settings will update the token + # (thread safe as update_settings is guarded by mutex from oboe sampler) def update(settings) settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings)) end def update_from_hash(settings) - @lock.synchronize do - if settings[:capacity] - difference = settings[:capacity] - @capacity - @capacity = [0, settings[:capacity]].max - @tokens = (@tokens + difference).clamp(0, @capacity) - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated capacity: #{@capacity}, tokens: #{@tokens}, difference: #{difference}" } - end - - if settings[:rate] - @rate = [0, settings[:rate]].max - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated rate: #{@rate}" } - end - - if settings[:interval] - @interval = settings[:interval].clamp(0, MAX_INTERVAL) - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Updated interval: #{@interval}ms" } - end + if settings[:capacity] + difference = settings[:capacity] - @capacity + self.capacity = settings[:capacity] + self.tokens = @tokens + difference end - if settings[:interval] && running - stop - start - end - - start unless running + self.rate = settings[:rate] if settings[:rate] + start end def tb_to_hash(settings) @@ -62,83 +46,59 @@ def tb_to_hash(settings) interval: settings.interval } end - def capacity - @lock.synchronize { @capacity } - end - def capacity=(capacity) - @lock.synchronize { @capacity = [0, capacity].max } - end - - def rate - @lock.synchronize { @rate } + @capacity = [0, capacity].max end def rate=(rate) - @lock.synchronize { @rate = [0, rate].max } - end - - def interval - @lock.synchronize { @interval } + @rate = [0, rate].max end + # self.interval= sets the @interval and @sleep_interval + # @sleep_interval is used in the timer thread to sleep between replenishing the bucket def interval=(interval) - @lock.synchronize { @interval = interval.clamp(0, MAX_INTERVAL) } - end - - def tokens - @lock.synchronize { @tokens } + @interval = interval.clamp(0, MAX_INTERVAL) + @sleep_interval = @interval / 1000.0 end def tokens=(tokens) - @lock.synchronize { @tokens = tokens.clamp(0, @capacity) } + @tokens = tokens.clamp(0, @capacity) end # Attempts to consume tokens from the bucket # @param n [Integer] Number of tokens to consume # @return [Boolean] Whether there were enough tokens + # TODO: we need to include thread-safety here since sampler is shared across threads + # and we may have multiple threads trying to consume tokens at the same time def consume(token = 1) - @lock.synchronize do - if @tokens >= token - @tokens -= token - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} tokens, remaining: #{@tokens}/#{@capacity} (#{(@tokens.to_f / @capacity * 100).round(1)}%)" } - true - else - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } - false - end + if @tokens >= token + self.tokens = @tokens - token + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } + true + else + SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } + false end end # Starts replenishing the bucket def start - @lock.synchronize do - return if running - - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Starting replenishment timer (interval: #{@interval}ms, rate: #{@rate})" } - @stop_requested = false - end + return if running @timer = Thread.new do loop do - break if @stop_requested - task - sleep(@interval / 1000.0) + sleep(@sleep_interval) end end end # Stops replenishing the bucket def stop - @lock.synchronize do - return unless running + return unless running - SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] Stopping replenishment timer" } - @stop_requested = true - end - @timer.join # Wait for clean exit - @lock.synchronize { @timer = nil } + @timer.kill + @timer = nil end # Whether the bucket is actively being replenished @@ -149,9 +109,7 @@ def running private def task - @lock.synchronize do - @tokens = [@tokens + @rate, @capacity].min - end + self.tokens = tokens + @rate end end end diff --git a/test/sampling/token_bucket_test.rb b/test/sampling/token_bucket_test.rb index 8042c8eb..9215a95a 100644 --- a/test/sampling/token_bucket_test.rb +++ b/test/sampling/token_bucket_test.rb @@ -76,4 +76,48 @@ refute bucket.consume end + + describe 'when a process fork occurs' do + it 'creates new timer thread when update is called' do + bucket = SolarWindsAPM::TokenBucket.new(SolarWindsAPM::TokenBucketSettings.new(8, 2, 10)) + bucket.start + + parent_timer_id = bucket.instance_variable_get(:@timer).object_id + parent_pid = Process.pid + + read_pipe, write_pipe = IO.pipe + + pid = fork do + read_pipe.close + # In the child process, call update which should create a new timer thread + bucket.update(rate: 3, capacity: 100) + + current_timer_id = bucket.instance_variable_get(:@timer).object_id + child_pid = Process.pid + + # Write results to parent + write_pipe.puts "#{child_pid},#{current_timer_id},#{bucket.running}" + write_pipe.close + + bucket.stop + exit! + end + + write_pipe.close + result = read_pipe.read + read_pipe.close + Process.wait(pid) + + child_pid, current_timer_id, is_running = result.strip.split(',') + child_pid = child_pid.to_i + current_timer_id = current_timer_id.to_i + + # Verify that we're in a different process and have a new timer thread + refute_equal parent_pid, child_pid + refute_equal parent_timer_id, current_timer_id + assert_equal 'true', is_running + + bucket.stop + end + end end