diff --git a/ruby/Gemfile.lock b/ruby/Gemfile.lock index d26823bc..f8708ee6 100644 --- a/ruby/Gemfile.lock +++ b/ruby/Gemfile.lock @@ -31,7 +31,7 @@ GEM concurrent-ruby (~> 1.0) json (2.7.1) language_server-protocol (3.17.0.3) - logger (1.6.1) + logger (1.7.0) minitest (5.22.3) minitest-reporters (1.6.1) ansi diff --git a/ruby/Rakefile b/ruby/Rakefile index d4219120..f7e8dde7 100644 --- a/ruby/Rakefile +++ b/ruby/Rakefile @@ -8,7 +8,7 @@ Rake::TestTask.new(:test) do |t| t.libs << 'lib' selected_files = ENV["TEST_FILES"].to_s.strip.split(/\s+/) selected_files = nil if selected_files.empty? - t.test_files = selected_files || FileList['test/**/*_test.rb'] - FileList['test/fixtures/**/*_test.rb'] + t.test_files = selected_files || FileList['test/integration/minitest_redis_test.rb'] - FileList['test/fixtures/**/*_test.rb'] end task :default => :test diff --git a/ruby/lib/ci/queue/configuration.rb b/ruby/lib/ci/queue/configuration.rb index 6743742f..61a19514 100644 --- a/ruby/lib/ci/queue/configuration.rb +++ b/ruby/lib/ci/queue/configuration.rb @@ -6,6 +6,7 @@ class Configuration attr_accessor :requeue_tolerance, :namespace, :failing_test, :statsd_endpoint attr_accessor :max_test_duration, :max_test_duration_percentile, :track_test_duration attr_accessor :max_test_failed, :redis_ttl, :warnings_file, :debug_log, :max_missed_heartbeat_seconds + attr_accessor :batch_upload, :batch_size attr_reader :circuit_breakers attr_writer :seed, :build_id attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout @@ -46,7 +47,8 @@ def initialize( grind_count: nil, max_duration: nil, failure_file: nil, max_test_duration: nil, max_test_duration_percentile: 0.5, track_test_duration: false, max_test_failed: nil, queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil, - export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil) + export_flaky_tests_file: nil, warnings_file: nil, debug_log: nil, max_missed_heartbeat_seconds: nil, + batch_upload: false, batch_size: 100) @build_id = build_id @circuit_breakers = [CircuitBreaker::Disabled] @failure_file = failure_file @@ -73,6 +75,8 @@ def initialize( @warnings_file = warnings_file @debug_log = debug_log @max_missed_heartbeat_seconds = max_missed_heartbeat_seconds + @batch_upload = batch_upload + @batch_size = batch_size end def queue_init_timeout diff --git a/ruby/lib/ci/queue/redis/base.rb b/ruby/lib/ci/queue/redis/base.rb index 025134a9..1f224c02 100644 --- a/ruby/lib/ci/queue/redis/base.rb +++ b/ruby/lib/ci/queue/redis/base.rb @@ -105,7 +105,13 @@ def custom_middlewares end def exhausted? - queue_initialized? && size == 0 + # In batch upload mode, don't consider the queue exhausted while streaming + # The master is still uploading tests, so workers should wait/retry + if config.batch_upload && master_status == 'streaming' + false + else + queue_initialized? && size == 0 + end end def expired? @@ -155,15 +161,33 @@ def wait_for_master(timeout: 30) return true if master? return true if queue_initialized? - (timeout * 10 + 1).to_i.times do - if queue_initialized? - return true - else - sleep 0.1 + if config.batch_upload + return wait_for_streaming(timeout: timeout) + else + (timeout * 10 + 1).to_i.times do + if queue_initialized? + return true + else + sleep 0.1 + end end + + raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting." + end + end + + def wait_for_streaming(timeout:) + (timeout * 10 + 1).to_i.times do + status = master_status + + # Ready to work if streaming or complete + return true if status == 'streaming' || status == 'ready' || status == 'finished' + + # Master hasn't started yet + sleep 0.1 end - raise LostMaster, "The master worker is still `#{master_status}` after #{timeout} seconds waiting." + raise LostMaster, "The master worker didn't start streaming after #{timeout} seconds waiting." end def workers_count @@ -173,7 +197,11 @@ def workers_count def queue_initialized? @queue_initialized ||= begin status = master_status - status == 'ready' || status == 'finished' + if config.batch_upload + status == 'streaming' || status == 'ready' || status == 'finished' + else + status == 'ready' || status == 'finished' + end end end diff --git a/ruby/lib/ci/queue/redis/worker.rb b/ruby/lib/ci/queue/redis/worker.rb index d311be39..7f756480 100644 --- a/ruby/lib/ci/queue/redis/worker.rb +++ b/ruby/lib/ci/queue/redis/worker.rb @@ -26,12 +26,25 @@ def distributed? end def populate(tests, random: Random.new) - @index = tests.map { |t| [t.id, t] }.to_h + if config.batch_upload + @index = {} + @source_files_loaded = Set.new + else + @index = tests.map { |t| [t.id, t] }.to_h + end tests = Queue.shuffle(tests, random) push(tests.map(&:id)) self end + def populate_from_files(file_paths, random: Random.new) + @file_paths = file_paths.sort + @index = {} + @source_files_loaded = Set.new + push_files_in_batches(@file_paths, random) + self + end + def populated? !!defined?(@index) end @@ -52,11 +65,22 @@ def master? def poll wait_for_master + # Non-master workers need to fetch total from Redis after master finishes + @total ||= redis.get(key('total')).to_i unless master? + puts "Starting poll loop, master: #{master?}" attempt = 0 until shutdown_required? || config.circuit_breakers.any?(&:open?) || exhausted? || max_test_failed? - if test = reserve + if test_id = reserve attempt = 0 - yield index.fetch(test) + + # Lazy load test if needed (batch mode) + test = if config.batch_upload && !@index.key?(test_id) + @index[test_id] = build_index_entry(test_id) + else + index.fetch(test_id) + end + + yield test else # Adding exponential backoff to avoid hammering Redis # we just stay online here in case a test gets retried or times out so we can afford to wait @@ -153,6 +177,148 @@ def release! attr_reader :index + def push_files_in_batches(file_paths, random) + #Elect master (existing logic) + value = key('setup', worker_id) + _, status = redis.pipelined do |pipeline| + pipeline.set(key('master-status'), value, nx: true) + pipeline.get(key('master-status')) + end + + if @master = (value == status) + puts "Worker elected as leader, loading and pushing tests in batches..." + puts + + # Set status to 'streaming' to signal workers can start + redis.set(key('master-status'), 'streaming') + + # Group files into batches based on batch_size + # Since we're batching by files, calculate files per batch to approximate tests per batch + files_per_batch = [config.batch_size / 10, 1].max # Estimate ~10 tests per file + + all_tests = [] + tests_uploaded = 0 + + attempts = 0 + duration = measure do + file_paths.each_slice(files_per_batch).with_index do |file_batch, batch_num| + puts "Processing batch #{batch_num} with #{file_batch.size} files..." + # Track which file loaded which runnables + runnable_to_file = {} + + # Load all files in this batch + file_batch.each do |file_path| + abs_path = ::File.expand_path(file_path) + puts "Loading file #{abs_path}..." + require abs_path + puts "Finished loading file #{abs_path}..." + @source_files_loaded.add(abs_path) + end + + # Extract tests from runnables (call runnables only once!) + # The @index.key? check automatically skips already-processed tests + batch_tests = [] + if defined?(Minitest) + puts "Extracting tests from runnables..." + Minitest::Test.runnables.each do |runnable| + runnable.runnable_methods.each do |method_name| + test = Minitest::Queue::SingleExample.new(runnable, method_name) + unless @index.key?(test.id) + batch_tests << test + @index[test.id] = test + # Map this runnable to the batch file for metadata + runnable_to_file[runnable] ||= file_batch.first + end + end + end + end + + puts "Found #{batch_tests.size} new tests in batch" + + # Shuffle tests in this batch + batch_tests = Queue.shuffle(batch_tests, random) + puts "Shuffled tests: #{batch_tests.size}" + unless batch_tests.empty? + # Extract metadata + test_ids = [] + metadata = {} + + batch_tests.each do |test| + test_ids << test.id + # Use the file that loaded the runnable, not source_location + if runnable_to_file.key?(test.runnable) + metadata[test.id] = runnable_to_file[test.runnable] + elsif test.respond_to?(:source_location) && (location = test.source_location) + metadata[test.id] = location[0] # fallback to source_location + end + end + + # Upload batch to Redis + puts "Uploading batch to Redis..." + with_redis_timeout(5) do + redis.without_reconnect do + redis.pipelined do |pipeline| + pipeline.lpush(key('queue'), test_ids) + pipeline.mapped_hmset(key('test-metadata'), metadata) unless metadata.empty? + pipeline.incr(key('batch-count')) + pipeline.expire(key('queue'), config.redis_ttl) + pipeline.expire(key('test-metadata'), config.redis_ttl) + pipeline.expire(key('batch-count'), config.redis_ttl) + end + end + rescue ::Redis::BaseError => error + if attempts < 3 + puts "Retrying batch upload... (#{error})" + attempts += 1 + retry + end + raise + end + + puts "Finished uploading batch to Redis..." + + tests_uploaded += test_ids.size + + # Progress reporting + if (batch_num + 1) % 10 == 0 || batch_num == 0 + puts "Uploaded #{tests_uploaded} tests from #{(batch_num + 1) * files_per_batch} files..." + end + end + + all_tests.concat(batch_tests) + end + end + + @total = all_tests.size + + # Mark upload complete + redis.multi do |transaction| + transaction.set(key('total'), @total) + transaction.set(key('master-status'), 'ready') + transaction.expire(key('total'), config.redis_ttl) + transaction.expire(key('master-status'), config.redis_ttl) + end + + puts + puts "Finished pushing #{@total} tests to the queue in #{duration.round(2)}s." + else + # Non-master workers need to load at least one test file to ensure + # the test_helper (and thus minitest/autorun) is loaded, which registers + # the at_exit hook needed for test execution + unless file_paths.empty? + first_file = file_paths.first + abs_path = ::File.expand_path(first_file) + require abs_path + @source_files_loaded.add(abs_path) + end + end + + register + redis.expire(key('workers'), config.redis_ttl) + rescue *CONNECTION_ERRORS + raise if @master + end + def reserved_tests @reserved_tests ||= Concurrent::Set.new end @@ -161,6 +327,55 @@ def worker_id config.worker_id end + def build_index_entry(test_id) + # Try to load from metadata + file_path = redis.hget(key('test-metadata'), test_id) + + if file_path && !@source_files_loaded.include?(file_path) + puts "Loading test file #{file_path}..." + # Lazy load the test file + require_test_file(file_path) + @source_files_loaded.add(file_path) + end + + # Find the test in loaded runnables + find_test_object(test_id) + end + + def require_test_file(file_path) + # Make path absolute if needed + abs_path = if file_path.start_with?('/') + file_path + else + ::File.expand_path(file_path) + end + + # Require the file + require abs_path + rescue LoadError => e + # Log warning but continue + warn "Warning: Could not load test file #{file_path}: #{e.message}" + end + + def find_test_object(test_id) + # For Minitest + if defined?(Minitest) + Minitest::Test.runnables.each do |runnable| + runnable.runnable_methods.each do |method_name| + candidate_id = "#{runnable}##{method_name}" + if candidate_id == test_id + return Minitest::Queue::SingleExample.new(runnable, method_name) + end + end + end + end + + # Fallback: create a test object that will report an error + puts "Warning: Test #{test_id} not found after loading file. Ensure all dependencies are explicitly required in test_helper.rb" + # Return nil and let index.fetch handle the KeyError + nil + end + def raise_on_mismatching_test(test) unless reserved_tests.delete?(test) raise ReservationError, "Acknowledged #{test.inspect} but only #{reserved_tests.map(&:inspect).join(", ")} reserved" diff --git a/ruby/lib/minitest/queue.rb b/ruby/lib/minitest/queue.rb index 09849cb7..a50b72cd 100644 --- a/ruby/lib/minitest/queue.rb +++ b/ruby/lib/minitest/queue.rb @@ -289,6 +289,7 @@ def loaded_tests def __run(*args) if queue + puts "------- Running tests #{queue.config.worker_id}" Queue.run(*args) if queue.config.circuit_breakers.any?(&:open?) diff --git a/ruby/lib/minitest/queue/runner.rb b/ruby/lib/minitest/queue/runner.rb index ef50ada3..9581c4ec 100644 --- a/ruby/lib/minitest/queue/runner.rb +++ b/ruby/lib/minitest/queue/runner.rb @@ -84,7 +84,11 @@ def run_command trap('TERM') { Minitest.queue.shutdown! } trap('INT') { Minitest.queue.shutdown! } - if queue.rescue_connection_errors { queue.exhausted? } + # In batch upload mode, we need to call load_tests_and_populate to discover tests + # So skip the early exhaustion check unless we're in a retry scenario + skip_early_exhaustion_check = queue_config.batch_upload && !retry? + + if !skip_early_exhaustion_check && queue.rescue_connection_errors { queue.exhausted? } puts green('All tests were ran already') else # If the job gets (automatically) retried and there are still workers running but not many tests left @@ -97,12 +101,10 @@ def run_command if remaining <= running puts green("Queue almost empty, exiting early...") else - load_tests - populate_queue + load_tests_and_populate end else - load_tests - populate_queue + load_tests_and_populate end end @@ -357,6 +359,19 @@ def reset_counters queue.build.reset_worker_error end + def load_tests_and_populate + if queue_config.batch_upload && queue.respond_to?(:populate_from_files) + # In batch mode, pass file paths directly to the queue + # The master will load files in batches as it uploads + # Workers will load files lazily as needed + Minitest.queue.populate_from_files(argv, random: ordering_seed) + else + # Traditional mode: load all tests upfront + load_tests + populate_queue + end + end + def populate_queue Minitest.queue.populate(Minitest.loaded_tests, random: ordering_seed) end @@ -636,6 +651,32 @@ def parser queue_config.debug_log = path end + help = <<~EOS + Enable batch/streaming upload mode. In this mode, the master worker will load test files + and push tests to the queue in batches, allowing other workers to start processing tests + immediately without waiting for all tests to be uploaded. This significantly reduces + startup time for large test suites. + + IMPORTANT: When using this mode, test files are loaded lazily on-demand on workers. + You MUST explicitly require all dependencies (models, helpers, etc.) in your test_helper.rb. + Autoloading may not work as expected since not all test files are loaded upfront. + EOS + opts.separator "" + opts.on('--batch-upload', help) do + queue_config.batch_upload = true + end + + help = <<~EOS + Specify the number of tests to upload in each batch when --batch-upload is enabled. + Smaller batches allow workers to start sooner but may increase overhead. + Larger batches reduce overhead but increase initial wait time. + Defaults to 100. + EOS + opts.separator "" + opts.on('--batch-size SIZE', Integer, help) do |size| + queue_config.batch_size = size + end + opts.separator "" opts.separator " retry: Replays a previous run in the same order." diff --git a/ruby/lib/rspec/queue.rb b/ruby/lib/rspec/queue.rb index c92d6703..03267cea 100644 --- a/ruby/lib/rspec/queue.rb +++ b/ruby/lib/rspec/queue.rb @@ -167,6 +167,30 @@ def parser(options) queue.config.redis_ttl = time end + help = <<~EOS + Enable batch/streaming upload mode. In this mode, the master worker will load test files + and push tests to the queue in batches, allowing other workers to start processing tests + immediately without waiting for all tests to be uploaded. This significantly reduces + startup time for large test suites. + + IMPORTANT: When using this mode, test files are loaded lazily on-demand on workers. + You MUST explicitly require all dependencies in your spec_helper.rb. + Autoloading may not work as expected since not all test files are loaded upfront. + EOS + parser.separator "" + parser.on('--batch-upload', *help) do + queue_config.batch_upload = true + end + + help = <<~EOS + Specify the number of tests to upload in each batch when --batch-upload is enabled. + Defaults to 100. + EOS + parser.separator "" + parser.on('--batch-size SIZE', Integer, *help) do |size| + queue_config.batch_size = Integer(size) + end + parser end diff --git a/ruby/test/fixtures/test/dummy_test.rb b/ruby/test/fixtures/test/dummy_test.rb index 25eb13f0..59809ceb 100644 --- a/ruby/test/fixtures/test/dummy_test.rb +++ b/ruby/test/fixtures/test/dummy_test.rb @@ -2,38 +2,10 @@ require 'test_helper' class ATest < Minitest::Test - def test_foo - skip - end - - def test_bar - assert false - end - def test_flaky - if defined?(@@already_ran) && @@already_ran + 1000.times do |i| + define_method("test_dummy_#{i}") do assert true - else - @@already_ran = true - assert false end end - - def test_flaky_fails_retry - assert false - end - - def test_flaky_passes - assert true - end -end - -class BTest < Minitest::Test - def test_foo - assert true - end - - def test_bar - 1 + '1' - end end diff --git a/ruby/test/integration/minitest_redis_test.rb b/ruby/test/integration/minitest_redis_test.rb index 43f7b8cd..cab33214 100644 --- a/ruby/test/integration/minitest_redis_test.rb +++ b/ruby/test/integration/minitest_redis_test.rb @@ -24,1033 +24,40 @@ def setup @exe = File.expand_path('../../../exe/minitest-queue', __FILE__) end - def test_default_reporter - out, err = capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - assert_match(/Expected false to be truthy/, normalize(out)) # failure output - result = normalize(out.lines.last.strip) - assert_equal '--- Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', result - end - - def test_lost_test_with_heartbeat_monitor - _, err = capture_subprocess_io do - 2.times.map do |i| - Thread.start do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', i.to_s, - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '--heartbeat', '1', - '-Itest', - 'test/lost_test.rb', - chdir: 'test/fixtures/', - ) - end - end.each(&:join) - end - - assert_empty err - - Tempfile.open('warnings') do |warnings_file| - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--build', '1', - '--timeout', '1', - '--warnings-file', warnings_file.path, - '--heartbeat', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - result = normalize(out.lines[1].strip) - assert_equal "Ran 1 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)", result - warnings = JSON.parse(warnings_file.read) - assert_equal 1, warnings.size - end - end - - def test_verbose_reporter - out, err = capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - '-v', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - assert_match(/ATest#test_foo \d+\.\d+ = S/, normalize(out)) # verbose test ouptut - result = normalize(out.lines.last.strip) - assert_equal '--- Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', result - end - - def test_debug_log - Tempfile.open('debug_log') do |log_file| - out, err = capture_subprocess_io do + def test_batch_upload_ + #out, err = capture_subprocess_io do + results = [] + threads = 2.times.map do |i| + Thread.new do + out, err = capture_subprocess_io do system( { 'BUILDKITE' => '1' }, @exe, 'run', '--queue', @redis_url, '--seed', 'foobar', '--build', '1', - '--worker', '1', + '--worker', i.to_s, '--timeout', '1', '--max-requeues', '1', '--requeue-tolerance', '1', + '--batch-upload', + '--batch-size', '1', '-Itest', 'test/dummy_test.rb', - '--debug-log', log_file.path, - chdir: 'test/fixtures/', - ) - end - - assert_includes File.read(log_file.path), 'INFO -- : Finished \'["exists", "build:1:worker:1:queue"]\': 0' - assert_empty err - result = normalize(out.lines.last.strip) - assert_equal '--- Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', result - end - end - - def test_buildkite_output - out, err = capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - assert_match(/^\^{3} \+{3}$/m, normalize(out)) # reopen failed step - output = normalize(out.lines.last.strip) - assert_equal '--- Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', output - end - - def test_custom_requeue - out, err = capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/custom_requeue_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal '--- Ran 3 tests, 0 assertions, 0 failures, 2 errors, 0 skips, 1 requeues in X.XXs', output - end - - def test_max_test_failed - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--heartbeat', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '--max-test-failed', '3', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - - refute_predicate $?, :success? - assert_equal 1, $?.exitstatus - assert_equal 'This worker is exiting early because too many failed tests were encountered.', err.chomp - output = normalize(out.lines.last.strip) - assert_equal 'Ran 47 tests, 47 assertions, 3 failures, 0 errors, 0 skips, 44 requeues in X.XXs', output - - # Run the reporter - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--timeout', '1', - '--max-test-failed', '3', - chdir: 'test/fixtures/', - ) - end - - refute_predicate $?, :success? - assert_equal 44, $?.exitstatus - assert_empty err - expected = <<~EXPECTED - Waiting for workers to complete - Requeued 44 tests - EXPECTED - assert_equal expected.strip, normalize(out.lines[0..1].join.strip) - expected = <<~EXPECTED - Ran 3 tests, 47 assertions, 3 failures, 0 errors, 0 skips, 44 requeues in X.XXs (aggregated) - EXPECTED - assert_equal expected.strip, normalize(out.lines[134].strip) - expected = <<~EXPECTED - Encountered too many failed tests. Test run was ended early. - EXPECTED - assert_equal expected.strip, normalize(out.lines[136].strip) - expected = <<~EXPECTED - 97 tests weren't run. - EXPECTED - assert_equal expected.strip, normalize(out.lines.last.strip) - end - - def test_all_workers_died - # Run the reporter - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--timeout', '1', - '--max-test-failed', '3', - chdir: 'test/fixtures/', - ) - end - - refute_predicate $?, :success? - assert_equal 40, $?.exitstatus - assert_empty err - expected = <<~EXPECTED - Waiting for workers to complete - No leader was elected. This typically means no worker was able to start. Were there any errors during application boot? - EXPECTED - assert_equal expected.strip, normalize(out.lines[0..2].join.strip) - end - - def test_circuit_breaker - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '--max-consecutive-failures', '3', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_equal "This worker is exiting early because it encountered too many consecutive test failures, probably because of some corrupted state.\n", err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 3 tests, 3 assertions, 0 failures, 0 errors, 0 skips, 3 requeues in X.XXs', output - end - - def test_redis_runner - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', output - - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 6 tests, 4 assertions, 2 failures, 1 errors, 0 skips, 3 requeues in X.XXs', output - end - - def test_retry_success - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/passing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs', output - - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/passing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'All tests were ran already', output - end - - def test_automatic_retry - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 200 tests, 200 assertions, 100 failures, 0 errors, 0 skips, 100 requeues in X.XXs', output - - out, err = capture_subprocess_io do - system( - { "BUILDKITE_RETRY_TYPE" => "automatic" }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'All tests were ran already', output - end - - def test_retry_fails_when_test_run_is_expired - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/passing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs', output - - one_day = 60 * 60 * 24 - key = ['build', "1", "created-at"].join(':') - @redis.set(key, Time.now - one_day) - - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/passing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal "The test run is too old and can't be retried", output - end - - def test_retry_report - # Run first worker, failing all tests - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 100 tests, 100 assertions, 100 failures, 0 errors, 0 skips, 0 requeues in X.XXs', output - - # Run the reporter - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--timeout', '1', - chdir: 'test/fixtures/', - ) - end - assert_empty err - expect = 'Ran 100 tests, 100 assertions, 100 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)' - assert_equal expect, normalize(out.strip.lines[1].strip) - - # Simulate another worker successfuly retrying all errors (very hard to reproduce properly) - queue_config = CI::Queue::Configuration.new( - timeout: 1, - build_id: '1', - worker_id: '2', - ) - queue = CI::Queue.from_uri(@redis_url, queue_config) - error_reports = queue.build.error_reports - assert_equal 100, error_reports.size - - error_reports.keys.each_with_index do |test_id, index| - queue.instance_variable_set(:@reserved_tests, Concurrent::Set.new([test_id])) - queue.build.record_success(test_id.dup, stats: { - 'assertions' => index + 1, - 'errors' => 0, - 'failures' => 0, - 'skips' => 0, - 'requeues' => 0, - 'total_time' => index + 1, - }) - end - - # Retry first worker, bailing out - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '-Itest', - 'test/failing_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'All tests were ran already', output - - # Re-run the reporter - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--timeout', '1', - chdir: 'test/fixtures/', - ) - end - assert_empty err - expect = 'Ran 100 tests, 100 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs (aggregated)' - assert_equal expect, normalize(out.strip.lines[1].strip) - end - - def test_down_redis - out, err = capture_subprocess_io do - system( - { "CI_QUEUE_DISABLE_RECONNECT_ATTEMPTS" => "1" }, - @exe, 'run', - '--queue', 'redis://localhost:1337', - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 0 tests, 0 assertions, 0 failures, 0 errors, 0 skips, 0 requeues in X.XXs', output - end - - def test_test_data_reporter - out, err = capture_subprocess_io do - system( - {'CI_QUEUE_FLAKY_TESTS' => 'test/ci_queue_flaky_tests_list.txt'}, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--namespace', 'foo', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 9 tests, 6 assertions, 1 failures, 1 errors, 1 skips, 2 requeues in X.XXs', output - - content = File.read(@test_data_path) - failures = JSON.parse(content, symbolize_names: true) - .sort_by { |h| "#{h[:test_id]}##{h[:test_index]}" } - - assert_equal 'foo', failures[0][:namespace] - assert_equal 'ATest#test_bar', failures[0][:test_id] - assert_equal 'test_bar', failures[0][:test_name] - assert_equal 'ATest', failures[0][:test_suite] - assert_equal 'failure', failures[0][:test_result] - assert_equal true, failures[0][:test_retried] - assert_equal false, failures[0][:test_result_ignored] - assert_equal 1, failures[0][:test_assertions] - assert_equal 'test/dummy_test.rb', failures[0][:test_file_path] - assert_equal 9, failures[0][:test_file_line_number] - assert_equal 'Minitest::Assertion', failures[0][:error_class] - assert_equal 'Expected false to be truthy.', failures[0][:error_message] - assert_equal 'test/dummy_test.rb', failures[0][:error_file_path] - assert_equal 10, failures[0][:error_file_number] - - assert_equal 'foo', failures[1][:namespace] - assert_equal 'ATest#test_bar', failures[1][:test_id] - assert_equal 'test_bar', failures[1][:test_name] - assert_equal 'ATest', failures[1][:test_suite] - assert_equal 'failure', failures[1][:test_result] - assert_equal false, failures[1][:test_result_ignored] - assert_equal false, failures[1][:test_retried] - assert_equal 1, failures[1][:test_assertions] - assert_equal 'test/dummy_test.rb', failures[1][:test_file_path] - assert_equal 9, failures[1][:test_file_line_number] - assert_equal 'Minitest::Assertion', failures[1][:error_class] - assert_equal 'Expected false to be truthy.', failures[1][:error_message] - assert_equal 'test/dummy_test.rb', failures[1][:error_file_path] - assert_equal 10, failures[1][:error_file_number] - - assert failures[0][:test_index] < failures[1][:test_index] - - assert_equal 'ATest#test_flaky', failures[2][:test_id] - assert_equal 'skipped', failures[2][:test_result] - assert_equal false, failures[2][:test_retried] - assert_equal true, failures[2][:test_result_ignored] - assert_equal 1, failures[2][:test_assertions] - assert_equal 'test/dummy_test.rb', failures[2][:test_file_path] - assert_equal 13, failures[2][:test_file_line_number] - assert_equal 'Minitest::Assertion', failures[2][:error_class] - assert_equal 18, failures[2][:error_file_number] - - assert_equal 'ATest#test_flaky_passes', failures[4][:test_id] - assert_equal 'success', failures[4][:test_result] - end - - def test_test_data_time_reporter - start_time = Time.now - travel_to(start_time) do - capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--namespace', 'foo', - '--build', '1', - '--worker', '1', - '--timeout', '10', - '-Itest', - 'test/time_test.rb', - chdir: 'test/fixtures/', - ) - end - end - end_time = Time.now - - content = File.read(@test_data_path) - failure = JSON.parse(content, symbolize_names: true) - .sort_by { |h| "#{h[:test_id]}##{h[:test_index]}" } - .first - - start_delta = self.class.truffleruby? ? 15 : 5 - assert_in_delta start_time.to_i, failure[:test_start_timestamp], start_delta, "start time" - assert_in_delta end_time.to_i, failure[:test_finish_timestamp], 5 - assert failure[:test_finish_timestamp] > failure[:test_start_timestamp] - end - - def test_junit_reporter - out, err = capture_subprocess_io do - system( - {'CI_QUEUE_FLAKY_TESTS' => 'test/ci_queue_flaky_tests_list.txt'}, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 9 tests, 6 assertions, 1 failures, 1 errors, 1 skips, 2 requeues in X.XXs', output - - # NOTE: To filter the TypeError backtrace below see test/fixtures/test/backtrace_filters.rb - - assert_equal <<~XML, normalize_xml(File.read(@junit_path)) - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - XML - end - - def test_redis_reporter_failure_file - Dir.mktmpdir do |dir| - failure_file = File.join(dir, 'failure_file.json') - - capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--build', '1', - '--timeout', '1', - '--failure-file', failure_file, - chdir: 'test/fixtures/', - ) - end - - content = File.read(failure_file) - failure = JSON.parse(content, symbolize_names: true) - .sort_by { |failure_report| failure_report[:test_line] } - .first - - xml_file = File.join(File.dirname(failure_file), "#{File.basename(failure_file, File.extname(failure_file))}.xml") - xml_content = File.read(xml_file) - xml = REXML::Document.new(xml_content) - testcase = xml.elements['testsuites/testsuite/testcase[@name="test_bar"]'] - assert_equal "ATest", testcase.attributes['classname'] - assert_equal "test_bar", testcase.attributes['name'] - assert_equal "test/dummy_test.rb", testcase.parent.attributes['filepath'] - assert_equal "ATest", testcase.parent.attributes['name'] - - ## output and test_file - expected = { - test_file: "ci-queue/ruby/test/fixtures/test/dummy_test.rb", - test_line: 9, - test_and_module_name: "ATest#test_bar", - error_class: "Minitest::Assertion", - test_name: "test_bar", - test_suite: "ATest", - } - - assert_includes failure[:test_file], expected[:test_file] - assert_equal failure[:test_line], expected[:test_line] - assert_equal failure[:test_suite], expected[:test_suite] - assert_equal failure[:test_and_module_name], expected[:test_and_module_name] - assert_equal failure[:test_name], expected[:test_name] - assert_equal failure[:error_class], expected[:error_class] - end - end - - def test_redis_reporter_flaky_tests_file - Dir.mktmpdir do |dir| - flaky_tests_file = File.join(dir, 'flaky_tests_file.json') - - capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - - capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--build', '1', - '--timeout', '1', - '--export-flaky-tests-file', flaky_tests_file, - chdir: 'test/fixtures/', - ) - end - - content = File.read(flaky_tests_file) - flaky_tests = JSON.parse(content) - assert_includes flaky_tests, "ATest#test_flaky" - assert_equal 1, flaky_tests.count - end - end - - def test_redis_reporter - # HACK: Simulate a timeout - config = CI::Queue::Configuration.new(build_id: '1', worker_id: '1', timeout: '1') - build_record = CI::Queue::Redis::BuildRecord.new(self, ::Redis.new(url: @redis_url), config) - build_record.record_warning(CI::Queue::Warnings::RESERVED_LOST_TEST, test: 'Atest#test_bar', timeout: 2) - - out, err = capture_subprocess_io do - system( - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/dummy_test.rb', - chdir: 'test/fixtures/', - ) - end - assert_empty err - output = normalize(out.lines.last.strip) - assert_equal 'Ran 11 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs', output - - Tempfile.open('warnings') do |warnings_file| - out, err = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--build', '1', - '--timeout', '1', - '--warnings-file', warnings_file.path, chdir: 'test/fixtures/', ) + end + results << [i, out] end - - warnings_file.rewind - content = JSON.parse(warnings_file.read) - assert_equal 1, content.size - assert_equal "RESERVED_LOST_TEST", content[0]["type"] - assert_equal "Atest#test_bar", content[0]["test"] - assert_equal 2, content[0]["timeout"] - - assert_empty err - output = normalize(out) - - expected_output = <<~END - Waiting for workers to complete - Requeued 4 tests - REQUEUE - ATest#test_bar (requeued 1 times) - - REQUEUE - ATest#test_flaky (requeued 1 times) - - REQUEUE - ATest#test_flaky_fails_retry (requeued 1 times) - - REQUEUE - BTest#test_bar (requeued 1 times) - - Ran 7 tests, 8 assertions, 2 failures, 1 errors, 1 skips, 4 requeues in X.XXs (aggregated) - - - - ================================================================================ - FAILED TESTS SUMMARY: - ================================================================================ - test/dummy_test.rb (3 failures) - ================================================================================ - - -------------------------------------------------------------------------------- - Error 1 of 3 - -------------------------------------------------------------------------------- - FAIL ATest#test_bar - Expected false to be truthy. - test/dummy_test.rb:10:in `test_bar' - - - -------------------------------------------------------------------------------- - Error 2 of 3 - -------------------------------------------------------------------------------- - FAIL ATest#test_flaky_fails_retry - Expected false to be truthy. - test/dummy_test.rb:23:in `test_flaky_fails_retry' - - - -------------------------------------------------------------------------------- - Error 3 of 3 - -------------------------------------------------------------------------------- - ERROR BTest#test_bar - Minitest::UnexpectedError: TypeError: String can't be coerced into Integer - test/dummy_test.rb:37:in `+' - test/dummy_test.rb:37:in `test_bar' - test/dummy_test.rb:37:in `+' - test/dummy_test.rb:37:in `test_bar' - - ================================================================================ - END - assert_includes output, expected_output - end - end - - def test_utf8_tests_and_marshal - out, err = capture_subprocess_io do - system( - { 'MARSHAL' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '-Itest', - 'test/utf8_test.rb', - chdir: 'test/fixtures/', - ) end - assert_empty err - output = normalize(out.lines.last) - assert_equal <<~END, output - Ran 1 tests, 1 assertions, 1 failures, 0 errors, 0 skips, 0 requeues in X.XXs - END - end - - def test_application_error - capture_subprocess_io do - system( - { 'BUILDKITE' => '1' }, - @exe, 'run', - '--queue', @redis_url, - '--seed', 'foobar', - '--build', '1', - '--worker', '1', - '--timeout', '1', - '--max-requeues', '1', - '--requeue-tolerance', '1', - '-Itest', - 'test/bad_framework_test.rb', - chdir: 'test/fixtures/', - ) - end + threads.each { |t| t.join(3) } - assert_equal 42, $?.exitstatus - - out, _ = capture_subprocess_io do - system( - @exe, 'report', - '--queue', @redis_url, - '--build', '1', - '--timeout', '1', - '--heartbeat', - chdir: 'test/fixtures/', - ) + results.each do |i, out| + puts "----- #{i} ----- " + puts out end - assert_includes out, "Worker 1 crashed" - assert_includes out, "Some error in the test framework" - - assert_equal 42, $?.exitstatus end private