From 837efd2ed0717d155b4daf7af4ef7f171ef9943c Mon Sep 17 00:00:00 2001 From: Lihao He Date: Tue, 11 Nov 2025 02:17:18 -0800 Subject: [PATCH] [CI] revert default worker 0 logic --- ruby/lib/ci/queue/redis/worker.rb | 19 ++++-- .../ci/queue/redis/dynamic_timeout_test.rb | 10 +-- ruby/test/ci/queue/redis/worker_chunk_test.rb | 2 +- ruby/test/ci/queue/redis_supervisor_test.rb | 8 +-- ruby/test/ci/queue/redis_test.rb | 49 ++++++++------- ruby/test/integration/minitest_redis_test.rb | 61 +++++++++++++------ ruby/test/integration/rspec_redis_test.rb | 17 +++--- 7 files changed, 100 insertions(+), 66 deletions(-) diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index 61767dce..a52fe682 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -29,8 +29,8 @@ def distributed? end def populate(tests, random: Random.new) - # All workers need an index of tests to resolve IDs @index = tests.map { |t| [t.id, t] }.to_h +<<<<<<< HEAD @total = tests.size @@ -40,13 +40,15 @@ def populate(tests, random: Random.new) # Only the master worker reorders tests and pushes them to the queue return self unless master? +======= +>>>>>>> parent of aa33ed7 (Merge pull request #62 from figma/ebarajas/only-populate-if-master) executables = reorder_tests(tests, random: random) # Separate chunks from individual tests chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) } individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) } - # Store chunk metadata in Redis + # Store chunk metadata in Redis (only master does this) store_chunk_metadata(chunks) if chunks.any? # Push all IDs to queue (chunks + individual tests) @@ -69,7 +71,7 @@ def shutdown_required? end def master? - @config.worker_id.to_s == '0' + @master end def idle? @@ -262,9 +264,15 @@ def try_to_reserve_lost_test end def push(tests) +<<<<<<< HEAD return unless master? if redis.setnx(key('master-status'), 'setup') +======= + @total = tests.size + + if @master = redis.setnx(key('master-status'), 'setup') +>>>>>>> parent of aa33ed7 (Merge pull request #62 from figma/ebarajas/only-populate-if-master) redis.multi do |transaction| transaction.lpush(key('queue'), tests) unless tests.empty? transaction.set(key('total'), @total) @@ -275,11 +283,14 @@ def push(tests) transaction.expire(key('master-status'), config.redis_ttl) end end + register + redis.expire(key('workers'), config.redis_ttl) + rescue *CONNECTION_ERRORS + raise if @master end def register redis.sadd(key('workers'), [worker_id]) - redis.expire(key('workers'), config.redis_ttl) end private diff --git a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb index d6c5b4d8..c747187d 100644 --- a/ruby/test/ci/queue/redis/dynamic_timeout_test.rb +++ b/ruby/test/ci/queue/redis/dynamic_timeout_test.rb @@ -9,7 +9,7 @@ def setup @config = CI::Queue::Configuration.new( build_id: '42', - worker_id: '0', + worker_id: '1', timeout: 30, # 30 seconds default timeout strategy: :suite_bin_packing, suite_max_duration: 120_000, @@ -157,7 +157,7 @@ def test_reserve_test_passes_dynamic_deadline_flag 'build:42:queue', 'build:42:running', 'build:42:processed', - 'build:42:worker:0:queue', + 'build:42:worker:1:queue', 'build:42:owners', 'build:42:test-group-timeout', # 6th key for dynamic deadline ] @@ -185,7 +185,7 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag expected_keys = [ 'build:42:running', 'build:42:completed', - 'build:42:worker:0:queue', + 'build:42:worker:1:queue', 'build:42:owners', 'build:42:test-group-timeout', # 5th key for dynamic deadline ] @@ -208,7 +208,7 @@ def test_chunk_not_marked_lost_before_dynamic_timeout # Create worker with short timeout for faster test config = CI::Queue::Configuration.new( build_id: 'timeout-test', - worker_id: '0', + worker_id: '1', timeout: 0.5, # 0.5 seconds strategy: :suite_bin_packing ) @@ -243,7 +243,7 @@ def test_single_test_marked_lost_after_default_timeout # Create worker with short timeout config = CI::Queue::Configuration.new( build_id: 'single-timeout-test', - worker_id: '0', + worker_id: '1', timeout: 0.5, # 0.5 seconds ) diff --git a/ruby/test/ci/queue/redis/worker_chunk_test.rb b/ruby/test/ci/queue/redis/worker_chunk_test.rb index f840e54c..62152a63 100644 --- a/ruby/test/ci/queue/redis/worker_chunk_test.rb +++ b/ruby/test/ci/queue/redis/worker_chunk_test.rb @@ -9,7 +9,7 @@ def setup @config = CI::Queue::Configuration.new( build_id: '42', - worker_id: '0', + worker_id: '1', timeout: 0.2, strategy: :suite_bin_packing, suite_max_duration: 120_000, diff --git a/ruby/test/ci/queue/redis_supervisor_test.rb b/ruby/test/ci/queue/redis_supervisor_test.rb index e37b757b..3692b657 100644 --- a/ruby/test/ci/queue/redis_supervisor_test.rb +++ b/ruby/test/ci/queue/redis_supervisor_test.rb @@ -23,7 +23,7 @@ def test_wait_for_master master_found = @supervisor.wait_for_master end thread.wakeup - worker(0) + worker(1) thread.join assert_equal true, master_found end @@ -34,7 +34,7 @@ def test_wait_for_workers workers_done = @supervisor.wait_for_workers end thread.wakeup - poll(worker(0)) + poll(worker(1)) thread.join assert_equal true, workers_done end @@ -46,14 +46,14 @@ def test_wait_for_workers_timeout io = capture_io { @supervisor.wait_for_workers } end thread.wakeup - worker(0) + worker(1) thread.join assert_includes io, "Aborting, it seems all workers died.\n" end def test_num_workers assert_equal 0, @supervisor.workers_count - worker(0) + worker(1) assert_equal 1, @supervisor.workers_count end diff --git a/ruby/test/ci/queue/redis_test.rb b/ruby/test/ci/queue/redis_test.rb index f8c7568c..58a73605 100644 --- a/ruby/test/ci/queue/redis_test.rb +++ b/ruby/test/ci/queue/redis_test.rb @@ -72,13 +72,17 @@ def test_shutdown def test_master_election assert_predicate @queue, :master? + refute_predicate worker(2), :master? + + @redis.flushdb + assert_predicate worker(2), :master? refute_predicate worker(1), :master? end def test_exhausted_while_not_populated assert_predicate @queue, :populated? - second_worker = worker(1, populate: false) + second_worker = worker(2, populate: false) refute_predicate second_worker, :populated? refute_predicate second_worker, :exhausted? @@ -90,7 +94,7 @@ def test_exhausted_while_not_populated end def test_timed_out_test_are_picked_up_by_other_workers - second_queue = worker(1) + second_queue = worker(2) acquired = false done = false monitor = Monitor.new @@ -119,7 +123,7 @@ def test_timed_out_test_are_picked_up_by_other_workers end def test_release_immediately_timeout_the_lease - second_queue = worker(1) + second_queue = worker(2) reserved_test = nil poll(@queue) do |test| @@ -128,7 +132,7 @@ def test_release_immediately_timeout_the_lease end refute_nil reserved_test - worker(0).release! # Use a new instance to ensure we don't depend on in-memory state + worker(1).release! # Use a new instance to ensure we don't depend on in-memory state poll(second_queue) do |test| assert_equal reserved_test, test @@ -137,7 +141,7 @@ def test_release_immediately_timeout_the_lease end def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker - second_queue = worker(1) + second_queue = worker(2) acquired = false done = false monitor = Monitor.new @@ -165,7 +169,7 @@ def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker end def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker - second_queue = worker(1) + second_queue = worker(2) acquired = false done = false monitor = Monitor.new @@ -197,8 +201,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker def test_workers_register assert_equal 1, @redis.scard(('build:42:workers')) - worker(1) - puts @redis.scard(('build:42:workers')) + worker(2) assert_equal 2, @redis.scard(('build:42:workers')) end @@ -258,9 +261,9 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker # Create a chunk with 10 tests from same suite -> timeout = 0.2s * 10 = 2.0s tests = (1..10).map { |i| MockTest.new("ChunkSuite#test_#{i}") } - worker1 = worker(0, tests: tests, build_id: '100', strategy: :suite_bin_packing, + worker1 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) - worker2 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing, + worker2 = worker(2, tests: tests, build_id: '100', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false) acquired = false @@ -319,9 +322,9 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout tests = (1..5).map { |i| MockTest.new("TimeoutSuite#test_#{i}") } - worker1 = worker(0, tests: tests, build_id: '101', strategy: :suite_bin_packing, + worker1 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) - worker2 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing, + worker2 = worker(2, tests: tests, build_id: '101', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) acquired = false @@ -383,8 +386,8 @@ def test_individual_test_uses_default_timeout_after_requeue MockTest.new("SuiteC#test_1") ] - worker1 = worker(0, tests: tests, build_id: '102', timeout: 0.2) - worker2 = worker(1, tests: tests, build_id: '102', timeout: 0.2) + worker1 = worker(1, tests: tests, build_id: '102', timeout: 0.2) + worker2 = worker(2, tests: tests, build_id: '102', timeout: 0.2) acquired = false done = false @@ -440,7 +443,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration MockTest.new('TestSuite#test_2') ] - worker = worker(0, tests: tests, build_id: '200', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '200', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] @@ -471,7 +474,7 @@ def test_moving_average_takes_precedence_over_timing_file file.write(JSON.generate(timing_data)) file.close - worker = worker(0, tests: tests, build_id: '201', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '201', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0, timing_file: file.path) @@ -497,7 +500,7 @@ def test_falls_back_to_timing_file_when_no_moving_average file.write(JSON.generate(timing_data)) file.close - worker = worker(0, tests: tests, build_id: '202', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '202', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0, timing_file: file.path) @@ -517,7 +520,7 @@ def test_falls_back_to_default_when_no_moving_average_or_timing_data tests = [MockTest.new('UnknownTest#test_1')] - worker = worker(0, tests: tests, build_id: '203', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '203', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] @@ -553,7 +556,7 @@ def test_mixed_duration_sources_in_suite_splitting file.write(JSON.generate(timing_data)) file.close - worker = worker(0, tests: tests, build_id: '204', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '204', strategy: :suite_bin_packing, suite_max_duration: 120_000, suite_buffer_percent: 10, timing_fallback_duration: 100.0, timing_file: file.path) @@ -585,7 +588,7 @@ def test_moving_average_ordering_by_duration MockTest.new('MediumTest#test_1') ] - worker = worker(0, tests: tests, build_id: '205', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '205', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 100.0) chunks = [] @@ -617,7 +620,7 @@ def test_moving_average_with_partial_coverage MockTest.new('PartialTest#test_3') ] - worker = worker(0, tests: tests, build_id: '206', strategy: :suite_bin_packing, + worker = worker(1, tests: tests, build_id: '206', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 500.0) chunks = [] @@ -639,7 +642,7 @@ def test_moving_average_updates_persist_across_workers # New worker should see the persisted moving average tests = [MockTest.new('PersistTest#test_1')] - worker1 = worker(0, tests: tests, build_id: '207', strategy: :suite_bin_packing, + worker1 = worker(1, tests: tests, build_id: '207', strategy: :suite_bin_packing, suite_max_duration: 120_000, timing_fallback_duration: 1000.0) chunks = [] @@ -680,7 +683,7 @@ def shuffled_test_list end def build_queue - worker(0, max_requeues: 1, requeue_tolerance: 0.1, populate: false, max_consecutive_failures: 10) + worker(1, max_requeues: 1, requeue_tolerance: 0.1, populate: false, max_consecutive_failures: 10) end def populate(worker, tests: TEST_LIST.dup) diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 803299fe..d58b8c43 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -30,7 +30,7 @@ def test_buildkite_output '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -53,7 +53,7 @@ def test_custom_requeue '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -75,7 +75,7 @@ def test_max_test_failed '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -121,7 +121,7 @@ def test_circuit_breaker '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -144,7 +144,7 @@ def test_redis_runner '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -164,7 +164,7 @@ def test_redis_runner '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -186,7 +186,7 @@ def test_retry_success '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -205,7 +205,7 @@ def test_retry_success '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -226,7 +226,7 @@ def test_retry_fails_when_test_run_is_expired '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -249,7 +249,7 @@ def test_retry_fails_when_test_run_is_expired '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -271,7 +271,7 @@ def test_retry_report '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '-Itest', 'test/failing_test.rb', @@ -325,7 +325,7 @@ def test_retry_report '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '-Itest', 'test/failing_test.rb', @@ -352,6 +352,27 @@ def test_retry_report assert_equal expect, normalize(out.strip.lines[1].strip) end + def test_down_redis + out, err = capture_subprocess_io do + system( + @exe, 'run', + '--queue', 'redis://localhost:1337', + '--seed', 'foobar', + '--build', '1', + '--worker', '1', + '--timeout', '1', + '--max-requeues', '1', + '--requeue-tolerance', '1', + '-Itest', + 'test/dummy_test.rb', + chdir: 'test/fixtures/', + ) + end + assert_empty err + output = normalize(out.lines.last.strip) + assert_equal 'Ran 0 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs', output + end + def test_test_data_reporter out, err = capture_subprocess_io do system( @@ -361,7 +382,7 @@ def test_test_data_reporter '--seed', 'foobar', '--namespace', 'foo', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -434,7 +455,7 @@ def test_test_data_time_reporter '--seed', 'foobar', '--namespace', 'foo', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '10', '-Itest', '--record-duration-averages=true', @@ -463,7 +484,7 @@ def test_junit_reporter '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -549,7 +570,7 @@ def test_redis_reporter_failure_file '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -606,7 +627,7 @@ def test_redis_reporter_flaky_tests_file '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -644,7 +665,7 @@ def test_redis_reporter_export_timing_file '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--track-test-duration', '-Itest', @@ -694,7 +715,7 @@ def test_redis_reporter '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -748,7 +769,7 @@ def test_utf8_tests_and_marshal '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '-Itest', 'test/utf8_test.rb', diff --git a/ruby/test/integration/rspec_redis_test.rb b/ruby/test/integration/rspec_redis_test.rb index 7a08e88e..ab365615 100644 --- a/ruby/test/integration/rspec_redis_test.rb +++ b/ruby/test/integration/rspec_redis_test.rb @@ -23,7 +23,7 @@ def test_redis_runner '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -78,7 +78,7 @@ def test_redis_runner_retry '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -120,7 +120,7 @@ def test_redis_runner_retry '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -154,7 +154,7 @@ def test_retry_report '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', chdir: 'test/fixtures/', ) @@ -205,7 +205,7 @@ def test_retry_report '--seed', '123', '--build', '1', '--timeout', '1', - '--worker', '0', + '--worker', '1', chdir: 'test/fixtures/', ) end @@ -257,7 +257,7 @@ def test_before_suite_errors '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', @@ -297,7 +297,7 @@ def test_report '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', '--max-requeues', '0', '--requeue-tolerance', '0', @@ -365,7 +365,6 @@ def test_report end def test_world_wants_to_quit - skip "This test is broken with our changes to worker IDs" out, err = capture_subprocess_io do system( { 'EARLY_EXIT' => '1' }, @@ -373,7 +372,7 @@ def test_world_wants_to_quit '--queue', @redis_url, '--seed', '123', '--build', '1', - '--worker', '0', + '--worker', '1', '--timeout', '1', chdir: 'test/fixtures/early_exit_suite', )