diff --git a/frac/processor/search_params.go b/frac/processor/search_params.go index e383c9e2..3d5d0e93 100644 --- a/frac/processor/search_params.go +++ b/frac/processor/search_params.go @@ -1,6 +1,8 @@ package processor import ( + "go.uber.org/zap/zapcore" + "github.com/ozontech/seq-db/parser" "github.com/ozontech/seq-db/seq" ) @@ -13,6 +15,23 @@ type AggQuery struct { Interval int64 } +func (q AggQuery) MarshalLogObject(enc zapcore.ObjectEncoder) error { + if q.Field != nil { + enc.AddString("field", q.Field.Field) + } + if q.GroupBy != nil { + enc.AddString("groupBy", q.GroupBy.Field) + } + enc.AddString("func", q.Func.String()) + if len(q.Quantiles) != 0 { + enc.AddInt("quantiles_count", len(q.Quantiles)) + } + if q.Interval != 0 { + enc.AddInt64("interval", q.Interval) + } + return nil +} + type SearchParams struct { AST *parser.ASTNode `json:"-"` @@ -39,3 +58,13 @@ func (p *SearchParams) HasAgg() bool { func (p *SearchParams) IsScanAllRequest() bool { return p.WithTotal || p.HasAgg() || p.HasHist() } + +func (p *SearchParams) Type() string { + if p.HasAgg() { + return "agg" + } else if p.HasHist() { + return "hist" + } + + return "reg" +} diff --git a/fracmanager/searcher.go b/fracmanager/searcher.go index d69277fb..6cc688cb 100644 --- a/fracmanager/searcher.go +++ b/fracmanager/searcher.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/frac/processor" @@ -17,9 +19,14 @@ import ( "github.com/ozontech/seq-db/util" ) +const ( + maxFracsSlowSearchLog = 10 +) + type SearcherCfg struct { MaxFractionHits int // the maximum number of fractions used in the search FractionsPerIteration int + SlowLogThreshold time.Duration } type Searcher struct { @@ -38,6 +45,7 @@ func NewSearcher(maxWorkersNum int, cfg SearcherCfg) *Searcher { } func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params processor.SearchParams, tr *querytracer.Tracer) (*seq.QPR, error) { + start := time.Now() remainingFracs, err := s.prepareFracs(fracs, params) if err != nil { return nil, err @@ -59,13 +67,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params var totalSearchTimeNanos int64 var totalWaitTimeNanos int64 + totalFracsFound := 0 + totalFracsSkipped := 0 + var fracsFound []string + var fracsSkipped []string for len(remainingFracs) > 0 && (scanAll || params.Limit > 0) { - subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, remainingFracs.Shift(fracsChunkSize), params) + chunk := remainingFracs.Shift(fracsChunkSize) + + subQPRs, searchTimeNanos, waitTimeNanos, err := s.searchDocsAsync(ctx, chunk, params) if err != nil { return nil, err } + for i, qpr := range subQPRs { + if !qpr.Empty() { + totalFracsFound++ + if len(fracsFound) < maxFracsSlowSearchLog { + fracsFound = append(fracsFound, chunk[i].Info().Name()) + } + } else { + totalFracsSkipped++ + if len(fracsSkipped) < maxFracsSlowSearchLog { + fracsSkipped = append(fracsSkipped, chunk[i].Info().Name()) + } + } + } + totalSearchTimeNanos += searchTimeNanos totalWaitTimeNanos += waitTimeNanos @@ -91,6 +119,33 @@ func (s *Searcher) SearchDocs(ctx context.Context, fracs []frac.Fraction, params } searchSubSearches.Observe(float64(subSearchesCnt)) + + took := time.Since(start) + if s.cfg.SlowLogThreshold != 0 && took >= s.cfg.SlowLogThreshold { + fields := []zap.Field{ + zap.Int64("took_ms", took.Milliseconds()), + zap.String("type", params.Type()), + zap.String("request", params.AST.SeqQLString()), + zap.Uint64("hist_interval", params.HistInterval), + zap.String("from", params.From.String()), + zap.String("to", params.To.String()), + zap.Uint64("range", seq.MIDToSeconds(params.To)-seq.MIDToSeconds(params.From)), + zap.String("offset_id", params.OffsetId.String()), + zap.Int("limit", params.Limit), + zap.Bool("with_total", params.WithTotal), + zap.Int("total_fracs_found", totalFracsFound), + zap.Strings("fracs_found", fracsFound), + zap.Int("total_fracs_skipped", totalFracsSkipped), + zap.Strings("fracs_skipped", fracsSkipped), + zap.Uint64("total", total.Total), + } + + for i, agg := range params.AggQ { + fields = append(fields, zap.Object(fmt.Sprintf("agg_%d", i), agg)) + } + + logger.Warn("slow search", fields...) + } return total, nil } diff --git a/seq/qpr.go b/seq/qpr.go index a7246233..30b78937 100644 --- a/seq/qpr.go +++ b/seq/qpr.go @@ -74,6 +74,10 @@ type QPR struct { Errors []ErrorSource } +func (q *QPR) Empty() bool { + return len(q.IDs) == 0 && len(q.Histogram) == 0 && len(q.Aggs) == 0 +} + func (q *QPR) Aggregate(args []AggregateArgs) []AggregationResult { allAggregations := make([]AggregationResult, len(q.Aggs)) for i, agg := range q.Aggs { @@ -107,6 +111,29 @@ const ( AggFuncUniqueCount ) +func (f AggFunc) String() string { + switch f { + case AggFuncCount: + return "count" + case AggFuncSum: + return "sum" + case AggFuncMin: + return "min" + case AggFuncMax: + return "max" + case AggFuncAvg: + return "avg" + case AggFuncQuantile: + return "quantile" + case AggFuncUnique: + return "unique" + case AggFuncUniqueCount: + return "unique_count" + default: + return "unknown" + } +} + type AggBin struct { MID MID Token string diff --git a/seq/seq.go b/seq/seq.go index 87f82087..6a5a0039 100644 --- a/seq/seq.go +++ b/seq/seq.go @@ -152,6 +152,10 @@ func MIDToMillis(t MID) uint64 { return uint64(t) / uint64(time.Millisecond) } +func MIDToSeconds(t MID) uint64 { + return uint64(t) / uint64(time.Second) +} + func MIDToCeilingMillis(t MID) uint64 { millis := uint64(t) / uint64(time.Millisecond) nanosPartOfMilli := uint64(t) % uint64(time.Millisecond) diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 4acd3bb7..2057ea39 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -225,21 +225,6 @@ func (g *GrpcV1) doSearch( } } - took := time.Since(start) - if g.config.Search.LogThreshold != 0 && took >= g.config.Search.LogThreshold { - logger.Warn("slow search", - zap.Int64("took_ms", took.Milliseconds()), - zap.Object("req", (*searchRequestMarshaler)(req)), - zap.Uint64("found", qpr.Total), - zap.String("from", seq.MillisToMID(uint64(req.From)).String()), - zap.String("to", seq.MillisToMID(uint64(req.To)).String()), - zap.Int64("offset", req.Offset), - zap.String("offset_id", req.OffsetId), - zap.Int64("size", req.Size), - zap.Bool("with_total", req.WithTotal), - ) - } - return buildSearchResponse(qpr), nil } diff --git a/storeapi/grpc_v1.go b/storeapi/grpc_v1.go index 2ed55ea5..a242b172 100644 --- a/storeapi/grpc_v1.go +++ b/storeapi/grpc_v1.go @@ -106,6 +106,7 @@ func NewGrpcV1(cfg APIConfig, fracManager *fracmanager.FracManager, mappingProvi searcher: fracmanager.NewSearcher(cfg.Search.WorkersCount, fracmanager.SearcherCfg{ MaxFractionHits: cfg.Search.MaxFractionHits, FractionsPerIteration: cfg.Search.FractionsPerIteration, + SlowLogThreshold: cfg.Search.LogThreshold, }), }, fetchData: fetchData{