From 514c3dbfc3069811ce89f3549e6aec83901e5069 Mon Sep 17 00:00:00 2001 From: Sammy Steele Date: Mon, 16 Mar 2026 23:41:58 +0000 Subject: [PATCH] fix: address review findings for leader election recovery - Make current_generation public so BuildRecord can access it - Remove duplicate generation_key/learn_generation from Worker (inherit from Base) - Base generation_key now raises instead of silent fallback - Fence push() with Lua script to prevent stale master overwrite - Add lock refresh during population to prevent TTL expiry - Supervisor rescues MasterDied alongside LostMaster - wait_for_master detects immediate nil status (no prior 'setup' needed) - Move generation_stale? check to idle path to reduce Redis GETs - Increase default master_lock_ttl from 30s to 120s - Add 8 tests covering election retry, fenced push, generation staleness, and error handling paths --- redis/push.lua | 32 ++++++ redis/refresh_master_lock.lua | 9 ++ ruby/lib/ci/queue/configuration.rb | 2 +- ruby/lib/ci/queue/redis/base.rb | 25 +++-- ruby/lib/ci/queue/redis/supervisor.rb | 2 +- ruby/lib/ci/queue/redis/worker.rb | 74 +++++++------ ruby/test/ci/queue/redis_test.rb | 152 ++++++++++++++++++++++++++ 7 files changed, 252 insertions(+), 44 deletions(-) create mode 100644 redis/push.lua create mode 100644 redis/refresh_master_lock.lua diff --git a/redis/push.lua b/redis/push.lua new file mode 100644 index 0000000..67e72f7 --- /dev/null +++ b/redis/push.lua @@ -0,0 +1,32 @@ +local master_status_key = KEYS[1] +local queue_key = KEYS[2] +local total_key = KEYS[3] +local current_generation_key = KEYS[4] + +local expected_lock_value = ARGV[1] +local total_count = ARGV[2] +local generation_uuid = ARGV[3] +local redis_ttl = ARGV[4] + +if redis.call('GET', master_status_key) ~= expected_lock_value then + return 0 +end + +if #ARGV > 4 then + local tests = {} + for i = 5, #ARGV do + tests[#tests + 1] = ARGV[i] + end + redis.call('LPUSH', queue_key, unpack(tests)) +end + +redis.call('SET', total_key, total_count) +redis.call('SET', master_status_key, 'ready') +redis.call('SET', current_generation_key, generation_uuid) + +redis.call('EXPIRE', queue_key, redis_ttl) +redis.call('EXPIRE', total_key, redis_ttl) +redis.call('EXPIRE', master_status_key, redis_ttl) +redis.call('EXPIRE', current_generation_key, redis_ttl) + +return 1 diff --git a/redis/refresh_master_lock.lua b/redis/refresh_master_lock.lua new file mode 100644 index 0000000..117a99f --- /dev/null +++ b/redis/refresh_master_lock.lua @@ -0,0 +1,9 @@ +local master_status_key = KEYS[1] +local expected_value = ARGV[1] +local ttl = ARGV[2] + +if redis.call('GET', master_status_key) == expected_value then + redis.call('SET', master_status_key, expected_value, 'EX', ttl) + return 1 +end +return 0 diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 8882f94..a4620d2 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -68,7 +68,7 @@ def initialize( timing_redis_url: nil, heartbeat_grace_period: 30, heartbeat_interval: 10, - master_lock_ttl: 30, + master_lock_ttl: 120, max_election_attempts: 3 ) @build_id = build_id diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 221ac94..08db082 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -19,6 +19,7 @@ def initialize(redis_url, config) end def exhausted? + return false unless current_generation queue_initialized? && size == 0 end @@ -70,8 +71,12 @@ def wait_for_master(timeout: 120) # Master lock expired during setup (died mid-population) # Status will be nil if lock expired - if status.nil? && last_status == 'setup' - raise MasterDied, "Master lock expired during setup - master may have died" + if status.nil? + if last_status == 'setup' + raise MasterDied, "Master lock expired during setup - master may have died" + elsif !redis.exists?(key('current-generation')) + raise MasterDied, "No master has completed setup" + end end last_status = status @@ -114,6 +119,16 @@ def master_worker_id redis.get(key('master-worker-id')) end + def current_generation + @generation || @current_generation + end + + def generation_stale? + return false unless @current_generation + current = redis.get(key('current-generation')) + current && current != @current_generation + end + private attr_reader :redis, :redis_url @@ -135,7 +150,7 @@ def master_status def generation_key(*args) gen = @generation || @current_generation - return key(*args) unless gen # Fallback for backwards compatibility + raise "Generation not set - call learn_generation first" unless gen key('gen', gen, *args) end @@ -145,10 +160,6 @@ def learn_generation @current_generation end - def current_generation - @generation || @current_generation - end - def eval_script(script, *args) redis.evalsha(load_script(script), *args) end diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index 9cdcdd6..ec815b3 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -45,7 +45,7 @@ def wait_for_workers puts "Aborting, it seems all workers died." if time_left_with_no_workers <= 0 exhausted? - rescue CI::Queue::Redis::LostMaster + rescue CI::Queue::Redis::LostMaster, CI::Queue::Redis::MasterDied false end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index a0852f4..1bcca08 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -51,6 +51,13 @@ def populate(tests, random: Random.new) store_chunk_metadata(chunks) if chunks.any? + # Refresh lock TTL before push to prevent expiry during population + unless refresh_master_lock + @master = false + warn "Lock expired during population — another master may have taken over" + next + end + all_ids = chunks.map(&:id) + individual_tests.map(&:id) push(all_ids) end @@ -106,12 +113,6 @@ def poll idle_state_printed = false attempt = 0 until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? - # Check for generation staleness - another master may have taken over - if generation_stale? - warn "Generation changed - queue was repopulated by new master. Exiting poll loop." - break - end - if id = reserve attempt = 0 idle_since = nil @@ -126,6 +127,12 @@ def poll acknowledge(id) end else + # Check for generation staleness when idle - another master may have taken over + if generation_stale? + warn "Generation changed - queue was repopulated by new master. Exiting poll loop." + break + end + idle_since ||= CI::Queue.time_now if CI::Queue.time_now - idle_since > 120 && !idle_state_printed puts "Worker #{worker_id} has been idle for 120 seconds. Printing global state..." @@ -289,25 +296,6 @@ def heartbeat(test_or_id = nil) attr_reader :index - def generation_key(*args) - gen = @generation || @current_generation - raise "Generation not set - call learn_generation first" unless gen - key('gen', gen, *args) - end - - def learn_generation - @current_generation = redis.get(key('current-generation')) - raise MasterDied, "No generation available - master may have died" unless @current_generation - @current_generation - end - - # Check if our cached generation is stale - def generation_stale? - return false unless @current_generation - current = redis.get(key('current-generation')) - current && current != @current_generation - end - # Runs a block while sending periodic heartbeats in a background thread. # This prevents other workers from stealing the test while it's being executed. def with_heartbeat(test_id) @@ -485,16 +473,22 @@ def push(tests) @total = tests.size if @master - redis.multi do |transaction| - transaction.lpush(generation_key('queue'), tests) unless tests.empty? - transaction.set(generation_key('total'), @total) - transaction.set(key('master-status'), 'ready') - transaction.set(key('current-generation'), @generation) - - transaction.expire(generation_key('queue'), config.redis_ttl) - transaction.expire(generation_key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) - transaction.expire(key('current-generation'), config.redis_ttl) + argv = [ + "setup:#{@generation}", + @total.to_s, + @generation, + config.redis_ttl.to_s, + ] + tests + + result = eval_script( + :push, + keys: [key('master-status'), generation_key('queue'), generation_key('total'), key('current-generation')], + argv: argv, + ) + + if result == 0 + @master = false + warn "Lock lost during push — another master took over (generation #{@generation})" end end rescue *CONNECTION_ERRORS @@ -543,6 +537,16 @@ def acquire_master_role? false end + def refresh_master_lock + eval_script( + :refresh_master_lock, + keys: [key('master-status')], + argv: ["setup:#{@generation}", config.master_lock_ttl.to_s], + ) == 1 + rescue *CONNECTION_ERRORS + false + end + def register_worker_presence register redis.expire(key('workers'), config.redis_ttl) diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index cca154a..d481ad6 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -657,6 +657,158 @@ def test_moving_average_updates_persist_across_workers assert_equal 5500.0, chunks.first.estimated_duration end + # --- Election recovery tests --- + + def test_master_death_triggers_re_election + build = 'election-death' + w1 = worker(1, build_id: build, master_lock_ttl: 1, max_election_attempts: 3) + assert_predicate w1, :master? + + # Simulate master death: delete the master-status key (lock expires) + @redis.del("build:#{build}:master-status") + + # Worker 2 should detect the dead master and become the new master + w2 = worker(2, build_id: build, master_lock_ttl: 1, max_election_attempts: 3) + assert_predicate w2, :master? + + # Both workers should be able to poll successfully + poll(w2) + assert_predicate w2, :exhausted? + end + + def test_fenced_push_rejects_stale_master + build = 'fenced-push' + w1 = worker(1, build_id: build, master_lock_ttl: 30) + assert_predicate w1, :master? + + # Overwrite master-status to simulate another master winning election + other_gen = 'other-generation-uuid' + @redis.set("build:#{build}:master-status", "setup:#{other_gen}", ex: 30) + + # w1 still thinks it's master, but push.lua should reject because lock value changed + w1.send(:push, ['ATest#test_foo']) + + # master-status should still be the other master's value + status = @redis.get("build:#{build}:master-status") + assert_equal "setup:#{other_gen}", status, "Fenced push should not overwrite another master's status" + end + + def test_generation_stale_exits_poll + build = 'gen-stale' + w1 = worker(1, build_id: build) + assert_predicate w1, :master? + + w2 = worker(2, build_id: build) + refute_predicate w2, :master? + + # Drain the queue with w1 so w2 sees no tests (idle), but not exhausted + poll(w1) + + # Change generation before w2 polls — w2 will be idle and detect staleness + @redis.set("build:#{build}:current-generation", "new-generation-uuid") + + tests_seen = [] + w2.poll do |test| + tests_seen << test + w2.acknowledge(test) + end + + assert_equal 0, tests_seen.size, "Worker should exit poll immediately when generation is stale" + end + + def test_learn_generation_raises_when_key_missing + build = 'learn-gen-missing' + w1 = worker(1, build_id: build) + assert_predicate w1, :master? + + # Delete the current-generation key + @redis.del("build:#{build}:current-generation") + + # A non-master worker trying to learn generation should raise MasterDied + assert_raises(CI::Queue::Redis::MasterDied) do + w2 = worker(2, build_id: build, populate: false) + w2.send(:learn_generation) + end + end + + def test_max_election_attempts_raises_lost_master + build = 'max-attempts' + + # Worker 1 wins master and starts setup, then dies (lock expires) + # We simulate this by having another process hold setup then expire + # To prevent the test worker from winning master, keep re-setting the key + # so it always sees "setup" then nil (death) + t = Thread.new do + loop do + # Keep setting a short-lived setup lock so the worker always sees a dying master + @redis.set("build:#{build}:master-status", "setup:dead-gen", px: 50) + sleep 0.06 + end + end + + assert_raises(CI::Queue::Redis::LostMaster) do + worker(1, build_id: build, max_election_attempts: 1, queue_init_timeout: 0.5) + end + ensure + t&.kill + end + + def test_build_record_reads_generation_scoped_requeue_key + build = 'requeue-gen' + w1 = worker(1, build_id: build, max_requeues: 1, requeue_tolerance: 1.0) + + w1.poll do |test| + w1.report_failure! + unless w1.requeue(test) + w1.acknowledge(test) + end + end + + requeues = w1.build.requeued_tests + refute_empty requeues, "Should have requeued at least one test" + end + + def test_supervisor_handles_master_died + build = 'supervisor-died' + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build, + worker_id: 'sup', + timeout: 0.2, + queue_init_timeout: 0.3, + timing_redis_url: @redis_url, + ) + ) + + # wait_for_workers should return false (not crash) when no master exists + result = supervisor.wait_for_workers + refute result, "Supervisor should return false when master never appeared" + end + + def test_wait_for_master_detects_immediate_nil_status + build = 'nil-status' + + w = CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build, + worker_id: '2', + timeout: 0.2, + queue_init_timeout: 0.5, + timing_redis_url: @redis_url, + ) + ) + + # Simulate that status was "setup" then expired (nil) + @redis.set("build:#{build}:master-status", "setup:some-gen", px: 1) + sleep 0.01 # Let it expire + + assert_raises(CI::Queue::Redis::MasterDied) do + w.send(:wait_for_master, timeout: 0.5) + end + end + private class MockTest