From 84f311ae301e31136966808b4802875678ddb2b5 Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Fri, 31 Oct 2025 13:13:15 +0000 Subject: [PATCH 1/6] Refactor rule evaluation to support concurrent execution --- rules/group.go | 525 +++++++++++++++++++++++++++++++++-------------- rules/manager.go | 18 +- 2 files changed, 385 insertions(+), 158 deletions(-) diff --git a/rules/group.go b/rules/group.go index 201d3a67d7..2495fcab4e 100644 --- a/rules/group.go +++ b/rules/group.go @@ -476,164 +476,21 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var ( - samplesTotal atomic.Float64 - wg sync.WaitGroup - ) - - ruleQueryOffset := g.QueryOffset() - - for i, rule := range g.rules { - select { - case <-g.done: - return - default: - } - - eval := func(i int, rule Rule, cleanup func()) { - if cleanup != nil { - defer cleanup() - } - - logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) - ctx, sp := otel.Tracer("").Start(ctx, "rule") - sp.SetAttributes(attribute.String("name", rule.Name())) - defer func(t time.Time) { - sp.End() - - since := time.Since(t) - g.metrics.EvalDuration.Observe(since.Seconds()) - rule.SetEvaluationDuration(since) - rule.SetEvaluationTimestamp(t) - }(time.Now()) - - if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { - logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) - } - - g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) - if err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + var samplesTotal atomic.Float64 - // Canceled queries are intentional termination of queries. This normally - // happens on shutdown and thus we skip logging of any errors here. - var eqc promql.ErrQueryCanceled - if !errors.As(err, &eqc) { - level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) - } - return - } - rule.SetHealth(HealthGood) - rule.SetLastError(nil) - samplesTotal.Add(float64(len(vector))) - - if ar, ok := rule.(*AlertingRule); ok { - ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) - } - var ( - numOutOfOrder = 0 - numTooOld = 0 - numDuplicates = 0 - ) - - app := g.opts.Appendable.Appender(ctx) - seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) - defer func() { - if err := app.Commit(); err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() - - level.Warn(logger).Log("msg", "Rule sample appending failed", "err", err) - return - } - g.seriesInPreviousEval[i] = seriesReturned - }() - - for _, s := range vector { - if s.H != nil { - _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) - } else { - _, err = app.Append(0, s.Metric, s.T, s.F) - } - - if err != nil { - rule.SetHealth(HealthBad) - rule.SetLastError(err) - sp.SetStatus(codes.Error, err.Error()) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): - numOutOfOrder++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrTooOldSample): - numTooOld++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - numDuplicates++ - level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - default: - level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) - } - } else { - buf := [1024]byte{} - seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric - } - } - if numOutOfOrder > 0 { - level.Warn(logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) - } - if numTooOld > 0 { - level.Warn(logger).Log("msg", "Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) - } - if numDuplicates > 0 { - level.Warn(logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) - } - - for metric, lset := range g.seriesInPreviousEval[i] { - if _, ok := seriesReturned[metric]; !ok { - // Series no longer exposed, mark it stale. - _, err = app.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) - unwrappedErr := errors.Unwrap(err) - if unwrappedErr == nil { - unwrappedErr = err - } - switch { - case unwrappedErr == nil: - case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), - errors.Is(unwrappedErr, storage.ErrTooOldSample), - errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): - // Do not count these in logging, as this is expected if series - // is exposed from a different rule. - default: - level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) - } - } - } - } - - if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { - wg.Add(1) - - go eval(i, rule, func() { - wg.Done() - ctrl.Done(ctx) - }) - } else { - eval(i, rule, nil) - } + sequentiallyRules := g.concurrencyEval(ctx, ts, samplesTotal) + select { + case <-g.done: + return + default: } - wg.Wait() + g.sequentiallyEval(ctx, ts, samplesTotal, sequentiallyRules) + select { + case <-g.done: + return + default: + } g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) g.cleanupStaleSeries(ctx, ts) @@ -1092,3 +949,361 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } + +func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal atomic.Float64) []Rule { + var ( + wg sync.WaitGroup + mtx sync.Mutex + concurrencyApp = g.opts.Appendable.Appender(ctx) + ) + + ruleQueryOffset := g.QueryOffset() + seriesInPreviousEval := make([]map[string]labels.Labels, len(g.rules)) + concurrencyEval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) + ctx, sp := otel.Tracer("").Start(ctx, "rule") + sp.SetAttributes(attribute.String("name", rule.Name())) + defer func(t time.Time) { + sp.End() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { + logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) + } + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + var eqc promql.ErrQueryCanceled + if !errors.As(err, &eqc) { + level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) + } + return + } + rule.SetHealth(HealthGood) + rule.SetLastError(nil) + samplesTotal.Add(float64(len(vector))) + + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numTooOld = 0 + numDuplicates = 0 + ) + + // app := g.opts.Appendable.Appender(ctx) + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + seriesInPreviousEval[i] = seriesReturned + }() + + for _, s := range vector { + if s.H != nil { + mtx.Lock() + _, err = concurrencyApp.AppendHistogram(0, s.Metric, s.T, nil, s.H) + mtx.Unlock() + } else { + mtx.Lock() + _, err = concurrencyApp.Append(0, s.Metric, s.T, s.F) + mtx.Unlock() + } + + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): + numOutOfOrder++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrTooOldSample): + numTooOld++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + numDuplicates++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + default: + level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + } + } else { + buf := [1024]byte{} + seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + } + } + if numOutOfOrder > 0 { + level.Warn(logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) + } + if numTooOld > 0 { + level.Warn(logger).Log("msg", "Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) + } + if numDuplicates > 0 { + level.Warn(logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) + } + + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + mtx.Lock() + _, err = concurrencyApp.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) + mtx.Unlock() + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case unwrappedErr == nil: + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), + errors.Is(unwrappedErr, storage.ErrTooOldSample), + errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) + } + } + } + } + + sequentiallyRules := make([]Rule, 0, len(g.rules)) + for i, rule := range g.rules { + select { + case <-g.done: + return sequentiallyRules + default: + } + + if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, rule) { + wg.Add(1) + + go concurrencyEval(i, rule, func() { + wg.Done() + ctrl.Done(ctx) + }) + sequentiallyRules = append(sequentiallyRules, nil) // placeholder for the series + } else { + sequentiallyRules = append(sequentiallyRules, rule) + } + } + + wg.Wait() + + if err := concurrencyApp.Commit(); err != nil { + groupKey := GroupKey(g.File(), g.Name()) + for i := range g.rules { + if ctrl := g.concurrencyController; ctrl.Allow(ctx, g, g.rules[i]) { + g.rules[i].SetHealth(HealthBad) + g.rules[i].SetLastError(err) + g.metrics.EvalFailures.WithLabelValues(groupKey).Inc() + ctrl.Done(ctx) + } + } + + level.Warn(g.logger).Log( + "msg", "Rule sample committing failed", + "group_key", groupKey, + "err", err, + ) + + return sequentiallyRules + } + + for i, series := range seriesInPreviousEval { + if series == nil { + continue + } + + g.seriesInPreviousEval[i] = series + } + + return sequentiallyRules +} + +func (g *Group) sequentiallyEval( + ctx context.Context, + ts time.Time, + samplesTotal atomic.Float64, + sequentiallyRules []Rule, +) { + ruleQueryOffset := g.QueryOffset() + eval := func(i int, rule Rule, cleanup func()) { + if cleanup != nil { + defer cleanup() + } + + logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i) + ctx, sp := otel.Tracer("").Start(ctx, "rule") + sp.SetAttributes(attribute.String("name", rule.Name())) + defer func(t time.Time) { + sp.End() + + since := time.Since(t) + g.metrics.EvalDuration.Observe(since.Seconds()) + rule.SetEvaluationDuration(since) + rule.SetEvaluationTimestamp(t) + }(time.Now()) + + if sp.SpanContext().IsSampled() && sp.SpanContext().HasTraceID() { + logger = log.WithPrefix(logger, "trace_id", sp.SpanContext().TraceID()) + } + + g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + vector, err := rule.Eval(ctx, ruleQueryOffset, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit()) + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + // Canceled queries are intentional termination of queries. This normally + // happens on shutdown and thus we skip logging of any errors here. + var eqc promql.ErrQueryCanceled + if !errors.As(err, &eqc) { + level.Warn(logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err) + } + return + } + rule.SetHealth(HealthGood) + rule.SetLastError(nil) + samplesTotal.Add(float64(len(vector))) + + if ar, ok := rule.(*AlertingRule); ok { + ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) + } + var ( + numOutOfOrder = 0 + numTooOld = 0 + numDuplicates = 0 + ) + + app := g.opts.Appendable.Appender(ctx) + seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + defer func() { + if err := app.Commit(); err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc() + + level.Warn(logger).Log("msg", "Rule sample appending failed", "err", err) + return + } + g.seriesInPreviousEval[i] = seriesReturned + }() + + for _, s := range vector { + if s.H != nil { + _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) + } else { + _, err = app.Append(0, s.Metric, s.T, s.F) + } + + if err != nil { + rule.SetHealth(HealthBad) + rule.SetLastError(err) + sp.SetStatus(codes.Error, err.Error()) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample): + numOutOfOrder++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrTooOldSample): + numTooOld++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + case errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + numDuplicates++ + level.Debug(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + default: + level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) + } + } else { + buf := [1024]byte{} + seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + } + } + if numOutOfOrder > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting out-of-order result from rule evaluation", + "num_dropped", numOutOfOrder, + ) + } + if numTooOld > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting too old result from rule evaluation", + "num_dropped", numTooOld, + ) + } + if numDuplicates > 0 { + level.Warn(logger).Log( + "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", + "num_dropped", numDuplicates, + ) + } + + for metric, lset := range g.seriesInPreviousEval[i] { + if _, ok := seriesReturned[metric]; !ok { + // Series no longer exposed, mark it stale. + _, err = app.Append( + 0, + lset, + timestamp.FromTime(ts.Add(-ruleQueryOffset)), + math.Float64frombits(value.StaleNaN), + ) + unwrappedErr := errors.Unwrap(err) + if unwrappedErr == nil { + unwrappedErr = err + } + switch { + case unwrappedErr == nil: + case errors.Is(unwrappedErr, storage.ErrOutOfOrderSample), + errors.Is(unwrappedErr, storage.ErrTooOldSample), + errors.Is(unwrappedErr, storage.ErrDuplicateSampleForTimestamp): + // Do not count these in logging, as this is expected if series + // is exposed from a different rule. + default: + level.Warn(logger).Log("msg", "Adding stale sample failed", "sample", lset.String(), "err", err) + } + } + } + } + + for i, rule := range sequentiallyRules { + select { + case <-g.done: + return + default: + } + + if rule == nil { + continue + } + + eval(i, rule, nil) + } +} diff --git a/rules/manager.go b/rules/manager.go index efb017854e..830b8c0c98 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -478,13 +478,25 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle } } -func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { +// func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { +// // To allow a rule to be executed concurrently, we need 3 conditions: +// // 1. The rule must not have any rules that depend on it. +// // 2. The rule itself must not depend on any other rules. +// // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. +// if rule.NoDependentRules() && rule.NoDependencyRules() { +// return c.sema.TryAcquire(1) +// } + +// return false +// } + +func (c *concurrentRuleEvalController) Allow(ctx context.Context, _ *Group, rule Rule) bool { // To allow a rule to be executed concurrently, we need 3 conditions: // 1. The rule must not have any rules that depend on it. // 2. The rule itself must not depend on any other rules. // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. - if rule.NoDependentRules() && rule.NoDependencyRules() { - return c.sema.TryAcquire(1) + if rule.NoDependencyRules() { + return c.sema.Acquire(ctx, 1) == nil } return false From 31ee1483d4452cde8507a0c8c228885b4ed7709a Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Fri, 31 Oct 2025 15:49:58 +0000 Subject: [PATCH 2/6] some fix --- pp-pkg/storage/appender.go | 4 +- pp/go/model/labelset.go | 7 +++ rules/group.go | 105 ++++++++++++++++++++++--------------- rules/manager.go | 13 +++++ 4 files changed, 84 insertions(+), 45 deletions(-) diff --git a/pp-pkg/storage/appender.go b/pp-pkg/storage/appender.go index 2db4f3b731..69465d40cf 100644 --- a/pp-pkg/storage/appender.go +++ b/pp-pkg/storage/appender.go @@ -45,7 +45,7 @@ func newTimeSeriesAppender( ctx: ctx, adapter: adapter, state: state, - batch: &timeSeriesBatch{}, + batch: &timeSeriesBatch{timeSeries: make([]model.TimeSeries, 0, 10)}, } } @@ -56,7 +56,7 @@ func (a *TimeSeriesAppender) Append( t int64, v float64, ) (storage.SeriesRef, error) { - lsb := model.NewLabelSetBuilder() + lsb := model.NewLabelSetBuilderSize(l.Len()) l.Range(func(label labels.Label) { lsb.Add(label.Name, label.Value) }) diff --git a/pp/go/model/labelset.go b/pp/go/model/labelset.go index 54b73a0db6..1f498f8173 100644 --- a/pp/go/model/labelset.go +++ b/pp/go/model/labelset.go @@ -454,6 +454,13 @@ func NewLabelSetBuilder() *LabelSetBuilder { } } +// NewLabelSetBuilderSize is a constructor with container size. +func NewLabelSetBuilderSize(size int) *LabelSetBuilder { + return &LabelSetBuilder{ + pairs: make(map[string]string, size), + } +} + // Build label set func (builder *LabelSetBuilder) Build() LabelSet { return LabelSetFromMap(builder.pairs) diff --git a/rules/group.go b/rules/group.go index 2495fcab4e..1e7439ca94 100644 --- a/rules/group.go +++ b/rules/group.go @@ -476,23 +476,21 @@ func (g *Group) CopyState(from *Group) { // Eval runs a single evaluation cycle in which all rules are evaluated sequentially. // Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled. func (g *Group) Eval(ctx context.Context, ts time.Time) { - var samplesTotal atomic.Float64 + var samplesTotal float64 - sequentiallyRules := g.concurrencyEval(ctx, ts, samplesTotal) - select { - case <-g.done: - return - default: + if g.concurrencyController.IsConcurrent() { + samplesTotal = g.concurrencyEval(ctx, ts) + } else { + samplesTotal = g.sequentiallyEval(ctx, ts, g.rules) } - g.sequentiallyEval(ctx, ts, samplesTotal, sequentiallyRules) select { case <-g.done: return default: } - g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load()) + g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal) g.cleanupStaleSeries(ctx, ts) } @@ -950,8 +948,10 @@ func buildDependencyMap(rules []Rule) dependencyMap { return dependencies } -func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal atomic.Float64) []Rule { +// concurrencyEval evaluates the rules concurrently. +func (g *Group) concurrencyEval(ctx context.Context, ts time.Time) float64 { var ( + samplesTotal atomic.Float64 wg sync.WaitGroup mtx sync.Mutex concurrencyApp = g.opts.Appendable.Appender(ctx) @@ -1010,12 +1010,8 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal numDuplicates = 0 ) - // app := g.opts.Appendable.Appender(ctx) - seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) - defer func() { - seriesInPreviousEval[i] = seriesReturned - }() - + seriesInPreviousEval[i] = make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) + buf := [1024]byte{} for _, s := range vector { if s.H != nil { mtx.Lock() @@ -1048,26 +1044,41 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal default: level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) } - } else { - buf := [1024]byte{} - seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + + continue } + + seriesInPreviousEval[i][string(s.Metric.Bytes(buf[:]))] = s.Metric } if numOutOfOrder > 0 { - level.Warn(logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "num_dropped", numOutOfOrder) + level.Warn(logger).Log( + "msg", "Error on ingesting out-of-order result from rule evaluation", + "num_dropped", numOutOfOrder, + ) } if numTooOld > 0 { - level.Warn(logger).Log("msg", "Error on ingesting too old result from rule evaluation", "num_dropped", numTooOld) + level.Warn(logger).Log( + "msg", "Error on ingesting too old result from rule evaluation", + "num_dropped", numTooOld, + ) } if numDuplicates > 0 { - level.Warn(logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "num_dropped", numDuplicates) + level.Warn(logger).Log( + "msg", "Error on ingesting results from rule evaluation with different value but same timestamp", + "num_dropped", numDuplicates, + ) } for metric, lset := range g.seriesInPreviousEval[i] { - if _, ok := seriesReturned[metric]; !ok { + if _, ok := seriesInPreviousEval[i][metric]; !ok { // Series no longer exposed, mark it stale. mtx.Lock() - _, err = concurrencyApp.Append(0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN)) + _, err = concurrencyApp.Append( + 0, + lset, + timestamp.FromTime(ts.Add(-ruleQueryOffset)), + math.Float64frombits(value.StaleNaN), + ) mtx.Unlock() unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { @@ -1087,11 +1098,11 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal } } - sequentiallyRules := make([]Rule, 0, len(g.rules)) + sequentiallyRules := make([]Rule, len(g.rules)) for i, rule := range g.rules { select { case <-g.done: - return sequentiallyRules + return samplesTotal.Load() default: } @@ -1102,9 +1113,9 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal wg.Done() ctrl.Done(ctx) }) - sequentiallyRules = append(sequentiallyRules, nil) // placeholder for the series + sequentiallyRules[i] = nil // placeholder for the series } else { - sequentiallyRules = append(sequentiallyRules, rule) + sequentiallyRules[i] = rule } } @@ -1126,27 +1137,31 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time, samplesTotal "group_key", groupKey, "err", err, ) + } else { + for i, series := range seriesInPreviousEval { + if series == nil { + continue + } - return sequentiallyRules - } - - for i, series := range seriesInPreviousEval { - if series == nil { - continue + g.seriesInPreviousEval[i] = series } - - g.seriesInPreviousEval[i] = series } - return sequentiallyRules + return samplesTotal.Add(g.sequentiallyEval(ctx, ts, sequentiallyRules)) } +// sequentiallyEval evaluates the rules sequentially. func (g *Group) sequentiallyEval( ctx context.Context, ts time.Time, - samplesTotal atomic.Float64, sequentiallyRules []Rule, -) { +) float64 { + if len(sequentiallyRules) == 0 { + return 0 + } + + var samplesTotal float64 + ruleQueryOffset := g.QueryOffset() eval := func(i int, rule Rule, cleanup func()) { if cleanup != nil { @@ -1188,7 +1203,7 @@ func (g *Group) sequentiallyEval( } rule.SetHealth(HealthGood) rule.SetLastError(nil) - samplesTotal.Add(float64(len(vector))) + samplesTotal += float64(len(vector)) if ar, ok := rule.(*AlertingRule); ok { ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc) @@ -1214,6 +1229,7 @@ func (g *Group) sequentiallyEval( g.seriesInPreviousEval[i] = seriesReturned }() + buf := [1024]byte{} for _, s := range vector { if s.H != nil { _, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H) @@ -1242,10 +1258,11 @@ func (g *Group) sequentiallyEval( default: level.Warn(logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s) } - } else { - buf := [1024]byte{} - seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric + + continue } + + seriesReturned[string(s.Metric.Bytes(buf[:]))] = s.Metric } if numOutOfOrder > 0 { level.Warn(logger).Log( @@ -1296,7 +1313,7 @@ func (g *Group) sequentiallyEval( for i, rule := range sequentiallyRules { select { case <-g.done: - return + return samplesTotal default: } @@ -1306,4 +1323,6 @@ func (g *Group) sequentiallyEval( eval(i, rule, nil) } + + return samplesTotal } diff --git a/rules/manager.go b/rules/manager.go index 830b8c0c98..756f538bbd 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -465,6 +465,9 @@ type RuleConcurrencyController interface { // Done releases a concurrent evaluation slot. Done(ctx context.Context) + + // IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. + IsConcurrent() bool } // concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules. @@ -506,6 +509,11 @@ func (c *concurrentRuleEvalController) Done(_ context.Context) { c.sema.Release(1) } +// IsConcurrent returns true if the controller is a concurrent controller, false if it is a sequential controller. +func (*concurrentRuleEvalController) IsConcurrent() bool { + return true +} + // sequentialRuleEvalController is a RuleConcurrencyController that runs every rule sequentially. type sequentialRuleEvalController struct{} @@ -514,3 +522,8 @@ func (c sequentialRuleEvalController) Allow(_ context.Context, _ *Group, _ Rule) } func (c sequentialRuleEvalController) Done(_ context.Context) {} + +// IsConcurrent returns false if the controller is a sequential controller, true if it is a concurrent controller. +func (c sequentialRuleEvalController) IsConcurrent() bool { + return false +} From 70e4b8943bf403bb9cfc21fb36bee9197100803a Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Fri, 31 Oct 2025 16:18:19 +0000 Subject: [PATCH 3/6] small fix --- pp-pkg/storage/appender.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pp-pkg/storage/appender.go b/pp-pkg/storage/appender.go index 69465d40cf..d46c5bc7b5 100644 --- a/pp-pkg/storage/appender.go +++ b/pp-pkg/storage/appender.go @@ -34,6 +34,7 @@ type TimeSeriesAppender struct { adapter *Adapter state *cppbridge.StateV2 batch *timeSeriesBatch + lsb *model.LabelSetBuilder } func newTimeSeriesAppender( @@ -46,6 +47,7 @@ func newTimeSeriesAppender( adapter: adapter, state: state, batch: &timeSeriesBatch{timeSeries: make([]model.TimeSeries, 0, 10)}, + lsb: model.NewLabelSetBuilderSize(10), } } @@ -56,13 +58,13 @@ func (a *TimeSeriesAppender) Append( t int64, v float64, ) (storage.SeriesRef, error) { - lsb := model.NewLabelSetBuilderSize(l.Len()) + a.lsb.Reset() l.Range(func(label labels.Label) { - lsb.Add(label.Name, label.Value) + a.lsb.Add(label.Name, label.Value) }) a.batch.timeSeries = append(a.batch.timeSeries, model.TimeSeries{ - LabelSet: lsb.Build(), + LabelSet: a.lsb.Build(), Timestamp: uint64(t), // #nosec G115 // no overflow Value: v, }) From bcd157635579ecd0a1d15979e265280449816a2c Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Sat, 1 Nov 2025 11:06:12 +0000 Subject: [PATCH 4/6] fix mutex --- cmd/prometheus/main.go | 3 ++- promql/engine.go | 2 +- rules/group.go | 8 ++------ rules/manager.go | 12 ------------ 4 files changed, 5 insertions(+), 20 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 961f5bd24d..d51face9a3 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -954,6 +954,7 @@ func main() { queryEngine = promql.NewEngine(opts) + ruleQueryOffset := time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) ruleManager = rules.NewManager(&rules.ManagerOptions{ Appendable: adapter, // PP_CHANGES.md: rebuild on cpp Queryable: adapter, // PP_CHANGES.md: rebuild on cpp @@ -969,7 +970,7 @@ func main() { MaxConcurrentEvals: cfg.maxConcurrentEvals, ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval, DefaultRuleQueryOffset: func() time.Duration { - return time.Duration(cfgFile.GlobalConfig.RuleQueryOffset) + return ruleQueryOffset }, }) } diff --git a/promql/engine.go b/promql/engine.go index 6cc4c69cf1..400fd739f2 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -722,7 +722,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *parser.Eval setOffsetForAtModifier(timeMilliseconds(s.Start), s.Expr) evalSpanTimer, ctxInnerEval := query.stats.GetSpanTimer(ctx, stats.InnerEvalTime, ng.metrics.queryInnerEval) // Instant evaluation. This is executed as a range evaluation with one step. - if s.Start == s.End && s.Interval == 0 { + if s.Start.Equal(s.End) && s.Interval == 0 { start := timeMilliseconds(s.Start) evaluator := &evaluator{ startTimestamp: start, diff --git a/rules/group.go b/rules/group.go index 1e7439ca94..da522ce2a6 100644 --- a/rules/group.go +++ b/rules/group.go @@ -1012,15 +1012,13 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time) float64 { seriesInPreviousEval[i] = make(map[string]labels.Labels, len(g.seriesInPreviousEval[i])) buf := [1024]byte{} + mtx.Lock() + defer mtx.Unlock() for _, s := range vector { if s.H != nil { - mtx.Lock() _, err = concurrencyApp.AppendHistogram(0, s.Metric, s.T, nil, s.H) - mtx.Unlock() } else { - mtx.Lock() _, err = concurrencyApp.Append(0, s.Metric, s.T, s.F) - mtx.Unlock() } if err != nil { @@ -1072,14 +1070,12 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time) float64 { for metric, lset := range g.seriesInPreviousEval[i] { if _, ok := seriesInPreviousEval[i][metric]; !ok { // Series no longer exposed, mark it stale. - mtx.Lock() _, err = concurrencyApp.Append( 0, lset, timestamp.FromTime(ts.Add(-ruleQueryOffset)), math.Float64frombits(value.StaleNaN), ) - mtx.Unlock() unwrappedErr := errors.Unwrap(err) if unwrappedErr == nil { unwrappedErr = err diff --git a/rules/manager.go b/rules/manager.go index 756f538bbd..161c6ef7e9 100644 --- a/rules/manager.go +++ b/rules/manager.go @@ -481,18 +481,6 @@ func newRuleConcurrencyController(maxConcurrency int64) RuleConcurrencyControlle } } -// func (c *concurrentRuleEvalController) Allow(_ context.Context, _ *Group, rule Rule) bool { -// // To allow a rule to be executed concurrently, we need 3 conditions: -// // 1. The rule must not have any rules that depend on it. -// // 2. The rule itself must not depend on any other rules. -// // 3. If 1 & 2 are true, then and only then we should try to acquire the concurrency slot. -// if rule.NoDependentRules() && rule.NoDependencyRules() { -// return c.sema.TryAcquire(1) -// } - -// return false -// } - func (c *concurrentRuleEvalController) Allow(ctx context.Context, _ *Group, rule Rule) bool { // To allow a rule to be executed concurrently, we need 3 conditions: // 1. The rule must not have any rules that depend on it. From 135eced40f48c6528c5b7017ebc9adf3db47c5b5 Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Sat, 1 Nov 2025 12:49:23 +0000 Subject: [PATCH 5/6] merge --- pp/go/cppbridge/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pp/go/cppbridge/metrics.go b/pp/go/cppbridge/metrics.go index d4c42479bf..2bec33ea66 100644 --- a/pp/go/cppbridge/metrics.go +++ b/pp/go/cppbridge/metrics.go @@ -1,8 +1,8 @@ package cppbridge import ( - "github.com/golang/protobuf/proto" dto "github.com/prometheus/client_model/go" + "google.golang.org/protobuf/proto" ) func CppMetrics(f func(metric *dto.Metric) bool) { From 947deafa50f51693364ae76dffdb58ed3d003711 Mon Sep 17 00:00:00 2001 From: Alexandr Yudin Date: Wed, 5 Nov 2025 07:44:26 +0000 Subject: [PATCH 6/6] fix review --- rules/group.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/rules/group.go b/rules/group.go index da522ce2a6..7057f89dbe 100644 --- a/rules/group.go +++ b/rules/group.go @@ -1133,14 +1133,16 @@ func (g *Group) concurrencyEval(ctx context.Context, ts time.Time) float64 { "group_key", groupKey, "err", err, ) - } else { - for i, series := range seriesInPreviousEval { - if series == nil { - continue - } - g.seriesInPreviousEval[i] = series + return samplesTotal.Add(g.sequentiallyEval(ctx, ts, sequentiallyRules)) + } + + for i, series := range seriesInPreviousEval { + if series == nil { + continue } + + g.seriesInPreviousEval[i] = series } return samplesTotal.Add(g.sequentiallyEval(ctx, ts, sequentiallyRules))