Skip to content

Commit c38fa94

Browse files
authored
Merge pull request #57 from figma/batch_update_test_duration
[CI] batch update test job duration
2 parents cdd9cb5 + 9f0caf6 commit c38fa94

6 files changed

Lines changed: 81 additions & 58 deletions

File tree

ruby/lib/ci/queue/configuration.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ class Configuration
1010
attr_accessor :max_test_failed, :redis_ttl
1111
attr_accessor :strategy, :timing_file, :timing_fallback_duration, :export_timing_file
1212
attr_accessor :suite_max_duration, :suite_buffer_percent
13+
attr_accessor :branch
1314
attr_reader :circuit_breakers
1415
attr_writer :seed, :build_id
1516
attr_writer :queue_init_timeout, :report_timeout, :inactive_workers_timeout
@@ -24,6 +25,7 @@ def from_env(env)
2425
statsd_endpoint: env['CI_QUEUE_STATSD_ADDR'],
2526
redis_ttl: env['CI_QUEUE_REDIS_TTL']&.to_i || 8 * 60 * 60,
2627
known_flaky_tests: load_known_flaky_tests(env['CI_QUEUE_KNOWN_FLAKY_TESTS']),
28+
branch: env['BUILDKITE_BRANCH'],
2729
)
2830
end
2931

@@ -55,7 +57,8 @@ def initialize(
5557
queue_init_timeout: nil, redis_ttl: 8 * 60 * 60, report_timeout: nil, inactive_workers_timeout: nil,
5658
export_flaky_tests_file: nil, known_flaky_tests: [],
5759
strategy: :random, timing_file: nil, timing_fallback_duration: 100.0, export_timing_file: nil,
58-
suite_max_duration: 120_000, suite_buffer_percent: 10
60+
suite_max_duration: 120_000, suite_buffer_percent: 10,
61+
branch: nil
5962
)
6063
@build_id = build_id
6164
@circuit_breakers = [CircuitBreaker::Disabled]
@@ -87,6 +90,7 @@ def initialize(
8790
@export_timing_file = export_timing_file
8891
@suite_max_duration = suite_max_duration
8992
@suite_buffer_percent = suite_buffer_percent
93+
@branch = branch
9094
end
9195

9296
def queue_init_timeout

ruby/lib/ci/queue/redis/test_time_record.rb

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ module CI
33
module Queue
44
module Redis
55
class TestTimeRecord < Worker
6+
def initialize(redis, config)
7+
super
8+
@ema_buffer = []
9+
@ema_batch_size = 100
10+
end
11+
612
def record(test_name, duration)
713
record_test_time(test_name, duration)
814
record_test_duration_moving_average(test_name, duration)
@@ -18,7 +24,7 @@ def fetch
1824

1925
private
2026

21-
attr_reader :redis
27+
attr_reader :redis, :ema_batch_size
2228

2329
def record_test_time(test_name, duration)
2430
redis.pipelined do |pipeline|
@@ -32,7 +38,21 @@ def record_test_time(test_name, duration)
3238
end
3339

3440
def record_test_duration_moving_average(test_name, duration)
35-
UpdateTestDurationMovingAverage.new(redis).update(test_name, duration)
41+
return if !should_update_moving_average?
42+
@ema_buffer << [test_name, duration]
43+
flush_ema_buffer if @ema_buffer.size >= ema_batch_size
44+
end
45+
46+
def flush_ema_buffer
47+
return nil if @ema_buffer.empty?
48+
UpdateTestDurationMovingAverage.new(redis).update_batch(@ema_buffer)
49+
@ema_buffer.clear
50+
nil
51+
end
52+
53+
def should_update_moving_average?
54+
current_branch = config.respond_to?(:branch) ? config.branch : nil
55+
current_branch.to_s == 'master'
3656
end
3757

3858
def record_test_name(test_name)

ruby/lib/ci/queue/redis/update_test_duration_moving_average.rb

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,25 @@ module CI
44
module Queue
55
module Redis
66
class UpdateTestDurationMovingAverage
7-
LUA_SCRIPT= <<~LUA
7+
LUA_SCRIPT_BATCH = <<~LUA
88
local hash_key = KEYS[1]
9-
local test_id = ARGV[1]
10-
local new_duration = tonumber(ARGV[2])
11-
local smoothing = tonumber(ARGV[3])
12-
local current_avg = redis.call('HGET', hash_key, test_id)
13-
if current_avg then
14-
current_avg = tonumber(current_avg)
15-
local new_avg = smoothing * new_duration + (1 - smoothing) * current_avg
16-
redis.call('HSET', hash_key, test_id, new_avg)
17-
return tostring(new_avg)
18-
else
19-
redis.call('HSET', hash_key, test_id, new_duration)
20-
return tostring(new_duration)
9+
local smoothing = tonumber(ARGV[1])
10+
11+
for i = 2, #ARGV, 2 do
12+
local test_id = ARGV[i]
13+
local new_duration = tonumber(ARGV[i+1])
14+
local current_avg = redis.call('HGET', hash_key, test_id)
15+
16+
if current_avg then
17+
current_avg = tonumber(current_avg)
18+
local new_avg = smoothing * new_duration + (1 - smoothing) * current_avg
19+
redis.call('HSET', hash_key, test_id, new_avg)
20+
else
21+
redis.call('HSET', hash_key, test_id, new_duration)
22+
end
2123
end
24+
25+
return 'OK'
2226
LUA
2327

2428
def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0.2)
@@ -27,8 +31,10 @@ def initialize(redis, key: "test_duration_moving_averages", smoothing_factor: 0.
2731
@smoothing_factor = smoothing_factor
2832
end
2933

30-
def update(test_id, duration)
31-
@redis.eval(LUA_SCRIPT, keys: [@key], argv: [test_id, duration, @smoothing_factor]).to_f
34+
def update_batch(pairs)
35+
return 0 if pairs.nil? || pairs.empty?
36+
argv = [@smoothing_factor] + pairs.flat_map { |test_id, duration| [test_id, duration] }
37+
@redis.eval(LUA_SCRIPT_BATCH, keys: [@key], argv: argv)
3238
end
3339
end
3440
end

ruby/lib/minitest/queue/test_time_recorder.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ def record(test)
1313
test_id = "#{test.klass}##{test.name}"
1414
@build.record(test_id, test_duration_in_milliseconds)
1515
end
16+
17+
def report
18+
@build.flush_ema_buffer if @build.respond_to?(:flush_ema_buffer)
19+
end
1620
end
1721
end
1822
end

ruby/test/ci/queue/redis/moving_average_test.rb

Lines changed: 23 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ def test_initialize_with_custom_smoothing_factor
2323
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.5)
2424
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
2525

26-
updater.update('test1', 100.0)
26+
updater.update_batch([["test1", 100.0]])
2727
first_avg = reader['test1']
2828
assert_equal 100.0, first_avg
2929

30-
updater.update('test1', 200.0)
30+
updater.update_batch([["test1", 200.0]])
3131
second_avg = reader['test1']
3232
assert_in_delta 150.0, second_avg, 0.001
3333
end
@@ -36,41 +36,39 @@ def test_update_creates_new_entry
3636
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
3737
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
3838

39-
result = updater.update('test1', 10.5)
40-
39+
updater.update_batch([["test1", 10.5]])
40+
result = reader['test1']
4141
assert_equal 10.5, result
4242
assert_equal 1, reader.size
4343
end
4444

4545
def test_update_calculates_exponential_moving_average
4646
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.2)
47+
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
4748

48-
first_avg = updater.update('test1', 100.0)
49-
assert_equal 100.0, first_avg
49+
updater.update_batch([["test1", 100.0]])
50+
assert_equal 100.0, reader['test1']
5051

5152
# EMA = 0.2 * 150 + 0.8 * 100 = 30 + 80 = 110
52-
second_avg = updater.update('test1', 150.0)
53-
assert_in_delta 110.0, second_avg, 0.001
53+
updater.update_batch([["test1", 150.0]])
54+
assert_in_delta 110.0, reader['test1'], 0.001
5455

55-
third_avg = updater.update('test1', 200.0)
56-
assert_in_delta 128.0, third_avg, 0.001
56+
updater.update_batch([["test1", 200.0]])
57+
assert_in_delta 128.0, reader['test1'], 0.001
5758
end
5859

5960
def test_update_multiple_tests
6061
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
6162
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
6263

63-
updater.update('test1', 10.0)
64-
updater.update('test2', 20.0)
65-
updater.update('test3', 30.0)
64+
updater.update_batch([["test1", 10.0], ["test2", 20.0], ["test3", 30.0]])
6665

6766
assert_equal 3, reader.size
6867
end
6968

7069
def test_bracket_operator_loads_and_returns_value
7170
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
72-
updater.update('test1', 10.5)
73-
updater.update('test2', 20.5)
71+
updater.update_batch([["test1", 10.5], ["test2", 20.5]])
7472

7573
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
7674
assert_equal 10.5, reader['test1']
@@ -81,16 +79,14 @@ def test_bracket_operator_returns_nil_for_missing_key
8179
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
8280
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
8381

84-
updater.update('test1', 10.5)
82+
updater.update_batch([["test1", 10.5]])
8583

8684
assert_nil reader['nonexistent']
8785
end
8886

8987
def test_load_all_loads_all_values_from_redis
9088
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
91-
updater.update('test1', 10.0)
92-
updater.update('test2', 20.0)
93-
updater.update('test3', 30.0)
89+
updater.update_batch([["test1", 10.0], ["test2", 20.0], ["test3", 30.0]])
9490

9591
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
9692
reader.load_all
@@ -104,7 +100,7 @@ def test_load_all_handles_batches
104100
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
105101

106102
1500.times do |i|
107-
updater.update("test_#{i}", i.to_f)
103+
updater.update_batch([["test_#{i}", i.to_f]])
108104
end
109105

110106
# Create new instance and load all
@@ -123,20 +119,19 @@ def test_size_returns_correct_count
123119

124120
assert_equal 0, reader.size
125121

126-
updater.update('test1', 10.0)
122+
updater.update_batch([["test1", 10.0]])
127123
assert_equal 1, reader.size
128124

129-
updater.update('test2', 20.0)
125+
updater.update_batch([["test2", 20.0]])
130126
assert_equal 2, reader.size
131127

132-
updater.update('test1', 15.0)
128+
updater.update_batch([["test1", 15.0]])
133129
assert_equal 2, reader.size
134130
end
135131

136132
def test_updates_persist_to_redis
137133
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
138-
updater.update('test1', 10.5)
139-
updater.update('test2', 20.5)
134+
updater.update_batch([["test1", 10.5], ["test2", 20.5]])
140135

141136
# Verify data is actually in Redis
142137
values = @redis.hgetall(@key)
@@ -150,8 +145,8 @@ def test_concurrent_updates_from_different_instances
150145
updater2 = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key, smoothing_factor: 0.2)
151146
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)
152147

153-
updater1.update('test1', 100.0)
154-
updater2.update('test1', 200.0)
148+
updater1.update_batch([["test1", 100.0]])
149+
updater2.update_batch([["test1", 200.0]])
155150

156151
# Expected: 0.2 * 200 + 0.8 * 100 = 120
157152
assert_in_delta 120.0, reader['test1'], 0.001
@@ -161,9 +156,7 @@ def test_handles_floating_point_precision
161156
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis, key: @key)
162157

163158
# Test with various floating point values
164-
updater.update('test1', 0.123456789)
165-
updater.update('test2', 999.999999)
166-
updater.update('test3', 0.000001)
159+
updater.update_batch([["test1", 0.123456789], ["test2", 999.999999], ["test3", 0.000001]])
167160

168161
# Load in new instance
169162
reader = CI::Queue::Redis::TestDurationMovingAverages.new(@redis, key: @key)

ruby/test/ci/queue/redis_test.rb

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,7 @@ def test_suite_bin_packing_uses_moving_average_for_duration
436436
@redis.flushdb
437437

438438
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
439-
updater.update('TestSuite#test_1', 5000.0)
440-
updater.update('TestSuite#test_2', 3000.0)
439+
updater.update_batch([["TestSuite#test_1", 5000.0], ["TestSuite#test_2", 3000.0]])
441440

442441
tests = [
443442
MockTest.new('TestSuite#test_1'),
@@ -467,7 +466,7 @@ def test_moving_average_takes_precedence_over_timing_file
467466
timing_data = { 'TestSuite#test_1' => 10_000.0 }
468467

469468
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
470-
updater.update('TestSuite#test_1', 2000.0)
469+
updater.update_batch([["TestSuite#test_1", 2000.0]])
471470

472471
tests = [MockTest.new('TestSuite#test_1')]
473472

@@ -539,8 +538,7 @@ def test_mixed_duration_sources_in_suite_splitting
539538
require 'tempfile'
540539

541540
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
542-
updater.update('MixedTest#test_1', 60_000.0)
543-
updater.update('MixedTest#test_2', 50_000.0)
541+
updater.update_batch([["MixedTest#test_1", 60_000.0], ["MixedTest#test_2", 50_000.0]])
544542

545543
timing_data = {
546544
'MixedTest#test_3' => 40_000.0,
@@ -582,9 +580,7 @@ def test_moving_average_ordering_by_duration
582580
@redis.flushdb
583581

584582
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
585-
updater.update('FastTest#test_1', 1000.0)
586-
updater.update('SlowTest#test_1', 10_000.0)
587-
updater.update('MediumTest#test_1', 5000.0)
583+
updater.update_batch([["FastTest#test_1", 1000.0], ["SlowTest#test_1", 10_000.0], ["MediumTest#test_1", 5000.0]])
588584

589585
tests = [
590586
MockTest.new('FastTest#test_1'),
@@ -616,7 +612,7 @@ def test_moving_average_with_partial_coverage
616612

617613
# Only one test has moving average data
618614
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
619-
updater.update('PartialTest#test_1', 3000.0)
615+
updater.update_batch([["PartialTest#test_1", 3000.0]])
620616

621617
tests = [
622618
MockTest.new('PartialTest#test_1'),
@@ -642,7 +638,7 @@ def test_moving_average_updates_persist_across_workers
642638

643639
# Manually update moving average as if a previous worker completed the test
644640
updater = CI::Queue::Redis::UpdateTestDurationMovingAverage.new(@redis)
645-
updater.update('PersistTest#test_1', 5500.0)
641+
updater.update_batch([["PersistTest#test_1", 5500.0]])
646642

647643
# New worker should see the persisted moving average
648644
tests = [MockTest.new('PersistTest#test_1')]

0 commit comments

Comments
 (0)