Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,21 @@ def distributed?
def populate(tests, random: Random.new)
# All workers need an index of tests to resolve IDs
@index = tests.map { |t| [t.id, t] }.to_h
executables = reorder_tests(tests, random: random)
@total = tests.size

if acquire_master_role?
executables = reorder_tests(tests, random: random)

# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.select { |e| !e.is_a?(CI::Queue::TestChunk) }
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
individual_tests = executables.reject { |e| e.is_a?(CI::Queue::TestChunk) }

# Store chunk metadata in Redis (only master does this)
store_chunk_metadata(chunks) if chunks.any?
store_chunk_metadata(chunks) if chunks.any?

all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
end

# Push all IDs to queue (chunks + individual tests)
all_ids = chunks.map(&:id) + individual_tests.map(&:id)
push(all_ids)
register_worker_presence

self
end
Expand Down Expand Up @@ -255,7 +258,7 @@ def try_to_reserve_lost_test
def push(tests)
@total = tests.size

if @master = redis.setnx(key('master-status'), 'setup')
if @master
redis.multi do |transaction|
transaction.lpush(key('queue'), tests) unless tests.empty?
transaction.set(key('total'), @total)
Expand All @@ -266,8 +269,6 @@ def push(tests)
transaction.expire(key('master-status'), config.redis_ttl)
end
end
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if @master
end
Expand All @@ -278,6 +279,22 @@ def register

private

def acquire_master_role?
return true if @master

@master = redis.setnx(key('master-status'), 'setup')
rescue *CONNECTION_ERRORS
@master = nil
false
end

def register_worker_presence
register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if master?
end

def store_chunk_metadata(chunks)
# Batch operations to avoid exceeding Redis multi operation limits
# Each chunk requires 4 commands (set, expire, sadd, hset), so batch conservatively
Expand Down
25 changes: 25 additions & 0 deletions ruby/test/ci/queue/redis/worker_chunk_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,31 @@ def test_populate_stores_partial_suite_with_test_ids
assert_equal test_ids, parsed['test_ids']
end

def test_non_master_does_not_reorder_tests
tests = create_mock_tests(['TestA#test_1'])

@worker.stub(:reorder_tests, tests) do
@worker.populate(tests)
end

secondary_config = @config.dup
secondary_config.instance_variable_set(:@worker_id, '2')
secondary_worker = CI::Queue::Redis.new(@redis_url, secondary_config)

reorder_calls = 0
secondary_worker.stub(:reorder_tests, ->(passed_tests, **_) do
reorder_calls += 1
passed_tests
end) do
secondary_worker.populate(tests)
end

assert_equal 0, reorder_calls, 'Non-master workers should not reorder tests'
refute secondary_worker.master?, 'Secondary worker should not become master'
assert_equal @worker.total, secondary_worker.total
assert_includes @redis.smembers('build:42:workers'), '2'
end

def test_chunk_id_detection
assert @worker.send(:chunk_id?, 'TestA:full_suite')
assert @worker.send(:chunk_id?, 'TestB:chunk_0')
Expand Down