Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def distributed?
end

def populate(tests, random: Random.new)
# All workers need an index of tests to resolve IDs
@index = tests.map { |t| [t.id, t] }.to_h
<<<<<<< HEAD

@total = tests.size

Expand All @@ -40,13 +40,15 @@ def populate(tests, random: Random.new)
# Only the master worker reorders tests and pushes them to the queue
return self unless master?

=======
>>>>>>> parent of aa33ed7 (Merge pull request #62 from figma/ebarajas/only-populate-if-master)
executables = reorder_tests(tests, random: random)

# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) }

# Store chunk metadata in Redis
# Store chunk metadata in Redis (only master does this)
store_chunk_metadata(chunks) if chunks.any?

# Push all IDs to queue (chunks + individual tests)
Expand All @@ -69,7 +71,7 @@ def shutdown_required?
end

def master?
@config.worker_id.to_s == '0'
@master
end

def idle?
Expand Down Expand Up @@ -262,9 +264,15 @@ def try_to_reserve_lost_test
end

def push(tests)
<<<<<<< HEAD
return unless master?

if redis.setnx(key('master-status'), 'setup')
=======
@total = tests.size

if @master = redis.setnx(key('master-status'), 'setup')
>>>>>>> parent of aa33ed7 (Merge pull request #62 from figma/ebarajas/only-populate-if-master)
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
Expand All @@ -275,11 +283,14 @@ def push(tests)
transaction.expire(key('master-status'), config.redis_ttl)
end
end
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if @master
end

def register
redis.sadd(key('workers'), [worker_id])
redis.expire(key('workers'), config.redis_ttl)
end

private
Expand Down
10 changes: 5 additions & 5 deletions ruby/test/ci/queue/redis/dynamic_timeout_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setup

@config = CI::Queue::Configuration.new(
build_id: '42',
worker_id: '0',
worker_id: '1',
timeout: 30, # 30 seconds default timeout
strategy: :suite_bin_packing,
suite_max_duration: 120_000,
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_reserve_test_passes_dynamic_deadline_flag
'build:42:queue',
'build:42:running',
'build:42:processed',
'build:42:worker:0:queue',
'build:42:worker:1:queue',
'build:42:owners',
'build:42:test-group-timeout', # 6th key for dynamic deadline
]
Expand Down Expand Up @@ -185,7 +185,7 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag
expected_keys = [
'build:42:running',
'build:42:completed',
'build:42:worker:0:queue',
'build:42:worker:1:queue',
'build:42:owners',
'build:42:test-group-timeout', # 5th key for dynamic deadline
]
Expand All @@ -208,7 +208,7 @@ def test_chunk_not_marked_lost_before_dynamic_timeout
# Create worker with short timeout for faster test
config = CI::Queue::Configuration.new(
build_id: 'timeout-test',
worker_id: '0',
worker_id: '1',
timeout: 0.5, # 0.5 seconds
strategy: :suite_bin_packing
)
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_single_test_marked_lost_after_default_timeout
# Create worker with short timeout
config = CI::Queue::Configuration.new(
build_id: 'single-timeout-test',
worker_id: '0',
worker_id: '1',
timeout: 0.5, # 0.5 seconds
)

Expand Down
2 changes: 1 addition & 1 deletion ruby/test/ci/queue/redis/worker_chunk_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setup

@config = CI::Queue::Configuration.new(
build_id: '42',
worker_id: '0',
worker_id: '1',
timeout: 0.2,
strategy: :suite_bin_packing,
suite_max_duration: 120_000,
Expand Down
8 changes: 4 additions & 4 deletions ruby/test/ci/queue/redis_supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_wait_for_master
master_found = @supervisor.wait_for_master
end
thread.wakeup
worker(0)
worker(1)
thread.join
assert_equal true, master_found
end
Expand All @@ -34,7 +34,7 @@ def test_wait_for_workers
workers_done = @supervisor.wait_for_workers
end
thread.wakeup
poll(worker(0))
poll(worker(1))
thread.join
assert_equal true, workers_done
end
Expand All @@ -46,14 +46,14 @@ def test_wait_for_workers_timeout
io = capture_io { @supervisor.wait_for_workers }
end
thread.wakeup
worker(0)
worker(1)
thread.join
assert_includes io, "Aborting, it seems all workers died.\n"
end

def test_num_workers
assert_equal 0, @supervisor.workers_count
worker(0)
worker(1)
assert_equal 1, @supervisor.workers_count
end

Expand Down
49 changes: 26 additions & 23 deletions ruby/test/ci/queue/redis_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,17 @@ def test_shutdown

def test_master_election
assert_predicate @queue, :master?
refute_predicate worker(2), :master?

@redis.flushdb
assert_predicate worker(2), :master?
refute_predicate worker(1), :master?
end

def test_exhausted_while_not_populated
assert_predicate @queue, :populated?

second_worker = worker(1, populate: false)
second_worker = worker(2, populate: false)

refute_predicate second_worker, :populated?
refute_predicate second_worker, :exhausted?
Expand All @@ -90,7 +94,7 @@ def test_exhausted_while_not_populated
end

def test_timed_out_test_are_picked_up_by_other_workers
second_queue = worker(1)
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
Expand Down Expand Up @@ -119,7 +123,7 @@ def test_timed_out_test_are_picked_up_by_other_workers
end

def test_release_immediately_timeout_the_lease
second_queue = worker(1)
second_queue = worker(2)

reserved_test = nil
poll(@queue) do |test|
Expand All @@ -128,7 +132,7 @@ def test_release_immediately_timeout_the_lease
end
refute_nil reserved_test

worker(0).release! # Use a new instance to ensure we don't depend on in-memory state
worker(1).release! # Use a new instance to ensure we don't depend on in-memory state

poll(second_queue) do |test|
assert_equal reserved_test, test
Expand All @@ -137,7 +141,7 @@ def test_release_immediately_timeout_the_lease
end

def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker
second_queue = worker(1)
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
Expand Down Expand Up @@ -165,7 +169,7 @@ def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker
end

def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker
second_queue = worker(1)
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
Expand Down Expand Up @@ -197,8 +201,7 @@ def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker

def test_workers_register
assert_equal 1, @redis.scard(('build:42:workers'))
worker(1)
puts @redis.scard(('build:42:workers'))
worker(2)
assert_equal 2, @redis.scard(('build:42:workers'))
end

Expand Down Expand Up @@ -258,9 +261,9 @@ def test_chunk_with_dynamic_timeout_not_stolen_by_other_worker
# Create a chunk with 10 tests from same suite -> timeout = 0.2s * 10 = 2.0s
tests = (1..10).map { |i| MockTest.new("ChunkSuite#test_#{i}") }

worker1 = worker(0, tests: tests, build_id: '100', strategy: :suite_bin_packing,
worker1 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0)
worker2 = worker(1, tests: tests, build_id: '100', strategy: :suite_bin_packing,
worker2 = worker(2, tests: tests, build_id: '100', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0, populate: false)

acquired = false
Expand Down Expand Up @@ -319,9 +322,9 @@ def test_chunk_with_dynamic_timeout_picked_up_after_timeout

tests = (1..5).map { |i| MockTest.new("TimeoutSuite#test_#{i}") }

worker1 = worker(0, tests: tests, build_id: '101', strategy: :suite_bin_packing,
worker1 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0)
worker2 = worker(1, tests: tests, build_id: '101', strategy: :suite_bin_packing,
worker2 = worker(2, tests: tests, build_id: '101', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0)

acquired = false
Expand Down Expand Up @@ -383,8 +386,8 @@ def test_individual_test_uses_default_timeout_after_requeue
MockTest.new("SuiteC#test_1")
]

worker1 = worker(0, tests: tests, build_id: '102', timeout: 0.2)
worker2 = worker(1, tests: tests, build_id: '102', timeout: 0.2)
worker1 = worker(1, tests: tests, build_id: '102', timeout: 0.2)
worker2 = worker(2, tests: tests, build_id: '102', timeout: 0.2)

acquired = false
done = false
Expand Down Expand Up @@ -440,7 +443,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration
MockTest.new('TestSuite#test_2')
]

worker = worker(0, tests: tests, build_id: '200', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '200', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0)

chunks = []
Expand Down Expand Up @@ -471,7 +474,7 @@ def test_moving_average_takes_precedence_over_timing_file
file.write(JSON.generate(timing_data))
file.close

worker = worker(0, tests: tests, build_id: '201', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '201', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0,
timing_file: file.path)

Expand All @@ -497,7 +500,7 @@ def test_falls_back_to_timing_file_when_no_moving_average
file.write(JSON.generate(timing_data))
file.close

worker = worker(0, tests: tests, build_id: '202', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '202', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0,
timing_file: file.path)

Expand All @@ -517,7 +520,7 @@ def test_falls_back_to_default_when_no_moving_average_or_timing_data

tests = [MockTest.new('UnknownTest#test_1')]

worker = worker(0, tests: tests, build_id: '203', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '203', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 500.0)

chunks = []
Expand Down Expand Up @@ -553,7 +556,7 @@ def test_mixed_duration_sources_in_suite_splitting
file.write(JSON.generate(timing_data))
file.close

worker = worker(0, tests: tests, build_id: '204', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '204', strategy: :suite_bin_packing,
suite_max_duration: 120_000, suite_buffer_percent: 10,
timing_fallback_duration: 100.0, timing_file: file.path)

Expand Down Expand Up @@ -585,7 +588,7 @@ def test_moving_average_ordering_by_duration
MockTest.new('MediumTest#test_1')
]

worker = worker(0, tests: tests, build_id: '205', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '205', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 100.0)

chunks = []
Expand Down Expand Up @@ -617,7 +620,7 @@ def test_moving_average_with_partial_coverage
MockTest.new('PartialTest#test_3')
]

worker = worker(0, tests: tests, build_id: '206', strategy: :suite_bin_packing,
worker = worker(1, tests: tests, build_id: '206', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 500.0)

chunks = []
Expand All @@ -639,7 +642,7 @@ def test_moving_average_updates_persist_across_workers

# New worker should see the persisted moving average
tests = [MockTest.new('PersistTest#test_1')]
worker1 = worker(0, tests: tests, build_id: '207', strategy: :suite_bin_packing,
worker1 = worker(1, tests: tests, build_id: '207', strategy: :suite_bin_packing,
suite_max_duration: 120_000, timing_fallback_duration: 1000.0)

chunks = []
Expand Down Expand Up @@ -680,7 +683,7 @@ def shuffled_test_list
end

def build_queue
worker(0, max_requeues: 1, requeue_tolerance: 0.1, populate: false, max_consecutive_failures: 10)
worker(1, max_requeues: 1, requeue_tolerance: 0.1, populate: false, max_consecutive_failures: 10)
end

def populate(worker, tests: TEST_LIST.dup)
Expand Down
Loading
Loading