Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ func registerQuery(app *extkingpin.App) {
rewriteAggregationLabelStrategy := cmd.Flag("query.aggregation-label-strategy", "The strategy to use when rewriting aggregation labels. Used during aggregator migration only.").Default(string(query.NoopLabelRewriter)).Hidden().Enum(string(query.NoopLabelRewriter), string(query.UpsertLabelRewriter), string(query.InsertOnlyLabelRewriter))
rewriteAggregationLabelTo := cmd.Flag("query.aggregation-label-value-override", "The value override for aggregation label. If set to x, all queries on aggregated metrics will have a `__agg_rule_type__=x` matcher. If empty, this behavior is disabled. Default is empty.").Hidden().Default("").String()

seriesResponseBatchSize := cmd.Flag("query.series-response-batch-size", "How many Series can be batched in one gRPC message. A value of 0 or 1 means no batching (one series per message). Higher values reduce gRPC overhead but increase memory usage.").Hidden().Default("1").Int()

lazyRetrievalMaxBufferedResponses := cmd.Flag("query.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").Int()

Expand Down Expand Up @@ -411,6 +413,7 @@ func registerQuery(app *extkingpin.App) {
*rewriteAggregationLabelStrategy,
*rewriteAggregationLabelTo,
*lazyRetrievalMaxBufferedResponses,
*seriesResponseBatchSize,
time.Duration(*grpcStoreClientKeepAlivePingInterval),
blockedMetricPatterns,
*forwardPartialStrategy,
Expand Down Expand Up @@ -502,6 +505,7 @@ func runQuery(
rewriteAggregationLabelStrategy string,
rewriteAggregationLabelTo string,
lazyRetrievalMaxBufferedResponses int,
seriesResponseBatchSize int,
grpcStoreClientKeepAlivePingInterval time.Duration,
blockedMetricPatterns []string,
forwardPartialStrategy bool,
Expand Down Expand Up @@ -655,6 +659,7 @@ func runQuery(
DeduplicationFunc: queryDeduplicationFunc,
RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy,
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
SeriesResponseBatchSize: seriesResponseBatchSize,
}
level.Info(logger).Log("msg", "databricks querier features", "opts", fmt.Sprintf("%+v", opts))
queryableCreator = query.NewQueryableCreatorWithOptions(
Expand Down
15 changes: 15 additions & 0 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Options struct {
DeduplicationFunc string
RewriteAggregationLabelStrategy string
RewriteAggregationLabelTo string
SeriesResponseBatchSize int
}

// NewQueryableCreator creates QueryableCreator.
Expand Down Expand Up @@ -169,6 +170,7 @@ type querier struct {
selectTimeout time.Duration
shardInfo *storepb.ShardInfo
seriesStatsReporter seriesStatsReporter
seriesResponseBatchSize int

aggregationLabelRewriter *AggregationLabelRewriter
}
Expand Down Expand Up @@ -248,6 +250,7 @@ func newQuerierWithOpts(
skipChunks: skipChunks,
shardInfo: shardInfo,
seriesStatsReporter: seriesStatsReporter,
seriesResponseBatchSize: opts.SeriesResponseBatchSize,

aggregationLabelRewriter: aggregationLabelRewriter,
}
Expand Down Expand Up @@ -279,6 +282,17 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
return nil
}

if b := r.GetBatch(); b != nil {
for _, series := range b.Series {
if series == nil {
continue
}
s.seriesSet = append(s.seriesSet, *series)
}
s.seriesSetStats.Count(r)
return nil
}

// Unsupported field, skip.
return nil
}
Expand Down Expand Up @@ -410,6 +424,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
ShardInfo: q.shardInfo,
PartialResponseStrategy: q.partialResponseStrategy,
SkipChunks: q.skipChunks,
ResponseBatchSize: int64(q.seriesResponseBatchSize),
}
if q.isDedupEnabled() {
// Soft ask to sort without replica labels and push them at the end of labelset.
Expand Down
60 changes: 60 additions & 0 deletions pkg/store/batchable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package store

import (
"github.com/thanos-io/thanos/pkg/store/storepb"
)

// batchableServer wraps a storepb.Store_SeriesServer and batches series responses
// to reduce gRPC overhead. When the batch size is reached, it sends all buffered
// series as a single SeriesBatch response.
type batchableServer struct {
storepb.Store_SeriesServer
batchSize int
series []*storepb.Series
}

func newBatchableServer(upstream storepb.Store_SeriesServer, batchSize int) storepb.Store_SeriesServer {
if batchSize <= 1 {
return &passthroughServer{Store_SeriesServer: upstream}
}
return &batchableServer{
Store_SeriesServer: upstream,
batchSize: batchSize,
series: make([]*storepb.Series, 0, batchSize),
}
}

// Send buffers series responses and sends them as batches when the buffer is full.
// Non-series responses (warnings, hints) trigger an immediate flush before being sent.
func (b *batchableServer) Send(response *storepb.SeriesResponse) error {
series := response.GetSeries()
if series == nil {
// Non-series response (warning/hints): flush batch first, then send
if err := b.Flush(); err != nil {
return err
}
return b.Store_SeriesServer.Send(response)
}

b.series = append(b.series, series)
if len(b.series) >= b.batchSize {
return b.Flush()
}
return nil
}

// Flush sends any buffered series as a batch response.
// Implements the flushableServer interface.
func (b *batchableServer) Flush() error {
if len(b.series) == 0 {
return nil
}
if err := b.Store_SeriesServer.Send(storepb.NewBatchResponse(b.series)); err != nil {
return err
}
b.series = b.series[:0]
return nil
}
2 changes: 1 addition & 1 deletion pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ func debugFoundBlockSetOverview(logger log.Logger, mint, maxt, maxResolutionMill

// Series implements the storepb.StoreServer interface.
func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) (err error) {
srv := newFlushableServer(seriesSrv, sortingStrategyNone)
srv := newFlushableServer(newBatchableServer(seriesSrv, int(req.ResponseBatchSize)), sortingStrategyNone)

if s.queryGate != nil {
tracing.DoInSpan(srv.Context(), "store_query_gate_ismyturn", func(ctx context.Context) {
Expand Down
11 changes: 10 additions & 1 deletion pkg/store/flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ type passthroughServer struct {
storepb.Store_SeriesServer
}

func (p *passthroughServer) Flush() error { return nil }
func (p *passthroughServer) Flush() error {
if f, ok := p.Store_SeriesServer.(flushableServer); ok {
return f.Flush()
}
return nil
}

// resortingServer is a flushableServer that resorts all series by their labels.
// This is required if replica labels are stored internally in a TSDB.
Expand Down Expand Up @@ -89,5 +94,9 @@ func (r *resortingServer) Flush() error {
return err
}
}
// Delegate to the wrapped server if it's flushable (e.g., batchableServer)
if f, ok := r.Store_SeriesServer.(flushableServer); ok {
return f.Flush()
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (p *PrometheusStore) putBuffer(b *[]byte) {

// Series returns all series for a requested time range and label matcher.
func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
s := newFlushableServer(seriesSrv, sortingStrategyStore)
s := newFlushableServer(newBatchableServer(seriesSrv, int(r.ResponseBatchSize)), sortingStrategyStore)

extLset := p.externalLabelsFn()

Expand Down
8 changes: 7 additions & 1 deletion pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
if s.debugLogging {
reqLogger = log.With(reqLogger, "request", originalRequest.String())
}
srv = newBatchableServer(srv, int(originalRequest.ResponseBatchSize))

match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache)
if err != nil {
Expand Down Expand Up @@ -502,6 +503,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
PartialResponseStrategy: originalRequest.PartialResponseStrategy,
ShardInfo: originalRequest.ShardInfo,
WithoutReplicaLabels: originalRequest.WithoutReplicaLabels,
ResponseBatchSize: originalRequest.ResponseBatchSize,
}
if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA && !s.forwardPartialStrategy {
// Do not forward this field as it might cause data loss.
Expand Down Expand Up @@ -699,7 +701,11 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}

if f, ok := srv.(flushableServer); ok {
if err := f.Flush(); err != nil {
return err
}
}
return nil
}

Expand Down
42 changes: 42 additions & 0 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,32 @@ func newLazyRespSet(
numResponses++
bytesProcessed += resp.Size()

// Handle batch responses by unpacking them into individual series responses
if batch := resp.GetBatch(); batch != nil {
for _, series := range batch.Series {
if series == nil {
continue
}
if applySharding && !shardMatcher.MatchesZLabels(series.Labels) {
continue
}
seriesResp := storepb.NewSeriesResponse(series)
seriesStats.Count(seriesResp)

l.bufferedResponsesMtx.Lock()
for l.isFull() && !l.closed {
l.bufferSlotEvent.Wait()
}
if !l.closed {
l.bufferedResponses[l.ringTail] = seriesResp
l.ringTail = (l.ringTail + 1) % l.fixedBufferSize
l.dataOrFinishEvent.Signal()
}
l.bufferedResponsesMtx.Unlock()
}
return true
}

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}
Expand Down Expand Up @@ -740,6 +766,22 @@ func newEagerRespSet(
numResponses++
bytesProcessed += resp.Size()

// Handle batch responses by unpacking them into individual series responses
if batch := resp.GetBatch(); batch != nil {
for _, series := range batch.Series {
if series == nil {
continue
}
if applySharding && !shardMatcher.MatchesZLabels(series.Labels) {
continue
}
seriesResp := storepb.NewSeriesResponse(series)
seriesStats.Count(seriesResp)
l.bufferedResponses = append(l.bufferedResponses, seriesResp)
}
return true
}

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}
Expand Down
39 changes: 32 additions & 7 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
}
}

func NewBatchResponse(batch []*Series) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Batch{
Batch: &SeriesBatch{
Series: batch,
},
},
}
}

func GRPCCodeFromWarn(warn string) codes.Code {
if strings.Contains(warn, "rpc error: code = ResourceExhausted") {
return codes.ResourceExhausted
Expand Down Expand Up @@ -535,13 +545,7 @@ func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) {
}
}

func (c *SeriesStatsCounter) Count(r *SeriesResponse) {
//aggregate # of bytes fetched
c.Bytes += uint64(r.Size())
if r.GetSeries() == nil {
return
}
series := r.GetSeries()
func (c *SeriesStatsCounter) countSingleSeries(series *Series) {
c.CountSeries(series.Labels)
for _, chk := range series.Chunks {
if chk.Raw != nil {
Expand Down Expand Up @@ -576,6 +580,27 @@ func (c *SeriesStatsCounter) Count(r *SeriesResponse) {
}
}

func (c *SeriesStatsCounter) Count(r *SeriesResponse) {
//aggregate # of bytes fetched
c.Bytes += uint64(r.Size())

// Handle batch responses
if batch := r.GetBatch(); batch != nil {
for _, series := range batch.Series {
if series == nil {
continue
}
c.countSingleSeries(series)
}
return
}

if r.GetSeries() == nil {
return
}
c.countSingleSeries(r.GetSeries())
}

func (m *SeriesRequest) ToPromQL() string {
return m.QueryHints.toPromQL(m.Matchers)
}
Expand Down
Loading
Loading