-
Notifications
You must be signed in to change notification settings - Fork 3
NH-124861: redo token calculation #238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
b942e60
5c4633a
9531249
bd6d513
7fef008
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -50,6 +50,7 @@ rubocop_result.txt | |
| git_push.sh | ||
| test_run.sh | ||
| test.js | ||
| review.txt | ||
|
|
||
| # act.secrets | ||
| act.secrets | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,8 +38,6 @@ def initialize(logger) | |
| } | ||
| @settings = {} # parsed setting from swo backend | ||
| @settings_mutex = ::Mutex.new | ||
|
|
||
| @buckets.each_value(&:start) | ||
| end | ||
|
|
||
| # return sampling result | ||
|
|
@@ -289,9 +287,9 @@ def disabled_algo(sample_state) | |
| end | ||
|
|
||
| def update_settings(settings) | ||
| return unless settings[:timestamp] > (@settings[:timestamp] || 0) | ||
|
|
||
| @settings_mutex.synchronize do | ||
| return unless settings[:timestamp] > (@settings[:timestamp] || 0) | ||
|
|
||
| @settings = settings | ||
| @buckets.each do |type, bucket| | ||
| bucket.update(@settings[:buckets][type]) if @settings[:buckets][type] | ||
|
|
@@ -324,18 +322,20 @@ def sw_from_span_and_decision(parent_span, otel_decision) | |
| end | ||
|
|
||
| def get_settings(params) | ||
| return if @settings.empty? | ||
|
|
||
| expiry = (@settings[:timestamp] + @settings[:ttl]) * 1000 | ||
| time_now = Time.now.to_i * 1000 | ||
| if time_now > expiry | ||
| @logger.debug { "[#{self.class}/#{__method__}] settings expired, removing" } | ||
| @settings = {} | ||
| return | ||
| @settings_mutex.synchronize do | ||
| return if @settings.empty? | ||
xuan-cao-swi marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was looking with copilot help on the question of early returns from a mutex synchronized block, whether that would release the lock. Seems yes (at least for MRI) but also seems |
||
|
|
||
| expiry = (@settings[:timestamp] + @settings[:ttl]) * 1000 | ||
| time_now = Time.now.to_i * 1000 | ||
| if time_now > expiry | ||
| @logger.debug { "[#{self.class}/#{__method__}] settings expired, removing" } | ||
| @settings = {} | ||
| return | ||
| end | ||
| sampling_setting = SolarWindsAPM::SamplingSettings.merge(@settings, local_settings(params)) | ||
| @logger.debug { "[#{self.class}/#{__method__}] sampling_setting: #{sampling_setting.inspect}" } | ||
| sampling_setting | ||
| end | ||
| sampling_setting = SolarWindsAPM::SamplingSettings.merge(@settings, local_settings(params)) | ||
| @logger.debug { "[#{self.class}/#{__method__}] sampling_setting: #{sampling_setting.inspect}" } | ||
| sampling_setting | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,126 +10,88 @@ | |
| # capacity is updated through update_settings | ||
| module SolarWindsAPM | ||
| class TokenBucket | ||
| # Maximum value of a signed 32-bit integer | ||
| MAX_INTERVAL = (2**31) - 1 | ||
|
|
||
| attr_reader :capacity, :rate, :interval, :tokens, :type | ||
| attr_reader :type | ||
|
|
||
| def initialize(token_bucket_settings) | ||
| self.capacity = token_bucket_settings.capacity || 0 | ||
| self.rate = token_bucket_settings.rate || 0 | ||
| self.interval = token_bucket_settings.interval || MAX_INTERVAL | ||
| self.tokens = @capacity | ||
| @capacity = token_bucket_settings.capacity || 0 | ||
| @rate = token_bucket_settings.rate || 0 | ||
| @tokens = @capacity | ||
| @last_update_time = Time.now.to_f | ||
| @type = token_bucket_settings.type | ||
| @timer = nil | ||
| @lock = ::Mutex.new | ||
| end | ||
|
|
||
| # oboe sampler update_settings will update the token | ||
| # (thread safe as update_settings is guarded by mutex from oboe sampler) | ||
| def update(settings) | ||
| settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_token_bucket_settings(settings) | ||
| def capacity | ||
| @lock.synchronize { @capacity } | ||
| end | ||
|
|
||
| def update_from_hash(settings) | ||
| if settings[:capacity] | ||
| difference = settings[:capacity] - @capacity | ||
| self.capacity = settings[:capacity] | ||
| self.tokens = @tokens + difference | ||
| end | ||
|
|
||
| self.rate = settings[:rate] if settings[:rate] | ||
|
|
||
| return unless settings[:interval] | ||
|
|
||
| self.interval = settings[:interval] | ||
| return unless running | ||
|
|
||
| stop | ||
| start | ||
| def rate | ||
| @lock.synchronize { @rate } | ||
| end | ||
|
|
||
| def update_from_token_bucket_settings(settings) | ||
| if settings.capacity | ||
| difference = settings.capacity - @capacity | ||
| self.capacity = settings.capacity | ||
| self.tokens = @tokens + difference | ||
| def tokens | ||
| @lock.synchronize do | ||
| calculate_tokens | ||
| @tokens | ||
| end | ||
|
|
||
| self.rate = settings.rate if settings.rate | ||
|
|
||
| return unless settings.interval | ||
|
|
||
| self.interval = settings.interval | ||
| return unless running | ||
|
|
||
| stop | ||
| start | ||
| end | ||
|
|
||
| def capacity=(capacity) | ||
| @capacity = [0, capacity].max | ||
| end | ||
|
|
||
| def rate=(rate) | ||
| @rate = [0, rate].max | ||
| end | ||
|
|
||
| # self.interval= sets the @interval and @sleep_interval | ||
| # @sleep_interval is used in the timer thread to sleep between replenishing the bucket | ||
| def interval=(interval) | ||
| @interval = interval.clamp(0, MAX_INTERVAL) | ||
| @sleep_interval = @interval / 1000.0 | ||
| end | ||
|
|
||
| def tokens=(tokens) | ||
| @tokens = tokens.clamp(0, @capacity) | ||
| # oboe sampler update_settings will update the token | ||
| def update(settings) | ||
| settings.instance_of?(Hash) ? update_from_hash(settings) : update_from_hash(tb_to_hash(settings)) | ||
| end | ||
|
|
||
| # Attempts to consume tokens from the bucket | ||
| # @param n [Integer] Number of tokens to consume | ||
| # @param token [Integer] Number of tokens to consume | ||
xuan-cao-swi marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| # @return [Boolean] Whether there were enough tokens | ||
| # TODO: we need to include thread-safety here since sampler is shared across threads | ||
| # and we may have multiple threads trying to consume tokens at the same time | ||
| def consume(token = 1) | ||
| if @tokens >= token | ||
| self.tokens = @tokens - token | ||
| SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } | ||
| true | ||
| else | ||
| SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } | ||
| false | ||
| end | ||
| end | ||
|
|
||
| # Starts replenishing the bucket | ||
| def start | ||
| return if running | ||
|
|
||
| @timer = Thread.new do | ||
| loop do | ||
| task | ||
| sleep(@sleep_interval) | ||
| @lock.synchronize do | ||
| calculate_tokens | ||
| if @tokens >= token | ||
| @tokens -= token | ||
| SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Consumed #{token} from total #{@tokens} (#{(@tokens.to_f / @capacity * 100).round(1)}% remaining)" } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor, the "from total " would show remaining tokens and imho misleading, i would remove this bit. |
||
| true | ||
| else | ||
| SolarWindsAPM.logger.debug { "[#{self.class}/#{__method__}] #{@type} Token consumption failed: requested=#{token}, available=#{@tokens}, capacity=#{@capacity}" } | ||
| false | ||
| end | ||
| end | ||
| end | ||
|
|
||
| # Stops replenishing the bucket | ||
| def stop | ||
| return unless running | ||
| private | ||
|
|
||
| @timer.kill | ||
| @timer = nil | ||
| def calculate_tokens | ||
| now = Time.now.to_f | ||
| elapsed = now - @last_update_time | ||
| @last_update_time = now | ||
| @tokens += elapsed * @rate | ||
|
||
| @tokens = [@tokens, @capacity].min | ||
| end | ||
|
|
||
| # Whether the bucket is actively being replenished | ||
| def running | ||
| !@timer.nil? | ||
| end | ||
| # settings is from json sampler | ||
| def update_from_hash(settings) | ||
| @lock.synchronize do | ||
| calculate_tokens | ||
|
|
||
| if settings[:capacity] | ||
| new_capacity = [0, settings[:capacity]].max | ||
| difference = new_capacity - @capacity | ||
| @capacity = new_capacity | ||
| @tokens += difference | ||
| @tokens = [0, @tokens].max | ||
| end | ||
|
|
||
| private | ||
| @rate = [0, settings[:rate]].max if settings[:rate] | ||
| end | ||
| end | ||
|
|
||
| def task | ||
| self.tokens = tokens + @rate | ||
| # settings is from http sampler | ||
| def tb_to_hash(settings) | ||
| tb_hash = {} | ||
| tb_hash[:capacity] = settings.capacity if settings.respond_to?(:capacity) | ||
| tb_hash[:rate] = settings.rate if settings.respond_to?(:rate) | ||
| tb_hash[:type] = settings.type if settings.respond_to?(:type) | ||
| tb_hash | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,19 +60,10 @@ RUN echo 'alias be="bundle exec"' >> ~/.profile | |
| # install rubies to build our gem against | ||
| RUN . ~/.profile \ | ||
| && cd /root/.rbenv/plugins/ruby-build && git pull && cd - \ | ||
| && rbenv install 3.1.0 | ||
| && rbenv install 3.2.6 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still have minimum required ruby version as 3.1.0. Is this Dockerfile used for adhoc local tests and should it really not test against 3.1.0? |
||
|
|
||
| RUN echo 'gem: --no-document' >> ~/.gemrc | ||
|
|
||
| # install swig 4.0.2 | ||
| RUN curl -SL https://github.com/swig/swig/archive/refs/tags/v4.0.2.tar.gz \ | ||
| | tar xzC /tmp \ | ||
| && cd /tmp/swig-4.0.2 \ | ||
| && ./autogen.sh \ | ||
| && ./configure && make && make install \ | ||
| && cd - \ | ||
| && rm -rf /tmp/swig-4.0.2 | ||
|
|
||
| # set up github package credentials for pushing | ||
| RUN mkdir ~/.gem \ | ||
| && echo "---\n:github: Bearer ${BUNDLE_RUBYGEMS__PKG__GITHUB__COM}" >> ~/.gem/credentials \ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.