Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ func registerQuery(app *extkingpin.App) {
enableGroupReplicaPartialStrategy := cmd.Flag("query.group-replica-strategy", "Enable group-replica partial response strategy.").
Default("false").Bool()

enableQuorumPartialStrategy := cmd.Flag("query.quorum-strategy", "Enable quorum partial response strategy based on InfoAPI store replica_group/quorum hints. Cannot be combined with query.group-replica-strategy.").
Default("false").Bool()

enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling.").
Hidden().Default("true").Bool()

Expand Down Expand Up @@ -320,6 +323,10 @@ func registerQuery(app *extkingpin.App) {
return err
}

if *enableGroupReplicaPartialStrategy && *enableQuorumPartialStrategy {
return errors.New("only one of --query.group-replica-strategy and --query.quorum-strategy can be enabled")
}

// Parse blocked metric patterns
var blockedMetricPatterns []string
if *blockQueryMetricsWithoutFilter != "" {
Expand Down Expand Up @@ -408,6 +415,7 @@ func registerQuery(app *extkingpin.App) {
*enforceTenancy,
*tenantLabel,
*enableGroupReplicaPartialStrategy,
*enableQuorumPartialStrategy,
*rewriteAggregationLabelStrategy,
*rewriteAggregationLabelTo,
*lazyRetrievalMaxBufferedResponses,
Expand Down Expand Up @@ -499,6 +507,7 @@ func runQuery(
enforceTenancy bool,
tenantLabel string,
groupReplicaPartialResponseStrategy bool,
quorumPartialResponseStrategy bool,
rewriteAggregationLabelStrategy string,
rewriteAggregationLabelTo string,
lazyRetrievalMaxBufferedResponses int,
Expand Down Expand Up @@ -638,7 +647,7 @@ func runQuery(
unhealthyStoreTimeout,
endpointInfoTimeout,
// ignoreErrors when group_replica partial response strategy is enabled.
groupReplicaPartialResponseStrategy,
groupReplicaPartialResponseStrategy || quorumPartialResponseStrategy,
queryConnMetricLabels...,
)

Expand All @@ -652,6 +661,7 @@ func runQuery(
)
opts := query.Options{
GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy,
QuorumPartialResponseStrategy: quorumPartialResponseStrategy,
DeduplicationFunc: queryDeduplicationFunc,
RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy,
RewriteAggregationLabelTo: rewriteAggregationLabelTo,
Expand Down
31 changes: 29 additions & 2 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func registerReceive(app *extkingpin.App) {
conf.registerFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error {
if err := conf.validate(); err != nil {
return err
}
lset, err := parseFlagLabels(conf.labelStrs)
if err != nil {
return errors.Wrap(err, "parse labels")
Expand Down Expand Up @@ -254,6 +257,9 @@ func runReceive(
}
multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache))
}
if conf.replicaGroup != "" {
multiTSDBOptions = append(multiTSDBOptions, receive.WithReplicaGroup(conf.replicaGroup, conf.quorum))
}

dbs := receive.NewMultiTSDB(
conf.dataDir,
Expand Down Expand Up @@ -454,6 +460,8 @@ func runReceive(
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: proxy.TSDBInfos(),
ReplicaGroup: conf.replicaGroup,
Quorum: int32(conf.quorum),
}, nil
}
return nil, errors.New("Not ready")
Expand Down Expand Up @@ -936,8 +944,10 @@ type receiveConfig struct {
rwClientSkipVerify bool
rwServerTlsMinVersion string

dataDir string
labelStrs []string
dataDir string
labelStrs []string
replicaGroup string
quorum int

objStoreConfig *extflag.PathOrContent
retention *model.Duration
Expand Down Expand Up @@ -1009,6 +1019,19 @@ type receiveConfig struct {
compactionDelayInterval *model.Duration
}

func (rc *receiveConfig) validate() error {
if rc.replicaGroup == "" {
if rc.quorum != 0 {
return errors.New("invalid receive config: --receive.quorum requires --receive.replica-group")
}
return nil
}
if rc.quorum <= 0 {
return errors.New("invalid receive config: --receive.quorum must be > 0 when --receive.replica-group is set")
}
return nil
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd)
rc.grpcConfig.registerFlag(cmd)
Expand Down Expand Up @@ -1042,6 +1065,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {

cmd.Flag("label", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").StringsVar(&rc.labelStrs)

cmd.Flag("receive.replica-group", "Replica group identifier used by the QUORUM partial response strategy (--query.quorum-strategy). Stores with the same replica_group value hold replicated data.").Default("").StringVar(&rc.replicaGroup)

cmd.Flag("receive.quorum", "Minimum number of healthy stores required per replica group for the QUORUM partial response strategy. Must be > 0 when receive.replica-group is set.").Default("0").IntVar(&rc.quorum)

rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false)

rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables the retention policy (i.e. infinite retention). For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d"))
Expand Down
68 changes: 68 additions & 0 deletions cmd/thanos/receive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package main

import (
"testing"

"github.com/efficientgo/core/testutil"
)

func TestReceiveConfigValidateReplicaGroupQuorum(t *testing.T) {
t.Parallel()

for _, tc := range []struct {
name string
replicaGroup string
quorum int
expectErr bool
}{
{
name: "both unset",
replicaGroup: "",
quorum: 0,
expectErr: false,
},
{
name: "both set",
replicaGroup: "rg",
quorum: 1,
expectErr: false,
},
{
name: "only quorum set",
replicaGroup: "",
quorum: 1,
expectErr: true,
},
{
name: "only replica-group set (quorum default)",
replicaGroup: "rg",
quorum: 0,
expectErr: true,
},
{
name: "replica-group set with negative quorum",
replicaGroup: "rg",
quorum: -1,
expectErr: true,
},
{
name: "negative quorum without replica-group",
replicaGroup: "",
quorum: -1,
expectErr: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
conf := &receiveConfig{
replicaGroup: tc.replicaGroup,
quorum: tc.quorum,
}
err := conf.validate()
if tc.expectErr {
testutil.NotOk(t, err)
return
}
testutil.Ok(t, err)
})
}
}
2 changes: 1 addition & 1 deletion cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ func queryFuncCreator(
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA:
case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA, storepb.PartialResponseStrategy_QUORUM:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
Expand Down
158 changes: 120 additions & 38 deletions pkg/info/infopb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading