diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index f82ed44963..df07025be0 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -250,6 +250,8 @@ func registerQuery(app *extkingpin.App) { rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter)) rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String() + seriesResponseBatchSize := cmd.Flag("query.series-response-batch-size", "How many Series can be batched in one gRPC message. A value of 0 or 1 means no batching (one series per message). Higher values reduce gRPC overhead but increase memory usage.").Hidden().Default("1").Int() + lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").Int() @@ -411,6 +413,7 @@ func registerQuery(app *extkingpin.App) { *rewriteAggregationLabelStrategy, *rewriteAggregationLabelTo, *lazyRetrievalMaxBufferedResponses, + *seriesResponseBatchSize, time.Duration(*grpcStoreClientKeepAlivePingInterval), blockedMetricPatterns, *forwardPartialStrategy, @@ -502,6 +505,7 @@ func runQuery( rewriteAggregationLabelStrategy string, rewriteAggregationLabelTo string, lazyRetrievalMaxBufferedResponses int, + seriesResponseBatchSize int, grpcStoreClientKeepAlivePingInterval time.Duration, blockedMetricPatterns []string, forwardPartialStrategy bool, @@ -655,6 +659,7 @@ func runQuery( DeduplicationFunc: queryDeduplicationFunc, RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy, RewriteAggregationLabelTo: rewriteAggregationLabelTo, + SeriesResponseBatchSize: seriesResponseBatchSize, } level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts)) queryableCreator = query.NewQueryableCreatorWithOptions( diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 64bab65a02..36b511ce1a 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -62,6 +62,7 @@ type Options struct { DeduplicationFunc string RewriteAggregationLabelStrategy string RewriteAggregationLabelTo string + SeriesResponseBatchSize int } // NewQueryableCreator creates QueryableCreator. @@ -169,6 +170,7 @@ type querier struct { selectTimeout time.Duration shardInfo *storepb.ShardInfo seriesStatsReporter seriesStatsReporter + seriesResponseBatchSize int aggregationLabelRewriter *AggregationLabelRewriter } @@ -248,6 +250,7 @@ func newQuerierWithOpts( skipChunks: skipChunks, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, + seriesResponseBatchSize: opts.SeriesResponseBatchSize, aggregationLabelRewriter: aggregationLabelRewriter, } @@ -279,6 +282,17 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { return nil } + if b := r.GetBatch(); b != nil { + for _, series := range b.Series { + if series == nil { + continue + } + s.seriesSet = append(s.seriesSet, *series) + } + s.seriesSetStats.Count(r) + return nil + } + // Unsupported field, skip. return nil } @@ -410,6 +424,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . ShardInfo: q.shardInfo, PartialResponseStrategy: q.partialResponseStrategy, SkipChunks: q.skipChunks, + ResponseBatchSize: int64(q.seriesResponseBatchSize), } if q.isDedupEnabled() { // Soft ask to sort without replica labels and push them at the end of labelset. diff --git a/pkg/store/batchable.go b/pkg/store/batchable.go new file mode 100644 index 0000000000..34e9843f8e --- /dev/null +++ b/pkg/store/batchable.go @@ -0,0 +1,60 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package store + +import ( + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +// batchableServer wraps a storepb.Store_SeriesServer and batches series responses +// to reduce gRPC overhead. When the batch size is reached, it sends all buffered +// series as a single SeriesBatch response. +type batchableServer struct { + storepb.Store_SeriesServer + batchSize int + series []*storepb.Series +} + +func newBatchableServer(upstream storepb.Store_SeriesServer, batchSize int) storepb.Store_SeriesServer { + if batchSize <= 1 { + return &passthroughServer{Store_SeriesServer: upstream} + } + return &batchableServer{ + Store_SeriesServer: upstream, + batchSize: batchSize, + series: make([]*storepb.Series, 0, batchSize), + } +} + +// Send buffers series responses and sends them as batches when the buffer is full. +// Non-series responses (warnings, hints) trigger an immediate flush before being sent. +func (b *batchableServer) Send(response *storepb.SeriesResponse) error { + series := response.GetSeries() + if series == nil { + // Non-series response (warning/hints): flush batch first, then send + if err := b.Flush(); err != nil { + return err + } + return b.Store_SeriesServer.Send(response) + } + + b.series = append(b.series, series) + if len(b.series) >= b.batchSize { + return b.Flush() + } + return nil +} + +// Flush sends any buffered series as a batch response. +// Implements the flushableServer interface. +func (b *batchableServer) Flush() error { + if len(b.series) == 0 { + return nil + } + if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil { + return err + } + b.series = b.series[:0] + return nil +} diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index d8f1b57701..b605fdb172 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1496,7 +1496,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill // Series implements the storepb.StoreServer interface. func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) { - srv := newFlushableServer(seriesSrv, sortingStrategyNone) + srv := newFlushableServer(newBatchableServer(seriesSrv, int(req.ResponseBatchSize)), sortingStrategyNone) if s.queryGate != nil { tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) { diff --git a/pkg/store/flushable.go b/pkg/store/flushable.go index 33680c3c89..d12203d64a 100644 --- a/pkg/store/flushable.go +++ b/pkg/store/flushable.go @@ -49,7 +49,12 @@ type passthroughServer struct { storepb.Store_SeriesServer } -func (p *passthroughServer) Flush() error { return nil } +func (p *passthroughServer) Flush() error { + if f, ok := p.Store_SeriesServer.(flushableServer); ok { + return f.Flush() + } + return nil +} // resortingServer is a flushableServer that resorts all series by their labels. // This is required if replica labels are stored internally in a TSDB. @@ -89,5 +94,9 @@ func (r *resortingServer) Flush() error { return err } } + // Delegate to the wrapped server if it's flushable (e.g., batchableServer) + if f, ok := r.Store_SeriesServer.(flushableServer); ok { + return f.Flush() + } return nil } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 3c30d70e8e..aa4aedc1cb 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -122,7 +122,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) { // Series returns all series for a requested time range and label matcher. func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { - s := newFlushableServer(seriesSrv, sortingStrategyStore) + s := newFlushableServer(newBatchableServer(seriesSrv, int(r.ResponseBatchSize)), sortingStrategyStore) extLset := p.externalLabelsFn() diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 434cb220e9..103e687c66 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -376,6 +376,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. if s.debugLogging { reqLogger = log.With(reqLogger, "request", originalRequest.String()) } + srv = newBatchableServer(srv, int(originalRequest.ResponseBatchSize)) match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { @@ -502,6 +503,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. PartialResponseStrategy: originalRequest.PartialResponseStrategy, ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, + ResponseBatchSize: originalRequest.ResponseBatchSize, } if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA && !s.forwardPartialStrategy { // Do not forward this field as it might cause data loss. @@ -699,7 +701,11 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) } } - + if f, ok := srv.(flushableServer); ok { + if err := f.Flush(); err != nil { + return err + } + } return nil } diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 54a46692c2..2ff8222519 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -467,6 +467,32 @@ func newLazyRespSet( numResponses++ bytesProcessed += resp.Size() + // Handle batch responses by unpacking them into individual series responses + if batch := resp.GetBatch(); batch != nil { + for _, series := range batch.Series { + if series == nil { + continue + } + if applySharding && !shardMatcher.MatchesZLabels(series.Labels) { + continue + } + seriesResp := storepb.NewSeriesResponse(series) + seriesStats.Count(seriesResp) + + l.bufferedResponsesMtx.Lock() + for l.isFull() && !l.closed { + l.bufferSlotEvent.Wait() + } + if !l.closed { + l.bufferedResponses[l.ringTail] = seriesResp + l.ringTail = (l.ringTail + 1) % l.fixedBufferSize + l.dataOrFinishEvent.Signal() + } + l.bufferedResponsesMtx.Unlock() + } + return true + } + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } @@ -740,6 +766,22 @@ func newEagerRespSet( numResponses++ bytesProcessed += resp.Size() + // Handle batch responses by unpacking them into individual series responses + if batch := resp.GetBatch(); batch != nil { + for _, series := range batch.Series { + if series == nil { + continue + } + if applySharding && !shardMatcher.MatchesZLabels(series.Labels) { + continue + } + seriesResp := storepb.NewSeriesResponse(series) + seriesStats.Count(seriesResp) + l.bufferedResponses = append(l.bufferedResponses, seriesResp) + } + return true + } + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index de3b5e7ac7..8db62b2d8f 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -52,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse { } } +func NewBatchResponse(batch []*Series) *SeriesResponse { + return &SeriesResponse{ + Result: &SeriesResponse_Batch{ + Batch: &SeriesBatch{ + Series: batch, + }, + }, + } +} + func GRPCCodeFromWarn(warn string) codes.Code { if strings.Contains(warn, "rpc error: code = ResourceExhausted") { return codes.ResourceExhausted @@ -535,13 +545,7 @@ func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) { } } -func (c *SeriesStatsCounter) Count(r *SeriesResponse) { - //aggregate # of bytes fetched - c.Bytes += uint64(r.Size()) - if r.GetSeries() == nil { - return - } - series := r.GetSeries() +func (c *SeriesStatsCounter) countSingleSeries(series *Series) { c.CountSeries(series.Labels) for _, chk := range series.Chunks { if chk.Raw != nil { @@ -576,6 +580,27 @@ func (c *SeriesStatsCounter) Count(r *SeriesResponse) { } } +func (c *SeriesStatsCounter) Count(r *SeriesResponse) { + //aggregate # of bytes fetched + c.Bytes += uint64(r.Size()) + + // Handle batch responses + if batch := r.GetBatch(); batch != nil { + for _, series := range batch.Series { + if series == nil { + continue + } + c.countSingleSeries(series) + } + return + } + + if r.GetSeries() == nil { + return + } + c.countSingleSeries(r.GetSeries()) +} + func (m *SeriesRequest) ToPromQL() string { return m.QueryHints.toPromQL(m.Matchers) } diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index 9d39f27e14..0dbda94493 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -182,6 +182,9 @@ type SeriesRequest struct { WithoutReplicaLabels []string `protobuf:"bytes,14,rep,name=without_replica_labels,json=withoutReplicaLabels,proto3" json:"without_replica_labels,omitempty"` // limit is used to limit the number of results returned Limit int64 `protobuf:"varint,15,opt,name=limit,proto3" json:"limit,omitempty"` + // response_batch_size controls how many series should be batched together + // in a single gRPC message. A value of 0 or 1 means no batching (one series per message). + ResponseBatchSize int64 `protobuf:"varint,16,opt,name=response_batch_size,json=responseBatchSize,proto3" json:"response_batch_size,omitempty"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } @@ -430,6 +433,7 @@ type SeriesResponse struct { // *SeriesResponse_Series // *SeriesResponse_Warning // *SeriesResponse_Hints + // *SeriesResponse_Batch Result isSeriesResponse_Result `protobuf_oneof:"result"` } @@ -481,10 +485,14 @@ type SeriesResponse_Warning struct { type SeriesResponse_Hints struct { Hints *types.Any `protobuf:"bytes,3,opt,name=hints,proto3,oneof" json:"hints,omitempty"` } +type SeriesResponse_Batch struct { + Batch *SeriesBatch `protobuf:"bytes,4,opt,name=batch,proto3,oneof" json:"batch,omitempty"` +} func (*SeriesResponse_Series) isSeriesResponse_Result() {} func (*SeriesResponse_Warning) isSeriesResponse_Result() {} func (*SeriesResponse_Hints) isSeriesResponse_Result() {} +func (*SeriesResponse_Batch) isSeriesResponse_Result() {} func (m *SeriesResponse) GetResult() isSeriesResponse_Result { if m != nil { @@ -514,12 +522,20 @@ func (m *SeriesResponse) GetHints() *types.Any { return nil } +func (m *SeriesResponse) GetBatch() *SeriesBatch { + if x, ok := m.GetResult().(*SeriesResponse_Batch); ok { + return x.Batch + } + return nil +} + // XXX_OneofWrappers is for the internal use of the proto package. func (*SeriesResponse) XXX_OneofWrappers() []interface{} { return []interface{}{ (*SeriesResponse_Series)(nil), (*SeriesResponse_Warning)(nil), (*SeriesResponse_Hints)(nil), + (*SeriesResponse_Batch)(nil), } } @@ -728,79 +744,82 @@ func init() { func init() { proto.RegisterFile("store/storepb/rpc.proto", fileDescriptor_a938d55a388af629) } var fileDescriptor_a938d55a388af629 = []byte{ - // 1149 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x4b, 0x6f, 0x23, 0x45, - 0x10, 0xf6, 0x78, 0x3c, 0x7e, 0x94, 0x13, 0xaf, 0xb7, 0xd7, 0xc9, 0x4e, 0xbc, 0x92, 0x63, 0x8c, - 0x90, 0xac, 0x55, 0xe4, 0xac, 0xbc, 0x08, 0x09, 0xc4, 0x25, 0x09, 0x2c, 0x59, 0x89, 0x04, 0xe8, - 0xec, 0x12, 0x04, 0x87, 0x51, 0xdb, 0xee, 0x8c, 0x47, 0x3b, 0xaf, 0x4c, 0xf7, 0x90, 0xf8, 0x0c, - 0x67, 0xc4, 0x9d, 0xdb, 0xfe, 0x9a, 0xdc, 0xd8, 0x23, 0x27, 0x04, 0xc9, 0x1f, 0x41, 0xfd, 0x18, - 0x3f, 0x82, 0xf7, 0xa5, 0xe4, 0x62, 0x75, 0x7d, 0x5f, 0x75, 0x4d, 0x75, 0xf5, 0x57, 0xe5, 0x86, - 0xfb, 0x8c, 0x47, 0x09, 0xdd, 0x96, 0xbf, 0xf1, 0x60, 0x3b, 0x89, 0x87, 0xbd, 0x38, 0x89, 0x78, - 0x84, 0x8a, 0x7c, 0x4c, 0xc2, 0x88, 0x35, 0x37, 0x16, 0x1d, 0xf8, 0x24, 0xa6, 0x4c, 0xb9, 0x34, - 0x1b, 0x6e, 0xe4, 0x46, 0x72, 0xb9, 0x2d, 0x56, 0x1a, 0x6d, 0x2f, 0x6e, 0x88, 0x93, 0x28, 0xb8, - 0xb6, 0x6f, 0xc3, 0x8d, 0x22, 0xd7, 0xa7, 0xdb, 0xd2, 0x1a, 0xa4, 0x27, 0xdb, 0x24, 0x9c, 0x28, - 0xaa, 0x73, 0x07, 0x56, 0x8f, 0x13, 0x8f, 0x53, 0x4c, 0x59, 0x1c, 0x85, 0x8c, 0x76, 0x7e, 0x31, - 0x60, 0x45, 0x23, 0xa7, 0x29, 0x65, 0x1c, 0xed, 0x00, 0x70, 0x2f, 0xa0, 0x8c, 0x26, 0x1e, 0x65, - 0xb6, 0xd1, 0x36, 0xbb, 0xd5, 0xfe, 0x03, 0xb1, 0x3b, 0xa0, 0x7c, 0x4c, 0x53, 0xe6, 0x0c, 0xa3, - 0x78, 0xd2, 0x7b, 0xe6, 0x05, 0xf4, 0x48, 0xba, 0xec, 0x16, 0x2e, 0xfe, 0xde, 0xcc, 0xe1, 0xb9, - 0x4d, 0x68, 0x1d, 0x8a, 0x9c, 0x86, 0x24, 0xe4, 0x76, 0xbe, 0x6d, 0x74, 0x2b, 0x58, 0x5b, 0xc8, - 0x86, 0x52, 0x42, 0x63, 0xdf, 0x1b, 0x12, 0xdb, 0x6c, 0x1b, 0x5d, 0x13, 0x67, 0x66, 0xe7, 0xa5, - 0x05, 0xab, 0x2a, 0x5c, 0x96, 0xc6, 0x06, 0x94, 0x03, 0x2f, 0x74, 0x44, 0x54, 0xdb, 0x50, 0xce, - 0x81, 0x17, 0x8a, 0xcf, 0x4a, 0x8a, 0x9c, 0x2b, 0x2a, 0xaf, 0x29, 0x72, 0x2e, 0xa9, 0x4f, 0x04, - 0xc5, 0x87, 0x63, 0x9a, 0x30, 0xdb, 0x94, 0xa9, 0x37, 0x7a, 0xaa, 0xce, 0xbd, 0xaf, 0xc9, 0x80, - 0xfa, 0x07, 0x8a, 0xd4, 0x39, 0x4f, 0x7d, 0x51, 0x1f, 0xd6, 0x44, 0xc8, 0x84, 0xb2, 0xc8, 0x4f, - 0xb9, 0x17, 0x85, 0xce, 0x99, 0x17, 0x8e, 0xa2, 0x33, 0xbb, 0x20, 0xe3, 0xdf, 0x0b, 0xc8, 0x39, - 0x9e, 0x72, 0xc7, 0x92, 0x42, 0x5b, 0x00, 0xc4, 0x75, 0x13, 0xea, 0x12, 0x4e, 0x99, 0x6d, 0xb5, - 0xcd, 0x6e, 0xad, 0xbf, 0x92, 0x7d, 0x6d, 0xc7, 0x75, 0x13, 0x3c, 0xc7, 0xa3, 0xcf, 0x60, 0x23, - 0x26, 0x09, 0xf7, 0x88, 0x2f, 0xbe, 0x22, 0x6b, 0xef, 0x8c, 0x3c, 0x46, 0x06, 0x3e, 0x1d, 0xd9, - 0xc5, 0xb6, 0xd1, 0x2d, 0xe3, 0xfb, 0xda, 0x21, 0xbb, 0x9b, 0x2f, 0x34, 0x8d, 0x7e, 0x5a, 0xb2, - 0x97, 0xf1, 0x84, 0x70, 0xea, 0x4e, 0xec, 0x52, 0xdb, 0xe8, 0xd6, 0xfa, 0x9b, 0xd9, 0x87, 0xbf, - 0x5d, 0x8c, 0x71, 0xa4, 0xdd, 0xfe, 0x17, 0x3c, 0x23, 0xd0, 0x26, 0x54, 0xd9, 0x0b, 0x2f, 0x76, - 0x86, 0xe3, 0x34, 0x7c, 0xc1, 0xec, 0xb2, 0x4c, 0x05, 0x04, 0xb4, 0x27, 0x11, 0xf4, 0x10, 0xac, - 0xb1, 0x17, 0x72, 0x66, 0x57, 0xda, 0x86, 0x2c, 0xa8, 0x52, 0x57, 0x2f, 0x53, 0x57, 0x6f, 0x27, - 0x9c, 0x60, 0xe5, 0x82, 0x10, 0x14, 0x18, 0xa7, 0xb1, 0x0d, 0xb2, 0x6c, 0x72, 0x8d, 0x1a, 0x60, - 0x25, 0x24, 0x74, 0xa9, 0x5d, 0x95, 0xa0, 0x32, 0xd0, 0x63, 0xa8, 0x9e, 0xa6, 0x34, 0x99, 0x38, - 0x2a, 0xf6, 0x8a, 0x8c, 0x8d, 0xb2, 0x53, 0x7c, 0x27, 0xa8, 0x7d, 0xc1, 0x60, 0x38, 0x9d, 0xae, - 0xd1, 0x23, 0x00, 0x36, 0x26, 0xc9, 0xc8, 0xf1, 0xc2, 0x93, 0xc8, 0x5e, 0x95, 0x7b, 0xee, 0x66, - 0x7b, 0x8e, 0x04, 0xf3, 0x34, 0x3c, 0x89, 0x70, 0x85, 0x65, 0x4b, 0xf4, 0x31, 0xac, 0x9f, 0x79, - 0x7c, 0x1c, 0xa5, 0xdc, 0xd1, 0x5a, 0x73, 0x7c, 0x21, 0x04, 0x66, 0xd7, 0xda, 0x66, 0xb7, 0x82, - 0x1b, 0x9a, 0xc5, 0x8a, 0x94, 0x22, 0x61, 0x22, 0x65, 0xdf, 0x0b, 0x3c, 0x6e, 0xdf, 0x51, 0x29, - 0x4b, 0xa3, 0xf3, 0xd2, 0x00, 0x98, 0x25, 0x26, 0x0b, 0xc7, 0x69, 0xec, 0x04, 0x9e, 0xef, 0x7b, - 0x4c, 0x8b, 0x14, 0x04, 0x74, 0x20, 0x11, 0xd4, 0x86, 0xc2, 0x49, 0x1a, 0x0e, 0xa5, 0x46, 0xab, - 0x33, 0x69, 0x3c, 0x49, 0xc3, 0x21, 0x96, 0x0c, 0xda, 0x82, 0xb2, 0x9b, 0x44, 0x69, 0xec, 0x85, - 0xae, 0x54, 0x5a, 0xb5, 0x5f, 0xcf, 0xbc, 0xbe, 0xd2, 0x38, 0x9e, 0x7a, 0xa0, 0x0f, 0xb3, 0x42, - 0x5a, 0xd2, 0x75, 0x35, 0x73, 0xc5, 0x02, 0xd4, 0x75, 0xed, 0x9c, 0x41, 0x65, 0x5a, 0x08, 0x99, - 0xa2, 0xae, 0xd7, 0x88, 0x9e, 0x4f, 0x53, 0x54, 0xfc, 0x88, 0x9e, 0xa3, 0x0f, 0x60, 0x85, 0x47, - 0x9c, 0xf8, 0x8e, 0xc4, 0x98, 0x6e, 0xa7, 0xaa, 0xc4, 0x64, 0x18, 0x86, 0x6a, 0x90, 0x1f, 0x4c, - 0x64, 0xbf, 0x96, 0x71, 0x7e, 0x30, 0x11, 0xcd, 0xad, 0x2b, 0x58, 0x90, 0x15, 0xd4, 0x56, 0xa7, - 0x09, 0x05, 0x71, 0x32, 0x21, 0x81, 0x90, 0xe8, 0xa6, 0xad, 0x60, 0xb9, 0xee, 0xf4, 0xa1, 0x9c, - 0x9d, 0x47, 0xc7, 0x33, 0x96, 0xc4, 0x33, 0x17, 0xe2, 0x6d, 0x82, 0x25, 0x0f, 0x26, 0x1c, 0x16, - 0x4a, 0xac, 0xad, 0xce, 0x6f, 0x06, 0xd4, 0xb2, 0x99, 0xa1, 0x34, 0x8d, 0xba, 0x50, 0x9c, 0xce, - 0x2d, 0x51, 0xa2, 0xda, 0x54, 0x1b, 0x12, 0xdd, 0xcf, 0x61, 0xcd, 0xa3, 0x26, 0x94, 0xce, 0x48, - 0x12, 0x8a, 0xc2, 0xcb, 0x19, 0xb5, 0x9f, 0xc3, 0x19, 0x80, 0xb6, 0x32, 0xc1, 0x9b, 0xaf, 0x17, - 0xfc, 0x7e, 0x4e, 0x4b, 0x7e, 0xb7, 0x0c, 0xc5, 0x84, 0xb2, 0xd4, 0xe7, 0x9d, 0x5f, 0x4d, 0xb8, - 0x2b, 0x05, 0x74, 0x48, 0x82, 0xd9, 0x20, 0x7b, 0x63, 0xe3, 0x1b, 0x37, 0x68, 0xfc, 0xfc, 0x0d, - 0x1b, 0xbf, 0x01, 0x16, 0xe3, 0x24, 0xe1, 0x7a, 0x16, 0x2b, 0x03, 0xd5, 0xc1, 0xa4, 0xe1, 0x48, - 0xcf, 0x3d, 0xb1, 0x9c, 0xf5, 0xbf, 0xf5, 0xf6, 0xfe, 0x9f, 0x9f, 0xbf, 0xc5, 0xf7, 0x98, 0xbf, - 0xaf, 0x6f, 0xd3, 0xd2, 0xbb, 0xb4, 0x69, 0x79, 0xbe, 0x4d, 0x13, 0x40, 0xf3, 0xb7, 0xa0, 0xa5, - 0xd1, 0x00, 0x4b, 0x48, 0x51, 0xfd, 0xa3, 0x55, 0xb0, 0x32, 0x50, 0x13, 0xca, 0xfa, 0xd6, 0x85, - 0xf6, 0x05, 0x31, 0xb5, 0x67, 0xe7, 0x36, 0xdf, 0x7a, 0xee, 0xce, 0x1f, 0xa6, 0xfe, 0xe8, 0xf7, - 0xc4, 0x4f, 0x67, 0x77, 0x2f, 0x12, 0x14, 0xa8, 0x6e, 0x06, 0x65, 0xbc, 0x59, 0x11, 0xf9, 0x1b, - 0x28, 0xc2, 0xbc, 0x2d, 0x45, 0x14, 0x96, 0x28, 0xc2, 0x5a, 0xa2, 0x88, 0xe2, 0xfb, 0x29, 0xa2, - 0x74, 0x2b, 0x8a, 0x28, 0xbf, 0x8b, 0x22, 0x2a, 0xf3, 0x8a, 0x48, 0xe1, 0xde, 0xc2, 0xe5, 0x68, - 0x49, 0xac, 0x43, 0xf1, 0x67, 0x89, 0x68, 0x4d, 0x68, 0xeb, 0xb6, 0x44, 0xf1, 0x70, 0x17, 0x0a, - 0xe2, 0x19, 0x80, 0x4a, 0x60, 0xe2, 0x9d, 0xe3, 0x7a, 0x0e, 0x55, 0xc0, 0xda, 0xfb, 0xe6, 0xf9, - 0xe1, 0xb3, 0xba, 0x21, 0xb0, 0xa3, 0xe7, 0x07, 0xf5, 0xbc, 0x58, 0x1c, 0x3c, 0x3d, 0xac, 0x9b, - 0x72, 0xb1, 0xf3, 0x43, 0xbd, 0x80, 0xaa, 0x50, 0x92, 0x5e, 0x5f, 0xe2, 0xba, 0xd5, 0xff, 0xd3, - 0x00, 0xeb, 0x48, 0xbc, 0xf4, 0xd0, 0xa7, 0x50, 0x54, 0x53, 0x0c, 0xad, 0x2d, 0x4e, 0x35, 0x2d, - 0xb6, 0xe6, 0xfa, 0x75, 0x58, 0x1d, 0xf3, 0x91, 0x81, 0xf6, 0x00, 0x66, 0x1d, 0x81, 0x36, 0x16, - 0xea, 0x3f, 0x3f, 0xab, 0x9a, 0xcd, 0x65, 0x94, 0xae, 0xd6, 0x13, 0xa8, 0xce, 0x15, 0x11, 0x2d, - 0xba, 0x2e, 0xc8, 0xbe, 0xf9, 0x60, 0x29, 0xa7, 0xe2, 0xf4, 0x0f, 0xa1, 0x26, 0xdf, 0x9b, 0x42, - 0xcf, 0xea, 0x64, 0x9f, 0x43, 0x15, 0xd3, 0x20, 0xe2, 0x54, 0xe2, 0x68, 0xaa, 0x8f, 0xf9, 0x67, - 0x69, 0x73, 0xed, 0x1a, 0xaa, 0x9f, 0xaf, 0xb9, 0xdd, 0x8f, 0x2e, 0xfe, 0x6d, 0xe5, 0x2e, 0x2e, - 0x5b, 0xc6, 0xab, 0xcb, 0x96, 0xf1, 0xcf, 0x65, 0xcb, 0xf8, 0xfd, 0xaa, 0x95, 0x7b, 0x75, 0xd5, - 0xca, 0xfd, 0x75, 0xd5, 0xca, 0xfd, 0x58, 0xd2, 0xcf, 0xe4, 0x41, 0x51, 0xde, 0xd0, 0xe3, 0xff, - 0x02, 0x00, 0x00, 0xff, 0xff, 0x84, 0xe1, 0x09, 0x34, 0x90, 0x0b, 0x00, 0x00, + // 1197 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x5d, 0x6f, 0xe3, 0x44, + 0x17, 0x8e, 0xe3, 0x38, 0x1f, 0x27, 0x6d, 0x36, 0x3b, 0x4d, 0xbb, 0x6e, 0x56, 0x4a, 0xf3, 0xe6, + 0x15, 0x52, 0xb4, 0x54, 0xe9, 0x2a, 0x8b, 0x90, 0x40, 0xdc, 0xb4, 0x85, 0xa5, 0x2b, 0xd1, 0x02, + 0xce, 0x2e, 0x45, 0x70, 0x61, 0x39, 0xc9, 0xd4, 0xb1, 0xd6, 0xb1, 0x5d, 0xcf, 0x98, 0x36, 0x7b, + 0x0b, 0x3f, 0x80, 0x7b, 0xee, 0xf8, 0x1b, 0xfc, 0x81, 0xde, 0xb1, 0xe2, 0x8a, 0x2b, 0x04, 0xed, + 0x1f, 0x41, 0x73, 0x66, 0x9c, 0x8f, 0x92, 0xfd, 0x52, 0x7b, 0x13, 0xcd, 0x79, 0x9e, 0x33, 0xc7, + 0x33, 0x67, 0x9e, 0x79, 0x32, 0x70, 0x8f, 0xf1, 0x30, 0xa6, 0x3b, 0xf8, 0x1b, 0xf5, 0x77, 0xe2, + 0x68, 0xd0, 0x89, 0xe2, 0x90, 0x87, 0x24, 0xcf, 0x47, 0x4e, 0x10, 0xb2, 0xfa, 0xe6, 0x62, 0x02, + 0x9f, 0x44, 0x94, 0xc9, 0x94, 0x7a, 0xcd, 0x0d, 0xdd, 0x10, 0x87, 0x3b, 0x62, 0xa4, 0xd0, 0xe6, + 0xe2, 0x84, 0x28, 0x0e, 0xc7, 0xd7, 0xe6, 0x6d, 0xba, 0x61, 0xe8, 0xfa, 0x74, 0x07, 0xa3, 0x7e, + 0x72, 0xb2, 0xe3, 0x04, 0x13, 0x49, 0xb5, 0xee, 0xc0, 0xea, 0x71, 0xec, 0x71, 0x6a, 0x51, 0x16, + 0x85, 0x01, 0xa3, 0xad, 0x1f, 0x35, 0x58, 0x51, 0xc8, 0x69, 0x42, 0x19, 0x27, 0xbb, 0x00, 0xdc, + 0x1b, 0x53, 0x46, 0x63, 0x8f, 0x32, 0x53, 0x6b, 0xea, 0xed, 0x72, 0xf7, 0xbe, 0x98, 0x3d, 0xa6, + 0x7c, 0x44, 0x13, 0x66, 0x0f, 0xc2, 0x68, 0xd2, 0x79, 0xea, 0x8d, 0x69, 0x0f, 0x53, 0xf6, 0x72, + 0x17, 0x7f, 0x6d, 0x65, 0xac, 0xb9, 0x49, 0x64, 0x03, 0xf2, 0x9c, 0x06, 0x4e, 0xc0, 0xcd, 0x6c, + 0x53, 0x6b, 0x97, 0x2c, 0x15, 0x11, 0x13, 0x0a, 0x31, 0x8d, 0x7c, 0x6f, 0xe0, 0x98, 0x7a, 0x53, + 0x6b, 0xeb, 0x56, 0x1a, 0xb6, 0xfe, 0x30, 0x60, 0x55, 0x96, 0x4b, 0x97, 0xb1, 0x09, 0xc5, 0xb1, + 0x17, 0xd8, 0xa2, 0xaa, 0xa9, 0xc9, 0xe4, 0xb1, 0x17, 0x88, 0xcf, 0x22, 0xe5, 0x9c, 0x4b, 0x2a, + 0xab, 0x28, 0xe7, 0x1c, 0xa9, 0x0f, 0x05, 0xc5, 0x07, 0x23, 0x1a, 0x33, 0x53, 0xc7, 0xa5, 0xd7, + 0x3a, 0xb2, 0xcf, 0x9d, 0x2f, 0x9c, 0x3e, 0xf5, 0x0f, 0x25, 0xa9, 0xd6, 0x3c, 0xcd, 0x25, 0x5d, + 0x58, 0x17, 0x25, 0x63, 0xca, 0x42, 0x3f, 0xe1, 0x5e, 0x18, 0xd8, 0x67, 0x5e, 0x30, 0x0c, 0xcf, + 0xcc, 0x1c, 0xd6, 0x5f, 0x1b, 0x3b, 0xe7, 0xd6, 0x94, 0x3b, 0x46, 0x8a, 0x6c, 0x03, 0x38, 0xae, + 0x1b, 0x53, 0xd7, 0xe1, 0x94, 0x99, 0x46, 0x53, 0x6f, 0x57, 0xba, 0x2b, 0xe9, 0xd7, 0x76, 0x5d, + 0x37, 0xb6, 0xe6, 0x78, 0xf2, 0x31, 0x6c, 0x46, 0x4e, 0xcc, 0x3d, 0xc7, 0x17, 0x5f, 0xc1, 0xde, + 0xdb, 0x43, 0x8f, 0x39, 0x7d, 0x9f, 0x0e, 0xcd, 0x7c, 0x53, 0x6b, 0x17, 0xad, 0x7b, 0x2a, 0x21, + 0x3d, 0x9b, 0x4f, 0x15, 0x4d, 0xbe, 0x5f, 0x32, 0x97, 0xf1, 0xd8, 0xe1, 0xd4, 0x9d, 0x98, 0x85, + 0xa6, 0xd6, 0xae, 0x74, 0xb7, 0xd2, 0x0f, 0x7f, 0xb5, 0x58, 0xa3, 0xa7, 0xd2, 0xfe, 0x53, 0x3c, + 0x25, 0xc8, 0x16, 0x94, 0xd9, 0x73, 0x2f, 0xb2, 0x07, 0xa3, 0x24, 0x78, 0xce, 0xcc, 0x22, 0x2e, + 0x05, 0x04, 0xb4, 0x8f, 0x08, 0x79, 0x00, 0xc6, 0xc8, 0x0b, 0x38, 0x33, 0x4b, 0x4d, 0x0d, 0x1b, + 0x2a, 0xd5, 0xd5, 0x49, 0xd5, 0xd5, 0xd9, 0x0d, 0x26, 0x96, 0x4c, 0x21, 0x04, 0x72, 0x8c, 0xd3, + 0xc8, 0x04, 0x6c, 0x1b, 0x8e, 0x49, 0x0d, 0x8c, 0xd8, 0x09, 0x5c, 0x6a, 0x96, 0x11, 0x94, 0x01, + 0x79, 0x04, 0xe5, 0xd3, 0x84, 0xc6, 0x13, 0x5b, 0xd6, 0x5e, 0xc1, 0xda, 0x24, 0xdd, 0xc5, 0xd7, + 0x82, 0x3a, 0x10, 0x8c, 0x05, 0xa7, 0xd3, 0x31, 0x79, 0x08, 0xc0, 0x46, 0x4e, 0x3c, 0xb4, 0xbd, + 0xe0, 0x24, 0x34, 0x57, 0x71, 0xce, 0xdd, 0x74, 0x4e, 0x4f, 0x30, 0x4f, 0x82, 0x93, 0xd0, 0x2a, + 0xb1, 0x74, 0x48, 0x3e, 0x80, 0x8d, 0x33, 0x8f, 0x8f, 0xc2, 0x84, 0xdb, 0x4a, 0x6b, 0xb6, 0x2f, + 0x84, 0xc0, 0xcc, 0x4a, 0x53, 0x6f, 0x97, 0xac, 0x9a, 0x62, 0x2d, 0x49, 0xa2, 0x48, 0x98, 0x58, + 0xb2, 0xef, 0x8d, 0x3d, 0x6e, 0xde, 0x91, 0x4b, 0xc6, 0x80, 0x74, 0x60, 0x6d, 0xda, 0xfe, 0xbe, + 0x50, 0x8e, 0xcd, 0xbc, 0x17, 0xd4, 0xac, 0x62, 0xce, 0xdd, 0x94, 0xda, 0x13, 0x4c, 0xcf, 0x7b, + 0x41, 0x5b, 0xbf, 0x6a, 0x00, 0xb3, 0x8d, 0x60, 0xa3, 0x39, 0x8d, 0xec, 0xb1, 0xe7, 0xfb, 0x1e, + 0x53, 0xa2, 0x06, 0x01, 0x1d, 0x22, 0x42, 0x9a, 0x90, 0x3b, 0x49, 0x82, 0x01, 0x6a, 0xba, 0x3c, + 0x93, 0xd2, 0xe3, 0x24, 0x18, 0x58, 0xc8, 0x90, 0x6d, 0x28, 0xba, 0x71, 0x98, 0x44, 0x5e, 0xe0, + 0xa2, 0x32, 0xcb, 0xdd, 0x6a, 0x9a, 0xf5, 0xb9, 0xc2, 0xad, 0x69, 0x06, 0xf9, 0x7f, 0xda, 0x78, + 0x03, 0x53, 0x57, 0xd3, 0x54, 0x4b, 0x80, 0xea, 0x1c, 0x5a, 0x67, 0x50, 0x9a, 0x36, 0x0e, 0x97, + 0xa8, 0xfa, 0x3b, 0xa4, 0xe7, 0xd3, 0x25, 0x4a, 0x7e, 0x48, 0xcf, 0xc9, 0xff, 0x60, 0x85, 0x87, + 0xdc, 0xf1, 0x6d, 0xc4, 0x98, 0xba, 0x7e, 0x65, 0xc4, 0xb0, 0x0c, 0x23, 0x15, 0xc8, 0xf6, 0x27, + 0x78, 0xbf, 0x8b, 0x56, 0xb6, 0x3f, 0x11, 0x66, 0xa0, 0x3a, 0x9e, 0xc3, 0x8e, 0xab, 0xa8, 0x55, + 0x87, 0x9c, 0xd8, 0x99, 0x90, 0x4c, 0xe0, 0xa8, 0x4b, 0x5e, 0xb2, 0x70, 0xdc, 0xea, 0x42, 0x31, + 0xdd, 0x8f, 0xaa, 0xa7, 0x2d, 0xa9, 0xa7, 0x2f, 0xd4, 0xdb, 0x02, 0x03, 0x37, 0x26, 0x12, 0x16, + 0x5a, 0xac, 0xa2, 0xd6, 0x6f, 0x1a, 0x54, 0x52, 0x8f, 0x91, 0x47, 0x45, 0xda, 0x90, 0x9f, 0xfa, + 0x9c, 0x68, 0x51, 0x65, 0xaa, 0x25, 0x44, 0x0f, 0x32, 0x96, 0xe2, 0x49, 0x1d, 0x0a, 0x67, 0x4e, + 0x1c, 0x88, 0xc6, 0xa3, 0xa7, 0x1d, 0x64, 0xac, 0x14, 0x20, 0xdb, 0xe9, 0x05, 0xd1, 0x5f, 0x7d, + 0x41, 0x0e, 0x32, 0xe9, 0x15, 0x79, 0x1f, 0x0c, 0x14, 0x8f, 0x3a, 0xc0, 0xb5, 0xc5, 0x4f, 0xa2, + 0x7a, 0x44, 0x32, 0xe6, 0xec, 0x15, 0x21, 0x1f, 0x53, 0x96, 0xf8, 0xbc, 0xf5, 0x93, 0x0e, 0x77, + 0x51, 0x9d, 0x47, 0xce, 0x78, 0xe6, 0x92, 0xaf, 0x75, 0x15, 0xed, 0x06, 0xae, 0x92, 0xbd, 0xa1, + 0xab, 0xd4, 0xc0, 0x60, 0xdc, 0x89, 0xb9, 0x32, 0x7a, 0x19, 0x90, 0x2a, 0xe8, 0x34, 0x18, 0x2a, + 0x53, 0x15, 0xc3, 0x99, 0xb9, 0x18, 0x6f, 0x36, 0x97, 0x79, 0x73, 0xcf, 0xbf, 0x83, 0xb9, 0xbf, + 0xda, 0x03, 0x0a, 0x6f, 0xe3, 0x01, 0xc5, 0x39, 0x0f, 0x68, 0xc5, 0x40, 0xe6, 0x4f, 0x41, 0xe9, + 0xa8, 0x06, 0x86, 0xd0, 0xad, 0xfc, 0xbb, 0x2c, 0x59, 0x32, 0x20, 0x75, 0x28, 0x2a, 0x89, 0x88, + 0x8b, 0x22, 0x88, 0x69, 0x3c, 0xdb, 0xb7, 0xfe, 0xc6, 0x7d, 0xb7, 0x7e, 0xd1, 0xd5, 0x47, 0xbf, + 0x71, 0xfc, 0x64, 0x76, 0xf6, 0x62, 0x81, 0x02, 0x55, 0x37, 0x47, 0x06, 0xaf, 0x57, 0x44, 0xf6, + 0x06, 0x8a, 0xd0, 0x6f, 0x4b, 0x11, 0xb9, 0x25, 0x8a, 0x30, 0x96, 0x28, 0x22, 0xff, 0x6e, 0x8a, + 0x28, 0xdc, 0x8a, 0x22, 0x8a, 0x6f, 0xa3, 0x88, 0xd2, 0xbc, 0x22, 0x12, 0x58, 0x5b, 0x38, 0x1c, + 0x25, 0x89, 0x0d, 0xc8, 0xff, 0x80, 0x88, 0xd2, 0x84, 0x8a, 0x6e, 0x4b, 0x14, 0x0f, 0xf6, 0x20, + 0x27, 0xde, 0x18, 0xa4, 0x00, 0xba, 0xb5, 0x7b, 0x5c, 0xcd, 0x90, 0x12, 0x18, 0xfb, 0x5f, 0x3e, + 0x3b, 0x7a, 0x5a, 0xd5, 0x04, 0xd6, 0x7b, 0x76, 0x58, 0xcd, 0x8a, 0xc1, 0xe1, 0x93, 0xa3, 0xaa, + 0x8e, 0x83, 0xdd, 0x6f, 0xab, 0x39, 0x52, 0x86, 0x02, 0x66, 0x7d, 0x66, 0x55, 0x8d, 0xee, 0xef, + 0x1a, 0x18, 0x3d, 0xf1, 0x8c, 0x24, 0x1f, 0x41, 0x5e, 0xfa, 0x0f, 0x59, 0x5f, 0xf4, 0x23, 0x25, + 0xb6, 0xfa, 0xc6, 0x75, 0x58, 0x6e, 0xf3, 0xa1, 0x46, 0xf6, 0x01, 0x66, 0x37, 0x82, 0x6c, 0x2e, + 0xf4, 0x7f, 0xde, 0xab, 0xea, 0xf5, 0x65, 0x94, 0xea, 0xd6, 0x63, 0x28, 0xcf, 0x35, 0x91, 0x2c, + 0xa6, 0x2e, 0xc8, 0xbe, 0x7e, 0x7f, 0x29, 0x27, 0xeb, 0x74, 0x8f, 0xa0, 0x82, 0x8f, 0x59, 0xa1, + 0x67, 0xb9, 0xb3, 0x4f, 0xa0, 0x6c, 0xd1, 0x71, 0xc8, 0x29, 0xe2, 0x64, 0xaa, 0x8f, 0xf9, 0x37, + 0x6f, 0x7d, 0xfd, 0x1a, 0xaa, 0xde, 0xc6, 0x99, 0xbd, 0xf7, 0x2e, 0xfe, 0x69, 0x64, 0x2e, 0x2e, + 0x1b, 0xda, 0xcb, 0xcb, 0x86, 0xf6, 0xf7, 0x65, 0x43, 0xfb, 0xf9, 0xaa, 0x91, 0x79, 0x79, 0xd5, + 0xc8, 0xfc, 0x79, 0xd5, 0xc8, 0x7c, 0x57, 0x50, 0x6f, 0xf0, 0x7e, 0x1e, 0x4f, 0xe8, 0xd1, 0xbf, + 0x01, 0x00, 0x00, 0xff, 0xff, 0x89, 0x92, 0xc7, 0xe6, 0xed, 0x0b, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1177,6 +1196,13 @@ func (m *SeriesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.ResponseBatchSize != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.ResponseBatchSize)) + i-- + dAtA[i] = 0x1 + i-- + dAtA[i] = 0x80 + } if m.Limit != 0 { i = encodeVarintRpc(dAtA, i, uint64(m.Limit)) i-- @@ -1616,6 +1642,27 @@ func (m *SeriesResponse_Hints) MarshalToSizedBuffer(dAtA []byte) (int, error) { } return len(dAtA) - i, nil } +func (m *SeriesResponse_Batch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesResponse_Batch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Batch != nil { + { + size, err := m.Batch.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 + } + return len(dAtA) - i, nil +} func (m *LabelNamesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -2011,6 +2058,9 @@ func (m *SeriesRequest) Size() (n int) { if m.Limit != 0 { n += 1 + sovRpc(uint64(m.Limit)) } + if m.ResponseBatchSize != 0 { + n += 2 + sovRpc(uint64(m.ResponseBatchSize)) + } return n } @@ -2151,6 +2201,18 @@ func (m *SeriesResponse_Hints) Size() (n int) { } return n } +func (m *SeriesResponse_Batch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Batch != nil { + l = m.Batch.Size() + n += 1 + l + sovRpc(uint64(l)) + } + return n +} func (m *LabelNamesRequest) Size() (n int) { if m == nil { return 0 @@ -2921,6 +2983,25 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { break } } + case 16: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field ResponseBatchSize", wireType) + } + m.ResponseBatchSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.ResponseBatchSize |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -3643,6 +3724,41 @@ func (m *SeriesResponse) Unmarshal(dAtA []byte) error { } m.Result = &SeriesResponse_Hints{v} iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Batch", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &SeriesBatch{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &SeriesResponse_Batch{v} + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 2de086bc12..a39bad66a7 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -111,6 +111,10 @@ message SeriesRequest { // limit is used to limit the number of results returned int64 limit = 15; + + // response_batch_size controls how many series should be batched together + // in a single gRPC message. A value of 0 or 1 means no batching (one series per message). + int64 response_batch_size = 16; } // QueryHints represents hints from PromQL that might help to @@ -187,6 +191,10 @@ message SeriesResponse { /// multiple SeriesResponse frames contain hints for a single Series() request and how should they /// be handled in such case (ie. merged vs keep the first/last one). google.protobuf.Any hints = 3; + + /// batch contains multiple series in a single response message. + /// This allows for more efficient transmission of series data by reducing gRPC overhead. + SeriesBatch batch = 4; } } diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 5d0a1f9867..d7d55191c5 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -123,7 +123,7 @@ func (x LabelMatcher_Type) String() string { } func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3, 0} + return fileDescriptor_121fba57de02d8e0, []int{4, 0} } type Chunk struct { @@ -203,6 +203,43 @@ func (m *Series) XXX_DiscardUnknown() { var xxx_messageInfo_Series proto.InternalMessageInfo +type SeriesBatch struct { + Series []*Series `protobuf:"bytes,1,rep,name=series,proto3" json:"series,omitempty"` +} + +func (m *SeriesBatch) Reset() { *m = SeriesBatch{} } +func (m *SeriesBatch) String() string { return proto.CompactTextString(m) } +func (*SeriesBatch) ProtoMessage() {} +func (*SeriesBatch) Descriptor() ([]byte, []int) { + return fileDescriptor_121fba57de02d8e0, []int{2} +} +func (m *SeriesBatch) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SeriesBatch) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SeriesBatch.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SeriesBatch) XXX_Merge(src proto.Message) { + xxx_messageInfo_SeriesBatch.Merge(m, src) +} +func (m *SeriesBatch) XXX_Size() int { + return m.Size() +} +func (m *SeriesBatch) XXX_DiscardUnknown() { + xxx_messageInfo_SeriesBatch.DiscardUnknown(m) +} + +var xxx_messageInfo_SeriesBatch proto.InternalMessageInfo + type AggrChunk struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` @@ -218,7 +255,7 @@ func (m *AggrChunk) Reset() { *m = AggrChunk{} } func (m *AggrChunk) String() string { return proto.CompactTextString(m) } func (*AggrChunk) ProtoMessage() {} func (*AggrChunk) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{2} + return fileDescriptor_121fba57de02d8e0, []int{3} } func (m *AggrChunk) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -258,7 +295,7 @@ func (m *LabelMatcher) Reset() { *m = LabelMatcher{} } func (m *LabelMatcher) String() string { return proto.CompactTextString(m) } func (*LabelMatcher) ProtoMessage() {} func (*LabelMatcher) Descriptor() ([]byte, []int) { - return fileDescriptor_121fba57de02d8e0, []int{3} + return fileDescriptor_121fba57de02d8e0, []int{4} } func (m *LabelMatcher) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -293,6 +330,7 @@ func init() { proto.RegisterEnum("thanos.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) proto.RegisterType((*Chunk)(nil), "thanos.Chunk") proto.RegisterType((*Series)(nil), "thanos.Series") + proto.RegisterType((*SeriesBatch)(nil), "thanos.SeriesBatch") proto.RegisterType((*AggrChunk)(nil), "thanos.AggrChunk") proto.RegisterType((*LabelMatcher)(nil), "thanos.LabelMatcher") } @@ -300,44 +338,45 @@ func init() { func init() { proto.RegisterFile("store/storepb/types.proto", fileDescriptor_121fba57de02d8e0) } var fileDescriptor_121fba57de02d8e0 = []byte{ - // 583 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4d, 0x6f, 0xd3, 0x40, - 0x10, 0xf5, 0x3a, 0x8e, 0x93, 0x0c, 0x2d, 0xb8, 0x4b, 0x05, 0x6e, 0x0f, 0x4e, 0x64, 0x84, 0x88, - 0x2a, 0xd5, 0x96, 0x0a, 0x12, 0x17, 0x2e, 0x4e, 0x65, 0x4a, 0xa5, 0xb6, 0x69, 0xb7, 0x41, 0xa0, - 0x5e, 0xaa, 0x8d, 0xbb, 0xb2, 0xad, 0xc6, 0x1f, 0xb2, 0xd7, 0x90, 0xfe, 0x0b, 0x10, 0x37, 0x0e, - 0xfc, 0x9e, 0x1c, 0x7b, 0x44, 0x1c, 0x2a, 0x68, 0xff, 0x08, 0xf2, 0xda, 0xa1, 0x44, 0xca, 0xc5, - 0x1a, 0xbf, 0xf7, 0x66, 0x66, 0xe7, 0xed, 0x2c, 0x6c, 0xe4, 0x3c, 0xc9, 0x98, 0x2d, 0xbe, 0xe9, - 0xd8, 0xe6, 0x57, 0x29, 0xcb, 0xad, 0x34, 0x4b, 0x78, 0x82, 0x55, 0x1e, 0xd0, 0x38, 0xc9, 0x37, - 0xd7, 0xfd, 0xc4, 0x4f, 0x04, 0x64, 0x97, 0x51, 0xc5, 0x6e, 0xd6, 0x89, 0x13, 0x3a, 0x66, 0x93, - 0xc5, 0x44, 0xf3, 0x3b, 0x82, 0xe6, 0x6e, 0x50, 0xc4, 0x97, 0x78, 0x0b, 0x94, 0x92, 0xd0, 0x51, - 0x0f, 0xf5, 0x1f, 0xee, 0x3c, 0xb1, 0xaa, 0x8a, 0x96, 0x20, 0x2d, 0x37, 0xf6, 0x92, 0x8b, 0x30, - 0xf6, 0x89, 0xd0, 0x60, 0x0c, 0xca, 0x05, 0xe5, 0x54, 0x97, 0x7b, 0xa8, 0xbf, 0x42, 0x44, 0x8c, - 0x75, 0x50, 0x02, 0x9a, 0x07, 0x7a, 0xa3, 0x87, 0xfa, 0xca, 0x40, 0x99, 0xdd, 0x74, 0x11, 0x11, - 0x88, 0xf9, 0x1a, 0xda, 0xf3, 0x7c, 0xdc, 0x82, 0xc6, 0xc7, 0x21, 0xd1, 0x24, 0xbc, 0x0a, 0x9d, - 0x77, 0xfb, 0xa7, 0xa3, 0xe1, 0x1e, 0x71, 0x0e, 0x35, 0x84, 0x1f, 0xc3, 0xa3, 0xb7, 0x07, 0x43, - 0x67, 0x74, 0x7e, 0x0f, 0xca, 0xe6, 0x0f, 0x04, 0xea, 0x29, 0xcb, 0x42, 0x96, 0x63, 0x0f, 0x54, - 0x71, 0xfc, 0x5c, 0x47, 0xbd, 0x46, 0xff, 0xc1, 0xce, 0xea, 0xfc, 0x7c, 0x07, 0x25, 0x3a, 0x78, - 0x33, 0xbb, 0xe9, 0x4a, 0xbf, 0x6e, 0xba, 0xaf, 0xfc, 0x90, 0x07, 0xc5, 0xd8, 0xf2, 0x92, 0xc8, - 0xae, 0x04, 0xdb, 0x61, 0x52, 0x47, 0x76, 0x7a, 0xe9, 0xdb, 0x0b, 0x4e, 0x58, 0x67, 0x22, 0x9b, - 0xd4, 0xa5, 0xb1, 0x0d, 0xaa, 0x57, 0x8e, 0x9b, 0xeb, 0xb2, 0x68, 0xb2, 0x36, 0x6f, 0xe2, 0xf8, - 0x7e, 0x26, 0x8c, 0x10, 0x73, 0x49, 0xa4, 0x96, 0x99, 0xdf, 0x64, 0xe8, 0xfc, 0xe3, 0xf0, 0x06, - 0xb4, 0xa3, 0x30, 0x3e, 0xe7, 0x61, 0x54, 0xb9, 0xd8, 0x20, 0xad, 0x28, 0x8c, 0x47, 0x61, 0xc4, - 0x04, 0x45, 0xa7, 0x15, 0x25, 0xd7, 0x14, 0x9d, 0x0a, 0xaa, 0x0b, 0x8d, 0x8c, 0x7e, 0x16, 0xb6, - 0xfd, 0x37, 0x96, 0xa8, 0x48, 0x4a, 0x06, 0x3f, 0x83, 0xa6, 0x97, 0x14, 0x31, 0xd7, 0x95, 0x65, - 0x92, 0x8a, 0x2b, 0xab, 0xe4, 0x45, 0xa4, 0x37, 0x97, 0x56, 0xc9, 0x8b, 0xa8, 0x14, 0x44, 0x61, - 0xac, 0xab, 0x4b, 0x05, 0x51, 0x18, 0x0b, 0x01, 0x9d, 0xea, 0xad, 0xe5, 0x02, 0x3a, 0xc5, 0x2f, - 0xa0, 0x25, 0x7a, 0xb1, 0x4c, 0x6f, 0x2f, 0x13, 0xcd, 0x59, 0xf3, 0x2b, 0x82, 0x15, 0x61, 0xec, - 0x21, 0xe5, 0x5e, 0xc0, 0x32, 0xbc, 0xbd, 0xb0, 0x5a, 0x1b, 0x0b, 0x57, 0x57, 0x6b, 0xac, 0xd1, - 0x55, 0xca, 0xee, 0xb7, 0x2b, 0xa6, 0xb5, 0x51, 0x1d, 0x22, 0x62, 0xbc, 0x0e, 0xcd, 0x4f, 0x74, - 0x52, 0x30, 0xe1, 0x53, 0x87, 0x54, 0x3f, 0x66, 0x1f, 0x94, 0x32, 0x0f, 0xab, 0x20, 0xbb, 0x27, - 0x9a, 0x54, 0x6e, 0xd7, 0x91, 0x7b, 0xa2, 0xa1, 0x12, 0x20, 0xae, 0x26, 0x0b, 0x80, 0xb8, 0x5a, - 0x63, 0xcb, 0x81, 0xa7, 0xc7, 0x34, 0xe3, 0x21, 0x9d, 0x10, 0x96, 0xa7, 0x49, 0x9c, 0xb3, 0x53, - 0x9e, 0x51, 0xce, 0xfc, 0x2b, 0xdc, 0x06, 0xe5, 0x83, 0x43, 0x8e, 0x34, 0x09, 0x77, 0xa0, 0xe9, - 0x0c, 0x86, 0x64, 0xa4, 0x21, 0xbc, 0x06, 0xab, 0x7b, 0x64, 0xf8, 0xfe, 0xf8, 0x9c, 0xb8, 0xc7, - 0x07, 0xfb, 0xbb, 0x8e, 0x26, 0x0f, 0x9e, 0xcf, 0xfe, 0x18, 0xd2, 0xec, 0xd6, 0x40, 0xd7, 0xb7, - 0x06, 0xfa, 0x7d, 0x6b, 0xa0, 0x2f, 0x77, 0x86, 0x74, 0x7d, 0x67, 0x48, 0x3f, 0xef, 0x0c, 0xe9, - 0xac, 0x55, 0x3f, 0xcb, 0xb1, 0x2a, 0x1e, 0xd6, 0xcb, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xee, - 0x6c, 0xcf, 0x99, 0xae, 0x03, 0x00, 0x00, + // 605 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x53, 0x4d, 0x6f, 0xd3, 0x30, + 0x18, 0x8e, 0xd3, 0x34, 0x6d, 0xdf, 0x7d, 0x90, 0x99, 0x09, 0xb2, 0x1d, 0xd2, 0x2a, 0x08, 0xa8, + 0x26, 0x2d, 0x91, 0x06, 0x88, 0x0b, 0x97, 0x74, 0x2a, 0x63, 0xd2, 0xb6, 0x6e, 0x5e, 0x11, 0x68, + 0x97, 0xc9, 0xed, 0xac, 0x34, 0x5a, 0xf3, 0xa1, 0xd8, 0x81, 0xee, 0x5f, 0x80, 0xb8, 0x71, 0xe0, + 0xf7, 0xf4, 0xb8, 0x23, 0xe2, 0x30, 0xc1, 0xf6, 0x47, 0x50, 0x9c, 0x94, 0x51, 0xa9, 0x97, 0xe8, + 0xcd, 0xfb, 0x7c, 0xf8, 0xf5, 0x63, 0x1b, 0x36, 0xb8, 0x88, 0x53, 0xe6, 0xca, 0x6f, 0x32, 0x70, + 0xc5, 0x55, 0xc2, 0xb8, 0x93, 0xa4, 0xb1, 0x88, 0xb1, 0x2e, 0x46, 0x34, 0x8a, 0xf9, 0xe6, 0xba, + 0x1f, 0xfb, 0xb1, 0x6c, 0xb9, 0x79, 0x55, 0xa0, 0x9b, 0xa5, 0x70, 0x4c, 0x07, 0x6c, 0x3c, 0x2f, + 0xb4, 0xbf, 0x23, 0xa8, 0xee, 0x8e, 0xb2, 0xe8, 0x12, 0x6f, 0x81, 0x96, 0x03, 0x26, 0x6a, 0xa1, + 0xf6, 0xea, 0xce, 0x23, 0xa7, 0x70, 0x74, 0x24, 0xe8, 0x74, 0xa3, 0x61, 0x7c, 0x11, 0x44, 0x3e, + 0x91, 0x1c, 0x8c, 0x41, 0xbb, 0xa0, 0x82, 0x9a, 0x6a, 0x0b, 0xb5, 0x97, 0x89, 0xac, 0xb1, 0x09, + 0xda, 0x88, 0xf2, 0x91, 0x59, 0x69, 0xa1, 0xb6, 0xd6, 0xd1, 0xa6, 0x37, 0x4d, 0x44, 0x64, 0xc7, + 0x7e, 0x0d, 0xf5, 0x99, 0x1e, 0xd7, 0xa0, 0xf2, 0xb1, 0x47, 0x0c, 0x05, 0xaf, 0x40, 0xe3, 0xdd, + 0xfe, 0x69, 0xbf, 0xb7, 0x47, 0xbc, 0x43, 0x03, 0xe1, 0x87, 0xf0, 0xe0, 0xed, 0x41, 0xcf, 0xeb, + 0x9f, 0xdf, 0x37, 0x55, 0xfb, 0x07, 0x02, 0xfd, 0x94, 0xa5, 0x01, 0xe3, 0x78, 0x08, 0xba, 0x1c, + 0x9f, 0x9b, 0xa8, 0x55, 0x69, 0x2f, 0xed, 0xac, 0xcc, 0xe6, 0x3b, 0xc8, 0xbb, 0x9d, 0x37, 0xd3, + 0x9b, 0xa6, 0xf2, 0xeb, 0xa6, 0xf9, 0xd2, 0x0f, 0xc4, 0x28, 0x1b, 0x38, 0xc3, 0x38, 0x74, 0x0b, + 0xc2, 0x76, 0x10, 0x97, 0x95, 0x9b, 0x5c, 0xfa, 0xee, 0x5c, 0x12, 0xce, 0x99, 0x54, 0x93, 0xd2, + 0x1a, 0xbb, 0xa0, 0x0f, 0xf3, 0xed, 0x72, 0x53, 0x95, 0x8b, 0xac, 0xcd, 0x16, 0xf1, 0x7c, 0x3f, + 0x95, 0x41, 0xc8, 0x7d, 0x29, 0xa4, 0xa4, 0xd9, 0xaf, 0x60, 0xa9, 0x98, 0xaf, 0x43, 0xc5, 0x70, + 0x84, 0x9f, 0x81, 0xce, 0xe5, 0x6f, 0x39, 0xe4, 0xea, 0x4c, 0x5f, 0x90, 0x48, 0x89, 0xda, 0xdf, + 0x54, 0x68, 0xfc, 0xb3, 0xc4, 0x1b, 0x50, 0x0f, 0x83, 0xe8, 0x5c, 0x04, 0x61, 0x11, 0x7e, 0x85, + 0xd4, 0xc2, 0x20, 0xea, 0x07, 0x21, 0x93, 0x10, 0x9d, 0x14, 0x90, 0x5a, 0x42, 0x74, 0x22, 0xa1, + 0x26, 0x54, 0x52, 0xfa, 0x59, 0xa6, 0xfd, 0x5f, 0x1a, 0xd2, 0x91, 0xe4, 0x08, 0x7e, 0x02, 0xd5, + 0x61, 0x9c, 0x45, 0xc2, 0xd4, 0x16, 0x51, 0x0a, 0x2c, 0x77, 0xe1, 0x59, 0x68, 0x56, 0x17, 0xba, + 0xf0, 0x2c, 0xcc, 0x09, 0x61, 0x10, 0x99, 0xfa, 0x42, 0x42, 0x18, 0x44, 0x92, 0x40, 0x27, 0x66, + 0x6d, 0x31, 0x81, 0x4e, 0xf0, 0x73, 0xa8, 0xc9, 0xb5, 0x58, 0x6a, 0xd6, 0x17, 0x91, 0x66, 0xa8, + 0xfd, 0x15, 0xc1, 0xb2, 0x3c, 0x8f, 0xc3, 0x3c, 0x4c, 0x96, 0xe2, 0xed, 0xb9, 0x1b, 0xb9, 0x31, + 0x77, 0xe2, 0x25, 0xc7, 0xe9, 0x5f, 0x25, 0xec, 0xfe, 0x52, 0x46, 0xb4, 0x0c, 0xaa, 0x41, 0x64, + 0x8d, 0xd7, 0xa1, 0xfa, 0x89, 0x8e, 0x33, 0x26, 0x73, 0x6a, 0x90, 0xe2, 0xc7, 0x6e, 0x83, 0x96, + 0xeb, 0xb0, 0x0e, 0x6a, 0xf7, 0xc4, 0x50, 0xf2, 0x4b, 0x79, 0xd4, 0x3d, 0x31, 0x50, 0xde, 0x20, + 0x5d, 0x43, 0x95, 0x0d, 0xd2, 0x35, 0x2a, 0x5b, 0x1e, 0x3c, 0x3e, 0xa6, 0xa9, 0x08, 0xe8, 0x98, + 0x30, 0x9e, 0xc4, 0x11, 0x67, 0xa7, 0x22, 0xa5, 0x82, 0xf9, 0x57, 0xb8, 0x0e, 0xda, 0x07, 0x8f, + 0x1c, 0x19, 0x0a, 0x6e, 0x40, 0xd5, 0xeb, 0xf4, 0x48, 0xdf, 0x40, 0x78, 0x0d, 0x56, 0xf6, 0x48, + 0xef, 0xfd, 0xf1, 0x39, 0xe9, 0x1e, 0x1f, 0xec, 0xef, 0x7a, 0x86, 0xda, 0x79, 0x3a, 0xfd, 0x63, + 0x29, 0xd3, 0x5b, 0x0b, 0x5d, 0xdf, 0x5a, 0xe8, 0xf7, 0xad, 0x85, 0xbe, 0xdc, 0x59, 0xca, 0xf5, + 0x9d, 0xa5, 0xfc, 0xbc, 0xb3, 0x94, 0xb3, 0x5a, 0xf9, 0x9a, 0x07, 0xba, 0x7c, 0x8f, 0x2f, 0xfe, + 0x06, 0x00, 0x00, 0xff, 0xff, 0xda, 0x77, 0x43, 0x54, 0xe5, 0x03, 0x00, 0x00, } func (m *Chunk) Marshal() (dAtA []byte, err error) { @@ -431,6 +470,43 @@ func (m *Series) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *SeriesBatch) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SeriesBatch) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *SeriesBatch) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.Series) > 0 { + for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Series[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTypes(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *AggrChunk) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -629,6 +705,21 @@ func (m *Series) Size() (n int) { return n } +func (m *SeriesBatch) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Series) > 0 { + for _, e := range m.Series { + l = e.Size() + n += 1 + l + sovTypes(uint64(l)) + } + } + return n +} + func (m *AggrChunk) Size() (n int) { if m == nil { return 0 @@ -934,6 +1025,90 @@ func (m *Series) Unmarshal(dAtA []byte) error { } return nil } +func (m *SeriesBatch) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SeriesBatch: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SeriesBatch: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Series", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTypes + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Series = append(m.Series, &Series{}) + if err := m.Series[len(m.Series)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTypes(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTypes + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AggrChunk) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/pkg/store/storepb/types.proto b/pkg/store/storepb/types.proto index 42dfe883c6..32f676a161 100644 --- a/pkg/store/storepb/types.proto +++ b/pkg/store/storepb/types.proto @@ -36,6 +36,10 @@ message Series { repeated AggrChunk chunks = 2 [(gogoproto.nullable) = false]; } +message SeriesBatch { + repeated Series series = 1; +} + message AggrChunk { int64 min_time = 1; int64 max_time = 2; diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 4d6b8c9ef2..8728f2f847 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -251,7 +251,7 @@ func (s *TSDBStore) SeriesLocal(ctx context.Context, r *storepb.SeriesRequest) ( func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { var srv flushableServer if fs, ok := seriesSrv.(flushableServer); !ok { - srv = newFlushableServer(seriesSrv, sortingStrategyStore) + srv = newFlushableServer(newBatchableServer(seriesSrv, int(r.ResponseBatchSize)), sortingStrategyStore) } else { srv = fs }