From 63f0a8fac2b5d04b29d2394127f55792856ecb90 Mon Sep 17 00:00:00 2001 From: Lihao He Date: Tue, 11 Nov 2025 02:33:02 -0800 Subject: [PATCH] [CI] pick back up the leader election logic --- ruby/lib/ci/queue/redis/worker.rb | 41 +++++++++++++------ ruby/test/ci/queue/redis/worker_chunk_test.rb | 25 +++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 2d02b29..3a6d4cd 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -31,18 +31,21 @@ def distributed? def populate(tests, random: Random.new) # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h - executables = reorder_tests(tests, random: random) + @total = tests.size + + if acquire_master_role? + executables = reorder_tests(tests, random: random) - # Separate chunks from individual tests - chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } - individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) } + chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } + individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) } - # Store chunk metadata in Redis (only master does this) - store_chunk_metadata(chunks) if chunks.any? + store_chunk_metadata(chunks) if chunks.any? + + all_ids = chunks.map(&:id) + individual_tests.map(&:id) + push(all_ids) + end - # Push all IDs to queue (chunks + individual tests) - all_ids = chunks.map(&:id) + individual_tests.map(&:id) - push(all_ids) + register_worker_presence self end @@ -255,7 +258,7 @@ def try_to_reserve_lost_test def push(tests) @total = tests.size - if @master = redis.setnx(key('master-status'), 'setup') + if @master redis.multi do |transaction| transaction.lpush(key('queue'), tests) unless tests.empty? transaction.set(key('total'), @total) @@ -266,8 +269,6 @@ def push(tests) transaction.expire(key('master-status'), config.redis_ttl) end end - register - redis.expire(key('workers'), config.redis_ttl) rescue *CONNECTION_ERRORS raise if @master end @@ -278,6 +279,22 @@ def register private + def acquire_master_role? + return true if @master + + @master = redis.setnx(key('master-status'), 'setup') + rescue *CONNECTION_ERRORS + @master = nil + false + end + + def register_worker_presence + register + redis.expire(key('workers'), config.redis_ttl) + rescue *CONNECTION_ERRORS + raise if master? + end + def store_chunk_metadata(chunks) # Batch operations to avoid exceeding Redis multi operation limits # Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively diff --git a/ruby/test/ci/queue/redis/worker_chunk_test.rb b/ruby/test/ci/queue/redis/worker_chunk_test.rb index 62152a6..44f7e4b 100644 --- a/ruby/test/ci/queue/redis/worker_chunk_test.rb +++ b/ruby/test/ci/queue/redis/worker_chunk_test.rb @@ -63,6 +63,31 @@ def test_populate_stores_partial_suite_with_test_ids assert_equal test_ids, parsed['test_ids'] end + def test_non_master_does_not_reorder_tests + tests = create_mock_tests(['TestA#test_1']) + + @worker.stub(:reorder_tests, tests) do + @worker.populate(tests) + end + + secondary_config = @config.dup + secondary_config.instance_variable_set(:@worker_id, '2') + secondary_worker = CI::Queue::Redis.new(@redis_url, secondary_config) + + reorder_calls = 0 + secondary_worker.stub(:reorder_tests, ->(passed_tests, **_) do + reorder_calls += 1 + passed_tests + end) do + secondary_worker.populate(tests) + end + + assert_equal 0, reorder_calls, 'Non-master workers should not reorder tests' + refute secondary_worker.master?, 'Secondary worker should not become master' + assert_equal @worker.total, secondary_worker.total + assert_includes @redis.smembers('build:42:workers'), '2' + end + def test_chunk_id_detection assert @worker.send(:chunk_id?, 'TestA:full_suite') assert @worker.send(:chunk_id?, 'TestB:chunk_0')