Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions ruby/lib/ci/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module CI
module Queue
extend self

attr_accessor :shuffler, :requeueable
attr_accessor :requeueable

module Warnings
RESERVED_LOST_TEST = :RESERVED_LOST_TEST
Expand All @@ -42,13 +42,9 @@ def requeueable?(test_result)
requeueable.nil? || requeueable.call(test_result)
end

def shuffle(tests, random, config: nil)
if shuffler
shuffler.call(tests, random)
else
strategy = get_strategy(config&.strategy)
strategy.order_tests(tests, random: random, config: config)
end
def reorder_tests(tests, random, config: nil, redis: nil)
strategy = get_strategy(config&.strategy, config)
strategy.order_tests(tests, random: random, redis: redis)
end

def from_uri(url, config)
Expand All @@ -69,14 +65,14 @@ def from_uri(url, config)

private

def get_strategy(strategy_name)
def get_strategy(strategy_name, config)
case strategy_name&.to_sym
when :timing_based
Strategy::TimingBased.new
Strategy::TimingBased.new(config)
when :suite_bin_packing
Strategy::SuiteBinPacking.new
Strategy::SuiteBinPacking.new(config)
else
Strategy::Random.new
Strategy::Random.new(config)
end
end
end
Expand Down
7 changes: 6 additions & 1 deletion ruby/lib/ci/queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class Configuration
attr_accessor :max_test_failed, :redis_ttl
attr_accessor :strategy, :timing_file, :timing_fallback_duration, :export_timing_file
attr_accessor :suite_max_duration, :suite_buffer_percent
attr_accessor :timing_redis_key, :timing_ema_alpha, :timing_hscan_count
attr_reader :circuit_breakers
attr_writer :seed, :build_id
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
Expand Down Expand Up @@ -55,7 +56,8 @@ def initialize(
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
export_flaky_tests_file: nil, known_flaky_tests: [],
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
suite_max_duration: 120_000, suite_buffer_percent: 10
suite_max_duration: 120_000, suite_buffer_percent: 10,
timing_redis_key: 'timing_data', timing_ema_alpha: 0.2, timing_hscan_count: 1000
)
@build_id = build_id
@circuit_breakers = [CircuitBreaker::Disabled]
Expand Down Expand Up @@ -87,6 +89,9 @@ def initialize(
@export_timing_file = export_timing_file
@suite_max_duration = suite_max_duration
@suite_buffer_percent = suite_buffer_percent
@timing_redis_key = timing_redis_key
@timing_ema_alpha = timing_ema_alpha
@timing_hscan_count = timing_hscan_count
end

def queue_init_timeout
Expand Down
83 changes: 83 additions & 0 deletions ruby/lib/ci/queue/redis/exponential_moving_average.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# frozen_string_literal: true

module CI
module Queue
module Redis
class ExponentialMovingAverage
LUA_SCRIPT= <<~LUA
local hash_key = KEYS[1]
local test_id = ARGV[1]
local new_duration = tonumber(ARGV[2])
local alpha = tonumber(ARGV[3])

local current_avg = redis.call('HGET', hash_key, test_id)

if current_avg then
current_avg = tonumber(current_avg)
local new_avg = alpha * new_duration + (1 - alpha) * current_avg
redis.call('HSET', hash_key, test_id, new_avg)
return new_avg
else
redis.call('HSET', hash_key, test_id, new_duration)
return new_duration
end
LUA

attr_reader :redis, :hash_key, :alpha

def initialize(redis, hash_key: 'timing_data', alpha: 0.2)
@redis = redis
@hash_key = hash_key
@alpha = alpha
end

def update(test_id, duration)
redis.eval(
LUA_SCRIPT,
keys: [hash_key],
argv: [test_id, duration.to_f, alpha]
)
end

def load_all(count: 1000)
timing_data = {}
cursor = '0'

loop do
cursor, fields = redis.hscan(hash_key, cursor, count: count)

# fields is array: ["key1", "val1", "key2", "val2", ...]
# Convert to hash and parse floats
Hash[*fields].each do |test_id, duration_str|
timing_data[test_id] = duration_str.to_f
end

break if cursor == '0'
end

timing_data
end

def import(timing_hash)
return if timing_hash.nil? || timing_hash.empty?

# HMSET expects flattened array: ["key1", "val1", "key2", "val2", ...]
flattened = timing_hash.to_a.flatten
redis.hmset(hash_key, *flattened)
end

def export_all
load_all
end

def exists?
redis.exists?(hash_key) && redis.hlen(hash_key) > 0
end

def size
redis.hlen(hash_key)
end
end
end
end
end
14 changes: 14 additions & 0 deletions ruby/lib/ci/queue/redis/test_time_record.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
# frozen_string_literal: true
require_relative 'exponential_moving_average'

module CI
module Queue
module Redis
class TestTimeRecord < Worker
def record(test_name, duration)
record_test_time(test_name, duration)
record_test_name(test_name)
record_test_time_ema(test_name, duration)
end

def fetch
Expand Down Expand Up @@ -59,6 +62,17 @@ def all_test_names_key
def test_time_key(test_name)
"build:#{config.build_id}:#{test_name}".dup.force_encoding(Encoding::BINARY)
end

def record_test_time_ema(test_name, duration)
timing_ema = ExponentialMovingAverage.new(
redis,
hash_key: config.timing_redis_key || 'timing_data',
alpha: config.timing_ema_alpha || 0.2
)
timing_ema.update(test_name, duration)
rescue ::Redis::BaseError => e
warn "Warning: Failed to update EMA timing for #{test_name}: #{e.message}"
end
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/redis/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def distributed?

def populate(tests, random: Random.new)
@index = tests.map { |t| [t.id, t] }.to_h
executables = Queue.shuffle(tests, random, config: config)
executables = Queue.reorder_tests(tests, random, config: config, redis: redis)

# Separate chunks from individual tests
chunks = executables.select { |e| e.is_a?(CI::Queue::TestChunk) }
Expand Down
10 changes: 8 additions & 2 deletions ruby/lib/ci/queue/strategy/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ module CI
module Queue
module Strategy
class Base
def order_tests(tests, random: Random.new, config: nil)
def initialize(config)
@config = config
end

attr_accessor :config

def order_tests(tests, random: Random.new, config: nil, redis: nil)
raise NotImplementedError, "#{self.class} must implement #order_tests"
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion ruby/lib/ci/queue/strategy/random.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module CI
module Queue
module Strategy
class Random < Base
def order_tests(tests, random: Random.new, config: nil)
def order_tests(tests, random: Random.new, config: nil, redis: nil)
tests.sort.shuffle(random: random)
end
end
Expand Down
54 changes: 46 additions & 8 deletions ruby/lib/ci/queue/strategy/suite_bin_packing.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
# frozen_string_literal: true
require_relative 'base'
require_relative '../redis/exponential_moving_average'
require 'json'

module CI
module Queue
module Strategy
class SuiteBinPacking < Base
def order_tests(tests, random: Random.new, config: nil)
timing_data = load_timing_data(config&.timing_file)
def order_tests(tests, random: Random.new, config: nil, redis: nil)
timing_data = load_timing_data(config, redis)
pp timing_data if ENV['VERBOSE']
max_duration = config&.suite_max_duration || 120_000
fallback_duration = config&.timing_fallback_duration || 100.0
buffer_percent = config&.suite_buffer_percent || 10
Expand Down Expand Up @@ -40,13 +42,33 @@ def extract_suite_name(test_id)
test_id.split('#').first
end

def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)
def load_timing_data(config, redis)
timing_data = {}

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
{}
if redis
begin
timing_ema = CI::Queue::Redis::ExponentialMovingAverage.new(
redis,
hash_key: config&.timing_redis_key || 'timing_data'
)

timing_data = timing_ema.load_all(count: config&.timing_hscan_count || 1000)
puts "Loaded #{timing_data.size} timing entries from Redis via HSCAN" if ENV['VERBOSE']
rescue ::Redis::BaseError => e
warn "Warning: Failed to load timing data from Redis: #{e.message}"
end
end

if timing_data.empty? && config&.timing_file && ::File.exist?(config.timing_file)
begin
timing_data = JSON.parse(::File.read(config.timing_file))
puts "Loaded #{timing_data.size} timing entries from file #{config.timing_file}" if ENV['VERBOSE']
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{config.timing_file}: #{e.message}"
end
end

timing_data
end

def get_test_duration(test_id, timing_data, fallback_duration)
Expand Down Expand Up @@ -132,6 +154,22 @@ def split_suite_into_chunks(suite_name, suite_tests, max_duration, buffer_percen

chunks
end

private def load_timing_data_from_redis
if redis
begin
timing_ema = CI::Queue::Redis::ExponentialMovingAverage.new(
redis,
hash_key: config&.timing_redis_key || 'timing_data'
)

timing_data = timing_ema.load_all(count: config&.timing_hscan_count || 1000)
puts "Loaded #{timing_data.size} timing entries from Redis via HSCAN" if ENV['VERBOSE']
rescue ::Redis::BaseError => e
warn "Warning: Failed to load timing data from Redis: #{e.message}"
end
end
end
end
end
end
Expand Down
48 changes: 35 additions & 13 deletions ruby/lib/ci/queue/strategy/timing_based.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# frozen_string_literal: true
require_relative 'base'
require_relative '../redis/exponential_moving_average'
require 'json'

module CI
module Queue
module Strategy
class TimingBased < Base
def order_tests(tests, random: Random.new, config: nil)
timing_data = load_timing_data(config&.timing_file)
def order_tests(tests, random: Random.new, config: nil, redis: nil)
timing_data = load_timing_data(config, redis)
fallback_duration = config&.timing_fallback_duration || 100.0

tests.sort_by do |test|
Expand All @@ -18,18 +19,39 @@ def order_tests(tests, random: Random.new, config: nil)

private

def load_timing_data(file_path)
return {} unless file_path && ::File.exist?(file_path)

JSON.parse(::File.read(file_path))
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{file_path}: #{e.message}"
{}
rescue => e
warn "Warning: Could not read timing file #{file_path}: #{e.message}"
{}
def load_timing_data(config, redis)
timing_data = {}

# Strategy 1: Try Redis via HSCAN (non-blocking)
if redis
begin
timing_ema = CI::Queue::Redis::ExponentialMovingAverage.new(
redis,
hash_key: config&.timing_redis_key || 'timing_data'
)

timing_data = timing_ema.load_all(count: config&.timing_hscan_count || 1000)
puts "Loaded #{timing_data.size} timing entries from Redis via HSCAN" if ENV['VERBOSE']
rescue ::Redis::BaseError => e
warn "Warning: Failed to load timing data from Redis: #{e.message}"
end
end

# Strategy 2: Fallback to JSON file if Redis unavailable or empty
if timing_data.empty? && config&.timing_file && ::File.exist?(config.timing_file)
begin
timing_data = JSON.parse(::File.read(config.timing_file))
puts "Loaded #{timing_data.size} timing entries from file #{config.timing_file}" if ENV['VERBOSE']
rescue JSON::ParserError => e
warn "Warning: Could not parse timing file #{config.timing_file}: #{e.message}"
rescue => e
warn "Warning: Could not read timing file #{config.timing_file}: #{e.message}"
end
end

timing_data
end
end
end
end
end
end
Loading
Loading