From d613953350dba6aa017b6bad6eca59fade746059 Mon Sep 17 00:00:00 2001 From: Dan Mayer Date: Fri, 28 Feb 2025 12:05:48 -0700 Subject: [PATCH 1/3] try fill lock via meta protocol improve support for meta flags and fetch_with_lock --- lib/dalli/client.rb | 58 ++++++ lib/dalli/protocol/base.rb | 2 +- lib/dalli/protocol/meta.rb | 8 +- lib/dalli/protocol/meta/response_processor.rb | 43 +++- test/integration/test_operations.rb | 194 +++++++++++++++++- 5 files changed, 287 insertions(+), 18 deletions(-) diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index a6133d8c..dc29b7f5 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -10,6 +10,8 @@ module Dalli ## # rubocop:disable Metrics/ClassLength class Client + LOCK_TTL = 5 # seconds, default lock TTL + FILL_LOCK_INTERVAL = 0.01 # seconds, default fill lock interval ## # Dalli::Client is the main class which developers will use to interact with # the memcached server. Usage: @@ -148,6 +150,46 @@ def fetch(key, ttl = nil, req_options = nil) new_val end + # Fetch the value associated with the key, along with a lock. + # If a value is found, then it is returned. + # + # If a value is not found and no block is given, then nil is returned. + # + # If a value is not found (or if the found value is nil and :cache_nils is false) + # and a block is given, the block will be invoked and its return value + # written to the cache and returned. + # rubocop:disable Metrics/AbcSize + # rubocop:disable Metrics/CyclomaticComplexity + # rubocop:disable Metrics/PerceivedComplexity + def fetch_with_lock(key, ttl = nil, req_options = nil) + req_options = {} if req_options.nil? + clean_req_options = cache_nils ? req_options.merge(CACHE_NILS) : req_options + lock_ttl, fill_lock_interval, lock_wait_end_time = get_lock_options(req_options) + + req_options = clean_req_options.dup + req_options[:meta_flags] ||= [] + req_options[:meta_flags] << "N#{lock_ttl}" + + loop do + val, meta_flags = get(key, req_options) + + if meta_flags[:w] + new_val = yield + set(key, new_val, ttl_or_default(ttl), clean_req_options) + return new_val + elsif meta_flags[:z] && (!val.nil? || val != '') + break if Time.now.to_f >= lock_wait_end_time + elsif val + return val + end + sleep(fill_lock_interval) + end + yield # fails to read value in wait time, yield back the value + end + # rubocop:enable Metrics/AbcSize + # rubocop:enable Metrics/CyclomaticComplexity + # rubocop:enable Metrics/PerceivedComplexity + ## # compare and swap values using optimistic locking. # Fetch the existing value for key. @@ -390,6 +432,22 @@ def with private + def get_lock_options(req_options) + lock_ttl = req_options.delete(:lock_ttl) || LOCK_TTL + fill_lock_interval = req_options.delete(:fill_lock_interval) || FILL_LOCK_INTERVAL + + raise ArgumentError, 'lock_ttl must be a positive integer' if !lock_ttl.is_a?(Integer) && lock_ttl <= 1 + + if fill_lock_interval.is_a?(Numeric) && fill_lock_interval <= 0 + raise ArgumentError, + 'fill_lock_interval must be a positive number' + end + + lock_wait_end_time = Time.now.to_f + lock_ttl + + [lock_ttl, fill_lock_interval, lock_wait_end_time] + end + def check_positive!(amt) raise ArgumentError, "Positive values only: #{amt}" if amt.negative? end diff --git a/lib/dalli/protocol/base.rb b/lib/dalli/protocol/base.rb index 2fb0fc36..dcaf7c02 100644 --- a/lib/dalli/protocol/base.rb +++ b/lib/dalli/protocol/base.rb @@ -150,10 +150,10 @@ def quiet? alias multi? quiet? # NOTE: Additional public methods should be overridden in Dalli::Threadsafe + ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze private - ALLOWED_QUIET_OPS = %i[add replace set delete incr decr append prepend flush noop].freeze def verify_allowed_quiet!(opkey) return if ALLOWED_QUIET_OPS.include?(opkey) diff --git a/lib/dalli/protocol/meta.rb b/lib/dalli/protocol/meta.rb index 9b061d88..416ef591 100644 --- a/lib/dalli/protocol/meta.rb +++ b/lib/dalli/protocol/meta.rb @@ -102,7 +102,8 @@ def get(key, options = nil) if !meta_options && !base64 && !quiet? && @value_marshaller.raw_by_default response_processor.meta_get_with_value(cache_nils: cache_nils?(options), skip_flags: true) elsif meta_options - response_processor.meta_get_with_value_and_meta_flags(cache_nils: cache_nils?(options)) + response_processor.meta_get_with_value_and_meta_flags(cache_nils: cache_nils?(options), + meta_flags: meta_options) else response_processor.meta_get_with_value(cache_nils: cache_nils?(options)) end @@ -122,8 +123,9 @@ def gat(key, ttl, options = nil) req = RequestFormatter.meta_get(key: encoded_key, ttl: ttl, base64: base64, meta_flags: meta_flag_options(options)) write(req) - if meta_flag_options(options) - response_processor.meta_get_with_value_and_meta_flags(cache_nils: cache_nils?(options)) + if (meta_options = meta_flag_options(options)) + response_processor.meta_get_with_value_and_meta_flags(cache_nils: cache_nils?(options), + meta_flags: meta_options) else response_processor.meta_get_with_value(cache_nils: cache_nils?(options)) end diff --git a/lib/dalli/protocol/meta/response_processor.rb b/lib/dalli/protocol/meta/response_processor.rb index 6bb0a011..ba1317e5 100644 --- a/lib/dalli/protocol/meta/response_processor.rb +++ b/lib/dalli/protocol/meta/response_processor.rb @@ -50,11 +50,11 @@ def meta_get_with_value_and_cas [@value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)), cas] end - def meta_get_with_value_and_meta_flags(cache_nils: false) + def meta_get_with_value_and_meta_flags(cache_nils: false, meta_flags: []) tokens = error_on_unexpected!([VA, EN, HD]) return [(cache_nils ? ::Dalli::NOT_FOUND : nil), {}] if tokens.first == EN - meta_flags = meta_flags_from_tokens(tokens) + meta_flags = meta_flags_from_tokens(tokens, meta_flags) return [(cache_nils ? ::Dalli::NOT_FOUND : nil), meta_flags] unless tokens.first == VA value, bitflag = @value_marshaller.retrieve(read_data(tokens[1].to_i), bitflags_from_tokens(tokens)) @@ -192,14 +192,39 @@ def error_on_unexpected!(expected_codes) raise Dalli::DalliError, "Response error: #{tokens.first}" end - def meta_flags_from_tokens(tokens) - { - c: cas_from_tokens(tokens), - h: hit_from_tokens(tokens), - l: last_accessed_from_tokens(tokens), - t: ttl_remaining_from_tokens(tokens) - } + # rubocop:disable Metrics/MethodLength + def meta_flags_from_tokens(tokens, meta_flags) + flag_values = {} + + meta_flags.each do |flag| + if flag == :c + flag_values[:c] = cas_from_tokens(tokens) + next + end + + if flag == :h + flag_values[:h] = hit_from_tokens(tokens) + next + end + + if flag == :l + flag_values[:l] = last_accessed_from_tokens(tokens) + next + end + + if flag == :t + flag_values[:t] = ttl_remaining_from_tokens(tokens) + next + end + + if flag.match?(/^N\d+$/) + flag_values[:w] = tokens.any?('W') + flag_values[:z] = tokens.any?('Z') + end + end + flag_values end + # rubocop:enable Metrics/MethodLength def bitflags_from_tokens(tokens) value_from_tokens(tokens, 'f')&.to_i diff --git a/test/integration/test_operations.rb b/test/integration/test_operations.rb index c4fca2ed..f91a3927 100644 --- a/test/integration/test_operations.rb +++ b/test/integration/test_operations.rb @@ -56,18 +56,52 @@ val1 = 'meta' dc.set('meta_key', val1) - val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h t]) + val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h t c]) assert_equal val1, val2 # not yet hit, and last accessed 0 from set - assert_equal({ c: 0, l: 0, h: false, t: -1, bitflag: nil }.sort, meta_flags.sort) + assert meta_flags.delete(:c) + assert_equal({ l: 0, h: false, t: -1, bitflag: nil }.sort, meta_flags.sort) sleep 1 # we can't simulate time in memcached so we need to sleep # ensure hit true and last accessed 1 - val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h t]) + val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h t c]) assert_equal val1, val2 - assert_equal({ c: 0, l: 1, h: true, t: -1, bitflag: nil }.sort, meta_flags.sort) + assert meta_flags.delete(:c) + assert_equal({ l: 1, h: true, t: -1, bitflag: nil }.sort, meta_flags.sort) + + assert op_addset_succeeds(dc.set('meta_key', nil)) + assert_nil dc.get('meta_key') + end + end + + it 'return the value and the sparce meta flags' do + memcached_persistent do |dc| + dc.flush + + val1 = 'meta' + dc.set('meta_key', val1) + val2, meta_flags = dc.get('meta_key', meta_flags: %i[h t]) + + assert_equal val1, val2 + # not yet hit, and last accessed 0 from set + assert_equal({ h: false, t: -1, bitflag: nil }.sort, meta_flags.sort) + + val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h]) + + assert_equal val1, val2 + assert_equal({ l: 0, h: true, bitflag: nil }.sort, meta_flags.sort) + + val2, meta_flags = dc.get('meta_key', meta_flags: %i[l h t N10]) + + assert_equal val1, val2 + assert_equal({ l: 0, h: true, t: -1, bitflag: nil, w: false, z: false }.sort, meta_flags.sort) + + val2, meta_flags = dc.get('meta_key', meta_flags: %i[N10]) + + assert_equal val1, val2 + assert_equal({ bitflag: nil, w: false, z: false }.sort, meta_flags.sort) assert op_addset_succeeds(dc.set('meta_key', nil)) assert_nil dc.get('meta_key') @@ -180,7 +214,7 @@ meta_flags = response.last assert_equal val1, val2 - assert_equal({ c: 0, l: 0, h: true, t: 10, bitflag: nil }, meta_flags) + assert_equal({ l: 0, h: true, t: 10, bitflag: nil }.sort, meta_flags.sort) assert op_addset_succeeds(dc.set('meta_key', nil)) assert_nil dc.get('meta_key') @@ -373,6 +407,156 @@ end end + describe 'fetch_with_lock' do + it 'uses default lock TTL and fill lock interval when no options provided' do + memcached_persistent do |dc| + dc.flush + start_time = Time.now.to_f + executed = false + value = dc.fetch_with_lock('lock_key') do + executed = true + 'new_value' + end + end_time = Time.now.to_f + + assert executed + assert_equal 'new_value', value + # Verify execution time is within expected range for default TTL + assert_operator (end_time - start_time), :<, Dalli::Client::LOCK_TTL + end + end + + it 'respects custom lock TTL' do + memcached_persistent do |dc| + dc.flush + custom_ttl = 2 # shorter than default + start_time = Time.now.to_f + executed = false + value = dc.fetch_with_lock('lock_key', nil, { lock_ttl: custom_ttl }) do + executed = true + 'new_value' + end + end_time = Time.now.to_f + + assert executed + assert_equal 'new_value', value + # Verify execution time is within expected range for custom TTL + assert_operator (end_time - start_time), :<, custom_ttl + end + end + + it 'respects custom fill lock interval' do + memcached_persistent do |dc| + dc.flush + custom_interval = 0.05 # longer than default + start_time = Time.now.to_f + executed = false + key = 'custom_fill_lock_key' + + # Set up a concurrent update to force waiting + Thread.new do + dc.fetch_with_lock(key, 1) do + sleep(0.1) # Hold the lock briefly + 'other_value' + end + end + + # Small delay to ensure the other thread gets the lock first + sleep(1.5) + + value = dc.fetch_with_lock(key, 1, { fill_lock_interval: custom_interval }) do + executed = true + 'new_value' + end + end_time = Time.now.to_f + + assert executed + assert_equal 'new_value', value + # Verify we waited at least one interval + assert_operator (end_time - start_time), :>, custom_interval + end + end + + it 'supports both custom lock TTL and fill lock interval' do + memcached_persistent do |dc| + dc.flush + custom_ttl = 3 + custom_interval = 0.05 + executed = false + value = dc.fetch_with_lock('custom_lock_key', nil, { + lock_ttl: custom_ttl, + fill_lock_interval: custom_interval + }) do + executed = true + 'new_value' + end + + assert executed + assert_equal 'new_value', value + assert_equal 'new_value', dc.get('custom_lock_key') + end + end + + it 'raises when givne a bad argument for both custom lock TTL and fill lock interval' do + memcached_persistent do |dc| + dc.flush + custom_ttl = 3 + custom_interval = 0.05 + + assert_raises ArgumentError do + dc.fetch_with_lock('custom_lock_key', nil, { + lock_ttl: 0.1, + fill_lock_interval: custom_interval + }) do + 'new_value' + end + end + + assert_raises ArgumentError do + dc.fetch_with_lock('custom_lock_key', nil, { + lock_ttl: custom_ttl, + fill_lock_interval: -0.1 + }) do + 'new_value' + end + end + end + end + + it 'times out and yields after lock TTL expires' do + memcached_persistent do |dc| + dc.flush + custom_ttl = 1 # Very short TTL + start_time = Time.now.to_f + + # Set up a concurrent update that holds the lock + Thread.new do + dc.fetch_with_lock('lock_key') do + sleep(1.4) # Hold the lock longer than the TTL + 'holding_value' + end + end + + # Small delay to ensure the other thread gets the lock first + sleep(0.05) + + executed = false + value = dc.fetch_with_lock('lock_key', nil, { lock_ttl: custom_ttl }) do + executed = true + 'timeout_value' + end + end_time = Time.now.to_f + + assert executed + assert_equal 'timeout_value', value + # Verify we waited at least the TTL + assert_operator (end_time - start_time), :>, custom_ttl + # But not too much longer + assert_operator (end_time - start_time), :<, custom_ttl + 0.1 + end + end + end + describe 'incr/decr' do it 'supports incrementing and decrementing existing values' do memcached_persistent do |client| From f2e5a731aba2f0cc7bb07575ea13d0de75acc3db Mon Sep 17 00:00:00 2001 From: Dan Mayer Date: Tue, 4 Mar 2025 12:11:13 -0700 Subject: [PATCH 2/3] bump ubuntu version --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index afed8ed3..f74ed298 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -4,7 +4,7 @@ on: [push, pull_request] jobs: test: - runs-on: ubuntu-20.04 + runs-on: ubuntu-latest strategy: fail-fast: false From efb8083df207a36e58f68ffb8bd9f361039bfc8f Mon Sep 17 00:00:00 2001 From: Dan Mayer Date: Mon, 10 Mar 2025 11:11:29 -0600 Subject: [PATCH 3/3] address feedback --- lib/dalli/client.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/dalli/client.rb b/lib/dalli/client.rb index dc29b7f5..8bee531f 100644 --- a/lib/dalli/client.rb +++ b/lib/dalli/client.rb @@ -173,15 +173,16 @@ def fetch_with_lock(key, ttl = nil, req_options = nil) loop do val, meta_flags = get(key, req_options) - if meta_flags[:w] + if val && val != '' + return val + elsif meta_flags[:w] new_val = yield set(key, new_val, ttl_or_default(ttl), clean_req_options) return new_val - elsif meta_flags[:z] && (!val.nil? || val != '') + elsif meta_flags[:z] break if Time.now.to_f >= lock_wait_end_time - elsif val - return val end + sleep(fill_lock_interval) end yield # fails to read value in wait time, yield back the value @@ -436,7 +437,7 @@ def get_lock_options(req_options) lock_ttl = req_options.delete(:lock_ttl) || LOCK_TTL fill_lock_interval = req_options.delete(:fill_lock_interval) || FILL_LOCK_INTERVAL - raise ArgumentError, 'lock_ttl must be a positive integer' if !lock_ttl.is_a?(Integer) && lock_ttl <= 1 + raise ArgumentError, 'lock_ttl must be a positive integer' if !lock_ttl.is_a?(Integer) && lock_ttl < 1 if fill_lock_interval.is_a?(Numeric) && fill_lock_interval <= 0 raise ArgumentError,