diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 9676a4a95ee..0f7b1cdd6a7 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -503,7 +503,7 @@ func runCompactForTenant( progress.Set(compact.CleanBlocks) compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) - if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + if _, err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { return errors.Wrap(err, "cleaning marked blocks") } compactMetrics.cleanups.Inc() diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 1f2ec22f262..c5858f010e9 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -948,7 +948,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat } compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, stubCounter, stubCounter, stubCounter) - if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { + if _, err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { return errors.Wrap(err, "error cleaning blocks") } diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index 0c08a0ebca9..618c3b2f70a 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -30,6 +30,8 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" @@ -785,7 +787,7 @@ func TestQueryAnalyzeEndpoints(t *testing.T) { func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore { c := &storetestutil.TestClient{ Name: "1", - StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil)), + StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), atomic.Bool{}), MinTime: math.MinInt64, MaxTime: math.MaxInt64, } diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index c0122c80f75..ea645abaaf0 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -857,18 +857,34 @@ func NewDeduplicateFilter(concurrency int) *DefaultDeduplicateFilter { // Filter filters out duplicate blocks that can be formed // from two or more overlapping blocks that fully submatches the source blocks of the older blocks. func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec, modified GaugeVec) error { - f.duplicateIDs = f.duplicateIDs[:0] - - var wg sync.WaitGroup + var filterWg, dupWg sync.WaitGroup var groupChan = make(chan []*metadata.Meta) + var dupsChan = make(chan ulid.ULID) + + dupWg.Add(1) + go func() { + defer dupWg.Done() + dups := make([]ulid.ULID, 0) + for dup := range dupsChan { + if metas[dup] != nil { + dups = append(dups, dup) + } + synced.WithLabelValues(duplicateMeta).Inc() + delete(metas, dup) + } + f.mu.Lock() + f.duplicateIDs = dups + f.mu.Unlock() + }() + // Start up workers to deduplicate workgroups when they're ready. for i := 0; i < f.concurrency; i++ { - wg.Add(1) + filterWg.Add(1) go func() { - defer wg.Done() + defer filterWg.Done() for group := range groupChan { - f.filterGroup(group, metas, synced) + f.filterGroup(group, dupsChan) } }() } @@ -883,12 +899,15 @@ func (f *DefaultDeduplicateFilter) Filter(_ context.Context, metas map[ulid.ULID groupChan <- group } close(groupChan) - wg.Wait() + filterWg.Wait() + + close(dupsChan) + dupWg.Wait() return nil } -func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, metas map[ulid.ULID]*metadata.Meta, synced GaugeVec) { +func (f *DefaultDeduplicateFilter) filterGroup(metaSlice []*metadata.Meta, dupsChan chan ulid.ULID) { sort.Slice(metaSlice, func(i, j int) bool { ilen := len(metaSlice[i].Compaction.Sources) jlen := len(metaSlice[j].Compaction.Sources) @@ -919,19 +938,16 @@ childLoop: coveringSet = append(coveringSet, child) } - f.mu.Lock() for _, duplicate := range duplicates { - if metas[duplicate] != nil { - f.duplicateIDs = append(f.duplicateIDs, duplicate) - } - synced.WithLabelValues(duplicateMeta).Inc() - delete(metas, duplicate) + dupsChan <- duplicate } - f.mu.Unlock() } // DuplicateIDs returns slice of block ids that are filtered out by DefaultDeduplicateFilter. func (f *DefaultDeduplicateFilter) DuplicateIDs() []ulid.ULID { + f.mu.Lock() + defer f.mu.Unlock() + return f.duplicateIDs } diff --git a/pkg/block/indexheader/reader_pool.go b/pkg/block/indexheader/reader_pool.go index ff3975663c4..75d14304ee4 100644 --- a/pkg/block/indexheader/reader_pool.go +++ b/pkg/block/indexheader/reader_pool.go @@ -7,10 +7,14 @@ import ( "context" "sync" "time" + "unsafe" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" + + xsync "golang.org/x/sync/singleflight" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" @@ -48,6 +52,7 @@ type ReaderPool struct { // Keep track of all readers managed by the pool. lazyReadersMx sync.Mutex lazyReaders map[*LazyBinaryReader]struct{} + lazyReadersSF xsync.Group lazyDownloadFunc LazyDownloadIndexHeaderFunc } @@ -122,19 +127,22 @@ func NewReaderPool(logger log.Logger, lazyReaderEnabled bool, lazyReaderIdleTime // with lazy reader enabled, this function will return a lazy reader. The returned lazy reader // is tracked by the pool and automatically closed once the idle timeout expires. func (p *ReaderPool) NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.BucketReader, dir string, id ulid.ULID, postingOffsetsInMemSampling int, meta *metadata.Meta) (Reader, error) { - var reader Reader - var err error - - if p.lazyReaderEnabled { - reader, err = NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) - } else { - reader, err = NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) + if !p.lazyReaderEnabled { + return NewBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.binaryReader) } + idBytes := id[:] + lazyReader, err, _ := p.lazyReadersSF.Do(*(*string)(unsafe.Pointer(&idBytes)), func() (interface{}, error) { + return NewLazyBinaryReader(ctx, logger, bkt, dir, id, postingOffsetsInMemSampling, p.metrics.lazyReader, p.metrics.binaryReader, p.onLazyReaderClosed, p.lazyDownloadFunc(meta)) + }) + if err != nil { + level.Error(p.logger).Log("msg", "failed to create lazy index-header reader", "block", id.String(), "err", err) return nil, err } + reader := lazyReader.(Reader) + // Keep track of lazy readers only if required. if p.lazyReaderEnabled && p.lazyReaderIdleTimeout > 0 { p.lazyReadersMx.Lock() diff --git a/pkg/block/indexheader/reader_pool_test.go b/pkg/block/indexheader/reader_pool_test.go index 331d2187241..8f31b327cea 100644 --- a/pkg/block/indexheader/reader_pool_test.go +++ b/pkg/block/indexheader/reader_pool_test.go @@ -5,13 +5,18 @@ package indexheader import ( "context" + "path" "path/filepath" + "sync" "testing" "time" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" promtestutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/filesystem" "github.com/efficientgo/core/testutil" @@ -132,3 +137,114 @@ func TestReaderPool_ShouldCloseIdleLazyReaders(t *testing.T) { testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.loadCount)) testutil.Equals(t, float64(2), promtestutil.ToFloat64(metrics.lazyReader.unloadCount)) } + +func TestReaderPool_MultipleReaders(t *testing.T) { + ctx := context.Background() + + blkDir := t.TempDir() + + bkt := objstore.NewInMemBucket() + b1, err := e2eutil.CreateBlock(ctx, blkDir, []labels.Labels{ + labels.New(labels.Label{Name: "a", Value: "1"}), + labels.New(labels.Label{Name: "a", Value: "2"}), + labels.New(labels.Label{Name: "a", Value: "3"}), + labels.New(labels.Label{Name: "a", Value: "4"}), + labels.New(labels.Label{Name: "b", Value: "1"}), + }, 100, 0, 1000, labels.New(labels.Label{Name: "ext1", Value: "val1"}), 124, metadata.NoneFunc) + testutil.Ok(t, err) + + require.NoError(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(blkDir, b1.String()), metadata.NoneFunc)) + + readerPool := NewReaderPool( + log.NewNopLogger(), + true, + time.Minute, + NewReaderPoolMetrics(prometheus.NewRegistry()), + AlwaysEagerDownloadIndexHeader, + ) + + dlDir := t.TempDir() + + m, err := metadata.ReadFromDir(filepath.Join(blkDir, b1.String())) + testutil.Ok(t, err) + + startWg := &sync.WaitGroup{} + startWg.Add(1) + + waitWg := &sync.WaitGroup{} + + const readersCount = 10 + waitWg.Add(readersCount) + for range readersCount { + go func() { + defer waitWg.Done() + t.Logf("waiting") + startWg.Wait() + t.Logf("starting") + + br, err := readerPool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, dlDir, b1, 32, m) + testutil.Ok(t, err) + + t.Cleanup(func() { + testutil.Ok(t, br.Close()) + }) + }() + } + + startWg.Done() + waitWg.Wait() +} + +func TestReaderPool_NewBinaryReader_ErrDoesNotInsertNilReader(t *testing.T) { + ctx := context.Background() + tmpDir := t.TempDir() + + bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) + testutil.Ok(t, err) + t.Cleanup(func() { testutil.Ok(t, bkt.Close()) }) + + // Create and upload a valid block. + blockID, err := e2eutil.CreateBlock( + ctx, tmpDir, + []labels.Labels{ + labels.FromStrings("a", "1"), + labels.FromStrings("a", "2"), + }, + 100, 0, 1000, + labels.FromStrings("ext1", "1"), + 124, metadata.NoneFunc, + ) + testutil.Ok(t, err) + testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, blockID.String()), metadata.NoneFunc)) + + // Now remove the index object from the bucket to force the "missing index" path. + // This simulates partial/invalid upload so NewLazyBinaryReader fails. + idxPath := path.Join(blockID.String(), block.IndexFilename) + testutil.Ok(t, bkt.Delete(ctx, idxPath)) + + meta, err := metadata.ReadFromDir(filepath.Join(tmpDir, blockID.String())) + testutil.Ok(t, err) + + // Enable lazy readers and tracking so the pool would normally insert into lazyReaders. + metrics := NewReaderPoolMetrics(nil) + pool := NewReaderPool(log.NewNopLogger(), true, time.Minute, metrics, AlwaysEagerDownloadIndexHeader) + t.Cleanup(pool.Close) + + // Call NewBinaryReader: should return error and NOT insert a nil into pool. + r, err := pool.NewBinaryReader(ctx, log.NewNopLogger(), bkt, tmpDir, blockID, 3, meta) + if err == nil { + t.Fatalf("expected error due to missing index, got nil (reader=%v)", r) + } + if r != nil { + t.Fatalf("expected nil reader on error, got %T", r) + } + + // Pool should remain empty + pool.lazyReadersMx.Lock() + got := len(pool.lazyReaders) + pool.lazyReadersMx.Unlock() + testutil.Equals(t, 0, got) + + // Exercise sweeper to ensure it doesn't trip over a bad entry. + pool.closeIdleReaders() +} diff --git a/pkg/compact/blocks_cleaner.go b/pkg/compact/blocks_cleaner.go index 3dc1f568ee8..2d8ef991487 100644 --- a/pkg/compact/blocks_cleaner.go +++ b/pkg/compact/blocks_cleaner.go @@ -5,17 +5,19 @@ package compact import ( "context" - "runtime" "sync" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/oklog/ulid" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/errutil" ) // BlocksCleaner is a struct that deletes blocks from bucket which are marked for deletion. @@ -42,30 +44,56 @@ func NewBlocksCleaner(logger log.Logger, bkt objstore.Bucket, ignoreDeletionMark // DeleteMarkedBlocks uses ignoreDeletionMarkFilter to gather the blocks that are marked for deletion and deletes those // if older than given deleteDelay. -func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) error { +func (s *BlocksCleaner) DeleteMarkedBlocks(ctx context.Context) (map[ulid.ULID]struct{}, error) { + const conc = 32 + level.Info(s.logger).Log("msg", "started cleaning of blocks marked for deletion") - deletionMarkMap := s.ignoreDeletionMarkFilter.DeletionMarkBlocks() - wg := &sync.WaitGroup{} - sem := make(chan struct{}, runtime.NumCPU()) - for _, deletionMark := range deletionMarkMap { - if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() { - sem <- struct{}{} // acquire BEFORE spawning goroutine - wg.Add(1) - go func(wg *sync.WaitGroup, sem chan struct{}, id ulid.ULID) { - defer wg.Done() - defer func() { <-sem }() // release - if err := block.Delete(ctx, s.logger, s.bkt, id); err != nil { - s.blockCleanupFailures.Inc() - level.Error(s.logger).Log("msg", "failed to delete block marked for deletion", "block", deletionMark.ID, "err", err) + var ( + merr errutil.SyncMultiError + deletedBlocksMtx sync.Mutex + deletedBlocks = make(map[ulid.ULID]struct{}, 0) + deletionMarkMap = s.ignoreDeletionMarkFilter.DeletionMarkBlocks() + wg sync.WaitGroup + dm = make(chan *metadata.DeletionMark, conc) + ) + + for range conc { + wg.Add(1) + go func() { + defer wg.Done() + for deletionMark := range dm { + if ctx.Err() != nil { return } - s.blocksCleaned.Inc() - }(wg, sem, deletionMark.ID) - level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID) - } + if time.Since(time.Unix(deletionMark.DeletionTime, 0)).Seconds() > s.deleteDelay.Seconds() { + if err := block.Delete(ctx, s.logger, s.bkt, deletionMark.ID); err != nil { + s.blockCleanupFailures.Inc() + merr.Add(errors.Wrap(err, "delete block")) + continue + } + + s.blocksCleaned.Inc() + level.Info(s.logger).Log("msg", "deleted block marked for deletion", "block", deletionMark.ID) + + deletedBlocksMtx.Lock() + deletedBlocks[deletionMark.ID] = struct{}{} + deletedBlocksMtx.Unlock() + } + } + }() } + + for _, deletionMark := range deletionMarkMap { + dm <- deletionMark + } + close(dm) wg.Wait() + + if ctx.Err() != nil { + return deletedBlocks, ctx.Err() + } + level.Info(s.logger).Log("msg", "cleaning of blocks marked for deletion done") - return nil + return deletedBlocks, merr.Err() } diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index a34555db1ba..9eccf8e6015 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -9,13 +9,16 @@ import ( "encoding/json" "fmt" "io" + "math/rand/v2" "path" + "sync" "testing" "time" "github.com/go-kit/log" "github.com/oklog/run" - "github.com/oklog/ulid" + "github.com/oklog/ulid/v2" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -245,7 +248,7 @@ func TestRetentionProgressCalculate(t *testing.T) { keys[ind] = meta.Thanos.GroupKey() } - ps := NewRetentionProgressCalculator(reg, nil, "test-tenant") + ps := NewRetentionProgressCalculator(reg, nil) for _, tcase := range []struct { testName string @@ -367,7 +370,7 @@ func TestCompactProgressCalculate(t *testing.T) { keys[ind] = meta.Thanos.GroupKey() } - ps := NewCompactionProgressCalculator(reg, planner, "test-tenant") + ps := NewCompactionProgressCalculator(reg, planner) var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for compact progress tests"}) @@ -466,7 +469,7 @@ func TestDownsampleProgressCalculate(t *testing.T) { keys[ind] = meta.Thanos.GroupKey() } - ds := NewDownsampleProgressCalculator(reg, "test-tenant") + ds := NewDownsampleProgressCalculator(reg) var bkt objstore.Bucket temp := promauto.With(reg).NewCounter(prometheus.CounterOpts{Name: "test_metric_for_group", Help: "this is a test metric for downsample progress tests"}) @@ -566,7 +569,7 @@ func TestNoMarkFilterAtomic(t *testing.T) { Name: "coolcounter", }) - for i := 0; i < blocksNum; i++ { + for i := range blocksNum { var meta metadata.Meta meta.Version = 1 meta.ULID = ulid.MustNew(uint64(i), nil) @@ -626,3 +629,112 @@ func TestNoMarkFilterAtomic(t *testing.T) { }) testutil.Ok(t, g.Run()) } + +func TestGarbageCollect_FilterRace(t *testing.T) { + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(timeoutCancel) + + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + t.Cleanup(cancel) + + bkt := objstore.NewInMemBucket() + + var metaParent metadata.Meta + metaParent.Version = 1 + metaParent.ULID = ulid.MustNew(uint64(0), nil) + + children := []ulid.ULID{ + ulid.MustNew(uint64(1), nil), ulid.MustNew(uint64(2), nil), ulid.MustNew(uint64(3), nil), + } + metaParent.Compaction.Sources = children + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&metaParent)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(metaParent.ULID.String(), metadata.MetaFilename), &buf)) + + createBlocks := func() { + for _, ch := range children { + var metaChild metadata.Meta + metaChild.Version = 1 + metaChild.ULID = ch + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&metaChild)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(metaChild.ULID.String(), metadata.MetaFilename), &buf)) + } + } + + reg := prometheus.NewRegistry() + + baseFetcher, err := block.NewBaseFetcher( + log.NewNopLogger(), + 10, + objstore.WithNoopInstr(bkt), + block.NewConcurrentLister(log.NewNopLogger(), objstore.WithNoopInstr(bkt)), + t.TempDir(), + reg, + ) + testutil.Ok(t, err) + + df := block.NewIgnoreDeletionMarkFilter(log.NewNopLogger(), objstore.WithNoopInstr(bkt), 0*time.Second, 1) + + duplicateFilter := block.NewDeduplicateFilter(5) + mf := baseFetcher.NewMetaFetcher(reg, []block.MetadataFilter{df, duplicateFilter}) + + garbageCollection := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "test_gc_counter", + }) + + syncer, err := NewMetaSyncer(log.NewNopLogger(), reg, bkt, mf, duplicateFilter, df, promauto.With(prometheus.NewRegistry()).NewCounter(prometheus.CounterOpts{ + Name: "test_meta_syncer_syncs", + }), garbageCollection, 5*time.Minute) + testutil.Ok(t, err) + + blocksCleanedMetric := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "test_block_cleaned", + }) + blocksCleaner := NewBlocksCleaner(log.NewNopLogger(), objstore.WithNoopInstr(bkt), df, 0*time.Second, blocksCleanedMetric, promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "test_block_cleaner_errors", + })) + + for timeoutCtx.Err() == nil { + t.Log("doing iteration") + + testutil.Equals(t, float64(0.0), promtestutil.ToFloat64(garbageCollection)) + + createBlocks() + for _, ch := range children { + testutil.Ok(t, block.MarkForDeletion(context.Background(), log.NewNopLogger(), objstore.WithNoopInstr(bkt), ch, "foo", promauto.With(prometheus.NewRegistry()).NewCounter( + prometheus.CounterOpts{Name: "test_block_marked_for_deletion"}, + ))) + } + testutil.Ok(t, syncer.SyncMetas(context.Background())) + + startWg := &sync.WaitGroup{} + startWg.Add(1) + + wg := &sync.WaitGroup{} + wg.Add(2) + + go func() { + defer wg.Done() + startWg.Wait() + r := rand.Uint32N(20) + time.Sleep(time.Duration(r) * time.Millisecond) + deleted, err := blocksCleaner.DeleteMarkedBlocks(context.Background()) + testutil.Ok(t, err) + testutil.Ok(t, syncer.GarbageCollect(context.Background(), deleted)) + }() + + go func() { + defer wg.Done() + startWg.Wait() + for range 100 { + testutil.Ok(t, syncer.SyncMetas(context.Background())) + } + }() + + startWg.Done() + wg.Wait() + } +} diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 64bab65a022..354a13e8b50 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -279,6 +279,14 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { return nil } + if batch := r.GetBatch(); batch != nil { + for _, series := range batch.Series { + s.seriesSet = append(s.seriesSet, *series) + s.seriesSetStats.Count(storepb.NewSeriesResponse(series)) + } + return nil + } + // Unsupported field, skip. return nil } diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 8c77ee93c50..3a3199cc199 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -16,6 +16,8 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/google/go-cmp/cmp" @@ -845,7 +847,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore { } cls[i] = &storetestutil.TestClient{ Name: fmt.Sprintf("%v", i), - StoreClient: storepb.ServerAsClient(s), + StoreClient: storepb.ServerAsClient(s, atomic.Bool{}), MinTime: math.MinInt64, MaxTime: math.MaxInt64, WithoutReplicaLabelsEnabled: withoutReplicaLabelsEnabled, } diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index cc19f43ab6d..849e40a73ee 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -14,6 +14,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/storage" + "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store" @@ -79,7 +80,7 @@ func TestQuerier_Proxy(t *testing.T) { // TODO(bwplotka): Parse external labels. sc.append(&storetestutil.TestClient{ Name: fmt.Sprintf("store number %v", i), - StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt)), + StoreClient: storepb.ServerAsClient(selectedStore(store.NewTSDBStore(logger, st.storage.DB, component.Debug, nil), m, st.mint, st.maxt), atomic.Bool{}), MinTime: st.mint, MaxTime: st.maxt, }) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b2fdd382178..2563a76d878 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1125,6 +1125,28 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st span, ctx := tracing.StartSpan(ctx, "receive_grpc") defer span.Finish() + // Fast path for IngestorOnly mode: write directly to local TSDB. + // This skips distributeTimeseriesToReplicas and sendLocalWrite since + // the Router already determined this data belongs to this node. + if h.receiverMode == IngestorOnly { + err := h.writer.Write(ctx, r.Tenant, r.Timeseries) + if err != nil { + level.Debug(h.logger).Log("msg", "failed to write to local TSDB", "err", err) + } + switch cause := errors.Cause(err); cause { + case nil: + return &storepb.WriteResponse{}, nil + default: + if isNotReady(cause) { + return nil, status.Error(codes.Unavailable, err.Error()) + } + if isConflict(cause) { + return nil, status.Error(codes.AlreadyExists, err.Error()) + } + return nil, status.Error(codes.Internal, err.Error()) + } + } + _, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries}) if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 0eaa96bc2d1..59f62e808a7 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -6,6 +6,7 @@ package receive import ( "context" "fmt" + "maps" "os" "path" "path/filepath" @@ -25,6 +26,7 @@ import ( "go.uber.org/atomic" "golang.org/x/exp/slices" "golang.org/x/sync/errgroup" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/api/status" @@ -70,6 +72,11 @@ type MultiTSDB struct { noUploadTenants []string // Support both exact matches and prefix patterns (e.g., "tenant1", "prod-*") enableTenantPathPrefix bool pathSegmentsBeforeTenant []string + + headExpandedPostingsCacheSize uint64 + blockExpandedPostingsCacheSize uint64 + + initSingleFlight singleflight.Group } // MultiTSDBOption is a functional option for MultiTSDB. @@ -256,10 +263,10 @@ type localClient struct { client storepb.StoreClient } -func newLocalClient(store *store.TSDBStore) *localClient { +func newLocalClient(store *store.TSDBStore, readOnly atomic.Bool) *localClient { return &localClient{ store: store, - client: storepb.ServerAsClient(store), + client: storepb.ServerAsClient(store, readOnly), } } @@ -331,12 +338,18 @@ func (l *localClient) SupportsWithoutReplicaLabels() bool { return true } +func (t *tenant) setReadOnly(ro bool) { + t.readOnly.Store(ro) +} + type tenant struct { readyS *ReadyStorage storeTSDB *store.TSDBStore exemplarsTSDB *exemplars.TSDB ship *shipper.Shipper + readOnly atomic.Bool + mtx *sync.RWMutex tsdb *tsdb.DB @@ -344,6 +357,19 @@ type tenant struct { blocksToDeleteFn func(db *tsdb.DB) tsdb.BlocksToDeleteFunc } +func (m *MultiTSDB) initTSDBIfNeeded(tenantID string, t *tenant) error { + _, err, _ := m.initSingleFlight.Do(tenantID, func() (interface{}, error) { + if t.readyS.Get() != nil { + return nil, nil + } + logger := log.With(m.logger, "tenant", tenantID) + + return nil, m.startTSDB(logger, tenantID, t) + }) + + return err +} + func (t *tenant) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} { t.mtx.RLock() defer t.mtx.RUnlock() @@ -387,7 +413,7 @@ func (t *tenant) client() store.Client { return nil } - return newLocalClient(tsdbStore) + return newLocalClient(tsdbStore, t.readOnly) } func (t *tenant) exemplars() *exemplars.TSDB { @@ -441,7 +467,7 @@ func (t *MultiTSDB) Open() error { } g.Go(func() error { - _, err := t.getOrLoadTenant(f.Name(), true) + _, err := t.getOrLoadTenant(f.Name()) return err }) } @@ -524,14 +550,21 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { prunedTenants []string pmtx sync.Mutex + + tenants = make(map[string]*tenant) ) + t.mtx.RLock() - for tenantID, tenantInstance := range t.tenants { + maps.Copy(tenants, t.tenants) + t.mtx.RUnlock() + + begin := time.Now() + for tenantID, tenantInstance := range tenants { wg.Add(1) go func(tenantID string, tenantInstance *tenant) { defer wg.Done() - tlog := log.With(t.logger, "tenant", tenantID) - pruned, err := t.pruneTSDB(ctx, tlog, tenantInstance) + + pruned, err := t.pruneTSDB(ctx, log.With(t.logger, "tenant", tenantID), tenantInstance, tenantID) if err != nil { merr.Add(err) return @@ -545,50 +578,35 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { }(tenantID, tenantInstance) } wg.Wait() - t.mtx.RUnlock() - - t.mtx.Lock() - defer t.mtx.Unlock() - for _, tenantID := range prunedTenants { - // Check that the tenant hasn't been reinitialized in-between locks. - if t.tenants[tenantID].readyStorage().get() != nil { - continue - } - level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) - t.removeTenantUnlocked(tenantID) - } + level.Info(t.logger).Log("msg", "Pruning job completed", "pruned_tenants_count", len(prunedTenants), "pruned_tenants", prunedTenants, "took_seconds", time.Since(begin).Seconds()) return merr.Err() } // pruneTSDB removes a TSDB if its past the retention period. // It compacts the TSDB head, sends all remaining blocks to S3 and removes the TSDB from disk. -func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant) (pruned bool, rerr error) { +func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInstance *tenant, tenantID string) (pruned bool, rerr error) { tenantTSDB := tenantInstance.readyStorage() if tenantTSDB == nil { return false, nil } - tenantTSDB.mtx.RLock() - if tenantTSDB.a == nil || tenantTSDB.a.db == nil { - tenantTSDB.mtx.RUnlock() + + tdb := tenantTSDB.Get() + if tdb == nil { return false, nil } - tdb := tenantTSDB.a.db head := tdb.Head() if head.MaxTime() < 0 { - tenantTSDB.mtx.RUnlock() return false, nil } sinceLastAppendMillis := time.Since(time.UnixMilli(head.MaxTime())).Milliseconds() compactThreshold := int64(1.5 * float64(t.tsdbOpts.MaxBlockDuration)) if sinceLastAppendMillis <= compactThreshold { - tenantTSDB.mtx.RUnlock() return false, nil } - tenantTSDB.mtx.RUnlock() // Acquire a write lock and check that no writes have occurred in-between locks. tenantTSDB.mtx.Lock() @@ -637,6 +655,15 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst } } + tenantInstance.setReadOnly(true) + defer func() { + if pruned { + return + } + + tenantInstance.setReadOnly(false) + }() + if err := tdb.Close(); err != nil { return false, err } @@ -650,6 +677,10 @@ func (t *MultiTSDB) pruneTSDB(ctx context.Context, logger log.Logger, tenantInst tenantInstance.setComponents(nil, nil, nil, nil) tenantInstance.mtx.Unlock() + t.mtx.Lock() + t.removeTenantUnlocked(tenantID) + t.mtx.Unlock() + return true, nil } @@ -856,13 +887,13 @@ func (t *MultiTSDB) defaultTenantDataDir(tenantID string) string { return path.Join(t.dataDir, tenantID) } -func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenant, error) { +func (t *MultiTSDB) getOrLoadTenant(tenantID string) (*tenant, error) { // Fast path, as creating tenants is a very rare operation. t.mtx.RLock() tenant, exist := t.tenants[tenantID] t.mtx.RUnlock() if exist { - return tenant, nil + return tenant, t.initTSDBIfNeeded(tenantID, tenant) } // Slow path needs to lock fully and attempt to read again to prevent race @@ -872,27 +903,18 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan tenant, exist = t.tenants[tenantID] if exist { t.mtx.Unlock() - return tenant, nil + return tenant, t.initTSDBIfNeeded(tenantID, tenant) } tenant = newTenant() t.addTenantUnlocked(tenantID, tenant) t.mtx.Unlock() - logger := log.With(t.logger, "tenant", tenantID) - if !blockingStart { - go func() { - if err := t.startTSDB(logger, tenantID, tenant); err != nil { - level.Error(logger).Log("msg", "failed to start tsdb asynchronously", "err", err) - } - }() - return tenant, nil - } - return tenant, t.startTSDB(logger, tenantID, tenant) + return tenant, t.initTSDBIfNeeded(tenantID, tenant) } func (t *MultiTSDB) TenantAppendable(tenantID string) (Appendable, error) { - tenant, err := t.getOrLoadTenant(tenantID, false) + tenant, err := t.getOrLoadTenant(tenantID) if err != nil { return nil, err } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 65554bd7734..46c467af82f 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -1278,3 +1278,60 @@ func TestMultiTSDBSkipsLostAndFound(t *testing.T) { m.mtx.RUnlock() testutil.Assert(t, !exists, "lost+found should not be loaded as a tenant") } + +func TestMultiTSDBDoesNotReturnPrunedTenants(t *testing.T) { + t.Parallel() + + dir := t.TempDir() + + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, labels.FromStrings("replica", "test"), "tenant_id", objstore.NewInMemBucket(), false, metadata.NoneFunc) + t.Cleanup(func() { + testutil.Ok(t, m.Close()) + }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + const iterations = 200 + + wg := sync.WaitGroup{} + wg.Go(func() { + for i := range iterations { + tenant := fmt.Sprintf("pruned-tenant-%d", i) + + testutil.Ok(t, appendSample(m, tenant, time.UnixMilli(int64(10)))) + + testutil.Ok(t, m.Prune(ctx)) + } + }) + + wg.Go(func() { + for range iterations { + clients := m.TSDBLocalClients() + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 10, + Matchers: []storepb.LabelMatcher{{Name: "foo", Value: ".*", Type: storepb.LabelMatcher_RE}}, + } + + for _, c := range clients { + sc, err := c.Series(ctx, req) + testutil.Ok(t, err) + + for { + _, err := sc.Recv() + if err == io.EOF { + break + } + testutil.Ok(t, err) + } + } + } + }) + + wg.Wait() +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index d17fd6453cc..cdd3c0f9ccc 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -124,9 +124,6 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq []prompb.TimeS errorTracker.addSampleError(err, tLogger, lset, s.Timestamp, s.Value) } - b := labels.ScratchBuilder{} - b.Labels() - for _, hp := range t.Histograms { var ( h *histogram.Histogram diff --git a/pkg/store/acceptance_test.go b/pkg/store/acceptance_test.go index 48fc00adfbe..da7bdb13138 100644 --- a/pkg/store/acceptance_test.go +++ b/pkg/store/acceptance_test.go @@ -10,13 +10,14 @@ import ( "net/url" "os" "path/filepath" + "slices" "testing" "time" "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/pkg/errors" - "golang.org/x/exp/slices" + "go.uber.org/atomic" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -881,15 +882,14 @@ func testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t *testing.T, sta ref storage.SeriesRef err error ) - for i := int64(0); i < offset; i++ { + for i := range offset { ref, err = app.Append(ref, labels.FromStrings("a", "b"), baseT+i, 1) testutil.Ok(t, err) } testutil.Ok(t, app.Commit()) } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + ctx := t.Context() client := startStore(t, extLset, appendFn) srv := newStoreSeriesServer(ctx) @@ -1164,7 +1164,7 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) { p1 := startNestedStore(tt, appendFn, extLset1, extLset2, extLset3) clients := []Client{ - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1, atomic.Bool{}), ExtLset: []labels.Labels{extLset1, extLset2, extLset3}}, } relabelCfgs := []*relabel.Config{{ @@ -1227,8 +1227,8 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { p2 := startNestedStore(tt, extLset2, appendFn) clients := []Client{ - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}}, - storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1, atomic.Bool{}), ExtLset: []labels.Labels{extLset1}}, + storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2, atomic.Bool{}), ExtLset: []labels.Labels{extLset2}}, } return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval)) @@ -1236,3 +1236,59 @@ func TestProxyStoreWithReplicas_Acceptance(t *testing.T) { testStoreAPIsAcceptance(t, startStore) } + +// TestTSDBSelectorFilteringBehavior tests that TSDBSelector properly filters stores +// based on relabel configuration, ensuring that only matching stores are included +// in TSDBInfos, LabelValues, and other metadata operations. +func TestTSDBSelectorFilteringBehavior(t *testing.T) { + t.Parallel() + + startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer { + db, err := e2eutil.NewTSDB() + testutil.Ok(tt, err) + tt.Cleanup(func() { testutil.Ok(tt, db.Close()) }) + appendFn(db.Appender(context.Background())) + + return NewTSDBStore(nil, db, component.Rule, extLset) + } + + // this TSDB will be selected + store1 := startNestedStore(tt, extLset, appendFn) + + // this TSDB should be dropped + droppedLset := labels.New(labels.Label{Name: "foo", Value: "bar"}) + store2 := startNestedStore(tt, droppedLset, appendFn) + + clients := []Client{ + storetestutil.TestClient{ + StoreClient: storepb.ServerAsClient(store1, atomic.Bool{}), + ExtLset: []labels.Labels{extLset}, + }, + storetestutil.TestClient{ + StoreClient: storepb.ServerAsClient(store2, atomic.Bool{}), + ExtLset: []labels.Labels{droppedLset}, + }, + } + + // Create relabel config to keep only the labels in the extLset + relabelCfgs := []*relabel.Config{{ + SourceLabels: []model.LabelName{"foo"}, + Regex: relabel.MustNewRegexp("bar"), + Action: relabel.Drop, + }} + + return NewProxyStore( + nil, nil, + func() []Client { return clients }, + component.Query, + labels.EmptyLabels(), + 0*time.Second, + RetrievalStrategy(EagerRetrieval), + WithTSDBSelector(NewTSDBSelector(relabelCfgs)), + ) + } + + testStoreAPIsAcceptance(t, startStore) + +} diff --git a/pkg/store/batchable.go b/pkg/store/batchable.go new file mode 100644 index 00000000000..9a399a51f83 --- /dev/null +++ b/pkg/store/batchable.go @@ -0,0 +1,68 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func newBatchableServer( + upstream storepb.Store_SeriesServer, + batchSize int, +) storepb.Store_SeriesServer { + switch batchSize { + case 0: + return &passthroughServer{Store_SeriesServer: upstream} + case 1: + return &passthroughServer{Store_SeriesServer: upstream} + default: + return &batchableServer{ + Store_SeriesServer: upstream, + batchSize: batchSize, + series: make([]*storepb.Series, 0, batchSize), + } + } +} + +// batchableServer is a flushableServer that allows sending a batch of Series per message. +type batchableServer struct { + storepb.Store_SeriesServer + batchSize int + series []*storepb.Series +} + +func (b *batchableServer) Flush() error { + if len(b.series) != 0 { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + + return nil +} + +func (b *batchableServer) Send(response *storepb.SeriesResponse) error { + series := response.GetSeries() + if series == nil { + if len(b.series) > 0 { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + return b.Store_SeriesServer.Send(response) + } + + b.series = append(b.series, series) + + if len(b.series) >= b.batchSize { + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = make([]*storepb.Series, 0, b.batchSize) + } + + return nil +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d8f1b57701e..dbbe1b02d99 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1496,7 +1496,9 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, sortingStrategyNone) + srv := newFlushableServer( + newBatchableServer(seriesSrv, int(req.ResponseBatchSize)), + sortingStrategyNone) if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index 33680c3c896..f92a341f518 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -49,7 +49,13 @@ type passthroughServer struct { storepb.Store_SeriesServer } -func (p *passthroughServer) Flush() error { return nil } +func (p *passthroughServer) Flush() error { + // If the underlying server is also flushable, flush it + if f, ok := p.Store_SeriesServer.(flushableServer); ok { + return f.Flush() + } + return nil +} // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. @@ -89,5 +95,11 @@ func (r *resortingServer) Flush() error { return err } } + + // If the underlying server is also flushable, flush it + if fs, ok := r.Store_SeriesServer.(flushableServer); ok { + return fs.Flush() + } + return nil } diff --git a/pkg/store/labelpb/label.go b/pkg/store/labelpb/label.go index 90192d0fac8..ceff59da67b 100644 --- a/pkg/store/labelpb/label.go +++ b/pkg/store/labelpb/label.go @@ -28,12 +28,12 @@ var ( sep = []byte{'\xff'} ) -func noAllocString(buf []byte) string { - return *(*string)(unsafe.Pointer(&buf)) +func safeBytes(buf string) []byte { + return []byte(buf) } -func noAllocBytes(buf string) []byte { - return *(*[]byte)(unsafe.Pointer(&buf)) +func safeString(buf []byte) string { + return string(buf) } // ZLabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner. @@ -65,8 +65,8 @@ func ReAllocZLabelsStrings(lset *[]ZLabel, intern bool) { } for j, l := range *lset { - (*lset)[j].Name = string(noAllocBytes(l.Name)) - (*lset)[j].Value = string(noAllocBytes(l.Value)) + (*lset)[j].Name = string(safeBytes(l.Name)) + (*lset)[j].Value = string(safeBytes(l.Value)) } } @@ -80,7 +80,7 @@ func internLabelString(s string) string { // detachAndInternLabelString reallocates the label string to detach it // from a bigger memory pool and interns the string. func detachAndInternLabelString(s string) string { - return internLabelString(string(noAllocBytes(s))) + return internLabelString(string(safeBytes(s))) } // ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels. @@ -191,7 +191,7 @@ func (m *ZLabel) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Name = noAllocString(data[iNdEx:postIndex]) + m.Name = safeString(data[iNdEx:postIndex]) iNdEx = postIndex case 2: if wireType != 2 { @@ -223,7 +223,7 @@ func (m *ZLabel) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Value = noAllocString(data[iNdEx:postIndex]) + m.Value = safeString(data[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex @@ -344,8 +344,8 @@ func (m *ZLabelSet) PromLabels() labels.Labels { func DeepCopy(lbls []ZLabel) []ZLabel { ret := make([]ZLabel, len(lbls)) for i := range lbls { - ret[i].Name = string(noAllocBytes(lbls[i].Name)) - ret[i].Value = string(noAllocBytes(lbls[i].Value)) + ret[i].Name = string(safeBytes(lbls[i].Name)) + ret[i].Value = string(safeBytes(lbls[i].Value)) } return ret } diff --git a/pkg/store/limiter_test.go b/pkg/store/limiter_test.go index b713f0574c3..fb2ab03a683 100644 --- a/pkg/store/limiter_test.go +++ b/pkg/store/limiter_test.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" + "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -101,7 +102,7 @@ func TestRateLimitedServer(t *testing.T) { defer cancel() store := NewLimitedStoreServer(newStoreServerStub(test.series), prometheus.NewRegistry(), test.limits) - client := storepb.ServerAsClient(store) + client := storepb.ServerAsClient(store, atomic.Bool{}) seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{}) testutil.Ok(t, err) for { diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 3c30d70e8e8..dab8b96f726 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -122,7 +122,9 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, sortingStrategyStore) + s := newFlushableServer( + newBatchableServer(seriesSrv, int(r.ResponseBatchSize)), + sortingStrategyStore) extLset := p.externalLabelsFn() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 434cb220e9b..eb79867ae19 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -369,7 +369,9 @@ func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo { return infos } -func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { + srv := newBatchableServer(seriesSrv, int(originalRequest.ResponseBatchSize)) + // TODO(bwplotka): This should be part of request logger, otherwise it does not make much sense. Also, could be // triggered by tracing span to reduce cognitive load. reqLogger := log.With(s.logger, "component", "proxy") @@ -502,6 +504,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. PartialResponseStrategy: originalRequest.PartialResponseStrategy, ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, + ResponseBatchSize: originalRequest.ResponseBatchSize, } if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA && !s.forwardPartialStrategy { // Do not forward this field as it might cause data loss. @@ -700,6 +703,11 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } } + // Flush any remaining buffered series from the batchable server. + if f, ok := srv.(flushableServer); ok { + return f.Flush() + } + return nil } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 54a46692c21..480261b1184 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -475,7 +475,34 @@ func newLazyRespSet( seriesStats.Count(resp) } + if batch := resp.GetBatch(); batch != nil { + for _, series := range batch.Series { + seriesStats.Count(storepb.NewSeriesResponse(series)) + } + } + l.bufferedResponsesMtx.Lock() + // Handle batch responses by expanding them into individual series responses + if batch := resp.GetBatch(); batch != nil { + for _, series := range batch.Series { + if applySharding && !shardMatcher.MatchesZLabels(series.Labels) { + continue + } + for l.isFull() && !l.closed { + l.bufferSlotEvent.Wait() + } + if l.closed { + l.bufferedResponsesMtx.Unlock() + return true + } + l.bufferedResponses[l.ringTail] = storepb.NewSeriesResponse(series) + l.ringTail = (l.ringTail + 1) % l.fixedBufferSize + l.dataOrFinishEvent.Signal() + } + l.bufferedResponsesMtx.Unlock() + return true + } + for l.isFull() && !l.closed { l.bufferSlotEvent.Wait() } @@ -748,6 +775,18 @@ func newEagerRespSet( seriesStats.Count(resp) } + // Handle batch responses by expanding them into individual series responses + if batch := resp.GetBatch(); batch != nil { + for _, series := range batch.Series { + seriesStats.Count(storepb.NewSeriesResponse(series)) + if applySharding && !shardMatcher.MatchesZLabels(series.Labels) { + continue + } + l.bufferedResponses = append(l.bufferedResponses, storepb.NewSeriesResponse(series)) + } + return true + } + l.bufferedResponses = append(l.bufferedResponses, resp) return true } diff --git a/pkg/store/proxy_merge_test.go b/pkg/store/proxy_merge_test.go index 387d5ff6a87..a603da79891 100644 --- a/pkg/store/proxy_merge_test.go +++ b/pkg/store/proxy_merge_test.go @@ -5,10 +5,15 @@ package store import ( "fmt" + "io" "sync" "testing" "github.com/efficientgo/core/testutil" + "github.com/opentracing/opentracing-go" + otlog "github.com/opentracing/opentracing-go/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/thanos/pkg/errors" @@ -241,6 +246,175 @@ type nopClientSendCloser struct { func (c nopClientSendCloser) CloseSend() error { return nil } +// TestProxyResponseTreeSortWithBatchResponses verifies that batch responses are +// properly unpacked into individual series before being merged by the loser tree. +// Without proper unpacking, the loser tree would receive batch responses and fail +// to merge series correctly. +func TestProxyResponseTreeSortWithBatchResponses(t *testing.T) { + t.Parallel() + + // Create series that will be sent in batches from two stores. + // Store 1 sends batch with series a=1, a=3 + // Store 2 sends batch with series a=2, a=4 + // After unpacking and merging, we expect: a=1, a=2, a=3, a=4 + series1 := storeSeriesResponse(t, labelsFromStrings("a", "1")).GetSeries() + series2 := storeSeriesResponse(t, labelsFromStrings("a", "2")).GetSeries() + series3 := storeSeriesResponse(t, labelsFromStrings("a", "3")).GetSeries() + series4 := storeSeriesResponse(t, labelsFromStrings("a", "4")).GetSeries() + + mockClient1 := &mockBatchSeriesClient{ + responses: []*storepb.SeriesResponse{ + storepb.NewBatchResponse([]*storepb.Series{series1, series3}), + }, + } + mockClient2 := &mockBatchSeriesClient{ + responses: []*storepb.SeriesResponse{ + storepb.NewBatchResponse([]*storepb.Series{series2, series4}), + }, + } + + // Create eagerRespSets which should unpack the batches + var shardInfo *storepb.ShardInfo + reg := prometheus.NewRegistry() + factory := promauto.With(reg) + respSet1 := newEagerRespSet( + noopSpan{}, + 0, + "store1", + nil, + func() {}, + mockClient1, + shardInfo.Matcher(nil), + false, + factory.NewCounter(prometheus.CounterOpts{Name: "test_eager_1"}), + nil, + ) + respSet2 := newEagerRespSet( + noopSpan{}, + 0, + "store2", + nil, + func() {}, + mockClient2, + shardInfo.Matcher(nil), + false, + factory.NewCounter(prometheus.CounterOpts{Name: "test_eager_2"}), + nil, + ) + + h := NewProxyResponseLoserTree(respSet1, respSet2) + var got []*storepb.SeriesResponse + for h.Next() { + got = append(got, h.At()) + } + + exp := []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1")), + storeSeriesResponse(t, labelsFromStrings("a", "2")), + storeSeriesResponse(t, labelsFromStrings("a", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "4")), + } + testutil.Equals(t, exp, got) +} + +type mockBatchSeriesClient struct { + storepb.Store_SeriesClient + responses []*storepb.SeriesResponse + i int +} + +func (c *mockBatchSeriesClient) Recv() (*storepb.SeriesResponse, error) { + if c.i >= len(c.responses) { + return nil, io.EOF + } + resp := c.responses[c.i] + c.i++ + return resp, nil +} + +func (c *mockBatchSeriesClient) CloseSend() error { return nil } + +// noopSpan implements opentracing.Span for testing. +type noopSpan struct{} + +func (noopSpan) Finish() {} +func (noopSpan) SetTag(string, any) opentracing.Span { return noopSpan{} } +func (noopSpan) Context() opentracing.SpanContext { return nil } +func (noopSpan) SetOperationName(string) opentracing.Span { return noopSpan{} } +func (noopSpan) Tracer() opentracing.Tracer { return nil } +func (noopSpan) SetBaggageItem(string, string) opentracing.Span { return noopSpan{} } +func (noopSpan) BaggageItem(string) string { return "" } +func (noopSpan) LogKV(...any) {} +func (noopSpan) LogFields(...otlog.Field) {} +func (noopSpan) Log(opentracing.LogData) {} +func (noopSpan) FinishWithOptions(opentracing.FinishOptions) {} +func (noopSpan) LogEvent(string) {} +func (noopSpan) LogEventWithPayload(string, interface{}) {} + +// TestLazyRespSetUnpacksBatchResponses verifies that lazyRespSet properly unpacks +// batch responses into individual series before being merged by the loser tree. +func TestLazyRespSetUnpacksBatchResponses(t *testing.T) { + t.Parallel() + + series1 := storeSeriesResponse(t, labelsFromStrings("a", "1")).GetSeries() + series2 := storeSeriesResponse(t, labelsFromStrings("a", "2")).GetSeries() + series3 := storeSeriesResponse(t, labelsFromStrings("a", "3")).GetSeries() + series4 := storeSeriesResponse(t, labelsFromStrings("a", "4")).GetSeries() + + mockClient1 := &mockBatchSeriesClient{ + responses: []*storepb.SeriesResponse{ + storepb.NewBatchResponse([]*storepb.Series{series1, series3}), + }, + } + mockClient2 := &mockBatchSeriesClient{ + responses: []*storepb.SeriesResponse{ + storepb.NewBatchResponse([]*storepb.Series{series2, series4}), + }, + } + + var shardInfo *storepb.ShardInfo + reg := prometheus.NewRegistry() + factory := promauto.With(reg) + respSet1 := newLazyRespSet( + noopSpan{}, + 0, + "store1", + nil, + func() {}, + mockClient1, + shardInfo.Matcher(nil), + false, + factory.NewCounter(prometheus.CounterOpts{Name: "test_lazy_1"}), + 10, + ) + respSet2 := newLazyRespSet( + noopSpan{}, + 0, + "store2", + nil, + func() {}, + mockClient2, + shardInfo.Matcher(nil), + false, + factory.NewCounter(prometheus.CounterOpts{Name: "test_lazy_2"}), + 10, + ) + + h := NewProxyResponseLoserTree(respSet1, respSet2) + var got []*storepb.SeriesResponse + for h.Next() { + got = append(got, h.At()) + } + + exp := []*storepb.SeriesResponse{ + storeSeriesResponse(t, labelsFromStrings("a", "1")), + storeSeriesResponse(t, labelsFromStrings("a", "2")), + storeSeriesResponse(t, labelsFromStrings("a", "3")), + storeSeriesResponse(t, labelsFromStrings("a", "4")), + } + testutil.Equals(t, exp, got) +} + func TestSortWithoutLabels(t *testing.T) { t.Parallel() diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 5c30008e219..157346f1dd2 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -26,6 +26,7 @@ import ( "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" + "go.uber.org/atomic" "google.golang.org/grpc" "github.com/thanos-io/thanos/pkg/block" @@ -965,7 +966,7 @@ func TestProxyStore_Series(t *testing.T) { }, }, } - }, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval)), + }, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval), atomic.Bool{}), }, &storetestutil.TestClient{ MinTime: 1, @@ -2913,7 +2914,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), }, - }), + }, atomic.Bool{}), MinTime: math.MinInt64, MaxTime: math.MaxInt64, }, diff --git a/pkg/store/recover_test.go b/pkg/store/recover_test.go index c9e01ec7dc4..267bef64d0e 100644 --- a/pkg/store/recover_test.go +++ b/pkg/store/recover_test.go @@ -10,6 +10,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" + "go.uber.org/atomic" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -22,7 +23,7 @@ func TestRecoverableServer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - client := storepb.ServerAsClient(store) + client := storepb.ServerAsClient(store, atomic.Bool{}) seriesClient, err := client.Series(ctx, &storepb.SeriesRequest{}) testutil.Ok(t, err) diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index de3b5e7ac71..147fbc11208 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -52,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } } +func NewBatchResponse(batch []*Series) *SeriesResponse { + return &SeriesResponse{ + Result: &SeriesResponse_Batch{ + Batch: &SeriesBatch{ + Series: batch, + }, + }, + } +} + func GRPCCodeFromWarn(warn string) codes.Code { if strings.Contains(warn, "rpc error: code = ResourceExhausted") { return codes.ResourceExhausted diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index d76301b42c2..86b926bfc95 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -9,7 +9,9 @@ import ( "iter" "sync" + "go.uber.org/atomic" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) type inProcessServer struct { @@ -79,14 +81,15 @@ func (c *inProcessClient) CloseSend() error { return nil } -func ServerAsClient(srv StoreServer) StoreClient { - return &serverAsClient{srv: srv} +func ServerAsClient(srv StoreServer, readOnly atomic.Bool) StoreClient { + return &serverAsClient{srv: srv, readOnly: readOnly} } // serverAsClient allows to use servers as clients. // NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private). type serverAsClient struct { - srv StoreServer + srv StoreServer + readOnly atomic.Bool } func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) { @@ -97,7 +100,44 @@ func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, return s.srv.LabelValues(ctx, in) } +type readOnlySeriesClient struct { + ctx context.Context +} + +var _ Store_SeriesClient = &readOnlySeriesClient{} + +func (r *readOnlySeriesClient) Recv() (*SeriesResponse, error) { + return nil, io.EOF +} + +func (r *readOnlySeriesClient) Header() (metadata.MD, error) { + return nil, nil +} + +func (r *readOnlySeriesClient) Trailer() metadata.MD { + return nil +} + +func (r *readOnlySeriesClient) CloseSend() error { + return nil +} + +func (r *readOnlySeriesClient) Context() context.Context { + return r.ctx +} + +func (r *readOnlySeriesClient) SendMsg(m interface{}) error { + return nil +} + +func (r *readOnlySeriesClient) RecvMsg(m interface{}) error { + return io.EOF +} + func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) { + if s.readOnly.Load() { + return &readOnlySeriesClient{ctx: ctx}, nil + } var srvIter iter.Seq2[*SeriesResponse, error] = func(yield func(*SeriesResponse, error) bool) { srv := newInProcessServer(ctx, yield) err := s.srv.Series(in, srv) diff --git a/pkg/store/storepb/inprocess_test.go b/pkg/store/storepb/inprocess_test.go index f033b579b02..39d31d3ae9a 100644 --- a/pkg/store/storepb/inprocess_test.go +++ b/pkg/store/storepb/inprocess_test.go @@ -6,9 +6,11 @@ package storepb import ( "context" "io" + "sync" "testing" "github.com/thanos-io/thanos/pkg/testutil/custom" + "go.uber.org/atomic" "github.com/efficientgo/core/testutil" "github.com/pkg/errors" @@ -73,14 +75,14 @@ func TestServerAsClient(t *testing.T) { }), }} t.Run("ok", func(t *testing.T) { - for i := 0; i < 20; i++ { + for range 20 { r := &SeriesRequest{ MinTime: -214, MaxTime: 213, Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - client, err := ServerAsClient(s).Series(ctx, r) + client, err := ServerAsClient(s, atomic.Bool{}).Series(ctx, r) testutil.Ok(t, err) var resps []*SeriesResponse for { @@ -98,14 +100,14 @@ func TestServerAsClient(t *testing.T) { }) t.Run("ok, close send", func(t *testing.T) { s.err = errors.New("some error") - for i := 0; i < 20; i++ { + for range 20 { r := &SeriesRequest{ MinTime: -214, MaxTime: 213, Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - client, err := ServerAsClient(s).Series(ctx, r) + client, err := ServerAsClient(s, atomic.Bool{}).Series(ctx, r) testutil.Ok(t, err) var resps []*SeriesResponse for { @@ -126,14 +128,14 @@ func TestServerAsClient(t *testing.T) { } }) t.Run("error", func(t *testing.T) { - for i := 0; i < 20; i++ { + for range 20 { r := &SeriesRequest{ MinTime: -214, MaxTime: 213, Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - client, err := ServerAsClient(s).Series(ctx, r) + client, err := ServerAsClient(s, atomic.Bool{}).Series(ctx, r) testutil.Ok(t, err) var resps []*SeriesResponse for { @@ -152,17 +154,44 @@ func TestServerAsClient(t *testing.T) { s.seriesLastReq = nil } }) + t.Run("race", func(t *testing.T) { + s.err = nil + for range 20 { + r := &SeriesRequest{ + MinTime: -214, + MaxTime: 213, + Matchers: []LabelMatcher{{Value: "wfsdfs", Name: "__name__", Type: LabelMatcher_EQ}}, + PartialResponseStrategy: PartialResponseStrategy_ABORT, + } + client, err := ServerAsClient(s, atomic.Bool{}).Series(ctx, r) + testutil.Ok(t, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + _, err := client.Recv() + if err != nil { + break + } + } + }() + testutil.Ok(t, client.CloseSend()) + wg.Wait() + s.seriesLastReq = nil + } + }) }) t.Run("LabelNames", func(t *testing.T) { s := &testStoreServer{} t.Run("ok", func(t *testing.T) { - for i := 0; i < 20; i++ { + for range 20 { r := &LabelNamesRequest{ Start: -1, End: 234, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - resp, err := ServerAsClient(s).LabelNames(ctx, r) + resp, err := ServerAsClient(s, atomic.Bool{}).LabelNames(ctx, r) testutil.Ok(t, err) testutil.Equals(t, s.labelNames, resp) testutil.Equals(t, r, s.labelNamesLastReq) @@ -171,13 +200,13 @@ func TestServerAsClient(t *testing.T) { }) t.Run("error", func(t *testing.T) { s.err = errors.New("some error") - for i := 0; i < 20; i++ { + for range 20 { r := &LabelNamesRequest{ Start: -1, End: 234, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - _, err := ServerAsClient(s).LabelNames(ctx, r) + _, err := ServerAsClient(s, atomic.Bool{}).LabelNames(ctx, r) testutil.NotOk(t, err) testutil.Equals(t, s.err, err) } @@ -191,14 +220,14 @@ func TestServerAsClient(t *testing.T) { }, } t.Run("ok", func(t *testing.T) { - for i := 0; i < 20; i++ { + for range 20 { r := &LabelValuesRequest{ Label: "__name__", Start: -1, End: 234, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - resp, err := ServerAsClient(s).LabelValues(ctx, r) + resp, err := ServerAsClient(s, atomic.Bool{}).LabelValues(ctx, r) testutil.Ok(t, err) testutil.Equals(t, s.labelValues, resp) testutil.Equals(t, r, s.labelValuesLastReq) @@ -207,14 +236,14 @@ func TestServerAsClient(t *testing.T) { }) t.Run("error", func(t *testing.T) { s.err = errors.New("some error") - for i := 0; i < 20; i++ { + for range 20 { r := &LabelValuesRequest{ Label: "__name__", Start: -1, End: 234, PartialResponseStrategy: PartialResponseStrategy_ABORT, } - _, err := ServerAsClient(s).LabelValues(ctx, r) + _, err := ServerAsClient(s, atomic.Bool{}).LabelValues(ctx, r) testutil.NotOk(t, err) testutil.Equals(t, s.err, err) } diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 9d39f27e14a..5bba870412a 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -182,6 +182,9 @@ type SeriesRequest struct { WithoutReplicaLabels []string `protobuf:"bytes,14,rep,name=without_replica_labels,json=withoutReplicaLabels,proto3" json:"without_replica_labels,omitempty"` // limit is used to limit the number of results returned Limit int64 `protobuf:"varint,15,opt,name=limit,proto3" json:"limit,omitempty"` + // response_batch_size is used to batch Series messages into one SeriesResponse + // If set to 0 or 1, each Series is sent as a separate response + ResponseBatchSize int64 `protobuf:"varint,16,opt,name=response_batch_size,json=responseBatchSize,proto3" json:"response_batch_size,omitempty"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -430,6 +433,7 @@ type SeriesResponse struct { // *SeriesResponse_Series // *SeriesResponse_Warning // *SeriesResponse_Hints + // *SeriesResponse_Batch Result isSeriesResponse_Result `protobuf_oneof:"result"` } @@ -481,10 +485,14 @@ type SeriesResponse_Warning struct { type SeriesResponse_Hints struct { Hints *types.Any `protobuf:"bytes,3,opt,name=hints,proto3,oneof" json:"hints,omitempty"` } +type SeriesResponse_Batch struct { + Batch *SeriesBatch `protobuf:"bytes,4,opt,name=batch,proto3,oneof" json:"batch,omitempty"` +} func (*SeriesResponse_Series) isSeriesResponse_Result() {} func (*SeriesResponse_Warning) isSeriesResponse_Result() {} func (*SeriesResponse_Hints) isSeriesResponse_Result() {} +func (*SeriesResponse_Batch) isSeriesResponse_Result() {} func (m *SeriesResponse) GetResult() isSeriesResponse_Result { if m != nil { @@ -514,12 +522,20 @@ func (m *SeriesResponse) GetHints() *types.Any { return nil } +func (m *SeriesResponse) GetBatch() *SeriesBatch { + if x, ok := m.GetResult().(*SeriesResponse_Batch); ok { + return x.Batch + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*SeriesResponse) XXX_OneofWrappers() []interface{} { return []interface{}{ (*SeriesResponse_Series)(nil), (*SeriesResponse_Warning)(nil), (*SeriesResponse_Hints)(nil), + (*SeriesResponse_Batch)(nil), } } @@ -728,79 +744,82 @@ func init() { func init() { proto.RegisterFile("store/storepb/rpc.proto", fileDescriptor_a938d55a388af629) } var fileDescriptor_a938d55a388af629 = []byte{ - // 1149 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4b, 0x6f, 0x23, 0x45, - 0x10, 0xf6, 0x78, 0x3c, 0x7e, 0x94, 0x13, 0xaf, 0xb7, 0xd7, 0xc9, 0x4e, 0xbc, 0x92, 0x63, 0x8c, - 0x90, 0xac, 0x55, 0xe4, 0xac, 0xbc, 0x08, 0x09, 0xc4, 0x25, 0x09, 0x2c, 0x59, 0x89, 0x04, 0xe8, - 0xec, 0x12, 0x04, 0x87, 0x51, 0xdb, 0xee, 0x8c, 0x47, 0x3b, 0xaf, 0x4c, 0xf7, 0x90, 0xf8, 0x0c, - 0x67, 0xc4, 0x9d, 0xdb, 0xfe, 0x9a, 0xdc, 0xd8, 0x23, 0x27, 0x04, 0xc9, 0x1f, 0x41, 0xfd, 0x18, - 0x3f, 0x82, 0xf7, 0xa5, 0xe4, 0x62, 0x75, 0x7d, 0x5f, 0x75, 0x4d, 0x75, 0xf5, 0x57, 0xe5, 0x86, - 0xfb, 0x8c, 0x47, 0x09, 0xdd, 0x96, 0xbf, 0xf1, 0x60, 0x3b, 0x89, 0x87, 0xbd, 0x38, 0x89, 0x78, - 0x84, 0x8a, 0x7c, 0x4c, 0xc2, 0x88, 0x35, 0x37, 0x16, 0x1d, 0xf8, 0x24, 0xa6, 0x4c, 0xb9, 0x34, - 0x1b, 0x6e, 0xe4, 0x46, 0x72, 0xb9, 0x2d, 0x56, 0x1a, 0x6d, 0x2f, 0x6e, 0x88, 0x93, 0x28, 0xb8, - 0xb6, 0x6f, 0xc3, 0x8d, 0x22, 0xd7, 0xa7, 0xdb, 0xd2, 0x1a, 0xa4, 0x27, 0xdb, 0x24, 0x9c, 0x28, - 0xaa, 0x73, 0x07, 0x56, 0x8f, 0x13, 0x8f, 0x53, 0x4c, 0x59, 0x1c, 0x85, 0x8c, 0x76, 0x7e, 0x31, - 0x60, 0x45, 0x23, 0xa7, 0x29, 0x65, 0x1c, 0xed, 0x00, 0x70, 0x2f, 0xa0, 0x8c, 0x26, 0x1e, 0x65, - 0xb6, 0xd1, 0x36, 0xbb, 0xd5, 0xfe, 0x03, 0xb1, 0x3b, 0xa0, 0x7c, 0x4c, 0x53, 0xe6, 0x0c, 0xa3, - 0x78, 0xd2, 0x7b, 0xe6, 0x05, 0xf4, 0x48, 0xba, 0xec, 0x16, 0x2e, 0xfe, 0xde, 0xcc, 0xe1, 0xb9, - 0x4d, 0x68, 0x1d, 0x8a, 0x9c, 0x86, 0x24, 0xe4, 0x76, 0xbe, 0x6d, 0x74, 0x2b, 0x58, 0x5b, 0xc8, - 0x86, 0x52, 0x42, 0x63, 0xdf, 0x1b, 0x12, 0xdb, 0x6c, 0x1b, 0x5d, 0x13, 0x67, 0x66, 0xe7, 0xa5, - 0x05, 0xab, 0x2a, 0x5c, 0x96, 0xc6, 0x06, 0x94, 0x03, 0x2f, 0x74, 0x44, 0x54, 0xdb, 0x50, 0xce, - 0x81, 0x17, 0x8a, 0xcf, 0x4a, 0x8a, 0x9c, 0x2b, 0x2a, 0xaf, 0x29, 0x72, 0x2e, 0xa9, 0x4f, 0x04, - 0xc5, 0x87, 0x63, 0x9a, 0x30, 0xdb, 0x94, 0xa9, 0x37, 0x7a, 0xaa, 0xce, 0xbd, 0xaf, 0xc9, 0x80, - 0xfa, 0x07, 0x8a, 0xd4, 0x39, 0x4f, 0x7d, 0x51, 0x1f, 0xd6, 0x44, 0xc8, 0x84, 0xb2, 0xc8, 0x4f, - 0xb9, 0x17, 0x85, 0xce, 0x99, 0x17, 0x8e, 0xa2, 0x33, 0xbb, 0x20, 0xe3, 0xdf, 0x0b, 0xc8, 0x39, - 0x9e, 0x72, 0xc7, 0x92, 0x42, 0x5b, 0x00, 0xc4, 0x75, 0x13, 0xea, 0x12, 0x4e, 0x99, 0x6d, 0xb5, - 0xcd, 0x6e, 0xad, 0xbf, 0x92, 0x7d, 0x6d, 0xc7, 0x75, 0x13, 0x3c, 0xc7, 0xa3, 0xcf, 0x60, 0x23, - 0x26, 0x09, 0xf7, 0x88, 0x2f, 0xbe, 0x22, 0x6b, 0xef, 0x8c, 0x3c, 0x46, 0x06, 0x3e, 0x1d, 0xd9, - 0xc5, 0xb6, 0xd1, 0x2d, 0xe3, 0xfb, 0xda, 0x21, 0xbb, 0x9b, 0x2f, 0x34, 0x8d, 0x7e, 0x5a, 0xb2, - 0x97, 0xf1, 0x84, 0x70, 0xea, 0x4e, 0xec, 0x52, 0xdb, 0xe8, 0xd6, 0xfa, 0x9b, 0xd9, 0x87, 0xbf, - 0x5d, 0x8c, 0x71, 0xa4, 0xdd, 0xfe, 0x17, 0x3c, 0x23, 0xd0, 0x26, 0x54, 0xd9, 0x0b, 0x2f, 0x76, - 0x86, 0xe3, 0x34, 0x7c, 0xc1, 0xec, 0xb2, 0x4c, 0x05, 0x04, 0xb4, 0x27, 0x11, 0xf4, 0x10, 0xac, - 0xb1, 0x17, 0x72, 0x66, 0x57, 0xda, 0x86, 0x2c, 0xa8, 0x52, 0x57, 0x2f, 0x53, 0x57, 0x6f, 0x27, - 0x9c, 0x60, 0xe5, 0x82, 0x10, 0x14, 0x18, 0xa7, 0xb1, 0x0d, 0xb2, 0x6c, 0x72, 0x8d, 0x1a, 0x60, - 0x25, 0x24, 0x74, 0xa9, 0x5d, 0x95, 0xa0, 0x32, 0xd0, 0x63, 0xa8, 0x9e, 0xa6, 0x34, 0x99, 0x38, - 0x2a, 0xf6, 0x8a, 0x8c, 0x8d, 0xb2, 0x53, 0x7c, 0x27, 0xa8, 0x7d, 0xc1, 0x60, 0x38, 0x9d, 0xae, - 0xd1, 0x23, 0x00, 0x36, 0x26, 0xc9, 0xc8, 0xf1, 0xc2, 0x93, 0xc8, 0x5e, 0x95, 0x7b, 0xee, 0x66, - 0x7b, 0x8e, 0x04, 0xf3, 0x34, 0x3c, 0x89, 0x70, 0x85, 0x65, 0x4b, 0xf4, 0x31, 0xac, 0x9f, 0x79, - 0x7c, 0x1c, 0xa5, 0xdc, 0xd1, 0x5a, 0x73, 0x7c, 0x21, 0x04, 0x66, 0xd7, 0xda, 0x66, 0xb7, 0x82, - 0x1b, 0x9a, 0xc5, 0x8a, 0x94, 0x22, 0x61, 0x22, 0x65, 0xdf, 0x0b, 0x3c, 0x6e, 0xdf, 0x51, 0x29, - 0x4b, 0xa3, 0xf3, 0xd2, 0x00, 0x98, 0x25, 0x26, 0x0b, 0xc7, 0x69, 0xec, 0x04, 0x9e, 0xef, 0x7b, - 0x4c, 0x8b, 0x14, 0x04, 0x74, 0x20, 0x11, 0xd4, 0x86, 0xc2, 0x49, 0x1a, 0x0e, 0xa5, 0x46, 0xab, - 0x33, 0x69, 0x3c, 0x49, 0xc3, 0x21, 0x96, 0x0c, 0xda, 0x82, 0xb2, 0x9b, 0x44, 0x69, 0xec, 0x85, - 0xae, 0x54, 0x5a, 0xb5, 0x5f, 0xcf, 0xbc, 0xbe, 0xd2, 0x38, 0x9e, 0x7a, 0xa0, 0x0f, 0xb3, 0x42, - 0x5a, 0xd2, 0x75, 0x35, 0x73, 0xc5, 0x02, 0xd4, 0x75, 0xed, 0x9c, 0x41, 0x65, 0x5a, 0x08, 0x99, - 0xa2, 0xae, 0xd7, 0x88, 0x9e, 0x4f, 0x53, 0x54, 0xfc, 0x88, 0x9e, 0xa3, 0x0f, 0x60, 0x85, 0x47, - 0x9c, 0xf8, 0x8e, 0xc4, 0x98, 0x6e, 0xa7, 0xaa, 0xc4, 0x64, 0x18, 0x86, 0x6a, 0x90, 0x1f, 0x4c, - 0x64, 0xbf, 0x96, 0x71, 0x7e, 0x30, 0x11, 0xcd, 0xad, 0x2b, 0x58, 0x90, 0x15, 0xd4, 0x56, 0xa7, - 0x09, 0x05, 0x71, 0x32, 0x21, 0x81, 0x90, 0xe8, 0xa6, 0xad, 0x60, 0xb9, 0xee, 0xf4, 0xa1, 0x9c, - 0x9d, 0x47, 0xc7, 0x33, 0x96, 0xc4, 0x33, 0x17, 0xe2, 0x6d, 0x82, 0x25, 0x0f, 0x26, 0x1c, 0x16, - 0x4a, 0xac, 0xad, 0xce, 0x6f, 0x06, 0xd4, 0xb2, 0x99, 0xa1, 0x34, 0x8d, 0xba, 0x50, 0x9c, 0xce, - 0x2d, 0x51, 0xa2, 0xda, 0x54, 0x1b, 0x12, 0xdd, 0xcf, 0x61, 0xcd, 0xa3, 0x26, 0x94, 0xce, 0x48, - 0x12, 0x8a, 0xc2, 0xcb, 0x19, 0xb5, 0x9f, 0xc3, 0x19, 0x80, 0xb6, 0x32, 0xc1, 0x9b, 0xaf, 0x17, - 0xfc, 0x7e, 0x4e, 0x4b, 0x7e, 0xb7, 0x0c, 0xc5, 0x84, 0xb2, 0xd4, 0xe7, 0x9d, 0x5f, 0x4d, 0xb8, - 0x2b, 0x05, 0x74, 0x48, 0x82, 0xd9, 0x20, 0x7b, 0x63, 0xe3, 0x1b, 0x37, 0x68, 0xfc, 0xfc, 0x0d, - 0x1b, 0xbf, 0x01, 0x16, 0xe3, 0x24, 0xe1, 0x7a, 0x16, 0x2b, 0x03, 0xd5, 0xc1, 0xa4, 0xe1, 0x48, - 0xcf, 0x3d, 0xb1, 0x9c, 0xf5, 0xbf, 0xf5, 0xf6, 0xfe, 0x9f, 0x9f, 0xbf, 0xc5, 0xf7, 0x98, 0xbf, - 0xaf, 0x6f, 0xd3, 0xd2, 0xbb, 0xb4, 0x69, 0x79, 0xbe, 0x4d, 0x13, 0x40, 0xf3, 0xb7, 0xa0, 0xa5, - 0xd1, 0x00, 0x4b, 0x48, 0x51, 0xfd, 0xa3, 0x55, 0xb0, 0x32, 0x50, 0x13, 0xca, 0xfa, 0xd6, 0x85, - 0xf6, 0x05, 0x31, 0xb5, 0x67, 0xe7, 0x36, 0xdf, 0x7a, 0xee, 0xce, 0x1f, 0xa6, 0xfe, 0xe8, 0xf7, - 0xc4, 0x4f, 0x67, 0x77, 0x2f, 0x12, 0x14, 0xa8, 0x6e, 0x06, 0x65, 0xbc, 0x59, 0x11, 0xf9, 0x1b, - 0x28, 0xc2, 0xbc, 0x2d, 0x45, 0x14, 0x96, 0x28, 0xc2, 0x5a, 0xa2, 0x88, 0xe2, 0xfb, 0x29, 0xa2, - 0x74, 0x2b, 0x8a, 0x28, 0xbf, 0x8b, 0x22, 0x2a, 0xf3, 0x8a, 0x48, 0xe1, 0xde, 0xc2, 0xe5, 0x68, - 0x49, 0xac, 0x43, 0xf1, 0x67, 0x89, 0x68, 0x4d, 0x68, 0xeb, 0xb6, 0x44, 0xf1, 0x70, 0x17, 0x0a, - 0xe2, 0x19, 0x80, 0x4a, 0x60, 0xe2, 0x9d, 0xe3, 0x7a, 0x0e, 0x55, 0xc0, 0xda, 0xfb, 0xe6, 0xf9, - 0xe1, 0xb3, 0xba, 0x21, 0xb0, 0xa3, 0xe7, 0x07, 0xf5, 0xbc, 0x58, 0x1c, 0x3c, 0x3d, 0xac, 0x9b, - 0x72, 0xb1, 0xf3, 0x43, 0xbd, 0x80, 0xaa, 0x50, 0x92, 0x5e, 0x5f, 0xe2, 0xba, 0xd5, 0xff, 0xd3, - 0x00, 0xeb, 0x48, 0xbc, 0xf4, 0xd0, 0xa7, 0x50, 0x54, 0x53, 0x0c, 0xad, 0x2d, 0x4e, 0x35, 0x2d, - 0xb6, 0xe6, 0xfa, 0x75, 0x58, 0x1d, 0xf3, 0x91, 0x81, 0xf6, 0x00, 0x66, 0x1d, 0x81, 0x36, 0x16, - 0xea, 0x3f, 0x3f, 0xab, 0x9a, 0xcd, 0x65, 0x94, 0xae, 0xd6, 0x13, 0xa8, 0xce, 0x15, 0x11, 0x2d, - 0xba, 0x2e, 0xc8, 0xbe, 0xf9, 0x60, 0x29, 0xa7, 0xe2, 0xf4, 0x0f, 0xa1, 0x26, 0xdf, 0x9b, 0x42, - 0xcf, 0xea, 0x64, 0x9f, 0x43, 0x15, 0xd3, 0x20, 0xe2, 0x54, 0xe2, 0x68, 0xaa, 0x8f, 0xf9, 0x67, - 0x69, 0x73, 0xed, 0x1a, 0xaa, 0x9f, 0xaf, 0xb9, 0xdd, 0x8f, 0x2e, 0xfe, 0x6d, 0xe5, 0x2e, 0x2e, - 0x5b, 0xc6, 0xab, 0xcb, 0x96, 0xf1, 0xcf, 0x65, 0xcb, 0xf8, 0xfd, 0xaa, 0x95, 0x7b, 0x75, 0xd5, - 0xca, 0xfd, 0x75, 0xd5, 0xca, 0xfd, 0x58, 0xd2, 0xcf, 0xe4, 0x41, 0x51, 0xde, 0xd0, 0xe3, 0xff, - 0x02, 0x00, 0x00, 0xff, 0xff, 0x84, 0xe1, 0x09, 0x34, 0x90, 0x0b, 0x00, 0x00, + // 1197 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x5d, 0x6f, 0xe3, 0x44, + 0x17, 0x8e, 0xe3, 0x38, 0x1f, 0x27, 0x6d, 0x36, 0x3b, 0x4d, 0xbb, 0x6e, 0x56, 0x4a, 0xf3, 0xe6, + 0x15, 0x52, 0xb4, 0x54, 0xe9, 0x2a, 0x8b, 0x90, 0x40, 0xdc, 0xb4, 0x85, 0xa5, 0x2b, 0xd1, 0x02, + 0xce, 0x2e, 0x45, 0x70, 0x61, 0x39, 0xc9, 0xd4, 0xb1, 0xd6, 0xb1, 0x5d, 0xcf, 0x98, 0x36, 0x7b, + 0x0b, 0x3f, 0x80, 0x7b, 0xee, 0xf8, 0x1b, 0xfc, 0x81, 0xde, 0xb1, 0xe2, 0x8a, 0x2b, 0x04, 0xed, + 0x1f, 0x41, 0x73, 0x66, 0x9c, 0x8f, 0x92, 0xfd, 0x52, 0x7b, 0x13, 0xcd, 0x79, 0x9e, 0x33, 0xc7, + 0x33, 0x67, 0x9e, 0x79, 0x32, 0x70, 0x8f, 0xf1, 0x30, 0xa6, 0x3b, 0xf8, 0x1b, 0xf5, 0x77, 0xe2, + 0x68, 0xd0, 0x89, 0xe2, 0x90, 0x87, 0x24, 0xcf, 0x47, 0x4e, 0x10, 0xb2, 0xfa, 0xe6, 0x62, 0x02, + 0x9f, 0x44, 0x94, 0xc9, 0x94, 0x7a, 0xcd, 0x0d, 0xdd, 0x10, 0x87, 0x3b, 0x62, 0xa4, 0xd0, 0xe6, + 0xe2, 0x84, 0x28, 0x0e, 0xc7, 0xd7, 0xe6, 0x6d, 0xba, 0x61, 0xe8, 0xfa, 0x74, 0x07, 0xa3, 0x7e, + 0x72, 0xb2, 0xe3, 0x04, 0x13, 0x49, 0xb5, 0xee, 0xc0, 0xea, 0x71, 0xec, 0x71, 0x6a, 0x51, 0x16, + 0x85, 0x01, 0xa3, 0xad, 0x1f, 0x35, 0x58, 0x51, 0xc8, 0x69, 0x42, 0x19, 0x27, 0xbb, 0x00, 0xdc, + 0x1b, 0x53, 0x46, 0x63, 0x8f, 0x32, 0x53, 0x6b, 0xea, 0xed, 0x72, 0xf7, 0xbe, 0x98, 0x3d, 0xa6, + 0x7c, 0x44, 0x13, 0x66, 0x0f, 0xc2, 0x68, 0xd2, 0x79, 0xea, 0x8d, 0x69, 0x0f, 0x53, 0xf6, 0x72, + 0x17, 0x7f, 0x6d, 0x65, 0xac, 0xb9, 0x49, 0x64, 0x03, 0xf2, 0x9c, 0x06, 0x4e, 0xc0, 0xcd, 0x6c, + 0x53, 0x6b, 0x97, 0x2c, 0x15, 0x11, 0x13, 0x0a, 0x31, 0x8d, 0x7c, 0x6f, 0xe0, 0x98, 0x7a, 0x53, + 0x6b, 0xeb, 0x56, 0x1a, 0xb6, 0xfe, 0x30, 0x60, 0x55, 0x96, 0x4b, 0x97, 0xb1, 0x09, 0xc5, 0xb1, + 0x17, 0xd8, 0xa2, 0xaa, 0xa9, 0xc9, 0xe4, 0xb1, 0x17, 0x88, 0xcf, 0x22, 0xe5, 0x9c, 0x4b, 0x2a, + 0xab, 0x28, 0xe7, 0x1c, 0xa9, 0x0f, 0x05, 0xc5, 0x07, 0x23, 0x1a, 0x33, 0x53, 0xc7, 0xa5, 0xd7, + 0x3a, 0xb2, 0xcf, 0x9d, 0x2f, 0x9c, 0x3e, 0xf5, 0x0f, 0x25, 0xa9, 0xd6, 0x3c, 0xcd, 0x25, 0x5d, + 0x58, 0x17, 0x25, 0x63, 0xca, 0x42, 0x3f, 0xe1, 0x5e, 0x18, 0xd8, 0x67, 0x5e, 0x30, 0x0c, 0xcf, + 0xcc, 0x1c, 0xd6, 0x5f, 0x1b, 0x3b, 0xe7, 0xd6, 0x94, 0x3b, 0x46, 0x8a, 0x6c, 0x03, 0x38, 0xae, + 0x1b, 0x53, 0xd7, 0xe1, 0x94, 0x99, 0x46, 0x53, 0x6f, 0x57, 0xba, 0x2b, 0xe9, 0xd7, 0x76, 0x5d, + 0x37, 0xb6, 0xe6, 0x78, 0xf2, 0x31, 0x6c, 0x46, 0x4e, 0xcc, 0x3d, 0xc7, 0x17, 0x5f, 0xc1, 0xde, + 0xdb, 0x43, 0x8f, 0x39, 0x7d, 0x9f, 0x0e, 0xcd, 0x7c, 0x53, 0x6b, 0x17, 0xad, 0x7b, 0x2a, 0x21, + 0x3d, 0x9b, 0x4f, 0x15, 0x4d, 0xbe, 0x5f, 0x32, 0x97, 0xf1, 0xd8, 0xe1, 0xd4, 0x9d, 0x98, 0x85, + 0xa6, 0xd6, 0xae, 0x74, 0xb7, 0xd2, 0x0f, 0x7f, 0xb5, 0x58, 0xa3, 0xa7, 0xd2, 0xfe, 0x53, 0x3c, + 0x25, 0xc8, 0x16, 0x94, 0xd9, 0x73, 0x2f, 0xb2, 0x07, 0xa3, 0x24, 0x78, 0xce, 0xcc, 0x22, 0x2e, + 0x05, 0x04, 0xb4, 0x8f, 0x08, 0x79, 0x00, 0xc6, 0xc8, 0x0b, 0x38, 0x33, 0x4b, 0x4d, 0x0d, 0x1b, + 0x2a, 0xd5, 0xd5, 0x49, 0xd5, 0xd5, 0xd9, 0x0d, 0x26, 0x96, 0x4c, 0x21, 0x04, 0x72, 0x8c, 0xd3, + 0xc8, 0x04, 0x6c, 0x1b, 0x8e, 0x49, 0x0d, 0x8c, 0xd8, 0x09, 0x5c, 0x6a, 0x96, 0x11, 0x94, 0x01, + 0x79, 0x04, 0xe5, 0xd3, 0x84, 0xc6, 0x13, 0x5b, 0xd6, 0x5e, 0xc1, 0xda, 0x24, 0xdd, 0xc5, 0xd7, + 0x82, 0x3a, 0x10, 0x8c, 0x05, 0xa7, 0xd3, 0x31, 0x79, 0x08, 0xc0, 0x46, 0x4e, 0x3c, 0xb4, 0xbd, + 0xe0, 0x24, 0x34, 0x57, 0x71, 0xce, 0xdd, 0x74, 0x4e, 0x4f, 0x30, 0x4f, 0x82, 0x93, 0xd0, 0x2a, + 0xb1, 0x74, 0x48, 0x3e, 0x80, 0x8d, 0x33, 0x8f, 0x8f, 0xc2, 0x84, 0xdb, 0x4a, 0x6b, 0xb6, 0x2f, + 0x84, 0xc0, 0xcc, 0x4a, 0x53, 0x6f, 0x97, 0xac, 0x9a, 0x62, 0x2d, 0x49, 0xa2, 0x48, 0x98, 0x58, + 0xb2, 0xef, 0x8d, 0x3d, 0x6e, 0xde, 0x91, 0x4b, 0xc6, 0x80, 0x74, 0x60, 0x6d, 0xda, 0xfe, 0xbe, + 0x50, 0x8e, 0xcd, 0xbc, 0x17, 0xd4, 0xac, 0x62, 0xce, 0xdd, 0x94, 0xda, 0x13, 0x4c, 0xcf, 0x7b, + 0x41, 0x5b, 0xbf, 0x6a, 0x00, 0xb3, 0x8d, 0x60, 0xa3, 0x39, 0x8d, 0xec, 0xb1, 0xe7, 0xfb, 0x1e, + 0x53, 0xa2, 0x06, 0x01, 0x1d, 0x22, 0x42, 0x9a, 0x90, 0x3b, 0x49, 0x82, 0x01, 0x6a, 0xba, 0x3c, + 0x93, 0xd2, 0xe3, 0x24, 0x18, 0x58, 0xc8, 0x90, 0x6d, 0x28, 0xba, 0x71, 0x98, 0x44, 0x5e, 0xe0, + 0xa2, 0x32, 0xcb, 0xdd, 0x6a, 0x9a, 0xf5, 0xb9, 0xc2, 0xad, 0x69, 0x06, 0xf9, 0x7f, 0xda, 0x78, + 0x03, 0x53, 0x57, 0xd3, 0x54, 0x4b, 0x80, 0xea, 0x1c, 0x5a, 0x67, 0x50, 0x9a, 0x36, 0x0e, 0x97, + 0xa8, 0xfa, 0x3b, 0xa4, 0xe7, 0xd3, 0x25, 0x4a, 0x7e, 0x48, 0xcf, 0xc9, 0xff, 0x60, 0x85, 0x87, + 0xdc, 0xf1, 0x6d, 0xc4, 0x98, 0xba, 0x7e, 0x65, 0xc4, 0xb0, 0x0c, 0x23, 0x15, 0xc8, 0xf6, 0x27, + 0x78, 0xbf, 0x8b, 0x56, 0xb6, 0x3f, 0x11, 0x66, 0xa0, 0x3a, 0x9e, 0xc3, 0x8e, 0xab, 0xa8, 0x55, + 0x87, 0x9c, 0xd8, 0x99, 0x90, 0x4c, 0xe0, 0xa8, 0x4b, 0x5e, 0xb2, 0x70, 0xdc, 0xea, 0x42, 0x31, + 0xdd, 0x8f, 0xaa, 0xa7, 0x2d, 0xa9, 0xa7, 0x2f, 0xd4, 0xdb, 0x02, 0x03, 0x37, 0x26, 0x12, 0x16, + 0x5a, 0xac, 0xa2, 0xd6, 0x6f, 0x1a, 0x54, 0x52, 0x8f, 0x91, 0x47, 0x45, 0xda, 0x90, 0x9f, 0xfa, + 0x9c, 0x68, 0x51, 0x65, 0xaa, 0x25, 0x44, 0x0f, 0x32, 0x96, 0xe2, 0x49, 0x1d, 0x0a, 0x67, 0x4e, + 0x1c, 0x88, 0xc6, 0xa3, 0xa7, 0x1d, 0x64, 0xac, 0x14, 0x20, 0xdb, 0xe9, 0x05, 0xd1, 0x5f, 0x7d, + 0x41, 0x0e, 0x32, 0xe9, 0x15, 0x79, 0x1f, 0x0c, 0x14, 0x8f, 0x3a, 0xc0, 0xb5, 0xc5, 0x4f, 0xa2, + 0x7a, 0x44, 0x32, 0xe6, 0xec, 0x15, 0x21, 0x1f, 0x53, 0x96, 0xf8, 0xbc, 0xf5, 0x93, 0x0e, 0x77, + 0x51, 0x9d, 0x47, 0xce, 0x78, 0xe6, 0x92, 0xaf, 0x75, 0x15, 0xed, 0x06, 0xae, 0x92, 0xbd, 0xa1, + 0xab, 0xd4, 0xc0, 0x60, 0xdc, 0x89, 0xb9, 0x32, 0x7a, 0x19, 0x90, 0x2a, 0xe8, 0x34, 0x18, 0x2a, + 0x53, 0x15, 0xc3, 0x99, 0xb9, 0x18, 0x6f, 0x36, 0x97, 0x79, 0x73, 0xcf, 0xbf, 0x83, 0xb9, 0xbf, + 0xda, 0x03, 0x0a, 0x6f, 0xe3, 0x01, 0xc5, 0x39, 0x0f, 0x68, 0xc5, 0x40, 0xe6, 0x4f, 0x41, 0xe9, + 0xa8, 0x06, 0x86, 0xd0, 0xad, 0xfc, 0xbb, 0x2c, 0x59, 0x32, 0x20, 0x75, 0x28, 0x2a, 0x89, 0x88, + 0x8b, 0x22, 0x88, 0x69, 0x3c, 0xdb, 0xb7, 0xfe, 0xc6, 0x7d, 0xb7, 0x7e, 0xd1, 0xd5, 0x47, 0xbf, + 0x71, 0xfc, 0x64, 0x76, 0xf6, 0x62, 0x81, 0x02, 0x55, 0x37, 0x47, 0x06, 0xaf, 0x57, 0x44, 0xf6, + 0x06, 0x8a, 0xd0, 0x6f, 0x4b, 0x11, 0xb9, 0x25, 0x8a, 0x30, 0x96, 0x28, 0x22, 0xff, 0x6e, 0x8a, + 0x28, 0xdc, 0x8a, 0x22, 0x8a, 0x6f, 0xa3, 0x88, 0xd2, 0xbc, 0x22, 0x12, 0x58, 0x5b, 0x38, 0x1c, + 0x25, 0x89, 0x0d, 0xc8, 0xff, 0x80, 0x88, 0xd2, 0x84, 0x8a, 0x6e, 0x4b, 0x14, 0x0f, 0xf6, 0x20, + 0x27, 0xde, 0x18, 0xa4, 0x00, 0xba, 0xb5, 0x7b, 0x5c, 0xcd, 0x90, 0x12, 0x18, 0xfb, 0x5f, 0x3e, + 0x3b, 0x7a, 0x5a, 0xd5, 0x04, 0xd6, 0x7b, 0x76, 0x58, 0xcd, 0x8a, 0xc1, 0xe1, 0x93, 0xa3, 0xaa, + 0x8e, 0x83, 0xdd, 0x6f, 0xab, 0x39, 0x52, 0x86, 0x02, 0x66, 0x7d, 0x66, 0x55, 0x8d, 0xee, 0xef, + 0x1a, 0x18, 0x3d, 0xf1, 0x8c, 0x24, 0x1f, 0x41, 0x5e, 0xfa, 0x0f, 0x59, 0x5f, 0xf4, 0x23, 0x25, + 0xb6, 0xfa, 0xc6, 0x75, 0x58, 0x6e, 0xf3, 0xa1, 0x46, 0xf6, 0x01, 0x66, 0x37, 0x82, 0x6c, 0x2e, + 0xf4, 0x7f, 0xde, 0xab, 0xea, 0xf5, 0x65, 0x94, 0xea, 0xd6, 0x63, 0x28, 0xcf, 0x35, 0x91, 0x2c, + 0xa6, 0x2e, 0xc8, 0xbe, 0x7e, 0x7f, 0x29, 0x27, 0xeb, 0x74, 0x8f, 0xa0, 0x82, 0x8f, 0x59, 0xa1, + 0x67, 0xb9, 0xb3, 0x4f, 0xa0, 0x6c, 0xd1, 0x71, 0xc8, 0x29, 0xe2, 0x64, 0xaa, 0x8f, 0xf9, 0x37, + 0x6f, 0x7d, 0xfd, 0x1a, 0xaa, 0xde, 0xc6, 0x99, 0xbd, 0xf7, 0x2e, 0xfe, 0x69, 0x64, 0x2e, 0x2e, + 0x1b, 0xda, 0xcb, 0xcb, 0x86, 0xf6, 0xf7, 0x65, 0x43, 0xfb, 0xf9, 0xaa, 0x91, 0x79, 0x79, 0xd5, + 0xc8, 0xfc, 0x79, 0xd5, 0xc8, 0x7c, 0x57, 0x50, 0x6f, 0xf0, 0x7e, 0x1e, 0x4f, 0xe8, 0xd1, 0xbf, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x89, 0x92, 0xc7, 0xe6, 0xed, 0x0b, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1177,6 +1196,13 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ResponseBatchSize != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.ResponseBatchSize)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x80 + } if m.Limit != 0 { i = encodeVarintRpc(dAtA, i, uint64(m.Limit)) i-- @@ -1616,6 +1642,27 @@ func (m *SeriesResponse_Hints) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *SeriesResponse_Batch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesResponse_Batch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Batch != nil { + { + size, err := m.Batch.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func (m *LabelNamesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2011,6 +2058,9 @@ func (m *SeriesRequest) Size() (n int) { if m.Limit != 0 { n += 1 + sovRpc(uint64(m.Limit)) } + if m.ResponseBatchSize != 0 { + n += 2 + sovRpc(uint64(m.ResponseBatchSize)) + } return n } @@ -2151,6 +2201,18 @@ func (m *SeriesResponse_Hints) Size() (n int) { } return n } +func (m *SeriesResponse_Batch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Batch != nil { + l = m.Batch.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} func (m *LabelNamesRequest) Size() (n int) { if m == nil { return 0 @@ -2921,6 +2983,25 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { break } } + case 16: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ResponseBatchSize", wireType) + } + m.ResponseBatchSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ResponseBatchSize |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -3643,6 +3724,41 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Hints{v} iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Batch", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &SeriesBatch{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &SeriesResponse_Batch{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 2de086bc12b..6cfeb2bb77b 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -111,6 +111,10 @@ message SeriesRequest { // limit is used to limit the number of results returned int64 limit = 15; + + // response_batch_size is used to batch Series messages into one SeriesResponse + // If set to 0 or 1, each Series is sent as a separate response + int64 response_batch_size = 16; } // QueryHints represents hints from PromQL that might help to @@ -187,6 +191,9 @@ message SeriesResponse { /// multiple SeriesResponse frames contain hints for a single Series() request and how should they /// be handled in such case (ie. merged vs keep the first/last one). google.protobuf.Any hints = 3; + + /// batch is an array of series so more than 1 series can be included in the response + SeriesBatch batch = 4; } } diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 5d0a1f9867f..d7d55191c5a 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -123,7 +123,7 @@ func (x LabelMatcher_Type) String() string { } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3, 0} + return fileDescriptor_121fba57de02d8e0, []int{4, 0} } type Chunk struct { @@ -203,6 +203,43 @@ func (m *Series) XXX_DiscardUnknown() { var xxx_messageInfo_Series proto.InternalMessageInfo +type SeriesBatch struct { + Series []*Series `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"` +} + +func (m *SeriesBatch) Reset() { *m = SeriesBatch{} } +func (m *SeriesBatch) String() string { return proto.CompactTextString(m) } +func (*SeriesBatch) ProtoMessage() {} +func (*SeriesBatch) Descriptor() ([]byte, []int) { + return fileDescriptor_121fba57de02d8e0, []int{2} +} +func (m *SeriesBatch) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesBatch.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesBatch) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesBatch.Merge(m, src) +} +func (m *SeriesBatch) XXX_Size() int { + return m.Size() +} +func (m *SeriesBatch) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesBatch.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesBatch proto.InternalMessageInfo + type AggrChunk struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` @@ -218,7 +255,7 @@ func (m *AggrChunk) Reset() { *m = AggrChunk{} } func (m *AggrChunk) String() string { return proto.CompactTextString(m) } func (*AggrChunk) ProtoMessage() {} func (*AggrChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{2} + return fileDescriptor_121fba57de02d8e0, []int{3} } func (m *AggrChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +295,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3} + return fileDescriptor_121fba57de02d8e0, []int{4} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -293,6 +330,7 @@ func init() { proto.RegisterEnum("thanos.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) proto.RegisterType((*Chunk)(nil), "thanos.Chunk") proto.RegisterType((*Series)(nil), "thanos.Series") + proto.RegisterType((*SeriesBatch)(nil), "thanos.SeriesBatch") proto.RegisterType((*AggrChunk)(nil), "thanos.AggrChunk") proto.RegisterType((*LabelMatcher)(nil), "thanos.LabelMatcher") } @@ -300,44 +338,45 @@ func init() { func init() { proto.RegisterFile("store/storepb/types.proto", fileDescriptor_121fba57de02d8e0) } var fileDescriptor_121fba57de02d8e0 = []byte{ - // 583 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4d, 0x6f, 0xd3, 0x40, - 0x10, 0xf5, 0x3a, 0x8e, 0x93, 0x0c, 0x2d, 0xb8, 0x4b, 0x05, 0x6e, 0x0f, 0x4e, 0x64, 0x84, 0x88, - 0x2a, 0xd5, 0x96, 0x0a, 0x12, 0x17, 0x2e, 0x4e, 0x65, 0x4a, 0xa5, 0xb6, 0x69, 0xb7, 0x41, 0xa0, - 0x5e, 0xaa, 0x8d, 0xbb, 0xb2, 0xad, 0xc6, 0x1f, 0xb2, 0xd7, 0x90, 0xfe, 0x0b, 0x10, 0x37, 0x0e, - 0xfc, 0x9e, 0x1c, 0x7b, 0x44, 0x1c, 0x2a, 0x68, 0xff, 0x08, 0xf2, 0xda, 0xa1, 0x44, 0xca, 0xc5, - 0x1a, 0xbf, 0xf7, 0x66, 0x66, 0xe7, 0xed, 0x2c, 0x6c, 0xe4, 0x3c, 0xc9, 0x98, 0x2d, 0xbe, 0xe9, - 0xd8, 0xe6, 0x57, 0x29, 0xcb, 0xad, 0x34, 0x4b, 0x78, 0x82, 0x55, 0x1e, 0xd0, 0x38, 0xc9, 0x37, - 0xd7, 0xfd, 0xc4, 0x4f, 0x04, 0x64, 0x97, 0x51, 0xc5, 0x6e, 0xd6, 0x89, 0x13, 0x3a, 0x66, 0x93, - 0xc5, 0x44, 0xf3, 0x3b, 0x82, 0xe6, 0x6e, 0x50, 0xc4, 0x97, 0x78, 0x0b, 0x94, 0x92, 0xd0, 0x51, - 0x0f, 0xf5, 0x1f, 0xee, 0x3c, 0xb1, 0xaa, 0x8a, 0x96, 0x20, 0x2d, 0x37, 0xf6, 0x92, 0x8b, 0x30, - 0xf6, 0x89, 0xd0, 0x60, 0x0c, 0xca, 0x05, 0xe5, 0x54, 0x97, 0x7b, 0xa8, 0xbf, 0x42, 0x44, 0x8c, - 0x75, 0x50, 0x02, 0x9a, 0x07, 0x7a, 0xa3, 0x87, 0xfa, 0xca, 0x40, 0x99, 0xdd, 0x74, 0x11, 0x11, - 0x88, 0xf9, 0x1a, 0xda, 0xf3, 0x7c, 0xdc, 0x82, 0xc6, 0xc7, 0x21, 0xd1, 0x24, 0xbc, 0x0a, 0x9d, - 0x77, 0xfb, 0xa7, 0xa3, 0xe1, 0x1e, 0x71, 0x0e, 0x35, 0x84, 0x1f, 0xc3, 0xa3, 0xb7, 0x07, 0x43, - 0x67, 0x74, 0x7e, 0x0f, 0xca, 0xe6, 0x0f, 0x04, 0xea, 0x29, 0xcb, 0x42, 0x96, 0x63, 0x0f, 0x54, - 0x71, 0xfc, 0x5c, 0x47, 0xbd, 0x46, 0xff, 0xc1, 0xce, 0xea, 0xfc, 0x7c, 0x07, 0x25, 0x3a, 0x78, - 0x33, 0xbb, 0xe9, 0x4a, 0xbf, 0x6e, 0xba, 0xaf, 0xfc, 0x90, 0x07, 0xc5, 0xd8, 0xf2, 0x92, 0xc8, - 0xae, 0x04, 0xdb, 0x61, 0x52, 0x47, 0x76, 0x7a, 0xe9, 0xdb, 0x0b, 0x4e, 0x58, 0x67, 0x22, 0x9b, - 0xd4, 0xa5, 0xb1, 0x0d, 0xaa, 0x57, 0x8e, 0x9b, 0xeb, 0xb2, 0x68, 0xb2, 0x36, 0x6f, 0xe2, 0xf8, - 0x7e, 0x26, 0x8c, 0x10, 0x73, 0x49, 0xa4, 0x96, 0x99, 0xdf, 0x64, 0xe8, 0xfc, 0xe3, 0xf0, 0x06, - 0xb4, 0xa3, 0x30, 0x3e, 0xe7, 0x61, 0x54, 0xb9, 0xd8, 0x20, 0xad, 0x28, 0x8c, 0x47, 0x61, 0xc4, - 0x04, 0x45, 0xa7, 0x15, 0x25, 0xd7, 0x14, 0x9d, 0x0a, 0xaa, 0x0b, 0x8d, 0x8c, 0x7e, 0x16, 0xb6, - 0xfd, 0x37, 0x96, 0xa8, 0x48, 0x4a, 0x06, 0x3f, 0x83, 0xa6, 0x97, 0x14, 0x31, 0xd7, 0x95, 0x65, - 0x92, 0x8a, 0x2b, 0xab, 0xe4, 0x45, 0xa4, 0x37, 0x97, 0x56, 0xc9, 0x8b, 0xa8, 0x14, 0x44, 0x61, - 0xac, 0xab, 0x4b, 0x05, 0x51, 0x18, 0x0b, 0x01, 0x9d, 0xea, 0xad, 0xe5, 0x02, 0x3a, 0xc5, 0x2f, - 0xa0, 0x25, 0x7a, 0xb1, 0x4c, 0x6f, 0x2f, 0x13, 0xcd, 0x59, 0xf3, 0x2b, 0x82, 0x15, 0x61, 0xec, - 0x21, 0xe5, 0x5e, 0xc0, 0x32, 0xbc, 0xbd, 0xb0, 0x5a, 0x1b, 0x0b, 0x57, 0x57, 0x6b, 0xac, 0xd1, - 0x55, 0xca, 0xee, 0xb7, 0x2b, 0xa6, 0xb5, 0x51, 0x1d, 0x22, 0x62, 0xbc, 0x0e, 0xcd, 0x4f, 0x74, - 0x52, 0x30, 0xe1, 0x53, 0x87, 0x54, 0x3f, 0x66, 0x1f, 0x94, 0x32, 0x0f, 0xab, 0x20, 0xbb, 0x27, - 0x9a, 0x54, 0x6e, 0xd7, 0x91, 0x7b, 0xa2, 0xa1, 0x12, 0x20, 0xae, 0x26, 0x0b, 0x80, 0xb8, 0x5a, - 0x63, 0xcb, 0x81, 0xa7, 0xc7, 0x34, 0xe3, 0x21, 0x9d, 0x10, 0x96, 0xa7, 0x49, 0x9c, 0xb3, 0x53, - 0x9e, 0x51, 0xce, 0xfc, 0x2b, 0xdc, 0x06, 0xe5, 0x83, 0x43, 0x8e, 0x34, 0x09, 0x77, 0xa0, 0xe9, - 0x0c, 0x86, 0x64, 0xa4, 0x21, 0xbc, 0x06, 0xab, 0x7b, 0x64, 0xf8, 0xfe, 0xf8, 0x9c, 0xb8, 0xc7, - 0x07, 0xfb, 0xbb, 0x8e, 0x26, 0x0f, 0x9e, 0xcf, 0xfe, 0x18, 0xd2, 0xec, 0xd6, 0x40, 0xd7, 0xb7, - 0x06, 0xfa, 0x7d, 0x6b, 0xa0, 0x2f, 0x77, 0x86, 0x74, 0x7d, 0x67, 0x48, 0x3f, 0xef, 0x0c, 0xe9, - 0xac, 0x55, 0x3f, 0xcb, 0xb1, 0x2a, 0x1e, 0xd6, 0xcb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xee, - 0x6c, 0xcf, 0x99, 0xae, 0x03, 0x00, 0x00, + // 605 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4d, 0x6f, 0xd3, 0x30, + 0x18, 0x8e, 0xd3, 0x34, 0x6d, 0xdf, 0x7d, 0x90, 0x99, 0x09, 0xb2, 0x1d, 0xd2, 0x2a, 0x08, 0xa8, + 0x26, 0x2d, 0x91, 0x06, 0x88, 0x0b, 0x97, 0x74, 0x2a, 0x63, 0xd2, 0xb6, 0x6e, 0x5e, 0x11, 0x68, + 0x97, 0xc9, 0xed, 0xac, 0x34, 0x5a, 0xf3, 0xa1, 0xd8, 0x81, 0xee, 0x5f, 0x80, 0xb8, 0x71, 0xe0, + 0xf7, 0xf4, 0xb8, 0x23, 0xe2, 0x30, 0xc1, 0xf6, 0x47, 0x50, 0x9c, 0x94, 0x51, 0xa9, 0x97, 0xe8, + 0xcd, 0xfb, 0x7c, 0xf8, 0xf5, 0x63, 0x1b, 0x36, 0xb8, 0x88, 0x53, 0xe6, 0xca, 0x6f, 0x32, 0x70, + 0xc5, 0x55, 0xc2, 0xb8, 0x93, 0xa4, 0xb1, 0x88, 0xb1, 0x2e, 0x46, 0x34, 0x8a, 0xf9, 0xe6, 0xba, + 0x1f, 0xfb, 0xb1, 0x6c, 0xb9, 0x79, 0x55, 0xa0, 0x9b, 0xa5, 0x70, 0x4c, 0x07, 0x6c, 0x3c, 0x2f, + 0xb4, 0xbf, 0x23, 0xa8, 0xee, 0x8e, 0xb2, 0xe8, 0x12, 0x6f, 0x81, 0x96, 0x03, 0x26, 0x6a, 0xa1, + 0xf6, 0xea, 0xce, 0x23, 0xa7, 0x70, 0x74, 0x24, 0xe8, 0x74, 0xa3, 0x61, 0x7c, 0x11, 0x44, 0x3e, + 0x91, 0x1c, 0x8c, 0x41, 0xbb, 0xa0, 0x82, 0x9a, 0x6a, 0x0b, 0xb5, 0x97, 0x89, 0xac, 0xb1, 0x09, + 0xda, 0x88, 0xf2, 0x91, 0x59, 0x69, 0xa1, 0xb6, 0xd6, 0xd1, 0xa6, 0x37, 0x4d, 0x44, 0x64, 0xc7, + 0x7e, 0x0d, 0xf5, 0x99, 0x1e, 0xd7, 0xa0, 0xf2, 0xb1, 0x47, 0x0c, 0x05, 0xaf, 0x40, 0xe3, 0xdd, + 0xfe, 0x69, 0xbf, 0xb7, 0x47, 0xbc, 0x43, 0x03, 0xe1, 0x87, 0xf0, 0xe0, 0xed, 0x41, 0xcf, 0xeb, + 0x9f, 0xdf, 0x37, 0x55, 0xfb, 0x07, 0x02, 0xfd, 0x94, 0xa5, 0x01, 0xe3, 0x78, 0x08, 0xba, 0x1c, + 0x9f, 0x9b, 0xa8, 0x55, 0x69, 0x2f, 0xed, 0xac, 0xcc, 0xe6, 0x3b, 0xc8, 0xbb, 0x9d, 0x37, 0xd3, + 0x9b, 0xa6, 0xf2, 0xeb, 0xa6, 0xf9, 0xd2, 0x0f, 0xc4, 0x28, 0x1b, 0x38, 0xc3, 0x38, 0x74, 0x0b, + 0xc2, 0x76, 0x10, 0x97, 0x95, 0x9b, 0x5c, 0xfa, 0xee, 0x5c, 0x12, 0xce, 0x99, 0x54, 0x93, 0xd2, + 0x1a, 0xbb, 0xa0, 0x0f, 0xf3, 0xed, 0x72, 0x53, 0x95, 0x8b, 0xac, 0xcd, 0x16, 0xf1, 0x7c, 0x3f, + 0x95, 0x41, 0xc8, 0x7d, 0x29, 0xa4, 0xa4, 0xd9, 0xaf, 0x60, 0xa9, 0x98, 0xaf, 0x43, 0xc5, 0x70, + 0x84, 0x9f, 0x81, 0xce, 0xe5, 0x6f, 0x39, 0xe4, 0xea, 0x4c, 0x5f, 0x90, 0x48, 0x89, 0xda, 0xdf, + 0x54, 0x68, 0xfc, 0xb3, 0xc4, 0x1b, 0x50, 0x0f, 0x83, 0xe8, 0x5c, 0x04, 0x61, 0x11, 0x7e, 0x85, + 0xd4, 0xc2, 0x20, 0xea, 0x07, 0x21, 0x93, 0x10, 0x9d, 0x14, 0x90, 0x5a, 0x42, 0x74, 0x22, 0xa1, + 0x26, 0x54, 0x52, 0xfa, 0x59, 0xa6, 0xfd, 0x5f, 0x1a, 0xd2, 0x91, 0xe4, 0x08, 0x7e, 0x02, 0xd5, + 0x61, 0x9c, 0x45, 0xc2, 0xd4, 0x16, 0x51, 0x0a, 0x2c, 0x77, 0xe1, 0x59, 0x68, 0x56, 0x17, 0xba, + 0xf0, 0x2c, 0xcc, 0x09, 0x61, 0x10, 0x99, 0xfa, 0x42, 0x42, 0x18, 0x44, 0x92, 0x40, 0x27, 0x66, + 0x6d, 0x31, 0x81, 0x4e, 0xf0, 0x73, 0xa8, 0xc9, 0xb5, 0x58, 0x6a, 0xd6, 0x17, 0x91, 0x66, 0xa8, + 0xfd, 0x15, 0xc1, 0xb2, 0x3c, 0x8f, 0xc3, 0x3c, 0x4c, 0x96, 0xe2, 0xed, 0xb9, 0x1b, 0xb9, 0x31, + 0x77, 0xe2, 0x25, 0xc7, 0xe9, 0x5f, 0x25, 0xec, 0xfe, 0x52, 0x46, 0xb4, 0x0c, 0xaa, 0x41, 0x64, + 0x8d, 0xd7, 0xa1, 0xfa, 0x89, 0x8e, 0x33, 0x26, 0x73, 0x6a, 0x90, 0xe2, 0xc7, 0x6e, 0x83, 0x96, + 0xeb, 0xb0, 0x0e, 0x6a, 0xf7, 0xc4, 0x50, 0xf2, 0x4b, 0x79, 0xd4, 0x3d, 0x31, 0x50, 0xde, 0x20, + 0x5d, 0x43, 0x95, 0x0d, 0xd2, 0x35, 0x2a, 0x5b, 0x1e, 0x3c, 0x3e, 0xa6, 0xa9, 0x08, 0xe8, 0x98, + 0x30, 0x9e, 0xc4, 0x11, 0x67, 0xa7, 0x22, 0xa5, 0x82, 0xf9, 0x57, 0xb8, 0x0e, 0xda, 0x07, 0x8f, + 0x1c, 0x19, 0x0a, 0x6e, 0x40, 0xd5, 0xeb, 0xf4, 0x48, 0xdf, 0x40, 0x78, 0x0d, 0x56, 0xf6, 0x48, + 0xef, 0xfd, 0xf1, 0x39, 0xe9, 0x1e, 0x1f, 0xec, 0xef, 0x7a, 0x86, 0xda, 0x79, 0x3a, 0xfd, 0x63, + 0x29, 0xd3, 0x5b, 0x0b, 0x5d, 0xdf, 0x5a, 0xe8, 0xf7, 0xad, 0x85, 0xbe, 0xdc, 0x59, 0xca, 0xf5, + 0x9d, 0xa5, 0xfc, 0xbc, 0xb3, 0x94, 0xb3, 0x5a, 0xf9, 0x9a, 0x07, 0xba, 0x7c, 0x8f, 0x2f, 0xfe, + 0x06, 0x00, 0x00, 0xff, 0xff, 0xda, 0x77, 0x43, 0x54, 0xe5, 0x03, 0x00, 0x00, } func (m *Chunk) Marshal() (dAtA []byte, err error) { @@ -431,6 +470,43 @@ func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *SeriesBatch) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeriesBatch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Series) > 0 { + for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *AggrChunk) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -629,6 +705,21 @@ func (m *Series) Size() (n int) { return n } +func (m *SeriesBatch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Series) > 0 { + for _, e := range m.Series { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + return n +} + func (m *AggrChunk) Size() (n int) { if m == nil { return 0 @@ -934,6 +1025,90 @@ func (m *Series) Unmarshal(dAtA []byte) error { } return nil } +func (m *SeriesBatch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Series = append(m.Series, &Series{}) + if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AggrChunk) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/store/storepb/types.proto b/pkg/store/storepb/types.proto index 42dfe883c65..32f676a161e 100644 --- a/pkg/store/storepb/types.proto +++ b/pkg/store/storepb/types.proto @@ -36,6 +36,10 @@ message Series { repeated AggrChunk chunks = 2 [(gogoproto.nullable) = false]; } +message SeriesBatch { + repeated Series series = 1; +} + message AggrChunk { int64 min_time = 1; int64 max_time = 2; diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 16b50546986..eead76849fb 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -32,6 +32,7 @@ import ( "github.com/go-kit/log" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/google/go-cmp/cmp" "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -1392,7 +1393,9 @@ func queryWaitAndAssert(t *testing.T, ctx context.Context, addr string, q func() if reflect.DeepEqual(expected, result) { return nil } - return errors.New("series are different") + + return fmt.Errorf("series are different: %s", + cmp.Diff(expected, result)) })) testutil.Equals(t, expected, result) diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index c938a4f0407..99f0a3fc6e5 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -843,8 +843,7 @@ test_metric{a="2", b="2"} 1`) // We run three avalanches, one tenant which exceeds the limit, one tenant which remains under it, and one for the unlimited tenant. // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries and 40 samples. + // So without limiting we end up with 50 timeseries and 50 samples. avalanche1 := e2ethanos.NewAvalanche(e, "avalanche-1", e2ethanos.AvalancheOptions{ MetricCount: "10", @@ -862,8 +861,7 @@ test_metric{a="2", b="2"} 1`) }) // Avalanche in this configuration, would send 5 requests each with 5 of the same timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So we end up with 5 timeseries, 20 samples. + // So we end up with 5 timeseries, 25 samples. avalanche2 := e2ethanos.NewAvalanche(e, "avalanche-2", e2ethanos.AvalancheOptions{ MetricCount: "5", @@ -881,8 +879,7 @@ test_metric{a="2", b="2"} 1`) }) // Avalanche in this configuration, would send 5 requests each with 10 new timeseries. - // One request always fails due to TSDB not being ready for new tenant. - // So without limiting we end up with 40 timeseries and 40 samples. + // So without limiting we end up with 50 timeseries and 50 samples. avalanche3 := e2ethanos.NewAvalanche(e, "avalanche-3", e2ethanos.AvalancheOptions{ MetricCount: "10", @@ -901,9 +898,9 @@ test_metric{a="2", b="2"} 1`) testutil.Ok(t, e2e.StartAndWaitReady(avalanche1, avalanche2, avalanche3)) - // Here, 3/5 requests are failed due to limiting, as one request fails due to TSDB readiness and we ingest one initial request. - // 3 limited requests belong to the exceed-tenant. - testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2emon.Equals(3), []string{"thanos_receive_head_series_limited_requests_total"}, e2emon.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2emon.WaitMissingMetrics())) + // Here, 4/5 requests are failed due to limiting as we ingest one initial request. + // 4 limited requests belong to the exceed-tenant. + testutil.Ok(t, i1Runnable.WaitSumMetricsWithOptions(e2emon.Equals(4), []string{"thanos_receive_head_series_limited_requests_total"}, e2emon.WithWaitBackoff(&backoff.Config{Min: 1 * time.Second, Max: 10 * time.Minute, MaxRetries: 200}), e2emon.WaitMissingMetrics())) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) t.Cleanup(cancel) @@ -976,7 +973,7 @@ test_metric{a="2", b="2"} 1`) "job": "receive-i1", "tenant": "exceed-tenant", }, - Value: model.SampleValue(3), + Value: model.SampleValue(4), }, }) })