diff --git a/redis/push_queue.lua b/redis/push_queue.lua new file mode 100644 index 0000000..5ce4bad --- /dev/null +++ b/redis/push_queue.lua @@ -0,0 +1,32 @@ +local master_status_key = KEYS[1] +local queue_key = KEYS[2] +local total_key = KEYS[3] +local generation_key = KEYS[4] + +local expected_lock = ARGV[1] +local generation_uuid = ARGV[2] +local total_count = ARGV[3] +local redis_ttl = tonumber(ARGV[4]) + +-- CAS: verify we still own the lock +if redis.call('get', master_status_key) ~= expected_lock then + return 0 +end + +-- Push test IDs to queue (ARGV[5] onwards) +for i = 5, #ARGV do + redis.call('lpush', queue_key, ARGV[i]) +end + +-- Set metadata +redis.call('set', total_key, total_count) +redis.call('set', generation_key, generation_uuid) +redis.call('set', master_status_key, 'ready') + +-- Apply TTLs +redis.call('expire', queue_key, redis_ttl) +redis.call('expire', total_key, redis_ttl) +redis.call('expire', generation_key, redis_ttl) +redis.call('expire', master_status_key, redis_ttl) + +return 1 diff --git a/redis/renew_master_lock.lua b/redis/renew_master_lock.lua new file mode 100644 index 0000000..4c5b26a --- /dev/null +++ b/redis/renew_master_lock.lua @@ -0,0 +1,10 @@ +local master_status_key = KEYS[1] +local expected_value = ARGV[1] +local ttl = tonumber(ARGV[2]) + +if redis.call('get', master_status_key) == expected_value then + redis.call('expire', master_status_key, ttl) + return 1 +else + return 0 +end diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 9e9c133..8882f94 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -15,6 +15,7 @@ class Configuration attr_accessor :timing_redis_url attr_accessor :write_duration_averages attr_accessor :heartbeat_grace_period, :heartbeat_interval + attr_accessor :master_lock_ttl, :max_election_attempts attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -66,7 +67,9 @@ def initialize( branch: nil, timing_redis_url: nil, heartbeat_grace_period: 30, - heartbeat_interval: 10 + heartbeat_interval: 10, + master_lock_ttl: 30, + max_election_attempts: 3 ) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @@ -105,6 +108,8 @@ def initialize( @write_duration_averages = false @heartbeat_grace_period = heartbeat_grace_period @heartbeat_interval = heartbeat_interval + @master_lock_ttl = master_lock_ttl + @max_election_attempts = max_election_attempts end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis.rb b/ruby/lib/ci/queue/redis.rb index c3876ef..96f7606 100644 --- a/ruby/lib/ci/queue/redis.rb +++ b/ruby/lib/ci/queue/redis.rb @@ -18,6 +18,7 @@ module Queue module Redis Error = Class.new(StandardError) LostMaster = Class.new(Error) + MasterDied = Class.new(Error) class << self diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 9ebdd58..cb196cc 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -37,15 +37,15 @@ def created_at=(timestamp) def size redis.multi do |transaction| - transaction.llen(key('queue')) - transaction.zcard(key('running')) + transaction.llen(generation_key('queue')) + transaction.zcard(generation_key('running')) end.inject(:+) end def to_a redis.multi do |transaction| - transaction.lrange(key('queue'), 0, -1) - transaction.zrange(key('running'), 0, -1) + transaction.lrange(generation_key('queue'), 0, -1) + transaction.zrange(generation_key('running'), 0, -1) end.flatten.reverse.map { |k| index.fetch(k) } end @@ -56,11 +56,25 @@ def progress def wait_for_master(timeout: 120) return true if master? - (timeout * 10 + 1).to_i.times do - return true if queue_initialized? + deadline = CI::Queue.time_now + timeout + last_status = nil + while CI::Queue.time_now < deadline + status = master_status + + if status == 'ready' || status == 'finished' + learn_generation unless master? + return true + end + + if status.nil? && last_status == 'setup' + raise MasterDied, "Master lock expired during setup" + end + + last_status = status sleep 0.1 end + raise LostMaster, "The master worker (worker #{master_worker_id}) is still `#{master_status}` after #{timeout} seconds waiting." end @@ -71,7 +85,14 @@ def workers_count def queue_initialized? @queue_initialized ||= begin status = master_status - %w[ready finished].include?(status) + if %w[ready finished].include?(status) + learn_generation unless current_generation + true + else + false + end + rescue MasterDied + false end end @@ -105,12 +126,30 @@ def key(*args) ['build', build_id, *args].join(':') end + def generation_key(*args) + gen = @generation || @current_generation + return key(*args) unless gen + key('gen', gen, *args) + end + + def learn_generation + @current_generation = redis.get(key('current-generation')) + raise MasterDied, "No generation available" unless @current_generation + @current_generation + end + + def current_generation + @generation || @current_generation + end + def build_id config.build_id end def master_status - redis.get(key('master-status')) + status = redis.get(key('master-status')) + return 'setup' if status&.start_with?('setup:') + status end def eval_script(script, *args) diff --git a/ruby/lib/ci/queue/redis/build_record.rb b/ruby/lib/ci/queue/redis/build_record.rb index 0423b05..f398405 100644 --- a/ruby/lib/ci/queue/redis/build_record.rb +++ b/ruby/lib/ci/queue/redis/build_record.rb @@ -27,7 +27,7 @@ def passing_tests TOTAL_KEY = "___total___" def requeued_tests - requeues = redis.hgetall(key('requeues-count')) + requeues = redis.hgetall(generation_key('requeues-count')) requeues.delete(TOTAL_KEY) requeues end @@ -123,6 +123,12 @@ def record_stats(stats, pipeline: redis) end end + def generation_key(*args) + gen = @queue.respond_to?(:current_generation) ? @queue.current_generation : nil + return key(*args) unless gen + key('gen', gen, *args) + end + def key(*args) ['build', config.build_id, *args].join(':') end diff --git a/ruby/lib/ci/queue/redis/supervisor.rb b/ruby/lib/ci/queue/redis/supervisor.rb index c33edcf..9cdcdd6 100644 --- a/ruby/lib/ci/queue/redis/supervisor.rb +++ b/ruby/lib/ci/queue/redis/supervisor.rb @@ -9,7 +9,7 @@ def master? def total wait_for_master(timeout: config.queue_init_timeout) - redis.get(key('total')).to_i + redis.get(generation_key('total')).to_i end def build @@ -53,7 +53,7 @@ def wait_for_workers def active_workers? # if there are running jobs we assume there are still agents active - redis.zrangebyscore(key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0 + redis.zrangebyscore(generation_key('running'), CI::Queue.time_now.to_f - config.timeout, "+inf", limit: [0,1]).count > 0 end end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 7de1f17..25d80bf 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -2,6 +2,7 @@ require 'ci/queue/static' require 'concurrent/set' +require 'securerandom' module CI module Queue @@ -34,23 +35,40 @@ def distributed? end def populate(tests, random: Random.new) - # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h @total = tests.size - if acquire_master_role? - executables = reorder_tests(tests, random: random) + election_attempts = 0 + max_attempts = config.max_election_attempts - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } + loop do + if acquire_master_role? + executables = reorder_tests(tests, random: random) - store_chunk_metadata(chunks) if chunks.any? + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) - end + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end - register_worker_presence + register_worker_presence + + begin + wait_for_master(timeout: config.queue_init_timeout) + break + rescue MasterDied => e + election_attempts += 1 + if election_attempts >= max_attempts + raise LostMaster, "Failed to elect master after #{max_attempts} attempts: #{e.message}" + end + @master = nil + @generation = nil + warn "Previous master died (attempt #{election_attempts}/#{max_attempts}), retrying election..." + end + end self end @@ -87,6 +105,11 @@ def poll idle_state_printed = false attempt = 0 until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? + if generation_stale? + warn "Generation changed - queue was repopulated by new master. Exiting poll loop." + break + end + if id = reserve attempt = 0 idle_since = nil @@ -104,15 +127,15 @@ def poll idle_since ||= CI::Queue.time_now if CI::Queue.time_now - idle_since > 120 && !idle_state_printed puts "Worker #{worker_id} has been idle for 120 seconds. Printing global state..." - running_tests = redis.zrange(key('running'), 0, -1, withscores: true) - puts " Processed tests: #{redis.scard(key('processed'))}" - puts " Pending tests: #{redis.llen(key('queue'))}. #{redis.lrange(key('queue'), 0, -1)}" + running_tests = redis.zrange(generation_key('running'), 0, -1, withscores: true) + puts " Processed tests: #{redis.scard(generation_key('processed'))}" + puts " Pending tests: #{redis.llen(generation_key('queue'))}. #{redis.lrange(generation_key('queue'), 0, -1)}" puts " Running tests: #{running_tests.size}. #{running_tests}" - puts " Owners: #{redis.hgetall(key('owners'))}" + puts " Owners: #{redis.hgetall(generation_key('owners'))}" unless running_tests.empty? puts ' Checking if running tests are in processed set:' running_tests.each do |test, _score| - puts " #{test}: #{redis.sismember(key('processed'), test)}" + puts " #{test}: #{redis.sismember(generation_key('processed'), test)}" end end idle_state_printed = true @@ -126,7 +149,7 @@ def poll end redis.pipelined do |pipeline| pipeline.expire(key('worker', worker_id, 'queue'), config.redis_ttl) - pipeline.expire(key('processed'), config.redis_ttl) + pipeline.expire(generation_key('processed'), config.redis_ttl) end rescue *CONNECTION_ERRORS end @@ -173,7 +196,7 @@ def acknowledge(test_or_id) begin eval_script( :acknowledge, - keys: [key('running'), key('processed'), key('owners')], + keys: [generation_key('running'), generation_key('processed'), generation_key('owners')], argv: [test_key] ) == 1 rescue StandardError => e @@ -198,12 +221,12 @@ def requeue(test, offset: Redis.requeue_offset, skip_reservation_check: false) requeued = config.max_requeues > 0 && global_max_requeues > 0 && !config.known_flaky?(test_key) && eval_script( :requeue, keys: [ - key('processed'), - key('requeues-count'), - key('queue'), - key('running'), + generation_key('processed'), + generation_key('requeues-count'), + generation_key('queue'), + generation_key('running'), key('worker', worker_id, 'queue'), - key('owners') + generation_key('owners') ], argv: [config.max_requeues, global_max_requeues, test_key, offset] ) == 1 @@ -215,7 +238,7 @@ def requeue(test, offset: Redis.requeue_offset, skip_reservation_check: false) def release! eval_script( :release, - keys: [key('running'), key('worker', worker_id, 'queue'), key('owners')], + keys: [generation_key('running'), key('worker', worker_id, 'queue'), generation_key('owners')], argv: [] ) nil @@ -237,12 +260,12 @@ def heartbeat(test_or_id = nil) result = eval_script( :heartbeat, keys: [ - key('running'), - key('processed'), - key('owners'), + generation_key('running'), + generation_key('processed'), + generation_key('owners'), key('worker', worker_id, 'queue'), - key('heartbeats'), - key('test-group-timeout') + generation_key('heartbeats'), + generation_key('test-group-timeout') ], argv: [current_time, test_key, config.timeout] ) @@ -338,19 +361,19 @@ def try_to_reserve_test test_id = eval_script( :reserve, keys: [ - key('queue'), - key('running'), - key('processed'), + generation_key('queue'), + generation_key('running'), + generation_key('processed'), key('worker', worker_id, 'queue'), - key('owners'), - key('test-group-timeout') + generation_key('owners'), + generation_key('test-group-timeout') ], argv: [current_time, 'true', config.timeout] ) if test_id # Check what timeout was used (dynamic or default) - dynamic_timeout = redis.hget(key('test-group-timeout'), test_id) + dynamic_timeout = redis.hget(generation_key('test-group-timeout'), test_id) timeout_used = dynamic_timeout ? dynamic_timeout.to_f : config.timeout deadline = current_time + timeout_used gap_seconds = timeout_used @@ -386,19 +409,19 @@ def try_to_reserve_lost_test lost_test = eval_script( :reserve_lost, keys: [ - key('running'), - key('completed'), + generation_key('running'), + generation_key('processed'), key('worker', worker_id, 'queue'), - key('owners'), - key('test-group-timeout'), - key('heartbeats') + generation_key('owners'), + generation_key('test-group-timeout'), + generation_key('heartbeats') ], argv: [current_time, timeout, 'true', config.timeout, config.heartbeat_grace_period] ) if lost_test # Check what timeout was used (dynamic or default) - dynamic_timeout = redis.hget(key('test-group-timeout'), lost_test) + dynamic_timeout = redis.hget(generation_key('test-group-timeout'), lost_test) timeout_used = dynamic_timeout ? dynamic_timeout.to_f : config.timeout deadline = current_time + timeout_used gap_seconds = timeout_used @@ -429,7 +452,7 @@ def try_to_reserve_lost_test if lost_test.nil? && idle? puts "Worker #{worker_id} could not reserve a lost test while idle" puts 'Printing running tests:' - puts "#{redis.zrange(key('running'), 0, -1, withscores: true)}" + puts "#{redis.zrange(generation_key('running'), 0, -1, withscores: true)}" end build.record_warning(Warnings::RESERVED_LOST_TEST, test: lost_test, timeout: timeout) if lost_test @@ -441,15 +464,15 @@ def push(tests) @total = tests.size if @master - redis.multi do |transaction| - transaction.lpush(key('queue'), tests) unless tests.empty? - transaction.set(key('total'), @total) - transaction.set(key('master-status'), 'ready') - - transaction.expire(key('queue'), config.redis_ttl) - transaction.expire(key('total'), config.redis_ttl) - transaction.expire(key('master-status'), config.redis_ttl) - end + renew_master_lock! + + argv = ["setup:#{@generation}", @generation, @total.to_s, config.redis_ttl.to_s] + tests + result = eval_script( + :push_queue, + keys: [key('master-status'), generation_key('queue'), generation_key('total'), key('current-generation')], + argv: argv + ) + raise MasterDied, "CAS push failed — lock was lost" unless result == 1 end rescue *CONNECTION_ERRORS raise if @master @@ -462,24 +485,53 @@ def register def acquire_master_role? return true if @master - @master = redis.setnx(key('master-status'), 'setup') + @generation = SecureRandom.uuid + + @master = redis.set( + key('master-status'), + "setup:#{@generation}", + nx: true, + ex: config.master_lock_ttl + ) + if @master begin - redis.set(key('master-worker-id'), worker_id) - redis.expire(key('master-worker-id'), config.redis_ttl) - warn "Worker #{worker_id} elected as master" + redis.multi do |tx| + tx.set(key('master-worker-id'), worker_id) + tx.expire(key('master-worker-id'), config.redis_ttl) + end + warn "Worker #{worker_id} elected as master (generation #{@generation})" rescue *CONNECTION_ERRORS - # If setting master-worker-id fails, we still have master status - # Log but don't lose master role warn("Failed to set master-worker-id: #{$!.message}") end + else + @generation = nil end + @master rescue *CONNECTION_ERRORS @master = nil + @generation = nil false end + def generation_stale? + return false unless @current_generation + return false if @last_generation_check_at && (CI::Queue.time_now - @last_generation_check_at) < 5 + current = redis.get(key('current-generation')) + @last_generation_check_at = CI::Queue.time_now + current && current != @current_generation + end + + def renew_master_lock! + result = eval_script( + :renew_master_lock, + keys: [key('master-status')], + argv: ["setup:#{@generation}", config.master_lock_ttl] + ) + raise MasterDied, "Lock renewal failed — another master may have taken over" unless result == 1 + end + def register_worker_presence register redis.expire(key('workers'), config.redis_ttl) @@ -497,13 +549,13 @@ def store_chunk_metadata(chunks) chunk_batch.each do |chunk| # Store chunk metadata with TTL transaction.set( - key('chunk', chunk.id), + generation_key('chunk', chunk.id), chunk.to_json ) - transaction.expire(key('chunk', chunk.id), config.redis_ttl) + transaction.expire(generation_key('chunk', chunk.id), config.redis_ttl) # Track all chunks for cleanup - transaction.sadd(key('chunks'), chunk.id) + transaction.sadd(generation_key('chunks'), chunk.id) # Store dynamic timeout for this chunk # Timeout = estimated_duration (in ms) converted to seconds + buffer @@ -513,11 +565,12 @@ def store_chunk_metadata(chunks) chunk_timeout = (estimated_duration_seconds * (1 + buffer_percent / 100.0)).round(2) # Format to string to avoid floating point precision issues in Redis # Use %g to remove trailing zeros - transaction.hset(key('test-group-timeout'), chunk.id, format('%g', chunk_timeout)) + transaction.hset(generation_key('test-group-timeout'), chunk.id, format('%g', chunk_timeout)) end - transaction.expire(key('chunks'), config.redis_ttl) - transaction.expire(key('test-group-timeout'), config.redis_ttl) + transaction.expire(generation_key('chunks'), config.redis_ttl) + transaction.expire(generation_key('test-group-timeout'), config.redis_ttl) end + renew_master_lock! end end @@ -537,7 +590,7 @@ def resolve_executable(id) def resolve_chunk(chunk_id) # Fetch chunk metadata from Redis - chunk_json = redis.get(key('chunk', chunk_id)) + chunk_json = redis.get(generation_key('chunk', chunk_id)) unless chunk_json warn "Warning: Chunk metadata not found for #{chunk_id}" return nil diff --git a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb index 4fd3816..15ceab3 100644 --- a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb +++ b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb @@ -25,6 +25,11 @@ def teardown @redis.flushdb if @redis end + def generation_prefix(build_id = '42') + gen = @redis.get("build:#{build_id}:current-generation") + gen ? "build:#{build_id}:gen:#{gen}" : "build:#{build_id}" + end + def test_chunk_timeout_stored_in_redis_hash tests = create_mock_tests(['TestA#test_1', 'TestA#test_2', 'TestA#test_3']) test_ids = ['TestA#test_1', 'TestA#test_2', 'TestA#test_3'] @@ -38,7 +43,7 @@ def test_chunk_timeout_stored_in_redis_hash # Verify timeout was stored in test-group-timeout hash # Timeout should be: estimated_duration (5000ms = 5s) * 1.1 buffer = 5.5s - chunk_timeout = @redis.hget('build:42:test-group-timeout', 'TestA:chunk_0') + chunk_timeout = @redis.hget("#{generation_prefix}:test-group-timeout", 'TestA:chunk_0') refute_nil chunk_timeout assert_equal '5.5', chunk_timeout end @@ -54,7 +59,7 @@ def test_chunk_timeout_scales_with_test_count worker.populate(small_tests) end - small_timeout = @redis.hget('build:42:test-group-timeout', 'SmallSuite:chunk_0') + small_timeout = @redis.hget("#{generation_prefix}:test-group-timeout", 'SmallSuite:chunk_0') assert_equal '1.1', small_timeout # 1000ms = 1s * 1.1 buffer = 1.1s @redis.flushdb @@ -69,7 +74,7 @@ def test_chunk_timeout_scales_with_test_count worker.populate(large_tests) end - large_timeout = @redis.hget('build:42:test-group-timeout', 'LargeSuite:chunk_0') + large_timeout = @redis.hget("#{generation_prefix}:test-group-timeout", 'LargeSuite:chunk_0') assert_equal '5.5', large_timeout # 5000ms = 5s * 1.1 buffer = 5.5s end @@ -92,9 +97,9 @@ def test_multiple_chunks_stored_with_different_timeouts end # Verify each chunk has correct timeout - assert_equal '2.2', @redis.hget('build:42:test-group-timeout', 'TestA:chunk_0') # 2000ms = 2s * 1.1 buffer = 2.2s - assert_equal '3.3', @redis.hget('build:42:test-group-timeout', 'TestB:chunk_0') # 3000ms = 3s * 1.1 buffer = 3.3s - assert_equal '1.1', @redis.hget('build:42:test-group-timeout', 'TestC:chunk_0') # 1000ms = 1s * 1.1 buffer = 1.1s + assert_equal '2.2', @redis.hget("#{generation_prefix}:test-group-timeout", 'TestA:chunk_0') # 2000ms = 2s * 1.1 buffer = 2.2s + assert_equal '3.3', @redis.hget("#{generation_prefix}:test-group-timeout", 'TestB:chunk_0') # 3000ms = 3s * 1.1 buffer = 3.3s + assert_equal '1.1', @redis.hget("#{generation_prefix}:test-group-timeout", 'TestC:chunk_0') # 1000ms = 1s * 1.1 buffer = 1.1s end def test_timeout_hash_has_ttl @@ -108,7 +113,7 @@ def test_timeout_hash_has_ttl @worker.populate(tests) end - ttl = @redis.ttl('build:42:test-group-timeout') + ttl = @redis.ttl("#{generation_prefix}:test-group-timeout") assert ttl > 0, 'test-group-timeout hash should have TTL set' assert ttl <= @config.redis_ttl, 'TTL should not exceed config.redis_ttl' end @@ -122,8 +127,8 @@ def test_single_test_not_in_timeout_hash end # Verify individual tests are NOT in the timeout hash - assert_nil @redis.hget('build:42:test-group-timeout', 'TestA#test_1') - assert_nil @redis.hget('build:42:test-group-timeout', 'TestB#test_1') + assert_nil @redis.hget("#{generation_prefix}:test-group-timeout", 'TestA#test_1') + assert_nil @redis.hget("#{generation_prefix}:test-group-timeout", 'TestB#test_1') end def test_mixed_chunks_and_tests_only_chunks_have_timeouts @@ -145,11 +150,11 @@ def test_mixed_chunks_and_tests_only_chunks_have_timeouts end # Chunks should have timeouts - assert_equal '2.2', @redis.hget('build:42:test-group-timeout', 'TestA:chunk_0') # 2000ms = 2s * 1.1 buffer = 2.2s - assert_equal '3.3', @redis.hget('build:42:test-group-timeout', 'TestC:chunk_0') # 3000ms = 3s * 1.1 buffer = 3.3s + assert_equal '2.2', @redis.hget("#{generation_prefix}:test-group-timeout", 'TestA:chunk_0') # 2000ms = 2s * 1.1 buffer = 2.2s + assert_equal '3.3', @redis.hget("#{generation_prefix}:test-group-timeout", 'TestC:chunk_0') # 3000ms = 3s * 1.1 buffer = 3.3s # Individual test should not - assert_nil @redis.hget('build:42:test-group-timeout', 'TestB#test_1') + assert_nil @redis.hget("#{generation_prefix}:test-group-timeout", 'TestB#test_1') end def test_reserve_test_passes_dynamic_deadline_flag @@ -160,13 +165,14 @@ def test_reserve_test_passes_dynamic_deadline_flag end # Mock the eval_script call to verify correct parameters + gp = generation_prefix expected_keys = [ - 'build:42:queue', - 'build:42:running', - 'build:42:processed', + "#{gp}:queue", + "#{gp}:running", + "#{gp}:processed", 'build:42:worker:1:queue', - 'build:42:owners', - 'build:42:test-group-timeout' # 6th key for dynamic deadline + "#{gp}:owners", + "#{gp}:test-group-timeout" # 6th key for dynamic deadline ] @worker.stub(:eval_script, proc { |script, keys:, argv:| @@ -189,13 +195,14 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag @worker.populate(tests) end + gp = generation_prefix expected_keys = [ - 'build:42:running', - 'build:42:completed', + "#{gp}:running", + "#{gp}:processed", 'build:42:worker:1:queue', - 'build:42:owners', - 'build:42:test-group-timeout', # 5th key for dynamic deadline - 'build:42:heartbeats' # 6th key for heartbeat tracking + "#{gp}:owners", + "#{gp}:test-group-timeout", # 5th key for dynamic deadline + "#{gp}:heartbeats" # 6th key for heartbeat tracking ] @worker.stub(:eval_script, proc { |script, keys:, argv:| @@ -244,6 +251,7 @@ def test_chunk_not_marked_lost_before_dynamic_timeout worker2_config = config.dup worker2_config.instance_variable_set(:@worker_id, '2') worker2 = CI::Queue::Redis.new(@redis_url, worker2_config) + worker2.send(:learn_generation) lost_test = worker2.send(:try_to_reserve_lost_test) assert_nil lost_test, 'Chunk should not be marked as lost before dynamic timeout' @@ -277,6 +285,7 @@ def test_single_test_marked_lost_after_default_timeout worker2_config = config.dup worker2_config.instance_variable_set(:@worker_id, '2') worker2 = CI::Queue::Redis.new(@redis_url, worker2_config) + worker2.send(:learn_generation) lost_test = worker2.send(:try_to_reserve_lost_test) assert_equal 'TestA#test_1', lost_test, 'Single test should be marked as lost after default timeout' @@ -296,7 +305,7 @@ def test_batching_with_many_chunks # Verify all chunks have timeouts stored despite batching chunks.each do |chunk| - timeout = @redis.hget('build:42:test-group-timeout', chunk.id) + timeout = @redis.hget("#{generation_prefix}:test-group-timeout", chunk.id) refute_nil timeout, "Chunk #{chunk.id} should have timeout stored" assert_equal '1.1', timeout # 1000ms = 1s * 1.1 buffer = 1.1s end diff --git a/ruby/test/ci/queue/redis/worker_chunk_test.rb b/ruby/test/ci/queue/redis/worker_chunk_test.rb index e0b2592..d12c1aa 100644 --- a/ruby/test/ci/queue/redis/worker_chunk_test.rb +++ b/ruby/test/ci/queue/redis/worker_chunk_test.rb @@ -23,6 +23,11 @@ def teardown @redis.flushdb if @redis end + def generation_prefix(build_id = '42') + gen = @redis.get("build:#{build_id}:current-generation") + gen ? "build:#{build_id}:gen:#{gen}" : "build:#{build_id}" + end + def test_populate_stores_chunk_metadata_in_redis tests = create_mock_tests(['TestA#test_1', 'TestA#test_2']) test_ids = ['TestA#test_1', 'TestA#test_2'] @@ -36,7 +41,7 @@ def test_populate_stores_chunk_metadata_in_redis end # Verify chunk metadata was stored - chunk_data = @redis.get('build:42:chunk:TestA:chunk_0') + chunk_data = @redis.get("#{generation_prefix}:chunk:TestA:chunk_0") refute_nil chunk_data parsed = JSON.parse(chunk_data) @@ -162,8 +167,8 @@ def test_resolved_chunk_detects_flaky_tests def test_acknowledge_chunk # Set up a chunk as if it were reserved (in running zset) chunk_id = 'TestA:chunk_0' - @redis.zadd('build:42:running', Time.now.to_i, chunk_id) - @redis.hset('build:42:owners', chunk_id, 'build:42:worker:1:queue') + @redis.zadd("#{generation_prefix}:running", Time.now.to_i, chunk_id) + @redis.hset("#{generation_prefix}:owners", chunk_id, 'build:42:worker:1:queue') @worker.instance_variable_set(:@reserved_test, chunk_id) # Acknowledge the chunk @@ -171,9 +176,9 @@ def test_acknowledge_chunk # Verify chunk was removed from running and added to processed assert result - refute @redis.zrank('build:42:running', chunk_id) - assert @redis.sismember('build:42:processed', chunk_id) - refute @redis.hexists('build:42:owners', chunk_id) + refute @redis.zrank("#{generation_prefix}:running", chunk_id) + assert @redis.sismember("#{generation_prefix}:processed", chunk_id) + refute @redis.hexists("#{generation_prefix}:owners", chunk_id) end def test_populate_with_mixed_chunks_and_tests @@ -194,7 +199,7 @@ def test_populate_with_mixed_chunks_and_tests end # Check that both chunk and individual test IDs are in queue - queue_items = @redis.lrange('build:42:queue', 0, -1) + queue_items = @redis.lrange("#{generation_prefix}:queue", 0, -1) assert_includes queue_items, 'TestA:chunk_0' assert_includes queue_items, 'TestB#test_1' end @@ -210,7 +215,7 @@ def test_chunk_metadata_has_ttl @worker.populate(tests) end - ttl = @redis.ttl('build:42:chunk:TestA:chunk_0') + ttl = @redis.ttl("#{generation_prefix}:chunk:TestA:chunk_0") assert ttl > 0, 'Chunk metadata should have TTL set' end @@ -228,7 +233,7 @@ def test_populate_with_many_chunks_uses_batching # Verify all chunks were stored despite batching chunks.each do |chunk| - chunk_data = @redis.get("build:42:chunk:#{chunk.id}") + chunk_data = @redis.get("#{generation_prefix}:chunk:#{chunk.id}") refute_nil chunk_data, "Chunk #{chunk.id} should be stored" parsed = JSON.parse(chunk_data) @@ -237,7 +242,7 @@ def test_populate_with_many_chunks_uses_batching end # Verify all chunk IDs are in the chunks set - stored_chunks = @redis.smembers('build:42:chunks') + stored_chunks = @redis.smembers("#{generation_prefix}:chunks") chunks.each do |chunk| assert_includes stored_chunks, chunk.id end diff --git a/ruby/test/ci/queue/redis_generation_test.rb b/ruby/test/ci/queue/redis_generation_test.rb new file mode 100644 index 0000000..d801006 --- /dev/null +++ b/ruby/test/ci/queue/redis_generation_test.rb @@ -0,0 +1,230 @@ +# frozen_string_literal: true + +require 'test_helper' + +class CI::Queue::Redis::GenerationTest < Minitest::Test + include QueueHelper + + TEST_LIST = %w[ + ATest#test_foo + ATest#test_bar + BTest#test_foo + BTest#test_bar + ].freeze + + class MockTest + attr_reader :id + + def initialize(id) + @id = id + end + + def <=>(other) + id <=> other.id + end + + def flaky? + false + end + + def tests + [self] + end + end + + def setup + @redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0') + @redis = ::Redis.new(url: @redis_url) + @redis.flushdb + end + + def teardown + @redis.flushdb + end + + def test_normal_election_and_generation_scoping + w1 = worker(1, build_id: 'gen-1') + assert_predicate w1, :master? + + gen_keys = @redis.keys('*gen:*') + assert gen_keys.any?, "Expected generation-scoped Redis keys, got: #{@redis.keys('*').inspect}" + + w2 = worker(2, build_id: 'gen-1') + refute_predicate w2, :master? + + tests_seen = [] + w1.poll do |test| + tests_seen << test.id + w1.acknowledge(test) + end + w2.poll do |test| + tests_seen << test.id + w2.acknowledge(test) + end + + assert_equal TEST_LIST.sort, tests_seen.sort + end + + def test_acknowledged_test_not_re_stolen + w1 = worker(1, build_id: 'ack-1', timeout: 0.2) + + first_test = nil + w1.poll do |test| + first_test = test + w1.acknowledge(test) + break + end + refute_nil first_test + + sleep 0.3 + + w2 = worker(2, build_id: 'ack-1', timeout: 0.2) + stolen_tests = [] + w2.poll do |test| + stolen_tests << test.id + w2.acknowledge(test) + end + + refute_includes stolen_tests, first_test.id, + "Acknowledged test should not be re-stolen" + end + + def test_master_death_detection + build_id = 'death-1' + @redis.set("build:#{build_id}:master-status", "setup:fake-gen", ex: 1) + + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build_id, + timeout: 0.2, + queue_init_timeout: 2, + timing_redis_url: @redis_url, + ), + ) + + assert_raises(CI::Queue::Redis::LostMaster, CI::Queue::Redis::MasterDied) do + supervisor.wait_for_master(timeout: 2) + end + end + + def test_election_retry_on_master_death + build_id = 'retry-1' + # Pre-set master-status so first election NX fails, but TTL is short + @redis.set("build:#{build_id}:master-status", "setup:fake-gen", ex: 1) + + w = CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build_id, + worker_id: '1', + timeout: 0.2, + master_lock_ttl: 1, + max_election_attempts: 3, + queue_init_timeout: 3, + timing_redis_url: @redis_url, + ), + ) + + tests = TEST_LIST.map { |id| MockTest.new(id) } + # Lock expires after 1s, worker retries election and becomes master + result = w.populate(tests, random: Random.new(0)) + assert_equal w, result + assert_predicate w, :master? + end + + def test_cas_push_rejection + build_id = 'cas-1' + w = CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build_id, + worker_id: '1', + timeout: 0.2, + master_lock_ttl: 30, + timing_redis_url: @redis_url, + ), + ) + + # Acquire master role manually + assert w.send(:acquire_master_role?) + + # Overwrite master-status to simulate another master taking over + @redis.set("build:#{build_id}:master-status", "setup:other-gen") + + assert_raises(CI::Queue::Redis::MasterDied) do + w.send(:push, %w[test1 test2]) + end + end + + def test_generation_staleness_detection + w1 = worker(1, build_id: 'stale-1') + + # w1 is master, create a non-master worker that learns generation + w2 = worker(2, build_id: 'stale-1') + refute_predicate w2, :master? + + # Overwrite current-generation in Redis + @redis.set("build:stale-1:current-generation", "different-uuid") + + # Force staleness check by clearing the throttle + w2.instance_variable_set(:@last_generation_check_at, nil) + assert w2.send(:generation_stale?), "Expected generation to be stale after overwrite" + end + + def test_supervisor_with_generations + w = worker(1, build_id: 'sup-1') + + supervisor = CI::Queue::Redis::Supervisor.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: 'sup-1', + timeout: 5, + timing_redis_url: @redis_url, + ), + ) + + assert_equal TEST_LIST.size, supervisor.total + end + + def test_max_election_attempts_exceeded + build_id = 'max-elect-1' + # Set master-status with long TTL so it never expires during test + @redis.set("build:#{build_id}:master-status", "setup:fake-gen", ex: 60) + + w = CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build_id, + worker_id: '1', + timeout: 0.2, + master_lock_ttl: 60, + max_election_attempts: 1, + queue_init_timeout: 1, + timing_redis_url: @redis_url, + ), + ) + + tests = TEST_LIST.map { |id| MockTest.new(id) } + assert_raises(CI::Queue::Redis::LostMaster) do + w.populate(tests, random: Random.new(0)) + end + end + + private + + def worker(id, build_id: '42', timeout: 0.2, **args) + tests = args.delete(:tests) || TEST_LIST.map { |tid| MockTest.new(tid) } + queue = CI::Queue::Redis.new( + @redis_url, + CI::Queue::Configuration.new( + build_id: build_id, + worker_id: id.to_s, + timeout: timeout, + timing_redis_url: @redis_url, + **args, + ), + ) + queue.populate(tests, random: Random.new(0)) + end +end diff --git a/ruby/test/ci/queue/redis_supervisor_test.rb b/ruby/test/ci/queue/redis_supervisor_test.rb index 3692b65..51f0ef8 100644 --- a/ruby/test/ci/queue/redis_supervisor_test.rb +++ b/ruby/test/ci/queue/redis_supervisor_test.rb @@ -40,7 +40,7 @@ def test_wait_for_workers end def test_wait_for_workers_timeout - @supervisor = supervisor(timeout: 10, queue_init_timeout: 0.1) + @supervisor = supervisor(timeout: 0.2, queue_init_timeout: 0.5) io = nil thread = Thread.start do io = capture_io { @supervisor.wait_for_workers }