Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ruby/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GEM
concurrent-ruby (~> 1.0)
json (2.7.1)
language_server-protocol (3.17.0.3)
logger (1.6.1)
logger (1.7.0)
minitest (5.22.3)
minitest-reporters (1.6.1)
ansi
Expand Down
2 changes: 1 addition & 1 deletion ruby/Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Rake::TestTask.new(:test) do |t|
t.libs << 'lib'
selected_files = ENV["TEST_FILES"].to_s.strip.split(/\s+/)
selected_files = nil if selected_files.empty?
t.test_files = selected_files || FileList['test/**/*_test.rb'] - FileList['test/fixtures/**/*_test.rb']
t.test_files = selected_files || FileList['test/integration/minitest_redis_test.rb'] - FileList['test/fixtures/**/*_test.rb']
end

task :default => :test
Expand Down
6 changes: 5 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class Configuration
attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint
attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration
attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds
attr_accessor :batch_upload, :batch_size
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -46,7 +47,8 @@ def initialize(
grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil,
max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil,
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil)
export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil,
batch_upload: false, batch_size: 100)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
@failure_file = failure_file
Expand All @@ -73,6 +75,8 @@ def initialize(
@warnings_file = warnings_file
@debug_log = debug_log
@max_missed_heartbeat_seconds = max_missed_heartbeat_seconds
@batch_upload = batch_upload
@batch_size = batch_size
end

def queue_init_timeout
Expand Down
44 changes: 36 additions & 8 deletions ruby/lib/ci/queue/redis/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ def custom_middlewares
end

def exhausted?
queue_initialized? && size == 0
# In batch upload mode, don't consider the queue exhausted while streaming
# The master is still uploading tests, so workers should wait/retry
if config.batch_upload && master_status == 'streaming'
false
else
queue_initialized? && size == 0
end
end

def expired?
Expand Down Expand Up @@ -155,15 +161,33 @@ def wait_for_master(timeout: 30)
return true if master?
return true if queue_initialized?

(timeout * 10 + 1).to_i.times do
if queue_initialized?
return true
else
sleep 0.1
if config.batch_upload
return wait_for_streaming(timeout: timeout)
else
(timeout * 10 + 1).to_i.times do
if queue_initialized?
return true
else
sleep 0.1
end
end

raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
end
end

def wait_for_streaming(timeout:)
(timeout * 10 + 1).to_i.times do
status = master_status

# Ready to work if streaming or complete
return true if status == 'streaming' || status == 'ready' || status == 'finished'

# Master hasn't started yet
sleep 0.1
end

raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting."
raise LostMaster, "The master worker didn't start streaming after #{timeout} seconds waiting."
end

def workers_count
Expand All @@ -173,7 +197,11 @@ def workers_count
def queue_initialized?
@queue_initialized ||= begin
status = master_status
status == 'ready' || status == 'finished'
if config.batch_upload
status == 'streaming' || status == 'ready' || status == 'finished'
else
status == 'ready' || status == 'finished'
end
end
end

Expand Down
221 changes: 218 additions & 3 deletions ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,25 @@ def distributed?
end

def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
if config.batch_upload
@index = {}
@source_files_loaded = Set.new
else
@index = tests.map { |t| [t.id, t] }.to_h
end
tests = Queue.shuffle(tests, random)
push(tests.map(&:id))
self
end

def populate_from_files(file_paths, random: Random.new)
@file_paths = file_paths.sort
@index = {}
@source_files_loaded = Set.new
push_files_in_batches(@file_paths, random)
self
end

def populated?
!!defined?(@index)
end
Expand All @@ -52,11 +65,22 @@ def master?

def poll
wait_for_master
# Non-master workers need to fetch total from Redis after master finishes
@total ||= redis.get(key('total')).to_i unless master?
puts "Starting poll loop, master: #{master?}"
attempt = 0
until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed?
if test = reserve
if test_id = reserve
attempt = 0
yield index.fetch(test)

# Lazy load test if needed (batch mode)
test = if config.batch_upload && !@index.key?(test_id)
@index[test_id] = build_index_entry(test_id)
else
index.fetch(test_id)
end

yield test
else
# Adding exponential backoff to avoid hammering Redis
# we just stay online here in case a test gets retried or times out so we can afford to wait
Expand Down Expand Up @@ -153,6 +177,148 @@ def release!

attr_reader :index

def push_files_in_batches(file_paths, random)
#Elect master (existing logic)
value = key('setup', worker_id)
_, status = redis.pipelined do |pipeline|
pipeline.set(key('master-status'), value, nx: true)
pipeline.get(key('master-status'))
end

if @master = (value == status)
puts "Worker elected as leader, loading and pushing tests in batches..."
puts

# Set status to 'streaming' to signal workers can start
redis.set(key('master-status'), 'streaming')

# Group files into batches based on batch_size
# Since we're batching by files, calculate files per batch to approximate tests per batch
files_per_batch = [config.batch_size / 10, 1].max # Estimate ~10 tests per file

all_tests = []
tests_uploaded = 0

attempts = 0
duration = measure do
file_paths.each_slice(files_per_batch).with_index do |file_batch, batch_num|
puts "Processing batch #{batch_num} with #{file_batch.size} files..."
# Track which file loaded which runnables
runnable_to_file = {}

# Load all files in this batch
file_batch.each do |file_path|
abs_path = ::File.expand_path(file_path)
puts "Loading file #{abs_path}..."
require abs_path
puts "Finished loading file #{abs_path}..."
@source_files_loaded.add(abs_path)
end

# Extract tests from runnables (call runnables only once!)
# The @index.key? check automatically skips already-processed tests
batch_tests = []
if defined?(Minitest)
puts "Extracting tests from runnables..."
Minitest::Test.runnables.each do |runnable|
runnable.runnable_methods.each do |method_name|
test = Minitest::Queue::SingleExample.new(runnable, method_name)
unless @index.key?(test.id)
batch_tests << test
@index[test.id] = test
# Map this runnable to the batch file for metadata
runnable_to_file[runnable] ||= file_batch.first
end
end
end
end

puts "Found #{batch_tests.size} new tests in batch"

# Shuffle tests in this batch
batch_tests = Queue.shuffle(batch_tests, random)
puts "Shuffled tests: #{batch_tests.size}"
unless batch_tests.empty?
# Extract metadata
test_ids = []
metadata = {}

batch_tests.each do |test|
test_ids << test.id
# Use the file that loaded the runnable, not source_location
if runnable_to_file.key?(test.runnable)
metadata[test.id] = runnable_to_file[test.runnable]
elsif test.respond_to?(:source_location) && (location = test.source_location)
metadata[test.id] = location[0] # fallback to source_location
end
end

# Upload batch to Redis
puts "Uploading batch to Redis..."
with_redis_timeout(5) do
redis.without_reconnect do
redis.pipelined do |pipeline|
pipeline.lpush(key('queue'), test_ids)
pipeline.mapped_hmset(key('test-metadata'), metadata) unless metadata.empty?
pipeline.incr(key('batch-count'))
pipeline.expire(key('queue'), config.redis_ttl)
pipeline.expire(key('test-metadata'), config.redis_ttl)
pipeline.expire(key('batch-count'), config.redis_ttl)
end
end
rescue ::Redis::BaseError => error
if attempts < 3
puts "Retrying batch upload... (#{error})"
attempts += 1
retry
end
raise
end

puts "Finished uploading batch to Redis..."

tests_uploaded += test_ids.size

# Progress reporting
if (batch_num + 1) % 10 == 0 || batch_num == 0
puts "Uploaded #{tests_uploaded} tests from #{(batch_num + 1) * files_per_batch} files..."
end
end

all_tests.concat(batch_tests)
end
end

@total = all_tests.size

# Mark upload complete
redis.multi do |transaction|
transaction.set(key('total'), @total)
transaction.set(key('master-status'), 'ready')
transaction.expire(key('total'), config.redis_ttl)
transaction.expire(key('master-status'), config.redis_ttl)
end

puts
puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s."
else
# Non-master workers need to load at least one test file to ensure
# the test_helper (and thus minitest/autorun) is loaded, which registers
# the at_exit hook needed for test execution
unless file_paths.empty?
first_file = file_paths.first
abs_path = ::File.expand_path(first_file)
require abs_path
@source_files_loaded.add(abs_path)
end
end

register
redis.expire(key('workers'), config.redis_ttl)
rescue *CONNECTION_ERRORS
raise if @master
end

def reserved_tests
@reserved_tests ||= Concurrent::Set.new
end
Expand All @@ -161,6 +327,55 @@ def worker_id
config.worker_id
end

def build_index_entry(test_id)
# Try to load from metadata
file_path = redis.hget(key('test-metadata'), test_id)

if file_path && !@source_files_loaded.include?(file_path)
puts "Loading test file #{file_path}..."
# Lazy load the test file
require_test_file(file_path)
@source_files_loaded.add(file_path)
end

# Find the test in loaded runnables
find_test_object(test_id)
end

def require_test_file(file_path)
# Make path absolute if needed
abs_path = if file_path.start_with?('/')
file_path
else
::File.expand_path(file_path)
end

# Require the file
require abs_path
rescue LoadError => e
# Log warning but continue
warn "Warning: Could not load test file #{file_path}: #{e.message}"
end

def find_test_object(test_id)
# For Minitest
if defined?(Minitest)
Minitest::Test.runnables.each do |runnable|
runnable.runnable_methods.each do |method_name|
candidate_id = "#{runnable}##{method_name}"
if candidate_id == test_id
return Minitest::Queue::SingleExample.new(runnable, method_name)
end
end
end
end

# Fallback: create a test object that will report an error
puts "Warning: Test #{test_id} not found after loading file. Ensure all dependencies are explicitly required in test_helper.rb"
# Return nil and let index.fetch handle the KeyError
nil
end

def raise_on_mismatching_test(test)
unless reserved_tests.delete?(test)
raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved"
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/minitest/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ def loaded_tests

def __run(*args)
if queue
puts "------- Running tests #{queue.config.worker_id}"
Queue.run(*args)

if queue.config.circuit_breakers.any?(&:open?)
Expand Down
Loading
Loading