Skip to content

Commit 514c3db

Browse files
committed
fix: address review findings for leader election recovery
- Make current_generation public so BuildRecord can access it - Remove duplicate generation_key/learn_generation from Worker (inherit from Base) - Base generation_key now raises instead of silent fallback - Fence push() with Lua script to prevent stale master overwrite - Add lock refresh during population to prevent TTL expiry - Supervisor rescues MasterDied alongside LostMaster - wait_for_master detects immediate nil status (no prior 'setup' needed) - Move generation_stale? check to idle path to reduce Redis GETs - Increase default master_lock_ttl from 30s to 120s - Add 8 tests covering election retry, fenced push, generation staleness, and error handling paths
1 parent c41186e commit 514c3db

7 files changed

Lines changed: 252 additions & 44 deletions

File tree

redis/push.lua

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
local master_status_key = KEYS[1]
2+
local queue_key = KEYS[2]
3+
local total_key = KEYS[3]
4+
local current_generation_key = KEYS[4]
5+
6+
local expected_lock_value = ARGV[1]
7+
local total_count = ARGV[2]
8+
local generation_uuid = ARGV[3]
9+
local redis_ttl = ARGV[4]
10+
11+
if redis.call('GET', master_status_key) ~= expected_lock_value then
12+
return 0
13+
end
14+
15+
if #ARGV > 4 then
16+
local tests = {}
17+
for i = 5, #ARGV do
18+
tests[#tests + 1] = ARGV[i]
19+
end
20+
redis.call('LPUSH', queue_key, unpack(tests))
21+
end
22+
23+
redis.call('SET', total_key, total_count)
24+
redis.call('SET', master_status_key, 'ready')
25+
redis.call('SET', current_generation_key, generation_uuid)
26+
27+
redis.call('EXPIRE', queue_key, redis_ttl)
28+
redis.call('EXPIRE', total_key, redis_ttl)
29+
redis.call('EXPIRE', master_status_key, redis_ttl)
30+
redis.call('EXPIRE', current_generation_key, redis_ttl)
31+
32+
return 1

redis/refresh_master_lock.lua

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
local master_status_key = KEYS[1]
2+
local expected_value = ARGV[1]
3+
local ttl = ARGV[2]
4+
5+
if redis.call('GET', master_status_key) == expected_value then
6+
redis.call('SET', master_status_key, expected_value, 'EX', ttl)
7+
return 1
8+
end
9+
return 0

ruby/lib/ci/queue/configuration.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ def initialize(
6868
timing_redis_url: nil,
6969
heartbeat_grace_period: 30,
7070
heartbeat_interval: 10,
71-
master_lock_ttl: 30,
71+
master_lock_ttl: 120,
7272
max_election_attempts: 3
7373
)
7474
@build_id = build_id

ruby/lib/ci/queue/redis/base.rb

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ def initialize(redis_url, config)
1919
end
2020

2121
def exhausted?
22+
return false unless current_generation
2223
queue_initialized? && size == 0
2324
end
2425

@@ -70,8 +71,12 @@ def wait_for_master(timeout: 120)
7071

7172
# Master lock expired during setup (died mid-population)
7273
# Status will be nil if lock expired
73-
if status.nil? && last_status == 'setup'
74-
raise MasterDied, "Master lock expired during setup - master may have died"
74+
if status.nil?
75+
if last_status == 'setup'
76+
raise MasterDied, "Master lock expired during setup - master may have died"
77+
elsif !redis.exists?(key('current-generation'))
78+
raise MasterDied, "No master has completed setup"
79+
end
7580
end
7681

7782
last_status = status
@@ -114,6 +119,16 @@ def master_worker_id
114119
redis.get(key('master-worker-id'))
115120
end
116121

122+
def current_generation
123+
@generation || @current_generation
124+
end
125+
126+
def generation_stale?
127+
return false unless @current_generation
128+
current = redis.get(key('current-generation'))
129+
current && current != @current_generation
130+
end
131+
117132
private
118133

119134
attr_reader :redis, :redis_url
@@ -135,7 +150,7 @@ def master_status
135150

136151
def generation_key(*args)
137152
gen = @generation || @current_generation
138-
return key(*args) unless gen # Fallback for backwards compatibility
153+
raise "Generation not set - call learn_generation first" unless gen
139154
key('gen', gen, *args)
140155
end
141156

@@ -145,10 +160,6 @@ def learn_generation
145160
@current_generation
146161
end
147162

148-
def current_generation
149-
@generation || @current_generation
150-
end
151-
152163
def eval_script(script, *args)
153164
redis.evalsha(load_script(script), *args)
154165
end

ruby/lib/ci/queue/redis/supervisor.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ def wait_for_workers
4545

4646
puts "Aborting, it seems all workers died." if time_left_with_no_workers <= 0
4747
exhausted?
48-
rescue CI::Queue::Redis::LostMaster
48+
rescue CI::Queue::Redis::LostMaster, CI::Queue::Redis::MasterDied
4949
false
5050
end
5151

ruby/lib/ci/queue/redis/worker.rb

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ def populate(tests, random: Random.new)
5151

5252
store_chunk_metadata(chunks) if chunks.any?
5353

54+
# Refresh lock TTL before push to prevent expiry during population
55+
unless refresh_master_lock
56+
@master = false
57+
warn "Lock expired during population — another master may have taken over"
58+
next
59+
end
60+
5461
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
5562
push(all_ids)
5663
end
@@ -106,12 +113,6 @@ def poll
106113
idle_state_printed = false
107114
attempt = 0
108115
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
109-
# Check for generation staleness - another master may have taken over
110-
if generation_stale?
111-
warn "Generation changed - queue was repopulated by new master. Exiting poll loop."
112-
break
113-
end
114-
115116
if id = reserve
116117
attempt = 0
117118
idle_since = nil
@@ -126,6 +127,12 @@ def poll
126127
acknowledge(id)
127128
end
128129
else
130+
# Check for generation staleness when idle - another master may have taken over
131+
if generation_stale?
132+
warn "Generation changed - queue was repopulated by new master. Exiting poll loop."
133+
break
134+
end
135+
129136
idle_since ||= CI::Queue.time_now
130137
if CI::Queue.time_now - idle_since > 120 && !idle_state_printed
131138
puts "Worker #{worker_id} has been idle for 120 seconds. Printing global state..."
@@ -289,25 +296,6 @@ def heartbeat(test_or_id = nil)
289296

290297
attr_reader :index
291298

292-
def generation_key(*args)
293-
gen = @generation || @current_generation
294-
raise "Generation not set - call learn_generation first" unless gen
295-
key('gen', gen, *args)
296-
end
297-
298-
def learn_generation
299-
@current_generation = redis.get(key('current-generation'))
300-
raise MasterDied, "No generation available - master may have died" unless @current_generation
301-
@current_generation
302-
end
303-
304-
# Check if our cached generation is stale
305-
def generation_stale?
306-
return false unless @current_generation
307-
current = redis.get(key('current-generation'))
308-
current && current != @current_generation
309-
end
310-
311299
# Runs a block while sending periodic heartbeats in a background thread.
312300
# This prevents other workers from stealing the test while it's being executed.
313301
def with_heartbeat(test_id)
@@ -485,16 +473,22 @@ def push(tests)
485473
@total = tests.size
486474

487475
if @master
488-
redis.multi do |transaction|
489-
transaction.lpush(generation_key('queue'), tests) unless tests.empty?
490-
transaction.set(generation_key('total'), @total)
491-
transaction.set(key('master-status'), 'ready')
492-
transaction.set(key('current-generation'), @generation)
493-
494-
transaction.expire(generation_key('queue'), config.redis_ttl)
495-
transaction.expire(generation_key('total'), config.redis_ttl)
496-
transaction.expire(key('master-status'), config.redis_ttl)
497-
transaction.expire(key('current-generation'), config.redis_ttl)
476+
argv = [
477+
"setup:#{@generation}",
478+
@total.to_s,
479+
@generation,
480+
config.redis_ttl.to_s,
481+
] + tests
482+
483+
result = eval_script(
484+
:push,
485+
keys: [key('master-status'), generation_key('queue'), generation_key('total'), key('current-generation')],
486+
argv: argv,
487+
)
488+
489+
if result == 0
490+
@master = false
491+
warn "Lock lost during push — another master took over (generation #{@generation})"
498492
end
499493
end
500494
rescue *CONNECTION_ERRORS
@@ -543,6 +537,16 @@ def acquire_master_role?
543537
false
544538
end
545539

540+
def refresh_master_lock
541+
eval_script(
542+
:refresh_master_lock,
543+
keys: [key('master-status')],
544+
argv: ["setup:#{@generation}", config.master_lock_ttl.to_s],
545+
) == 1
546+
rescue *CONNECTION_ERRORS
547+
false
548+
end
549+
546550
def register_worker_presence
547551
register
548552
redis.expire(key('workers'), config.redis_ttl)

ruby/test/ci/queue/redis_test.rb

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,158 @@ def test_moving_average_updates_persist_across_workers
657657
assert_equal 5500.0, chunks.first.estimated_duration
658658
end
659659

660+
# --- Election recovery tests ---
661+
662+
def test_master_death_triggers_re_election
663+
build = 'election-death'
664+
w1 = worker(1, build_id: build, master_lock_ttl: 1, max_election_attempts: 3)
665+
assert_predicate w1, :master?
666+
667+
# Simulate master death: delete the master-status key (lock expires)
668+
@redis.del("build:#{build}:master-status")
669+
670+
# Worker 2 should detect the dead master and become the new master
671+
w2 = worker(2, build_id: build, master_lock_ttl: 1, max_election_attempts: 3)
672+
assert_predicate w2, :master?
673+
674+
# Both workers should be able to poll successfully
675+
poll(w2)
676+
assert_predicate w2, :exhausted?
677+
end
678+
679+
def test_fenced_push_rejects_stale_master
680+
build = 'fenced-push'
681+
w1 = worker(1, build_id: build, master_lock_ttl: 30)
682+
assert_predicate w1, :master?
683+
684+
# Overwrite master-status to simulate another master winning election
685+
other_gen = 'other-generation-uuid'
686+
@redis.set("build:#{build}:master-status", "setup:#{other_gen}", ex: 30)
687+
688+
# w1 still thinks it's master, but push.lua should reject because lock value changed
689+
w1.send(:push, ['ATest#test_foo'])
690+
691+
# master-status should still be the other master's value
692+
status = @redis.get("build:#{build}:master-status")
693+
assert_equal "setup:#{other_gen}", status, "Fenced push should not overwrite another master's status"
694+
end
695+
696+
def test_generation_stale_exits_poll
697+
build = 'gen-stale'
698+
w1 = worker(1, build_id: build)
699+
assert_predicate w1, :master?
700+
701+
w2 = worker(2, build_id: build)
702+
refute_predicate w2, :master?
703+
704+
# Drain the queue with w1 so w2 sees no tests (idle), but not exhausted
705+
poll(w1)
706+
707+
# Change generation before w2 polls — w2 will be idle and detect staleness
708+
@redis.set("build:#{build}:current-generation", "new-generation-uuid")
709+
710+
tests_seen = []
711+
w2.poll do |test|
712+
tests_seen << test
713+
w2.acknowledge(test)
714+
end
715+
716+
assert_equal 0, tests_seen.size, "Worker should exit poll immediately when generation is stale"
717+
end
718+
719+
def test_learn_generation_raises_when_key_missing
720+
build = 'learn-gen-missing'
721+
w1 = worker(1, build_id: build)
722+
assert_predicate w1, :master?
723+
724+
# Delete the current-generation key
725+
@redis.del("build:#{build}:current-generation")
726+
727+
# A non-master worker trying to learn generation should raise MasterDied
728+
assert_raises(CI::Queue::Redis::MasterDied) do
729+
w2 = worker(2, build_id: build, populate: false)
730+
w2.send(:learn_generation)
731+
end
732+
end
733+
734+
def test_max_election_attempts_raises_lost_master
735+
build = 'max-attempts'
736+
737+
# Worker 1 wins master and starts setup, then dies (lock expires)
738+
# We simulate this by having another process hold setup then expire
739+
# To prevent the test worker from winning master, keep re-setting the key
740+
# so it always sees "setup" then nil (death)
741+
t = Thread.new do
742+
loop do
743+
# Keep setting a short-lived setup lock so the worker always sees a dying master
744+
@redis.set("build:#{build}:master-status", "setup:dead-gen", px: 50)
745+
sleep 0.06
746+
end
747+
end
748+
749+
assert_raises(CI::Queue::Redis::LostMaster) do
750+
worker(1, build_id: build, max_election_attempts: 1, queue_init_timeout: 0.5)
751+
end
752+
ensure
753+
t&.kill
754+
end
755+
756+
def test_build_record_reads_generation_scoped_requeue_key
757+
build = 'requeue-gen'
758+
w1 = worker(1, build_id: build, max_requeues: 1, requeue_tolerance: 1.0)
759+
760+
w1.poll do |test|
761+
w1.report_failure!
762+
unless w1.requeue(test)
763+
w1.acknowledge(test)
764+
end
765+
end
766+
767+
requeues = w1.build.requeued_tests
768+
refute_empty requeues, "Should have requeued at least one test"
769+
end
770+
771+
def test_supervisor_handles_master_died
772+
build = 'supervisor-died'
773+
supervisor = CI::Queue::Redis::Supervisor.new(
774+
@redis_url,
775+
CI::Queue::Configuration.new(
776+
build_id: build,
777+
worker_id: 'sup',
778+
timeout: 0.2,
779+
queue_init_timeout: 0.3,
780+
timing_redis_url: @redis_url,
781+
)
782+
)
783+
784+
# wait_for_workers should return false (not crash) when no master exists
785+
result = supervisor.wait_for_workers
786+
refute result, "Supervisor should return false when master never appeared"
787+
end
788+
789+
def test_wait_for_master_detects_immediate_nil_status
790+
build = 'nil-status'
791+
792+
w = CI::Queue::Redis.new(
793+
@redis_url,
794+
CI::Queue::Configuration.new(
795+
build_id: build,
796+
worker_id: '2',
797+
timeout: 0.2,
798+
queue_init_timeout: 0.5,
799+
timing_redis_url: @redis_url,
800+
)
801+
)
802+
803+
# Simulate that status was "setup" then expired (nil)
804+
@redis.set("build:#{build}:master-status", "setup:some-gen", px: 1)
805+
sleep 0.01 # Let it expire
806+
807+
assert_raises(CI::Queue::Redis::MasterDied) do
808+
w.send(:wait_for_master, timeout: 0.5)
809+
end
810+
end
811+
660812
private
661813

662814
class MockTest

0 commit comments

Comments
 (0)