Skip to content

Commit c41186e

Browse files
ssteele110claude
andcommitted
Add leader election recovery with generation isolation
Implement leader election architecture to handle master worker death during queue population. Key changes: - Add MasterDied exception for detecting master failures - Add master_lock_ttl (30s) and max_election_attempts (3) config options - Use SET NX EX for master lock with TTL instead of SETNX (no TTL) - Namespace all queue data keys by generation UUID for isolation - Detect master death in wait_for_master when lock expires during setup - Add retry loop in populate() to handle MasterDied with configurable attempts - Add generation staleness check in poll loop - Update all queue operations to use generation-scoped keys This allows workers to recover when a master dies mid-population by: 1. Detecting the lock expiry (MasterDied exception) 2. Electing a new master with a new generation 3. Repopulating the queue in an isolated namespace 4. Old workers detect staleness and exit gracefully Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 80997b2 commit c41186e

6 files changed

Lines changed: 185 additions & 74 deletions

File tree

ruby/lib/ci/queue/configuration.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ class Configuration
1515
attr_accessor :timing_redis_url
1616
attr_accessor :write_duration_averages
1717
attr_accessor :heartbeat_grace_period, :heartbeat_interval
18+
attr_accessor :master_lock_ttl, :max_election_attempts
1819
attr_reader :circuit_breakers
1920
attr_writer :seed, :build_id
2021
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
@@ -66,7 +67,9 @@ def initialize(
6667
branch: nil,
6768
timing_redis_url: nil,
6869
heartbeat_grace_period: 30,
69-
heartbeat_interval: 10
70+
heartbeat_interval: 10,
71+
master_lock_ttl: 30,
72+
max_election_attempts: 3
7073
)
7174
@build_id = build_id
7275
@circuit_breakers = [CircuitBreaker::Disabled]
@@ -105,6 +108,8 @@ def initialize(
105108
@write_duration_averages = false
106109
@heartbeat_grace_period = heartbeat_grace_period
107110
@heartbeat_interval = heartbeat_interval
111+
@master_lock_ttl = master_lock_ttl
112+
@max_election_attempts = max_election_attempts
108113
end
109114

110115
def queue_init_timeout

ruby/lib/ci/queue/redis.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ module Queue
1818
module Redis
1919
Error = Class.new(StandardError)
2020
LostMaster = Class.new(Error)
21+
MasterDied = Class.new(Error)
2122

2223
class << self
2324

ruby/lib/ci/queue/redis/base.rb

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ def created_at=(timestamp)
3737

3838
def size
3939
redis.multi do |transaction|
40-
transaction.llen(key('queue'))
41-
transaction.zcard(key('running'))
40+
transaction.llen(generation_key('queue'))
41+
transaction.zcard(generation_key('running'))
4242
end.inject(:+)
4343
end
4444

4545
def to_a
4646
redis.multi do |transaction|
47-
transaction.lrange(key('queue'), 0, -1)
48-
transaction.zrange(key('running'), 0, -1)
47+
transaction.lrange(generation_key('queue'), 0, -1)
48+
transaction.zrange(generation_key('running'), 0, -1)
4949
end.flatten.reverse.map { |k| index.fetch(k) }
5050
end
5151

@@ -56,11 +56,28 @@ def progress
5656
def wait_for_master(timeout: 120)
5757
return true if master?
5858

59-
(timeout * 10 + 1).to_i.times do
60-
return true if queue_initialized?
59+
deadline = CI::Queue.time_now + timeout
60+
last_status = nil
6161

62+
while CI::Queue.time_now < deadline
63+
status = master_status
64+
65+
# Success - queue is ready
66+
if status == 'ready' || status == 'finished'
67+
learn_generation unless master?
68+
return true
69+
end
70+
71+
# Master lock expired during setup (died mid-population)
72+
# Status will be nil if lock expired
73+
if status.nil? && last_status == 'setup'
74+
raise MasterDied, "Master lock expired during setup - master may have died"
75+
end
76+
77+
last_status = status
6278
sleep 0.1
6379
end
80+
6481
raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting."
6582
end
6683

@@ -110,7 +127,26 @@ def build_id
110127
end
111128

112129
def master_status
113-
redis.get(key('master-status'))
130+
status = redis.get(key('master-status'))
131+
# Handle new format "setup:#{generation}" - return just "setup" for compatibility
132+
return 'setup' if status&.start_with?('setup:')
133+
status
134+
end
135+
136+
def generation_key(*args)
137+
gen = @generation || @current_generation
138+
return key(*args) unless gen # Fallback for backwards compatibility
139+
key('gen', gen, *args)
140+
end
141+
142+
def learn_generation
143+
@current_generation = redis.get(key('current-generation'))
144+
raise MasterDied, "No generation available - master may have died" unless @current_generation
145+
@current_generation
146+
end
147+
148+
def current_generation
149+
@generation || @current_generation
114150
end
115151

116152
def eval_script(script, *args)

ruby/lib/ci/queue/redis/build_record.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def passing_tests
2727

2828
TOTAL_KEY = "___total___"
2929
def requeued_tests
30-
requeues = redis.hgetall(key('requeues-count'))
30+
requeues = redis.hgetall(generation_key('requeues-count'))
3131
requeues.delete(TOTAL_KEY)
3232
requeues
3333
end
@@ -126,6 +126,12 @@ def record_stats(stats, pipeline: redis)
126126
def key(*args)
127127
['build', config.build_id, *args].join(':')
128128
end
129+
130+
def generation_key(*args)
131+
gen = @queue.respond_to?(:current_generation) ? @queue.current_generation : nil
132+
return key(*args) unless gen # Fallback for backwards compatibility
133+
key('gen', gen, *args)
134+
end
129135
end
130136
end
131137
end

ruby/lib/ci/queue/redis/supervisor.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def master?
99

1010
def total
1111
wait_for_master(timeout: config.queue_init_timeout)
12-
redis.get(key('total')).to_i
12+
redis.get(generation_key('total')).to_i
1313
end
1414

1515
def build
@@ -53,7 +53,7 @@ def wait_for_workers
5353

5454
def active_workers?
5555
# if there are running jobs we assume there are still agents active
56-
redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
56+
redis.zrangebyscore(generation_key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0
5757
end
5858
end
5959
end

0 commit comments

Comments
 (0)