diff --git a/CHANGELOG.md b/CHANGELOG.md index 966c5f2411f..0f25e45c25e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#8378](https://github.com/thanos-io/thanos/pull/8378): Store: fix the reuse of dirty posting slices + ### Added ### Changed diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 668cd5b9138..c097214811a 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -27,7 +27,10 @@ const ( // tenantRetentionRegex is the regex pattern for parsing tenant retention. // valid format is `:(|d)(:all)?` where > 0. // Default behavior is to delete only level 1 blocks, use :all to delete all blocks. - tenantRetentionRegex = `^([\w-]+):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$` + // Use `*` as tenant name to apply policy to all tenants (as a default/fallback). + // Specific tenant policies take precedence over the wildcard policy. + tenantRetentionRegex = `^([\w-]+|\*):((\d{4}-\d{2}-\d{2})|(\d+d))(:all)?$` + wildCardTenant = "*" Level1 = 1 // compaction level 1 indicating a new block Level2 = 2 // compaction level 2 indicating a compacted block @@ -120,6 +123,8 @@ func ParesRetentionPolicyByTenant(logger log.Logger, retentionTenants []string) } // ApplyRetentionPolicyByTenant removes blocks depending on the specified retentionByTenant based on blocks MaxTime. +// The wildcard policy ("*") applies to all tenants as a default/fallback. +// Specific tenant policies take precedence over the wildcard policy. func ApplyRetentionPolicyByTenant( ctx context.Context, logger log.Logger, @@ -133,11 +138,20 @@ func ApplyRetentionPolicyByTenant( } level.Info(logger).Log("msg", "start tenant retention", "total", len(metas)) deleted, skipped, notExpired := 0, 0, 0 + // Check if wildcard policy exists + wildcardPolicy, hasWildcard := retentionByTenant[wildCardTenant] for id, m := range metas { - policy, ok := retentionByTenant[m.Thanos.GetTenant()] + tenant := m.Thanos.GetTenant() + // First try to find tenant-specific policy + policy, ok := retentionByTenant[tenant] if !ok { - skipped++ - continue + // Fallback to wildcard policy if tenant-specific policy not found + if hasWildcard { + policy = wildcardPolicy + } else { + skipped++ + continue + } } maxTime := time.Unix(m.MaxTime/1000, 0) // Default behavior: only delete level 1 blocks unless IsAll is true @@ -145,7 +159,7 @@ func ApplyRetentionPolicyByTenant( continue } if policy.isExpired(maxTime) { - level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "maxTime", maxTime.String()) + level.Info(logger).Log("msg", "deleting blocks applying retention policy", "id", id, "tenant", tenant, "maxTime", maxTime.String()) if err := block.Delete(ctx, logger, bkt, id); err != nil { level.Error(logger).Log("msg", "failed to delete block", "id", id, "err", err) continue // continue to next block to clean up backlogs diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index 0e43da89e72..2524414894a 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -374,6 +374,47 @@ func TestParseRetentionPolicyByTenant(t *testing.T) { nil, true, }, + { + "wildcard tenant with duration", + []string{"*:30d"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, + IsAll: false, + }, + }, + false, + }, + { + "wildcard tenant with cutoff date and all flag", + []string{"*:2024-01-01:all"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), + RetentionDuration: time.Duration(0), + IsAll: true, + }, + }, + false, + }, + { + "wildcard with specific tenant override", + []string{"*:90d", "tenant-special:30d:all"}, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 90 * 24 * time.Hour, + IsAll: false, + }, + "tenant-special": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, + IsAll: true, + }, + }, + false, + }, } { t.Run(tt.name, func(t *testing.T) { got, err := compact.ParesRetentionPolicyByTenant(log.NewNopLogger(), tt.retentionTenants) @@ -573,6 +614,172 @@ func TestApplyRetentionPolicyByTenant(t *testing.T) { }, false, }, + { + "wildcard tenant applies to all tenants", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-b", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-c", + time.Now().Add(-24 * time.Hour), + time.Now().Add(-23 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-d", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "wildcard tenant with all flag applies to all levels", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-3 * 24 * time.Hour), + time.Now().Add(-2 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-b", + time.Now().Add(-2 * 24 * time.Hour), + time.Now().Add(-24 * time.Hour), + compact.Level2, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-c", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * time.Hour, + IsAll: true, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW50/", + }, + false, + }, + { + "wildcard with specific tenant override - wildcard longer retention, specific shorter", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-a", + time.Now().Add(-50 * 24 * time.Hour), + time.Now().Add(-49 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-cleanup", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW50", + "tenant-b", + time.Now().Add(-20 * 24 * time.Hour), + time.Now().Add(-19 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW51", + "tenant-cleanup", + time.Now().Add(-5 * time.Hour), + time.Now().Add(-4 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 30 * 24 * time.Hour, // 30 days for most tenants + IsAll: false, + }, + "tenant-cleanup": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * 24 * time.Hour, // 10 days for cleanup tenant + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW50/", + "01CPHBEX20729MJQZXE3W0BW51/", + }, + false, + }, + { + "wildcard precedence - specific policy takes priority over wildcard", + []testBlock{ + { + "01CPHBEX20729MJQZXE3W0BW48", + "tenant-override", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + { + "01CPHBEX20729MJQZXE3W0BW49", + "tenant-normal", + time.Now().Add(-15 * 24 * time.Hour), + time.Now().Add(-14 * 24 * time.Hour), + compact.Level1, + }, + }, + map[string]compact.RetentionPolicy{ + "*": { + CutoffDate: time.Time{}, + RetentionDuration: 10 * 24 * time.Hour, // 10 days wildcard + IsAll: false, + }, + "tenant-override": { + CutoffDate: time.Time{}, + RetentionDuration: 20 * 24 * time.Hour, // 20 days specific override + IsAll: false, + }, + }, + []string{ + "01CPHBEX20729MJQZXE3W0BW48/", // kept due to 20-day specific policy + }, + false, + }, } { t.Run(tt.name, func(t *testing.T) { bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) diff --git a/pkg/queryfrontend/query_logger.go b/pkg/queryfrontend/query_logger.go index c97c9c3b990..3629d23fd53 100644 --- a/pkg/queryfrontend/query_logger.go +++ b/pkg/queryfrontend/query_logger.go @@ -36,6 +36,8 @@ type UserInfo struct { Tenant string ForwardedFor string UserAgent string + Groups string + Email string } // ResponseStats holds statistics extracted from query response. @@ -91,6 +93,10 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo { if userInfo.Source == "" { userInfo.Source = headerValue } + case "x-auth-request-groups": + userInfo.Groups = headerValue + case "x-auth-request-email": + userInfo.Email = headerValue } } @@ -102,29 +108,6 @@ func ExtractUserInfoFromHeaders(headers []*RequestHeader) UserInfo { return userInfo } -// ExtractEmailFromResponse extracts the email from response headers (works for both range and instant queries). -func ExtractEmailFromResponse(resp queryrange.Response) string { - if resp == nil { - return "" - } - - // Check both response types using OR condition - var headers []*queryrange.PrometheusResponseHeader - if promResp, ok := resp.(*queryrange.PrometheusResponse); ok { - headers = promResp.GetHeaders() - } else if promResp, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok { - headers = promResp.GetHeaders() - } - - for _, header := range headers { - if strings.ToLower(header.Name) == "x-auth-request-email" && len(header.Values) > 0 { - return header.Values[0] - } - } - - return "" -} - // ConvertStoreMatchers converts internal store matchers to logging format. func ConvertStoreMatchers(storeMatchers [][]*labels.Matcher) []StoreMatcherSet { if len(storeMatchers) == 0 { @@ -174,6 +157,59 @@ func GetResponseStats(resp queryrange.Response) ResponseStats { return stats } +// ExtractMetricNames extracts all unique __name__ labels from query response (works for both range and instant queries). +func ExtractMetricNames(resp queryrange.Response) []string { + if resp == nil { + return nil + } + + metricNamesMap := make(map[string]struct{}) + + // Handle range query response (resultType: matrix) + if r, ok := resp.(*queryrange.PrometheusResponse); ok { + for _, stream := range r.Data.Result { + for _, label := range stream.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } else if r, ok := resp.(*queryrange.PrometheusInstantQueryResponse); ok { + // Handle instant query response - check all result types + if vector := r.Data.Result.GetVector(); vector != nil { + // resultType: vector + for _, sample := range vector.Samples { + for _, label := range sample.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } else if matrix := r.Data.Result.GetMatrix(); matrix != nil { + // resultType: matrix (subqueries in instant queries) + for _, stream := range matrix.SampleStreams { + for _, label := range stream.Labels { + if label.Name == "__name__" { + metricNamesMap[label.Value] = struct{}{} + break + } + } + } + } + // Scalar and StringSample don't have __name__ labels + } + + // Convert map to slice + metricNames := make([]string, 0, len(metricNamesMap)) + for name := range metricNamesMap { + metricNames = append(metricNames, name) + } + + return metricNames +} + // WriteJSONLogToFile writes query logs to file in JSON format. func WriteJSONLogToFile(logger log.Logger, writer interface{}, queryLog interface{}, queryType string) error { if writer == nil { diff --git a/pkg/queryfrontend/queryinstant_logger.go b/pkg/queryfrontend/queryinstant_logger.go index a317e9ae886..50a875816e0 100644 --- a/pkg/queryfrontend/queryinstant_logger.go +++ b/pkg/queryfrontend/queryinstant_logger.go @@ -37,6 +37,7 @@ type MetricsInstantQueryLogging struct { ForwardedFor string `json:"forwarded_for"` UserAgent string `json:"user_agent"` EmailId string `json:"email_id"` + Groups string `json:"groups"` // Query-related fields (instant query specific) QueryTimestampMs int64 `json:"query_timestamp_ms"` // Query timestamp for instant queries Path string `json:"path"` @@ -50,6 +51,8 @@ type MetricsInstantQueryLogging struct { Analyze bool `json:"analyze"` // Whether query analysis is enabled Engine string `json:"engine"` // Query engine being used Stats string `json:"stats"` // Query statistics information + MetricNames []string `json:"metric_names"` // Unique metric names (__name__ labels) in response + Shard string `json:"shard"` // Pantheon shard name // Store-matcher details StoreMatchers []StoreMatcherSet `json:"store_matchers"` } @@ -134,18 +137,12 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR success := err == nil userInfo := ExtractUserInfoFromHeaders(req.Headers) - // Extract email from response headers - email := ExtractEmailFromResponse(resp) - - // This is to avoid logging queries that come from rule manager. - if userInfo.UserAgent == "Databricks-RuleManager/1.0" { - return - } - // Calculate stats (only for successful queries). var stats ResponseStats + var metricNames []string if success && resp != nil { stats = GetResponseStats(resp) + metricNames = ExtractMetricNames(resp) } // Create the instant query log entry. @@ -166,7 +163,8 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR Tenant: userInfo.Tenant, ForwardedFor: userInfo.ForwardedFor, UserAgent: userInfo.UserAgent, - EmailId: email, + EmailId: userInfo.Email, + Groups: userInfo.Groups, // Query-related fields (instant query specific) QueryTimestampMs: req.Time, Path: req.Path, @@ -180,6 +178,8 @@ func (m *instantQueryLoggingMiddleware) logInstantQuery(req *ThanosQueryInstantR Analyze: req.Analyze, Engine: req.Engine, Stats: req.Stats, + MetricNames: metricNames, + Shard: os.Getenv("PANTHEON_SHARDNAME"), // Store-matcher details StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), } diff --git a/pkg/queryfrontend/queryrange_logger.go b/pkg/queryfrontend/queryrange_logger.go index c6a555f45d3..665a857e127 100644 --- a/pkg/queryfrontend/queryrange_logger.go +++ b/pkg/queryfrontend/queryrange_logger.go @@ -37,6 +37,7 @@ type MetricsRangeQueryLogging struct { ForwardedFor string `json:"forwardedFor"` UserAgent string `json:"userAgent"` EmailId string `json:"emailId"` + Groups string `json:"groups"` // Query-related fields StartTimestampMs int64 `json:"startTimestampMs"` EndTimestampMs int64 `json:"endTimestampMs"` @@ -53,6 +54,8 @@ type MetricsRangeQueryLogging struct { Engine string `json:"engine"` // Query engine being used SplitIntervalMs int64 `json:"splitIntervalMs"` // Query splitting interval in milliseconds Stats string `json:"stats"` // Query statistics information + MetricNames []string `json:"metricNames"` // Unique metric names (__name__ labels) in response + Shard string `json:"shard"` // Pantheon shard name // Store-matcher details StoreMatchers []StoreMatcherSet `json:"storeMatchers"` } @@ -137,13 +140,12 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest success := err == nil userInfo := ExtractUserInfoFromHeaders(req.Headers) - // Extract email from response headers - email := ExtractEmailFromResponse(resp) - // Calculate stats (only for successful queries). var stats ResponseStats + var metricNames []string if success && resp != nil { stats = GetResponseStats(resp) + metricNames = ExtractMetricNames(resp) } // Create the range query log entry. @@ -164,7 +166,8 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest Tenant: userInfo.Tenant, ForwardedFor: userInfo.ForwardedFor, UserAgent: userInfo.UserAgent, - EmailId: email, + EmailId: userInfo.Email, + Groups: userInfo.Groups, // Query-related fields StartTimestampMs: req.Start, EndTimestampMs: req.End, @@ -181,6 +184,8 @@ func (m *rangeQueryLoggingMiddleware) logRangeQuery(req *ThanosQueryRangeRequest Engine: req.Engine, SplitIntervalMs: req.SplitInterval.Milliseconds(), Stats: req.Stats, + MetricNames: metricNames, + Shard: os.Getenv("PANTHEON_SHARDNAME"), // Store-matcher details StoreMatchers: ConvertStoreMatchers(req.StoreMatchers), } diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7417b496420..d85b202705f 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -34,6 +34,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/zeropool" "github.com/weaveworks/common/httpgrpc" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" @@ -125,8 +126,22 @@ const ( var ( errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} + postingsPool zeropool.Pool[[]storage.SeriesRef] ) +func getPostingsSlice() []storage.SeriesRef { + if p := postingsPool.Get(); p != nil { + return p + } + + // Pre-allocate slice with initial capacity. + return make([]storage.SeriesRef, 0, 1024) +} + +func putPostingsSlice(p []storage.SeriesRef) { + postingsPool.Put(p[:0]) +} + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -1693,6 +1708,9 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store err = g.Wait() }) if err != nil { + for _, resp := range respSets { + resp.Close() + } code := codes.Aborted if s, ok := status.FromError(errors.Cause(err)); ok { code = s.Code() @@ -2539,6 +2557,10 @@ type bucketIndexReader struct { indexVersion int logger log.Logger + + // Posting slice to return to the postings pool on close. + // A single bucketIndexReader should have at most 1 postings slice to return. + postings []storage.SeriesRef } func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { @@ -2659,13 +2681,13 @@ func (r *bucketIndexReader) ExpandedPostings(ctx context.Context, ms sortedMatch // ExpandPostingsWithContext returns the postings expanded as a slice and considers context. func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) { - res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity + res := getPostingsSlice() i := 0 for p.Next() { i++ if i%checkContextEveryNIterations == 0 { if err := ctx.Err(); err != nil { - return nil, err + return res, err } } res = append(res, p.At()) @@ -2958,6 +2980,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, } ps, err := ExpandPostingsWithContext(ctx, p) + r.postings = ps if err != nil { level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil @@ -3394,6 +3417,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() + + if r.postings != nil { + putPostingsSlice(r.postings) + } return nil } diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index bee87898c6c..12a970437d9 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -131,7 +131,7 @@ func prepareTestBlocks(t testing.TB, now time.Time, count int, dir string, bkt o return } -func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig) *storeSuite { +func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, manyParts bool, chunksLimiterFactory ChunksLimiterFactory, seriesLimiterFactory SeriesLimiterFactory, bytesLimiterFactory BytesLimiterFactory, relabelConfig []*relabel.Config, filterConf *FilterConfig, opts ...BucketStoreOption) *storeSuite { series := []labels.Labels{ labels.FromStrings("a", "1", "b", "1"), labels.FromStrings("a", "1", "b", "2"), @@ -176,10 +176,10 @@ func prepareStoreWithTestBlocks(t testing.TB, dir string, bkt objstore.Bucket, m true, true, time.Minute, - WithLogger(s.logger), - WithIndexCache(s.cache), - WithFilterConfig(filterConf), - WithRegistry(reg), + append(opts, WithLogger(s.logger), + WithIndexCache(s.cache), + WithFilterConfig(filterConf), + WithRegistry(reg))..., ) testutil.Ok(t, err) defer func() { testutil.Ok(t, store.Close()) }() @@ -619,6 +619,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxChunksLimit uint64 maxSeriesLimit uint64 maxBytesLimit int64 + storeOpts []BucketStoreOption expectedErr string code codes.Code }{ @@ -630,12 +631,25 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { expectedErr: "exceeded chunks limit", code: codes.ResourceExhausted, }, + "should fail if the max chunks limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks - 1, + expectedErr: "exceeded chunks limit", + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, "should fail if the max series limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks, expectedErr: "exceeded series limit", maxSeriesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max series limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded series limit", + maxSeriesLimit: 1, + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, "should fail if the max bytes limit is exceeded - ResourceExhausted": { maxChunksLimit: expectedChunks, expectedErr: "exceeded bytes limit", @@ -643,6 +657,14 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxBytesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max bytes limit is exceeded - ResourceExhausted (sortingStrategyNone)": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded bytes limit", + maxSeriesLimit: 2, + maxBytesLimit: 1, + storeOpts: []BucketStoreOption{WithDontResort(true)}, + code: codes.ResourceExhausted, + }, } for testName, testData := range cases { @@ -653,7 +675,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf, testData.storeOpts...) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 8027c093de7..dfa9932b93c 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -2971,7 +2971,7 @@ func TestExpandPostingsWithContextCancel(t *testing.T) { res, err := ExpandPostingsWithContext(ctx, p) testutil.NotOk(t, err) testutil.Equals(t, context.Canceled, err) - testutil.Equals(t, []storage.SeriesRef(nil), res) + testutil.Equals(t, true, cap(res) == 1024) } func samePostingGroup(a, b *postingGroup) bool { diff --git a/pkg/store/lazy_postings.go b/pkg/store/lazy_postings.go index f8363ab477e..ba8e8fec65e 100644 --- a/pkg/store/lazy_postings.go +++ b/pkg/store/lazy_postings.go @@ -279,6 +279,19 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, errors.Wrap(err, "get postings") } + result := mergeFetchedPostings(ctx, fetchedPostings, postingGroups) + if err := ctx.Err(); err != nil { + return nil, nil, err + } + ps, err := ExpandPostingsWithContext(ctx, result) + r.postings = ps + if err != nil { + return nil, nil, errors.Wrap(err, "expand") + } + return ps, lazyMatchers, nil +} + +func mergeFetchedPostings(ctx context.Context, fetchedPostings []index.Postings, postingGroups []*postingGroup) index.Postings { // Get "add" and "remove" postings from groups. We iterate over postingGroups and their keys // again, and this is exactly the same order as before (when building the groups), so we can simply // use one incrementing index to fetch postings from returned slice. @@ -306,14 +319,5 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post } } - result := index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) - - if err := ctx.Err(); err != nil { - return nil, nil, err - } - ps, err := ExpandPostingsWithContext(ctx, result) - if err != nil { - return nil, nil, errors.Wrap(err, "expand") - } - return ps, lazyMatchers, nil + return index.Without(index.Intersect(groupAdds...), index.Merge(ctx, groupRemovals...)) } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index d128487d0c5..25dc335951a 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware" "github.com/thanos-io/thanos/pkg/losertree" @@ -277,6 +278,8 @@ type lazyRespSet struct { initialized bool shardMatcher *storepb.ShardMatcher + + donec chan struct{} } func (l *lazyRespSet) isEmpty() bool { @@ -385,6 +388,7 @@ func newLazyRespSet( ringHead: 0, ringTail: 0, closed: false, + donec: make(chan struct{}), } respSet.storeLabels = make(map[string]struct{}) for _, ls := range storeLabelSets { @@ -403,6 +407,8 @@ func newLazyRespSet( l.span.SetTag("processed.samples", seriesStats.Samples) l.span.SetTag("processed.bytes", bytesProcessed) l.span.Finish() + + close(l.donec) }() numResponses := 0 @@ -611,13 +617,14 @@ func newAsyncRespSet( func (l *lazyRespSet) Close() { l.bufferedResponsesMtx.Lock() - defer l.bufferedResponsesMtx.Unlock() - l.closeSeries() l.closed = true l.bufferSlotEvent.Signal() l.noMoreData = true l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + + <-l.donec // Wait for the internal goroutine to complete its work. l.shardMatcher.Close() _ = l.cl.CloseSend() @@ -806,11 +813,15 @@ func sortWithoutLabels(set []*storepb.SeriesResponse, labelsToRemove map[string] } func (l *eagerRespSet) Close() { + l.wg.Wait() + if l.closeSeries != nil { l.closeSeries() } + l.wg.Wait() l.shardMatcher.Close() _ = l.cl.CloseSend() + } func (l *eagerRespSet) At() *storepb.SeriesResponse { diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 7617f48b296..9d4cae6c1e9 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -2055,10 +2055,6 @@ func TestProxyStore_SeriesSlowStores(t *testing.T) { return } } - - // Wait until the last goroutine exits which is stuck on time.Sleep(). - // Otherwise, goleak complains. - time.Sleep(5 * time.Second) } func TestProxyStore_Series_RequestParamsProxied(t *testing.T) {