Skip to content

Commit 752c354

Browse files
Fix for #5620 (#5627)
Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
1 parent ddc2de1 commit 752c354

File tree

4 files changed

+250
-2
lines changed

4 files changed

+250
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [BUGFIX] Querier: Fix querier limiter bug under multiselect. #5627
45
* [CHANGE] Ruler: Add `cortex_ruler_rule_group_load_duration_seconds` and `cortex_ruler_rule_group_sync_duration_seconds` metrics. #5609
56
* [CHANGE] Ruler: Add contextual info and query statistics to log
67
* [FEATURE] Ruler: Add support for disabling rule groups. #5521

pkg/querier/querier.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,11 @@ type QueryableWithFilter interface {
242242
UseQueryable(now time.Time, queryMinT, queryMaxT int64) bool
243243
}
244244

245+
type limiterHolder struct {
246+
limiter *limiter.QueryLimiter
247+
limiterInitializer sync.Once
248+
}
249+
245250
// NewQueryable creates a new Queryable for cortex.
246251
func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter, chunkIterFn chunkIteratorFunc, cfg Config, limits *validation.Overrides, tombstonesLoader purger.TombstonesLoader) storage.Queryable {
247252
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
@@ -256,6 +261,7 @@ func NewQueryable(distributor QueryableWithFilter, stores []QueryableWithFilter,
256261
queryStoreForLabels: cfg.QueryStoreForLabels,
257262
distributor: distributor,
258263
stores: stores,
264+
limiterHolder: &limiterHolder{},
259265
}
260266

261267
return q, nil
@@ -273,6 +279,7 @@ type querier struct {
273279
queryStoreForLabels bool
274280
distributor QueryableWithFilter
275281
stores []QueryableWithFilter
282+
limiterHolder *limiterHolder
276283
}
277284

278285
func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int64, int64, storage.Querier, []storage.Querier, error) {
@@ -281,7 +288,11 @@ func (q querier) setupFromCtx(ctx context.Context) (context.Context, string, int
281288
return ctx, userID, 0, 0, nil, nil, err
282289
}
283290

284-
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID)))
291+
q.limiterHolder.limiterInitializer.Do(func() {
292+
q.limiterHolder.limiter = limiter.NewQueryLimiter(q.limits.MaxFetchedSeriesPerQuery(userID), q.limits.MaxFetchedChunkBytesPerQuery(userID), q.limits.MaxChunksPerQuery(userID), q.limits.MaxFetchedDataBytesPerQuery(userID))
293+
})
294+
295+
ctx = limiter.AddQueryLimiterToContext(ctx, q.limiterHolder.limiter)
285296

286297
mint, maxt, err := validateQueryTimeRange(ctx, userID, q.mint, q.maxt, q.limits, q.maxQueryIntoFuture)
287298
if err != nil {

pkg/querier/querier_test.go

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,201 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
311311
}
312312
}
313313

314+
type tenantLimit struct {
315+
MaxFetchedSeriesPerQuery int
316+
}
317+
318+
func (t tenantLimit) ByUserID(userID string) *validation.Limits {
319+
return &validation.Limits{
320+
MaxFetchedSeriesPerQuery: t.MaxFetchedSeriesPerQuery,
321+
}
322+
}
323+
324+
func (t tenantLimit) AllByUserID() map[string]*validation.Limits {
325+
defaults := DefaultLimitsConfig()
326+
return map[string]*validation.Limits{
327+
"0": &defaults,
328+
}
329+
}
330+
331+
// By testing limits, we are ensuring queries with multiple selects share the query limiter
332+
func TestLimits(t *testing.T) {
333+
t.Parallel()
334+
start := time.Now().Add(-2 * time.Hour)
335+
end := time.Now()
336+
ctx := user.InjectOrgID(context.Background(), "0")
337+
var cfg Config
338+
flagext.DefaultValues(&cfg)
339+
340+
const chunks = 1
341+
342+
labelsSets := []labels.Labels{
343+
{
344+
{Name: model.MetricNameLabel, Value: "foo"},
345+
{Name: "order", Value: "1"},
346+
},
347+
{
348+
{Name: model.MetricNameLabel, Value: "foo"},
349+
{Name: "order", Value: "2"},
350+
},
351+
{
352+
{Name: model.MetricNameLabel, Value: "foo"},
353+
{Name: "orders", Value: "3"},
354+
},
355+
{
356+
{Name: model.MetricNameLabel, Value: "bar"},
357+
{Name: "orders", Value: "4"},
358+
},
359+
{
360+
{Name: model.MetricNameLabel, Value: "bar"},
361+
{Name: "orders", Value: "5"},
362+
},
363+
}
364+
365+
_, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
366+
367+
streamResponse := client.QueryStreamResponse{
368+
Timeseries: []cortexpb.TimeSeries{
369+
{
370+
Labels: []cortexpb.LabelAdapter{
371+
{Name: model.MetricNameLabel, Value: "foo"},
372+
{Name: "order", Value: "2"},
373+
},
374+
Samples: samples,
375+
},
376+
{
377+
Labels: []cortexpb.LabelAdapter{
378+
{Name: model.MetricNameLabel, Value: "foo"},
379+
{Name: "order", Value: "1"},
380+
},
381+
Samples: samples,
382+
},
383+
{
384+
Labels: []cortexpb.LabelAdapter{
385+
{Name: model.MetricNameLabel, Value: "foo"},
386+
{Name: "orders", Value: "3"},
387+
},
388+
Samples: samples,
389+
},
390+
{
391+
Labels: []cortexpb.LabelAdapter{
392+
{Name: model.MetricNameLabel, Value: "bar"},
393+
{Name: "orders", Value: "2"},
394+
},
395+
Samples: samples,
396+
},
397+
{
398+
Labels: []cortexpb.LabelAdapter{
399+
{Name: model.MetricNameLabel, Value: "bar"},
400+
{Name: "orders", Value: "1"},
401+
},
402+
Samples: samples,
403+
},
404+
},
405+
}
406+
407+
distributor := &MockLimitingDistributor{
408+
response: &streamResponse,
409+
}
410+
411+
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
412+
413+
tCases := []struct {
414+
name string
415+
description string
416+
distributorQueryable QueryableWithFilter
417+
storeQueriables []QueryableWithFilter
418+
tenantLimit validation.TenantLimits
419+
query string
420+
assert func(t *testing.T, r *promql.Result)
421+
}{
422+
{
423+
424+
name: "should result in limit failure for multi-select and an individual select hits the series limit",
425+
description: "query results in multi-select but duplicate finger prints get deduped but still results in # of series greater than limit",
426+
query: "foo + foo",
427+
distributorQueryable: distributorQueryableStreaming,
428+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
429+
tenantLimit: &tenantLimit{
430+
MaxFetchedSeriesPerQuery: 2,
431+
},
432+
assert: func(t *testing.T, r *promql.Result) {
433+
require.Error(t, r.Err)
434+
},
435+
},
436+
{
437+
438+
name: "should not result in limit failure for multi-select and the query does not hit the series limit",
439+
description: "query results in multi-select but duplicate series finger prints get deduped resulting in # of series within the limit",
440+
query: "foo + foo",
441+
distributorQueryable: distributorQueryableStreaming,
442+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
443+
tenantLimit: &tenantLimit{
444+
MaxFetchedSeriesPerQuery: 3,
445+
},
446+
assert: func(t *testing.T, r *promql.Result) {
447+
require.NoError(t, r.Err)
448+
},
449+
},
450+
{
451+
452+
name: "should result in limit failure for multi-select and query hits the series limit",
453+
description: "query results in multi-select but each individual select does not hit the limit but cumulatively the query hits the limit",
454+
query: "foo + bar",
455+
distributorQueryable: distributorQueryableStreaming,
456+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
457+
tenantLimit: &tenantLimit{
458+
MaxFetchedSeriesPerQuery: 3,
459+
},
460+
assert: func(t *testing.T, r *promql.Result) {
461+
require.Error(t, r.Err)
462+
},
463+
},
464+
{
465+
466+
name: "should not result in limit failure for multi-select and query does not hit the series limit",
467+
description: "query results in multi-select and the cumulative limit is >= series",
468+
query: "foo + bar",
469+
distributorQueryable: distributorQueryableStreaming,
470+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(distributorQueryableStreaming)},
471+
tenantLimit: &tenantLimit{
472+
MaxFetchedSeriesPerQuery: 5,
473+
},
474+
assert: func(t *testing.T, r *promql.Result) {
475+
require.NoError(t, r.Err)
476+
},
477+
},
478+
}
479+
480+
for i, tc := range tCases {
481+
t.Run(tc.name+fmt.Sprintf(", Test: %d", i), func(t *testing.T) {
482+
wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable}
483+
var wQueriables []QueryableWithFilter
484+
for _, queriable := range tc.storeQueriables {
485+
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable})
486+
}
487+
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), tc.tenantLimit)
488+
require.NoError(t, err)
489+
490+
queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader())
491+
opts := promql.EngineOpts{
492+
Logger: log.NewNopLogger(),
493+
MaxSamples: 1e6,
494+
Timeout: 1 * time.Minute,
495+
}
496+
497+
queryEngine := promql.NewEngine(opts)
498+
499+
query, err := queryEngine.NewRangeQuery(ctx, queryable, nil, tc.query, start, end, 1*time.Minute)
500+
require.NoError(t, err)
501+
502+
r := query.Exec(ctx)
503+
504+
tc.assert(t, r)
505+
})
506+
}
507+
}
508+
314509
func TestQuerier(t *testing.T) {
315510
t.Parallel()
316511
var cfg Config

pkg/querier/testutils.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package querier
22

33
import (
44
"context"
5-
5+
"github.com/cortexproject/cortex/pkg/cortexpb"
6+
"github.com/cortexproject/cortex/pkg/util/limiter"
67
"github.com/prometheus/common/model"
78
"github.com/prometheus/prometheus/model/labels"
89
"github.com/prometheus/prometheus/scrape"
@@ -60,6 +61,46 @@ func (m *MockDistributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricM
6061
return args.Get(0).([]scrape.MetricMetadata), args.Error(1)
6162
}
6263

64+
type MockLimitingDistributor struct {
65+
MockDistributor
66+
response *client.QueryStreamResponse
67+
}
68+
69+
func (m *MockLimitingDistributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*client.QueryStreamResponse, error) {
70+
var (
71+
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
72+
)
73+
s := make([][]cortexpb.LabelAdapter, 0, len(m.response.Chunkseries)+len(m.response.Timeseries))
74+
75+
response := &client.QueryStreamResponse{}
76+
for _, series := range m.response.Chunkseries {
77+
for _, label := range series.Labels {
78+
for _, matcher := range matchers {
79+
if matcher.Matches(label.Value) {
80+
s = append(s, series.Labels)
81+
response.Chunkseries = append(response.Chunkseries, series)
82+
}
83+
}
84+
}
85+
}
86+
87+
for _, series := range m.response.Timeseries {
88+
for _, label := range series.Labels {
89+
for _, matcher := range matchers {
90+
if matcher.Matches(label.Value) {
91+
s = append(s, series.Labels)
92+
response.Timeseries = append(response.Timeseries, series)
93+
}
94+
}
95+
}
96+
}
97+
98+
if limitErr := queryLimiter.AddSeries(s...); limitErr != nil {
99+
return nil, validation.LimitError(limitErr.Error())
100+
}
101+
return response, nil
102+
}
103+
63104
type TestConfig struct {
64105
Cfg Config
65106
Distributor Distributor

0 commit comments

Comments
 (0)