Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions frac/processor/search_params.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package processor

import (
"go.uber.org/zap/zapcore"

"github.com/ozontech/seq-db/parser"
"github.com/ozontech/seq-db/seq"
)
Expand All @@ -13,6 +15,23 @@ type AggQuery struct {
Interval int64
}

func (q AggQuery) MarshalLogObject(enc zapcore.ObjectEncoder) error {
if q.Field != nil {
enc.AddString("field", q.Field.Field)
}
if q.GroupBy != nil {
enc.AddString("groupBy", q.GroupBy.Field)
}
enc.AddString("func", q.Func.String())
if len(q.Quantiles) != 0 {
enc.AddInt("quantiles_count", len(q.Quantiles))
}
if q.Interval != 0 {
enc.AddInt64("interval", q.Interval)
}
return nil
}

type SearchParams struct {
AST *parser.ASTNode `json:"-"`

Expand All @@ -39,3 +58,13 @@ func (p *SearchParams) HasAgg() bool {
func (p *SearchParams) IsScanAllRequest() bool {
return p.WithTotal || p.HasAgg() || p.HasHist()
}

func (p *SearchParams) Type() string {
if p.HasAgg() {
return "agg"
} else if p.HasHist() {
return "hist"
}

return "reg"
}
57 changes: 56 additions & 1 deletion fracmanager/searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"go.uber.org/zap"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/frac/processor"
Expand All @@ -17,9 +19,14 @@ import (
"github.com/ozontech/seq-db/util"
)

const (
maxFracsSlowSearchLog = 10
)

type SearcherCfg struct {
MaxFractionHits int // the maximum number of fractions used in the search
FractionsPerIteration int
SlowLogThreshold time.Duration
}

type Searcher struct {
Expand All @@ -38,6 +45,7 @@ func NewSearcher(maxWorkersNum int, cfg SearcherCfg) *Searcher {
}

func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params processor.SearchParams, tr *querytracer.Tracer) (*seq.QPR, error) {
start := time.Now()
remainingFracs, err := s.prepareFracs(fracs, params)
if err != nil {
return nil, err
Expand All @@ -59,13 +67,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params

var totalSearchTimeNanos int64
var totalWaitTimeNanos int64
totalFracsFound := 0
totalFracsSkipped := 0
var fracsFound []string
var fracsSkipped []string

for len(remainingFracs) > 0 && (scanAll || params.Limit > 0) {
subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, remainingFracs.Shift(fracsChunkSize), params)
chunk := remainingFracs.Shift(fracsChunkSize)

subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, chunk, params)
if err != nil {
return nil, err
}

for i, qpr := range subQPRs {
if !qpr.Empty() {
totalFracsFound++
if len(fracsFound) < maxFracsSlowSearchLog {
fracsFound = append(fracsFound, chunk[i].Info().Name())
}
} else {
totalFracsSkipped++
if len(fracsSkipped) < maxFracsSlowSearchLog {
fracsSkipped = append(fracsSkipped, chunk[i].Info().Name())
}
}
}

totalSearchTimeNanos += searchTimeNanos
totalWaitTimeNanos += waitTimeNanos

Expand All @@ -91,6 +119,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params
}

searchSubSearches.Observe(float64(subSearchesCnt))

took := time.Since(start)
if s.cfg.SlowLogThreshold != 0 && took >= s.cfg.SlowLogThreshold {
fields := []zap.Field{
zap.Int64("took_ms", took.Milliseconds()),
zap.String("type", params.Type()),
zap.String("request", params.AST.SeqQLString()),
zap.Uint64("hist_interval", params.HistInterval),
zap.String("from", params.From.String()),
zap.String("to", params.To.String()),
zap.Uint64("range", seq.MIDToSeconds(params.To)-seq.MIDToSeconds(params.From)),
zap.String("offset_id", params.OffsetId.String()),
zap.Int("limit", params.Limit),
zap.Bool("with_total", params.WithTotal),
zap.Int("total_fracs_found", totalFracsFound),
zap.Strings("fracs_found", fracsFound),
zap.Int("total_fracs_skipped", totalFracsSkipped),
zap.Strings("fracs_skipped", fracsSkipped),
zap.Uint64("total", total.Total),
}

for i, agg := range params.AggQ {
fields = append(fields, zap.Object(fmt.Sprintf("agg_%d", i), agg))
}

logger.Warn("slow search", fields...)
}
return total, nil

}
Expand Down
27 changes: 27 additions & 0 deletions seq/qpr.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type QPR struct {
Errors []ErrorSource
}

func (q *QPR) Empty() bool {
return len(q.IDs) == 0 && len(q.Histogram) == 0 && len(q.Aggs) == 0
}

func (q *QPR) Aggregate(args []AggregateArgs) []AggregationResult {
allAggregations := make([]AggregationResult, len(q.Aggs))
for i, agg := range q.Aggs {
Expand Down Expand Up @@ -107,6 +111,29 @@ const (
AggFuncUniqueCount
)

func (f AggFunc) String() string {
switch f {
case AggFuncCount:
return "count"
case AggFuncSum:
return "sum"
case AggFuncMin:
return "min"
case AggFuncMax:
return "max"
case AggFuncAvg:
return "avg"
case AggFuncQuantile:
return "quantile"
case AggFuncUnique:
return "unique"
case AggFuncUniqueCount:
return "unique_count"
default:
return "unknown"
}
}

type AggBin struct {
MID MID
Token string
Expand Down
4 changes: 4 additions & 0 deletions seq/seq.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func MIDToMillis(t MID) uint64 {
return uint64(t) / uint64(time.Millisecond)
}

func MIDToSeconds(t MID) uint64 {
return uint64(t) / uint64(time.Second)
}

func MIDToCeilingMillis(t MID) uint64 {
millis := uint64(t) / uint64(time.Millisecond)
nanosPartOfMilli := uint64(t) % uint64(time.Millisecond)
Expand Down
15 changes: 0 additions & 15 deletions storeapi/grpc_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,21 +225,6 @@ func (g *GrpcV1) doSearch(
}
}

took := time.Since(start)
if g.config.Search.LogThreshold != 0 && took >= g.config.Search.LogThreshold {
logger.Warn("slow search",
zap.Int64("took_ms", took.Milliseconds()),
zap.Object("req", (*searchRequestMarshaler)(req)),
zap.Uint64("found", qpr.Total),
zap.String("from", seq.MillisToMID(uint64(req.From)).String()),
zap.String("to", seq.MillisToMID(uint64(req.To)).String()),
zap.Int64("offset", req.Offset),
zap.String("offset_id", req.OffsetId),
zap.Int64("size", req.Size),
zap.Bool("with_total", req.WithTotal),
)
}

return buildSearchResponse(qpr), nil
}

Expand Down
1 change: 1 addition & 0 deletions storeapi/grpc_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvi
searcher: fracmanager.NewSearcher(cfg.Search.WorkersCount, fracmanager.SearcherCfg{
MaxFractionHits: cfg.Search.MaxFractionHits,
FractionsPerIteration: cfg.Search.FractionsPerIteration,
SlowLogThreshold: cfg.Search.LogThreshold,
}),
},
fetchData: fetchData{
Expand Down
Loading