Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ class << self
self.requeue_offset = 42

class Worker < Base
attr_reader :total
def total
@total ||= redis.get(key('total')).to_i
end

def initialize(redis, config)
@reserved_test = nil
Expand All @@ -29,29 +31,21 @@ def distributed?
end

def populate(tests, random: Random.new)
# All workers need an index of tests to resolve IDs
@index = tests.map { |t| [t.id, t] }.to_h

@total = tests.size

# After populating the index, "register" the worker with the build
register

# Only the master worker reorders tests and pushes them to the queue
return self unless master?
if acquire_master_role?
executables = reorder_tests(tests, random: random)

executables = reorder_tests(tests, random: random)
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) }

# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) }
store_chunk_metadata(chunks) if chunks.any?

# Store chunk metadata in Redis
store_chunk_metadata(chunks) if chunks.any?
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
end

# Push all IDs to queue (chunks + individual tests)
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
register_worker_presence

self
end
Expand All @@ -69,7 +63,7 @@ def shutdown_required?
end

def master?
@config.worker_id.to_s == '0'
@master
end

def idle?
Expand Down Expand Up @@ -262,9 +256,9 @@ def try_to_reserve_lost_test
end

def push(tests)
return unless master?
@total = tests.size

if redis.setnx(key('master-status'), 'setup')
if @master
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
Expand All @@ -275,15 +269,32 @@ def push(tests)
transaction.expire(key('master-status'), config.redis_ttl)
end
end
rescue *CONNECTION_ERRORS
raise if @master
end

def register
redis.sadd(key('workers'), [worker_id])
redis.expire(key('workers'), config.redis_ttl)
end

private

def acquire_master_role?
return true if @master

@master = redis.setnx(key('master-status'), 'setup')
rescue *CONNECTION_ERRORS
@master = nil
false
end

def register_worker_presence
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if master?
end

def store_chunk_metadata(chunks)
# Batch operations to avoid exceeding Redis multi operation limits
# Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively
Expand Down
10 changes: 5 additions & 5 deletions ruby/test/ci/queue/redis/dynamic_timeout_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setup

@config = CI::Queue::Configuration.new(
build_id: '42',
worker_id: '0',
worker_id: '1',
timeout: 30, # 30 seconds default timeout
strategy: :suite_bin_packing,
suite_max_duration: 120_000,
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_reserve_test_passes_dynamic_deadline_flag
'build:42:queue',
'build:42:running',
'build:42:processed',
'build:42:worker:0:queue',
'build:42:worker:1:queue',
'build:42:owners',
'build:42:test-group-timeout', # 6th key for dynamic deadline
]
Expand Down Expand Up @@ -185,7 +185,7 @@ def test_reserve_lost_test_passes_dynamic_deadline_flag
expected_keys = [
'build:42:running',
'build:42:completed',
'build:42:worker:0:queue',
'build:42:worker:1:queue',
'build:42:owners',
'build:42:test-group-timeout', # 5th key for dynamic deadline
]
Expand All @@ -208,7 +208,7 @@ def test_chunk_not_marked_lost_before_dynamic_timeout
# Create worker with short timeout for faster test
config = CI::Queue::Configuration.new(
build_id: 'timeout-test',
worker_id: '0',
worker_id: '1',
timeout: 0.5, # 0.5 seconds
strategy: :suite_bin_packing
)
Expand Down Expand Up @@ -243,7 +243,7 @@ def test_single_test_marked_lost_after_default_timeout
# Create worker with short timeout
config = CI::Queue::Configuration.new(
build_id: 'single-timeout-test',
worker_id: '0',
worker_id: '1',
timeout: 0.5, # 0.5 seconds
)

Expand Down
27 changes: 26 additions & 1 deletion ruby/test/ci/queue/redis/worker_chunk_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def setup

@config = CI::Queue::Configuration.new(
build_id: '42',
worker_id: '0',
worker_id: '1',
timeout: 0.2,
strategy: :suite_bin_packing,
suite_max_duration: 120_000,
Expand Down Expand Up @@ -63,6 +63,31 @@ def test_populate_stores_partial_suite_with_test_ids
assert_equal test_ids, parsed['test_ids']
end

def test_non_master_does_not_reorder_tests
tests = create_mock_tests(['TestA#test_1'])

@worker.stub(:reorder_tests, tests) do
@worker.populate(tests)
end

secondary_config = @config.dup
secondary_config.instance_variable_set(:@worker_id, '2')
secondary_worker = CI::Queue::Redis.new(@redis_url, secondary_config)

reorder_calls = 0
secondary_worker.stub(:reorder_tests, ->(passed_tests, **_) do
reorder_calls += 1
passed_tests
end) do
secondary_worker.populate(tests)
end

assert_equal 0, reorder_calls, 'Non-master workers should not reorder tests'
refute secondary_worker.master?, 'Secondary worker should not become master'
assert_equal @worker.total, secondary_worker.total
assert_includes @redis.smembers('build:42:workers'), '2'
end

def test_chunk_id_detection
assert @worker.send(:chunk_id?, 'TestA:full_suite')
assert @worker.send(:chunk_id?, 'TestB:chunk_0')
Expand Down
8 changes: 4 additions & 4 deletions ruby/test/ci/queue/redis_supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def test_wait_for_master
master_found = @supervisor.wait_for_master
end
thread.wakeup
worker(0)
worker(1)
thread.join
assert_equal true, master_found
end
Expand All @@ -34,7 +34,7 @@ def test_wait_for_workers
workers_done = @supervisor.wait_for_workers
end
thread.wakeup
poll(worker(0))
poll(worker(1))
thread.join
assert_equal true, workers_done
end
Expand All @@ -46,14 +46,14 @@ def test_wait_for_workers_timeout
io = capture_io { @supervisor.wait_for_workers }
end
thread.wakeup
worker(0)
worker(1)
thread.join
assert_includes io, "Aborting, it seems all workers died.\n"
end

def test_num_workers
assert_equal 0, @supervisor.workers_count
worker(0)
worker(1)
assert_equal 1, @supervisor.workers_count
end

Expand Down
Loading