diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index f82ed44963b..3297c85e23d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -200,6 +200,9 @@ func registerQuery(app *extkingpin.App) { enableGroupReplicaPartialStrategy := cmd.Flag("query.group-replica-strategy", "Enable group-replica partial response strategy."). Default("false").Bool() + enableQuorumPartialStrategy := cmd.Flag("query.quorum-strategy", "Enable quorum partial response strategy based on InfoAPI store replica_group/quorum hints. Cannot be combined with query.group-replica-strategy."). + Default("false").Bool() + enableRulePartialResponse := cmd.Flag("rule.partial-response", "Enable partial response for rules endpoint. --no-rule.partial-response for disabling."). Hidden().Default("true").Bool() @@ -320,6 +323,10 @@ func registerQuery(app *extkingpin.App) { return err } + if *enableGroupReplicaPartialStrategy && *enableQuorumPartialStrategy { + return errors.New("only one of --query.group-replica-strategy and --query.quorum-strategy can be enabled") + } + // Parse blocked metric patterns var blockedMetricPatterns []string if *blockQueryMetricsWithoutFilter != "" { @@ -408,6 +415,7 @@ func registerQuery(app *extkingpin.App) { *enforceTenancy, *tenantLabel, *enableGroupReplicaPartialStrategy, + *enableQuorumPartialStrategy, *rewriteAggregationLabelStrategy, *rewriteAggregationLabelTo, *lazyRetrievalMaxBufferedResponses, @@ -499,6 +507,7 @@ func runQuery( enforceTenancy bool, tenantLabel string, groupReplicaPartialResponseStrategy bool, + quorumPartialResponseStrategy bool, rewriteAggregationLabelStrategy string, rewriteAggregationLabelTo string, lazyRetrievalMaxBufferedResponses int, @@ -638,7 +647,7 @@ func runQuery( unhealthyStoreTimeout, endpointInfoTimeout, // ignoreErrors when group_replica partial response strategy is enabled. - groupReplicaPartialResponseStrategy, + groupReplicaPartialResponseStrategy || quorumPartialResponseStrategy, queryConnMetricLabels..., ) @@ -652,6 +661,7 @@ func runQuery( ) opts := query.Options{ GroupReplicaPartialResponseStrategy: groupReplicaPartialResponseStrategy, + QuorumPartialResponseStrategy: quorumPartialResponseStrategy, DeduplicationFunc: queryDeduplicationFunc, RewriteAggregationLabelStrategy: rewriteAggregationLabelStrategy, RewriteAggregationLabelTo: rewriteAggregationLabelTo, diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 3e92649c85a..b254b6c8d4f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -68,6 +68,9 @@ func registerReceive(app *extkingpin.App) { conf.registerFlag(cmd) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, debugLogging bool) error { + if err := conf.validate(); err != nil { + return err + } lset, err := parseFlagLabels(conf.labelStrs) if err != nil { return errors.Wrap(err, "parse labels") @@ -254,6 +257,9 @@ func runReceive( } multiTSDBOptions = append(multiTSDBOptions, receive.WithMatchersCache(cache)) } + if conf.replicaGroup != "" { + multiTSDBOptions = append(multiTSDBOptions, receive.WithReplicaGroup(conf.replicaGroup, conf.quorum)) + } dbs := receive.NewMultiTSDB( conf.dataDir, @@ -454,6 +460,8 @@ func runReceive( SupportsSharding: true, SupportsWithoutReplicaLabels: true, TsdbInfos: proxy.TSDBInfos(), + ReplicaGroup: conf.replicaGroup, + Quorum: int32(conf.quorum), }, nil } return nil, errors.New("Not ready") @@ -936,8 +944,10 @@ type receiveConfig struct { rwClientSkipVerify bool rwServerTlsMinVersion string - dataDir string - labelStrs []string + dataDir string + labelStrs []string + replicaGroup string + quorum int objStoreConfig *extflag.PathOrContent retention *model.Duration @@ -1009,6 +1019,19 @@ type receiveConfig struct { compactionDelayInterval *model.Duration } +func (rc *receiveConfig) validate() error { + if rc.replicaGroup == "" { + if rc.quorum != 0 { + return errors.New("invalid receive config: --receive.quorum requires --receive.replica-group") + } + return nil + } + if rc.quorum <= 0 { + return errors.New("invalid receive config: --receive.quorum must be > 0 when --receive.replica-group is set") + } + return nil +} + func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.httpBindAddr, rc.httpGracePeriod, rc.httpTLSConfig = extkingpin.RegisterHTTPFlags(cmd) rc.grpcConfig.registerFlag(cmd) @@ -1042,6 +1065,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("label", "External labels to announce. This flag will be removed in the future when handling multiple tsdb instances is added.").PlaceHolder("key=\"value\"").StringsVar(&rc.labelStrs) + cmd.Flag("receive.replica-group", "Replica group identifier used by the QUORUM partial response strategy (--query.quorum-strategy). Stores with the same replica_group value hold replicated data.").Default("").StringVar(&rc.replicaGroup) + + cmd.Flag("receive.quorum", "Minimum number of healthy stores required per replica group for the QUORUM partial response strategy. Must be > 0 when receive.replica-group is set.").Default("0").IntVar(&rc.quorum) + rc.objStoreConfig = extkingpin.RegisterCommonObjStoreFlags(cmd, "", false) rc.retention = extkingpin.ModelDuration(cmd.Flag("tsdb.retention", "How long to retain raw samples on local storage. 0d - disables the retention policy (i.e. infinite retention). For more details on how retention is enforced for individual tenants, please refer to the Tenant lifecycle management section in the Receive documentation: https://thanos.io/tip/components/receive.md/#tenant-lifecycle-management").Default("15d")) diff --git a/cmd/thanos/receive_test.go b/cmd/thanos/receive_test.go new file mode 100644 index 00000000000..d78339734df --- /dev/null +++ b/cmd/thanos/receive_test.go @@ -0,0 +1,68 @@ +package main + +import ( + "testing" + + "github.com/efficientgo/core/testutil" +) + +func TestReceiveConfigValidateReplicaGroupQuorum(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + replicaGroup string + quorum int + expectErr bool + }{ + { + name: "both unset", + replicaGroup: "", + quorum: 0, + expectErr: false, + }, + { + name: "both set", + replicaGroup: "rg", + quorum: 1, + expectErr: false, + }, + { + name: "only quorum set", + replicaGroup: "", + quorum: 1, + expectErr: true, + }, + { + name: "only replica-group set (quorum default)", + replicaGroup: "rg", + quorum: 0, + expectErr: true, + }, + { + name: "replica-group set with negative quorum", + replicaGroup: "rg", + quorum: -1, + expectErr: true, + }, + { + name: "negative quorum without replica-group", + replicaGroup: "", + quorum: -1, + expectErr: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + conf := &receiveConfig{ + replicaGroup: tc.replicaGroup, + quorum: tc.quorum, + } + err := conf.validate() + if tc.expectErr { + testutil.NotOk(t, err) + return + } + testutil.Ok(t, err) + }) + } +} diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 41d996e0200..3f14c542fa9 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -916,7 +916,7 @@ func queryFuncCreator( var spanID string switch partialResponseStrategy { - case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA: + case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA, storepb.PartialResponseStrategy_QUORUM: spanID = "/rule_instant_query HTTP[client]" case storepb.PartialResponseStrategy_ABORT: spanID = "/rule_instant_query_part_resp_abort HTTP[client]" diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index c88be63cfc1..ec1179f7f13 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -124,6 +124,16 @@ type StoreInfo struct { SupportsWithoutReplicaLabels bool `protobuf:"varint,5,opt,name=supports_without_replica_labels,json=supportsWithoutReplicaLabels,proto3" json:"supports_without_replica_labels,omitempty"` // TSDBInfos holds metadata for all TSDBs exposed by the store. TsdbInfos []TSDBInfo `protobuf:"bytes,6,rep,name=tsdb_infos,json=tsdbInfos,proto3" json:"tsdb_infos"` + // Replica topology hints for the QUORUM partial response strategy. + // Stores with the same replica_group value hold replicated data. + // When set, the query layer can tolerate failures as long as enough + // replicas in the group are healthy (determined by quorum). + // If empty, the store is treated as a singleton store. + ReplicaGroup string `protobuf:"bytes,7,opt,name=replica_group,json=replicaGroup,proto3" json:"replica_group,omitempty"` + // Minimum number of healthy stores required per replica group. + // Only meaningful when replica_group is set. + // If 0 or not set, the store is treated as a singleton store. + Quorum int32 `protobuf:"varint,8,opt,name=quorum,proto3" json:"quorum,omitempty"` } func (m *StoreInfo) Reset() { *m = StoreInfo{} } @@ -400,44 +410,46 @@ func init() { func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 589 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xdf, 0x8a, 0xda, 0x40, - 0x14, 0xc6, 0x8d, 0x7f, 0xe3, 0x71, 0xdd, 0xee, 0x0e, 0xbb, 0x25, 0x4a, 0x89, 0x12, 0xf6, 0x42, - 0x68, 0x31, 0x60, 0xa1, 0x94, 0xf6, 0xaa, 0x6e, 0x85, 0x6e, 0xe9, 0x42, 0x1b, 0x85, 0xc2, 0xde, - 0x84, 0xa8, 0xb3, 0x1a, 0x48, 0x32, 0x63, 0x66, 0xa4, 0xfa, 0x16, 0x7d, 0x95, 0xbe, 0x85, 0x97, - 0x7b, 0xd9, 0xab, 0xd2, 0xea, 0x43, 0xf4, 0xb6, 0xcc, 0x4c, 0x62, 0x0d, 0xdd, 0xbd, 0xe9, 0x8d, - 0x66, 0xe6, 0xfb, 0x9d, 0xc9, 0x39, 0xdf, 0x39, 0x13, 0x38, 0xf7, 0xa3, 0x5b, 0x62, 0x8b, 0x1f, - 0x3a, 0xb6, 0x63, 0x3a, 0xe9, 0xd2, 0x98, 0x70, 0x82, 0x6a, 0x7c, 0xee, 0x45, 0x84, 0x75, 0x85, - 0xd0, 0x6c, 0x30, 0x4e, 0x62, 0x6c, 0x07, 0xde, 0x18, 0x07, 0x74, 0x6c, 0xf3, 0x35, 0xc5, 0x4c, - 0x71, 0xcd, 0xb3, 0x19, 0x99, 0x11, 0xf9, 0x68, 0x8b, 0x27, 0xb5, 0x6b, 0xd5, 0xa1, 0x76, 0x15, - 0xdd, 0x12, 0x07, 0x2f, 0x96, 0x98, 0x71, 0xeb, 0x5b, 0x01, 0x8e, 0xd4, 0x9a, 0x51, 0x12, 0x31, - 0x8c, 0x5e, 0x00, 0xc8, 0xc3, 0x5c, 0x86, 0x39, 0x33, 0xb4, 0x76, 0xa1, 0x53, 0xeb, 0x9d, 0x76, - 0x93, 0x57, 0xde, 0x7c, 0x10, 0xd2, 0x10, 0xf3, 0x7e, 0x71, 0xf3, 0xa3, 0x95, 0x73, 0xaa, 0x41, - 0xb2, 0x66, 0xe8, 0x02, 0xea, 0x97, 0x24, 0xa4, 0x24, 0xc2, 0x11, 0x1f, 0xad, 0x29, 0x36, 0xf2, - 0x6d, 0xad, 0x53, 0x75, 0xb2, 0x9b, 0xe8, 0x19, 0x94, 0x64, 0xc2, 0x46, 0xa1, 0xad, 0x75, 0x6a, - 0xbd, 0xc7, 0xdd, 0x83, 0x5a, 0xba, 0x43, 0xa1, 0xc8, 0x64, 0x14, 0x24, 0xe8, 0x78, 0x19, 0x60, - 0x66, 0x14, 0xef, 0xa1, 0x1d, 0xa1, 0x28, 0x5a, 0x42, 0xe8, 0x1d, 0x3c, 0x0a, 0x31, 0x8f, 0xfd, - 0x89, 0x1b, 0x62, 0xee, 0x4d, 0x3d, 0xee, 0x19, 0x25, 0x19, 0xd7, 0xca, 0xc4, 0x5d, 0x4b, 0xe6, - 0x3a, 0x41, 0xe4, 0x01, 0xc7, 0x61, 0x66, 0x0f, 0xf5, 0xa0, 0xc2, 0xbd, 0x78, 0x26, 0x0c, 0x28, - 0xcb, 0x13, 0x8c, 0xcc, 0x09, 0x23, 0xa5, 0xc9, 0xd0, 0x14, 0x44, 0x2f, 0xa1, 0x8a, 0x57, 0x38, - 0xa4, 0x81, 0x17, 0x33, 0xa3, 0x22, 0xa3, 0x9a, 0x99, 0xa8, 0x41, 0xaa, 0xca, 0xb8, 0xbf, 0x30, - 0xb2, 0xa1, 0xb4, 0x58, 0xe2, 0x78, 0x6d, 0xe8, 0x32, 0xaa, 0x91, 0x89, 0xfa, 0x24, 0x94, 0x37, - 0x1f, 0xaf, 0x54, 0xa1, 0x92, 0xb3, 0x7e, 0x6b, 0x50, 0xdd, 0x7b, 0x85, 0x1a, 0xa0, 0x87, 0x7e, - 0xe4, 0x72, 0x3f, 0xc4, 0x86, 0xd6, 0xd6, 0x3a, 0x05, 0xa7, 0x12, 0xfa, 0xd1, 0xc8, 0x0f, 0xb1, - 0x94, 0xbc, 0x95, 0x92, 0xf2, 0x89, 0xe4, 0xad, 0xa4, 0xf4, 0x14, 0x4e, 0xd9, 0x92, 0x52, 0x12, - 0x73, 0xe6, 0xb2, 0xb9, 0x17, 0x4f, 0xfd, 0x68, 0x26, 0x9b, 0xa2, 0x3b, 0x27, 0xa9, 0x30, 0x4c, - 0xf6, 0xd1, 0x00, 0x5a, 0x7b, 0xf8, 0x8b, 0xcf, 0xe7, 0x64, 0xc9, 0xdd, 0x18, 0xd3, 0xc0, 0x9f, - 0x78, 0xae, 0x9c, 0x00, 0x26, 0x9d, 0xd6, 0x9d, 0x27, 0x29, 0xf6, 0x59, 0x51, 0x8e, 0x82, 0xe4, - 0xd4, 0x30, 0xf4, 0x0a, 0x80, 0xb3, 0xe9, 0xd8, 0x15, 0x85, 0x09, 0x67, 0xc5, 0x68, 0x9d, 0x67, - 0x9d, 0x1d, 0xbe, 0xed, 0x8b, 0xa2, 0xd2, 0xf1, 0x12, 0xb8, 0x58, 0xb3, 0xf7, 0x45, 0xbd, 0x78, - 0x52, 0xb2, 0x6a, 0x50, 0xdd, 0xb7, 0xdd, 0x3a, 0x03, 0xf4, 0x6f, 0x2f, 0xc5, 0x7c, 0x1f, 0xf4, - 0xc7, 0x1a, 0x40, 0x3d, 0x63, 0xfc, 0xff, 0xd9, 0x65, 0x1d, 0xc3, 0xd1, 0x61, 0x27, 0xac, 0x05, - 0xe8, 0x69, 0xae, 0xc8, 0x86, 0x72, 0x62, 0x82, 0x26, 0x1b, 0xf8, 0xe0, 0x6d, 0x49, 0xb0, 0x4c, - 0x0a, 0xf9, 0x87, 0x53, 0x28, 0x64, 0x52, 0xe8, 0x5d, 0x42, 0x51, 0xbe, 0xee, 0x75, 0xf2, 0x9f, - 0x9d, 0xc9, 0x83, 0x3b, 0xdd, 0x6c, 0xdc, 0xa3, 0xa8, 0xdb, 0xdd, 0xbf, 0xd8, 0xfc, 0x32, 0x73, - 0x9b, 0xad, 0xa9, 0xdd, 0x6d, 0x4d, 0xed, 0xe7, 0xd6, 0xd4, 0xbe, 0xee, 0xcc, 0xdc, 0xdd, 0xce, - 0xcc, 0x7d, 0xdf, 0x99, 0xb9, 0x9b, 0xb2, 0xfa, 0xd6, 0x8c, 0xcb, 0xf2, 0x53, 0xf1, 0xfc, 0x4f, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xf1, 0x5f, 0x0a, 0x2f, 0x81, 0x04, 0x00, 0x00, + // 621 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0xdf, 0x6a, 0xdb, 0x3e, + 0x14, 0xc7, 0xe3, 0xfc, 0x6b, 0x72, 0xd2, 0xf4, 0xd7, 0x8a, 0xb6, 0x38, 0xe1, 0x87, 0x1b, 0xbc, + 0x5e, 0x04, 0x36, 0x62, 0xc8, 0x60, 0x8c, 0xed, 0x6a, 0xed, 0xca, 0xd6, 0xb1, 0xc2, 0xe6, 0x14, + 0x06, 0xbd, 0x31, 0x4e, 0xab, 0xa6, 0x06, 0xdb, 0x52, 0x24, 0x99, 0xb5, 0x6f, 0xb1, 0x57, 0x19, + 0xec, 0x21, 0x7a, 0xd9, 0xcb, 0x5d, 0x8d, 0xad, 0x7d, 0x91, 0xa1, 0x23, 0x3b, 0x8b, 0x59, 0x7b, + 0xb3, 0x9b, 0xc4, 0x3a, 0xdf, 0xcf, 0x91, 0xce, 0x3f, 0x09, 0xb6, 0xa2, 0xf4, 0x9c, 0x79, 0xfa, + 0x87, 0x4f, 0x3d, 0xc1, 0x4f, 0x47, 0x5c, 0x30, 0xc5, 0x48, 0x47, 0x5d, 0x84, 0x29, 0x93, 0x23, + 0x2d, 0xf4, 0x7b, 0x52, 0x31, 0x41, 0xbd, 0x38, 0x9c, 0xd2, 0x98, 0x4f, 0x3d, 0x75, 0xc5, 0xa9, + 0x34, 0x5c, 0x7f, 0x73, 0xc6, 0x66, 0x0c, 0x3f, 0x3d, 0xfd, 0x65, 0xac, 0x6e, 0x17, 0x3a, 0x87, + 0xe9, 0x39, 0xf3, 0xe9, 0x3c, 0xa3, 0x52, 0xb9, 0x5f, 0x6b, 0xb0, 0x6a, 0xd6, 0x92, 0xb3, 0x54, + 0x52, 0xf2, 0x0c, 0x00, 0x37, 0x0b, 0x24, 0x55, 0xd2, 0xb6, 0x06, 0xb5, 0x61, 0x67, 0xbc, 0x31, + 0xca, 0x8f, 0x3c, 0x79, 0xaf, 0xa5, 0x09, 0x55, 0x7b, 0xf5, 0xeb, 0x1f, 0x3b, 0x15, 0xbf, 0x1d, + 0xe7, 0x6b, 0x49, 0x76, 0xa1, 0xbb, 0xcf, 0x12, 0xce, 0x52, 0x9a, 0xaa, 0xe3, 0x2b, 0x4e, 0xed, + 0xea, 0xc0, 0x1a, 0xb6, 0xfd, 0xb2, 0x91, 0x3c, 0x81, 0x06, 0x06, 0x6c, 0xd7, 0x06, 0xd6, 0xb0, + 0x33, 0xde, 0x1e, 0x2d, 0xe5, 0x32, 0x9a, 0x68, 0x05, 0x83, 0x31, 0x90, 0xa6, 0x45, 0x16, 0x53, + 0x69, 0xd7, 0xef, 0xa1, 0x7d, 0xad, 0x18, 0x1a, 0x21, 0xf2, 0x16, 0xfe, 0x4b, 0xa8, 0x12, 0xd1, + 0x69, 0x90, 0x50, 0x15, 0x9e, 0x85, 0x2a, 0xb4, 0x1b, 0xe8, 0xb7, 0x53, 0xf2, 0x3b, 0x42, 0xe6, + 0x28, 0x47, 0x70, 0x83, 0xb5, 0xa4, 0x64, 0x23, 0x63, 0x58, 0x51, 0xa1, 0x98, 0xe9, 0x02, 0x34, + 0x71, 0x07, 0xbb, 0xb4, 0xc3, 0xb1, 0xd1, 0xd0, 0xb5, 0x00, 0xc9, 0x73, 0x68, 0xd3, 0x4b, 0x9a, + 0xf0, 0x38, 0x14, 0xd2, 0x5e, 0x41, 0xaf, 0x7e, 0xc9, 0xeb, 0xa0, 0x50, 0xd1, 0xef, 0x0f, 0x4c, + 0x3c, 0x68, 0xcc, 0x33, 0x2a, 0xae, 0xec, 0x16, 0x7a, 0xf5, 0x4a, 0x5e, 0x1f, 0xb5, 0xf2, 0xea, + 0xc3, 0xa1, 0x49, 0x14, 0x39, 0xf7, 0x5b, 0x15, 0xda, 0x8b, 0x5a, 0x91, 0x1e, 0xb4, 0x92, 0x28, + 0x0d, 0x54, 0x94, 0x50, 0xdb, 0x1a, 0x58, 0xc3, 0x9a, 0xbf, 0x92, 0x44, 0xe9, 0x71, 0x94, 0x50, + 0x94, 0xc2, 0x4b, 0x23, 0x55, 0x73, 0x29, 0xbc, 0x44, 0xe9, 0x31, 0x6c, 0xc8, 0x8c, 0x73, 0x26, + 0x94, 0x0c, 0xe4, 0x45, 0x28, 0xce, 0xa2, 0x74, 0x86, 0x4d, 0x69, 0xf9, 0xeb, 0x85, 0x30, 0xc9, + 0xed, 0xe4, 0x00, 0x76, 0x16, 0xf0, 0xe7, 0x48, 0x5d, 0xb0, 0x4c, 0x05, 0x82, 0xf2, 0x38, 0x3a, + 0x0d, 0x03, 0x9c, 0x00, 0x89, 0x95, 0x6e, 0xf9, 0xff, 0x17, 0xd8, 0x27, 0x43, 0xf9, 0x06, 0xc2, + 0xa9, 0x91, 0xe4, 0x05, 0x80, 0x92, 0x67, 0xd3, 0x40, 0x27, 0xa6, 0x2b, 0xab, 0x47, 0x6b, 0xab, + 0x5c, 0xd9, 0xc9, 0xeb, 0x3d, 0x9d, 0x54, 0x31, 0x5e, 0x1a, 0xd7, 0x6b, 0x49, 0x1e, 0x41, 0xb7, + 0x38, 0x71, 0x26, 0x58, 0xc6, 0xb1, 0xc4, 0x6d, 0x7f, 0x35, 0x37, 0xbe, 0xd1, 0x36, 0xb2, 0x0d, + 0xcd, 0x79, 0xc6, 0x44, 0x96, 0x60, 0x29, 0x1b, 0x7e, 0xbe, 0x7a, 0x57, 0x6f, 0xd5, 0xd7, 0x1b, + 0x6e, 0x07, 0xda, 0x8b, 0x99, 0x71, 0x37, 0x81, 0xfc, 0x3d, 0x08, 0xfa, 0x72, 0x2c, 0x35, 0xd7, + 0x3d, 0x80, 0x6e, 0xa9, 0x6b, 0xff, 0x56, 0x6b, 0x77, 0x0d, 0x56, 0x97, 0xdb, 0xe8, 0xce, 0xa1, + 0x55, 0x24, 0x4a, 0x3c, 0x68, 0xe6, 0x15, 0xb4, 0xb0, 0xfb, 0x0f, 0x5e, 0xb5, 0x1c, 0x2b, 0x85, + 0x50, 0x7d, 0x38, 0x84, 0x5a, 0x29, 0x84, 0xf1, 0x3e, 0xd4, 0xf1, 0xb8, 0x97, 0xf9, 0x7f, 0x79, + 0xa0, 0x97, 0x1e, 0x84, 0x7e, 0xef, 0x1e, 0xc5, 0x3c, 0x0d, 0x7b, 0xbb, 0xd7, 0xbf, 0x9c, 0xca, + 0xf5, 0xad, 0x63, 0xdd, 0xdc, 0x3a, 0xd6, 0xcf, 0x5b, 0xc7, 0xfa, 0x72, 0xe7, 0x54, 0x6e, 0xee, + 0x9c, 0xca, 0xf7, 0x3b, 0xa7, 0x72, 0xd2, 0x34, 0x0f, 0xd5, 0xb4, 0x89, 0xef, 0xcc, 0xd3, 0xdf, + 0x01, 0x00, 0x00, 0xff, 0xff, 0xaf, 0x38, 0x92, 0xb8, 0xbe, 0x04, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -681,6 +693,18 @@ func (m *StoreInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Quorum != 0 { + i = encodeVarintRpc(dAtA, i, uint64(m.Quorum)) + i-- + dAtA[i] = 0x40 + } + if len(m.ReplicaGroup) > 0 { + i -= len(m.ReplicaGroup) + copy(dAtA[i:], m.ReplicaGroup) + i = encodeVarintRpc(dAtA, i, uint64(len(m.ReplicaGroup))) + i-- + dAtA[i] = 0x3a + } if len(m.TsdbInfos) > 0 { for iNdEx := len(m.TsdbInfos) - 1; iNdEx >= 0; iNdEx-- { { @@ -983,6 +1007,13 @@ func (m *StoreInfo) Size() (n int) { n += 1 + l + sovRpc(uint64(l)) } } + l = len(m.ReplicaGroup) + if l > 0 { + n += 1 + l + sovRpc(uint64(l)) + } + if m.Quorum != 0 { + n += 1 + sovRpc(uint64(m.Quorum)) + } return n } @@ -1583,6 +1614,57 @@ func (m *StoreInfo) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicaGroup", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ReplicaGroup = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Quorum", wireType) + } + m.Quorum = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Quorum |= int32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 9f0db3709da..14561efc80e 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -64,6 +64,18 @@ message StoreInfo { // TSDBInfos holds metadata for all TSDBs exposed by the store. repeated TSDBInfo tsdb_infos = 6 [(gogoproto.nullable) = false]; + + // Replica topology hints for the QUORUM partial response strategy. + // Stores with the same replica_group value hold replicated data. + // When set, the query layer can tolerate failures as long as enough + // replicas in the group are healthy (determined by quorum). + // If empty, the store is treated as a singleton store. + string replica_group = 7; + + // Minimum number of healthy stores required per replica group. + // Only meaningful when replica_group is set. + // If 0 or not set, the store is treated as a singleton store. + int32 quorum = 8; } // RulesInfo holds the metadata related to Rules API exposed by the component. diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index dcb4d80b3bd..b1bc9df7389 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -400,7 +400,7 @@ func (p *QueryOptions) AddTo(values url.Values) error { var partialResponseValue string switch p.PartialResponseStrategy { - case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA: + case storepb.PartialResponseStrategy_WARN, storepb.PartialResponseStrategy_GROUP_REPLICA, storepb.PartialResponseStrategy_QUORUM: partialResponseValue = strconv.FormatBool(true) case storepb.PartialResponseStrategy_ABORT: partialResponseValue = strconv.FormatBool(false) diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 02c9f096b4d..50e4ee8706d 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -10,7 +10,6 @@ import ( "math" "regexp" "sort" - "strings" "sync" "time" "unicode/utf8" @@ -155,15 +154,13 @@ type endpointSetNodeCollector struct { mtx sync.Mutex storeNodes map[string]map[string]int storePerExtLset map[string]int - storeNodesAddr map[string]map[string]int - storeNodesKeys map[string]map[string]int + storesByQuorum map[string]int logger log.Logger connectionsDesc *prometheus.Desc labels []string - connectionsWithAddr *prometheus.Desc - connectionsWithKeys *prometheus.Desc + endpointGroupsDesc *prometheus.Desc } func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointSetNodeCollector { @@ -172,23 +169,19 @@ func newEndpointSetNodeCollector(logger log.Logger, labels ...string) *endpointS } desc := "Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier." return &endpointSetNodeCollector{ - logger: logger, - storeNodes: map[string]map[string]int{}, + logger: logger, + storeNodes: map[string]map[string]int{}, + storesByQuorum: map[string]int{}, connectionsDesc: prometheus.NewDesc( "thanos_store_nodes_grpc_connections", desc, labels, nil, ), labels: labels, - connectionsWithAddr: prometheus.NewDesc( - "thanos_store_nodes_grpc_connections_addr", - desc, - []string{string(ReplicaKey), "addr"}, nil, - ), - connectionsWithKeys: prometheus.NewDesc( - "thanos_store_nodes_grpc_connections_keys", - desc, - []string{string(GroupKey), string(ReplicaKey)}, nil, + endpointGroupsDesc: prometheus.NewDesc( + "thanos_query_endpoint_groups", + "Number of discovered store API endpoints by quorum label value. Endpoints without quorum label have quorum=0.", + []string{"quorum"}, nil, ), } } @@ -208,8 +201,7 @@ func truncateExtLabels(s string, threshold int) string { } func (c *endpointSetNodeCollector) Update( nodes map[string]map[string]int, - nodesAddr map[string]map[string]int, - nodesKeys map[string]map[string]int, + storesByQuorum map[string]int, ) { storeNodes := make(map[string]map[string]int, len(nodes)) storePerExtLset := map[string]int{} @@ -227,14 +219,12 @@ func (c *endpointSetNodeCollector) Update( defer c.mtx.Unlock() c.storeNodes = storeNodes c.storePerExtLset = storePerExtLset - c.storeNodesAddr = nodesAddr - c.storeNodesKeys = nodesKeys + c.storesByQuorum = storesByQuorum } func (c *endpointSetNodeCollector) Describe(ch chan<- *prometheus.Desc) { ch <- c.connectionsDesc - ch <- c.connectionsWithAddr - ch <- c.connectionsWithKeys + ch <- c.endpointGroupsDesc } func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) { @@ -261,21 +251,11 @@ func (c *endpointSetNodeCollector) Collect(ch chan<- prometheus.Metric) { } } } - for replicaKey, occurrencesPerAddr := range c.storeNodesAddr { - for addr, occurrences := range occurrencesPerAddr { - ch <- prometheus.MustNewConstMetric( - c.connectionsWithAddr, prometheus.GaugeValue, - float64(occurrences), - replicaKey, addr) - } - } - for groupKey, occurrencesPerReplicaKey := range c.storeNodesKeys { - for replicaKeys, occurrences := range occurrencesPerReplicaKey { - ch <- prometheus.MustNewConstMetric( - c.connectionsWithKeys, prometheus.GaugeValue, - float64(occurrences), - groupKey, replicaKeys) - } + for quorum, count := range c.storesByQuorum { + ch <- prometheus.MustNewConstMetric( + c.endpointGroupsDesc, prometheus.GaugeValue, + float64(count), + quorum) } } @@ -443,14 +423,7 @@ func (e *EndpointSet) Update(ctx context.Context) { // Update stats. stats := newEndpointAPIStats() - statsAddr := make(map[string]map[string]int) - statsKeys := make(map[string]map[string]int) - bumpCounter := func(key1, key2 string, mp map[string]map[string]int) { - if _, ok := mp[key1]; !ok { - mp[key1] = make(map[string]int) - } - mp[key1][key2]++ - } + storesByQuorum := make(map[string]int) for addr, er := range e.endpoints { if !er.isQueryable() { continue @@ -466,11 +439,15 @@ func (e *EndpointSet) Update(ctx context.Context) { "address", addr, "extLset", extLset, "duplicates", fmt.Sprintf("%v", stats[component.Sidecar.String()][extLset]+stats[component.Rule.String()][extLset]+1)) } stats[er.ComponentType().String()][extLset]++ - bumpCounter(er.replicaKey, strings.Split(addr, ":")[0], statsAddr) - bumpCounter(er.groupKey, er.replicaKey, statsKeys) + + // Track endpoints by quorum value for thanos_query_endpoint_groups metric. + // Quorum is now a first-class field from StoreInfo. + ri := er.ReplicaInfo() + quorumValue := fmt.Sprintf("%d", ri.Quorum) + storesByQuorum[quorumValue]++ } - e.endpointsMetric.Update(stats, statsAddr, statsKeys) + e.endpointsMetric.Update(stats, storesByQuorum) } func (e *EndpointSet) updateEndpoint(ctx context.Context, spec *GRPCEndpointSpec, er *endpointRef) { @@ -545,8 +522,8 @@ func (e *EndpointSet) GetStoreClients() []store.Client { StoreClient: storepb.NewStoreClient(er.cc), addr: er.addr, metadata: er.metadata, - groupKey: er.GroupKey(), - replicaKey: er.ReplicaKey(), + groupKey: er.groupKey, + replicaKey: er.replicaKey, status: er.status, }) er.mtx.RUnlock() @@ -675,13 +652,44 @@ type endpointRef struct { } func (er *endpointRef) GroupKey() string { + er.mtx.RLock() + defer er.mtx.RUnlock() return er.groupKey } func (er *endpointRef) ReplicaKey() string { + er.mtx.RLock() + defer er.mtx.RUnlock() return er.replicaKey } +// ReplicaInfo returns replica topology hints used by the QUORUM partial response strategy. +// It unifies DNS-based grouping (legacy) with first-class StoreInfo fields, +// preferring StoreInfo fields when available. +func (er *endpointRef) ReplicaInfo() store.ReplicaInfo { + er.mtx.RLock() + defer er.mtx.RUnlock() + + // Prefer first-class fields from StoreInfo if available + if er.metadata != nil && er.metadata.Store != nil { + if rg := er.metadata.Store.ReplicaGroup; rg != "" { + return store.ReplicaInfo{ + Group: rg, + Replica: er.replicaKey, // Use DNS-based replica key for identification + Quorum: int(er.metadata.Store.Quorum), + } + } + } + + // Fall back to DNS-based grouping (legacy) + // Quorum=0 means singleton store semantics. + return store.ReplicaInfo{ + Group: er.groupKey, + Replica: er.replicaKey, + Quorum: 0, + } +} + // newEndpointRef creates a new endpointRef with a gRPC channel to the given the IP address. // The call to newEndpointRef will return an error if establishing the channel fails. func (e *EndpointSet) newEndpointRef(spec *GRPCEndpointSpec) (*endpointRef, error) { @@ -757,7 +765,7 @@ func (er *endpointRef) isQueryable() bool { er.mtx.RLock() defer er.mtx.RUnlock() - return er.isStrict || er.status.LastError == nil + return er.isStrict || er.ignoreError || er.status.LastError == nil } func (er *endpointRef) ComponentType() component.Component { diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 04a4033a0d2..9f297f9f519 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -243,18 +243,14 @@ func TestTruncateExtLabels(t *testing.T) { func TestEndpointSetUpdate(t *testing.T) { t.Parallel() + const metricsMetaEndpointGroups = ` + # HELP thanos_query_endpoint_groups Number of discovered store API endpoints by quorum label value. Endpoints without quorum label have quorum=0. + # TYPE thanos_query_endpoint_groups gauge + ` const metricsMeta = ` # HELP thanos_store_nodes_grpc_connections Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier. # TYPE thanos_store_nodes_grpc_connections gauge ` - const metricsMetaAddr = ` - # HELP thanos_store_nodes_grpc_connections_addr Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier. - # TYPE thanos_store_nodes_grpc_connections_addr gauge - ` - const metricsMetaKeys = ` - # HELP thanos_store_nodes_grpc_connections_keys Number of gRPC connection to Store APIs. Opened connection means healthy store APIs available for Querier. - # TYPE thanos_store_nodes_grpc_connections_keys gauge - ` testCases := []struct { name string endpoints []testEndpointMeta @@ -279,15 +275,12 @@ func TestEndpointSetUpdate(t *testing.T) { connLabels: []string{"store_type"}, expectedEndpoints: 1, - expectedConnMetrics: metricsMeta + - ` - thanos_store_nodes_grpc_connections{store_type="sidecar"} 1 - ` + metricsMetaAddr + + expectedConnMetrics: metricsMetaEndpointGroups + ` - thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1 - ` + metricsMetaKeys + + thanos_query_endpoint_groups{quorum="0"} 1 + ` + metricsMeta + ` - thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1 + thanos_store_nodes_grpc_connections{store_type="sidecar"} 1 `, }, { @@ -339,15 +332,12 @@ func TestEndpointSetUpdate(t *testing.T) { strict: true, connLabels: []string{"store_type"}, expectedEndpoints: 1, - expectedConnMetrics: metricsMeta + - ` - thanos_store_nodes_grpc_connections{store_type="sidecar"} 1 - ` + metricsMetaAddr + + expectedConnMetrics: metricsMetaEndpointGroups + ` - thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1 - ` + metricsMetaKeys + + thanos_query_endpoint_groups{quorum="0"} 1 + ` + metricsMeta + ` - thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1 + thanos_store_nodes_grpc_connections{store_type="sidecar"} 1 `, }, { @@ -369,14 +359,10 @@ func TestEndpointSetUpdate(t *testing.T) { }, }, expectedEndpoints: 1, - expectedConnMetrics: metricsMeta + ` + expectedConnMetrics: metricsMetaEndpointGroups + ` + thanos_query_endpoint_groups{quorum="0"} 1 + ` + metricsMeta + ` thanos_store_nodes_grpc_connections{external_labels="{lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val\", lbl=\"val}",store_type="sidecar"} 1 - ` + metricsMetaAddr + - ` - thanos_store_nodes_grpc_connections_addr{addr="127.0.0.1",replica_key=""} 1 - ` + metricsMetaKeys + - ` - thanos_store_nodes_grpc_connections_keys{group_key="",replica_key=""} 1 `, }, } @@ -401,6 +387,31 @@ func TestEndpointSetUpdate(t *testing.T) { } } +func TestEndpointSet_IgnoreErrorMakesEndpointQueryable(t *testing.T) { + t.Parallel() + + endpoints, err := startTestEndpoints([]testEndpointMeta{ + { + err: fmt.Errorf("endpoint unavailable"), + InfoResponse: sidecarInfo, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return labelpb.ZLabelSetsFromPromLabels(labels.FromStrings("addr", addr, "a", "b")) + }, + }, + }) + testutil.Ok(t, err) + t.Cleanup(endpoints.Close) + + spec := NewGRPCEndpointSpecWithReplicaKey("rep0", endpoints.orderAddrs[0], false, true, testGRPCOpts...) + endpointSet := NewEndpointSet(time.Now, nil, nil, func() (specs []*GRPCEndpointSpec) { + return []*GRPCEndpointSpec{spec} + }, testGRPCOpts, time.Minute, time.Second) + t.Cleanup(endpointSet.Close) + endpointSet.Update(context.Background()) + + testutil.Equals(t, 1, len(endpointSet.GetStoreClients())) +} + func TestEndpointSetUpdate_DuplicateSpecs(t *testing.T) { t.Parallel() diff --git a/pkg/query/querier.go b/pkg/query/querier.go index 64bab65a022..c2f5fab748c 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -59,6 +59,7 @@ type QueryableCreator func( type Options struct { GroupReplicaPartialResponseStrategy bool + QuorumPartialResponseStrategy bool DeduplicationFunc string RewriteAggregationLabelStrategy string RewriteAggregationLabelTo string @@ -226,7 +227,9 @@ func newQuerierWithOpts( } partialResponseStrategy := storepb.PartialResponseStrategy_ABORT - if opts.GroupReplicaPartialResponseStrategy { + if opts.QuorumPartialResponseStrategy { + partialResponseStrategy = storepb.PartialResponseStrategy_QUORUM + } else if opts.GroupReplicaPartialResponseStrategy { partialResponseStrategy = storepb.PartialResponseStrategy_GROUP_REPLICA } else if partialResponse { partialResponseStrategy = storepb.PartialResponseStrategy_WARN @@ -257,6 +260,15 @@ func (q *querier) isDedupEnabled() bool { return q.deduplicate && len(q.replicaLabels) > 0 } +// getWithoutReplicaLabels returns labels to exclude from series/label APIs. +// This is used for deduplication (replica labels) only. +func (q *querier) getWithoutReplicaLabels() []string { + if q.isDedupEnabled() { + return q.replicaLabels + } + return nil +} + type seriesServer struct { // This field just exist to pseudo-implement the unused methods of the interface. storepb.Store_SeriesServer @@ -411,9 +423,10 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . PartialResponseStrategy: q.partialResponseStrategy, SkipChunks: q.skipChunks, } - if q.isDedupEnabled() { - // Soft ask to sort without replica labels and push them at the end of labelset. - req.WithoutReplicaLabels = q.replicaLabels + // Soft ask to sort without replica labels and push them at the end of labelset. + // This is used for deduplication (replica labels) only. + if labels := q.getWithoutReplicaLabels(); len(labels) > 0 { + req.WithoutReplicaLabels = labels } if err := q.proxy.Series(&req, resp); err != nil { @@ -471,8 +484,8 @@ func (q *querier) LabelValues(ctx context.Context, name string, hints *storage.L Limit: int64(hints.Limit), } - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels + if labels := q.getWithoutReplicaLabels(); len(labels) > 0 { + req.WithoutReplicaLabels = labels } resp, err := q.proxy.LabelValues(ctx, req) @@ -514,8 +527,8 @@ func (q *querier) LabelNames(ctx context.Context, hints *storage.LabelHints, mat Limit: int64(hints.Limit), } - if q.isDedupEnabled() { - req.WithoutReplicaLabels = q.replicaLabels + if labels := q.getWithoutReplicaLabels(); len(labels) > 0 { + req.WithoutReplicaLabels = labels } resp, err := q.proxy.LabelNames(ctx, req) diff --git a/pkg/query/querier_test.go b/pkg/query/querier_test.go index 8c77ee93c50..d1a6530588d 100644 --- a/pkg/query/querier_test.go +++ b/pkg/query/querier_test.go @@ -1279,3 +1279,51 @@ func storeSeriesResponse(t testing.TB, lset labels.Labels, smplChunks ...[]sampl } return storepb.NewSeriesResponse(&s) } + +func TestQuerier_GetWithoutReplicaLabels(t *testing.T) { + t.Parallel() + + // Note: group/quorum labels for GROUP_REPLICA strategy are no longer included in + // WithoutReplicaLabels because they are configured as info-only labels on receivers, + // meaning they are only exposed via InfoAPI and NOT merged into series responses. + for _, tc := range []struct { + name string + deduplicate bool + replicaLabels []string + expectedLabels []string + }{ + { + name: "no dedup - empty result", + deduplicate: false, + replicaLabels: []string{"replica"}, + expectedLabels: nil, + }, + { + name: "dedup enabled - only replica labels", + deduplicate: true, + replicaLabels: []string{"replica", "prometheus_replica"}, + expectedLabels: []string{"replica", "prometheus_replica"}, + }, + { + name: "dedup enabled but no replica labels - empty result", + deduplicate: true, + replicaLabels: []string{}, + expectedLabels: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + q := &querier{ + deduplicate: tc.deduplicate, + replicaLabels: tc.replicaLabels, + } + + got := q.getWithoutReplicaLabels() + + if tc.expectedLabels == nil { + testutil.Equals(t, 0, len(got)) + } else { + testutil.Equals(t, tc.expectedLabels, got) + } + }) + } +} diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index a8749441f58..262fcb7e405 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -54,6 +54,8 @@ type MultiTSDB struct { tsdbOpts *tsdb.Options tenantLabelName string labels labels.Labels + replicaGroup string // Replica group identifier used by QUORUM partial response strategy + quorum int32 // Minimum healthy stores required per group bucket objstore.Bucket mtx *sync.RWMutex @@ -115,6 +117,15 @@ func WithMatchersCache(cache storecache.MatchersCache) MultiTSDBOption { } } +// WithReplicaGroup sets the replica group identifier and quorum for GROUP_REPLICA +// partial response strategy. Stores with the same replica_group value hold replicated data. +func WithReplicaGroup(replicaGroup string, quorum int) MultiTSDBOption { + return func(s *MultiTSDB) { + s.replicaGroup = replicaGroup + s.quorum = int32(quorum) + } +} + // WithCompactionDelayInterval sets the interval for staggering head compaction across tenants. // Tenant N gets delay of N*interval (mod block duration). func WithCompactionDelayInterval(interval time.Duration) MultiTSDBOption { @@ -306,6 +317,12 @@ func (l *localClient) ReplicaKey() string { return "" } +// ReplicaInfo returns replica topology hints used by the QUORUM partial response strategy. +// It delegates to the underlying TSDBStore which has the first-class replica_group and quorum fields. +func (l *localClient) ReplicaInfo() store.ReplicaInfo { + return l.store.ReplicaInfo() +} + func (l *localClient) Matches(matchers []*labels.Matcher) bool { return l.store.Matches(matchers) } @@ -881,7 +898,11 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant if t.matcherCache != nil { options = append(options, store.WithMatcherCacheInstance(t.matcherCache)) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, options...), s, ship, exemplars.NewTSDB(s, lset)) + tsdbStore := store.NewTSDBStore(logger, s, component.Receive, lset, options...) + if t.replicaGroup != "" { + tsdbStore.SetReplicaInfo(t.replicaGroup, t.quorum) + } + tenant.set(tsdbStore, s, ship, exemplars.NewTSDB(s, lset)) t.addTenantLocked(tenantID, tenant) // need to update the client list once store is ready & client != nil level.Info(logger).Log("msg", "TSDB is now ready") return nil diff --git a/pkg/rules/rulespb/custom_test.go b/pkg/rules/rulespb/custom_test.go index 9c60e5173f6..101b5ef5e45 100644 --- a/pkg/rules/rulespb/custom_test.go +++ b/pkg/rules/rulespb/custom_test.go @@ -162,7 +162,7 @@ func TestJSONUnmarshalMarshal(t *testing.T) { }, }, }, - expectedErr: errors.New("failed to unmarshal \"asdfsdfsdfsd\" as 'partial_response_strategy'. Possible values are ABORT,GROUP_REPLICA,WARN"), + expectedErr: errors.New("failed to unmarshal \"asdfsdfsdfsd\" as 'partial_response_strategy'. Possible values are ABORT,GROUP_REPLICA,QUORUM,WARN"), }, { name: "one valid group with 1 alerting rule containing no alerts.", diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 434cb220e9b..5433362c02c 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -49,6 +49,53 @@ const metricNameLabel = "__name__" // This can happen with Query servers trees and external labels. var ErrorNoStoresMatched = errors.New("No StoreAPIs matched for this query") +// ReplicaInfo is an alias to storepb.ReplicaInfo for convenience. +// It contains replica topology hints used by the QUORUM partial response strategy. +type ReplicaInfo = storepb.ReplicaInfo + +type trackedRespSet struct { + respSet + store Client + group string + singleton bool + groupSucceeded map[string]int + storeFailed map[Client]bool + closed bool +} + +func (t *trackedRespSet) Close() { + if t.closed { + return + } + t.closed = true + if !t.singleton && !t.storeFailed[t.store] { + t.groupSucceeded[t.group]++ + } + t.respSet.Close() +} + +type warnAttributingStream struct { + h interface{ Next() bool; At() *storepb.SeriesResponse; Winner() respSet } + warnSrc map[*storepb.SeriesResponse]respSet +} + +func newWarnAttributingStream(h interface{ Next() bool; At() *storepb.SeriesResponse; Winner() respSet }) *warnAttributingStream { + return &warnAttributingStream{h: h, warnSrc: map[*storepb.SeriesResponse]respSet{}} +} + +func (w *warnAttributingStream) Next() bool { + if !w.h.Next() { + return false + } + resp := w.h.At() + if resp != nil && resp.GetWarning() != "" { + w.warnSrc[resp] = w.h.Winner() + } + return true +} + +func (w *warnAttributingStream) At() *storepb.SeriesResponse { return w.h.At() } + // Client holds meta information about a store. type Client interface { // StoreClient to access the store. @@ -77,7 +124,7 @@ type Client interface { // represents a local client (server-as-client) and has no remote address. Addr() (addr string, isLocalClient bool) - // ReplicaKey returns replica name of the store client. A replica consists of a set of endpoints belong to the + // ReplicaKey returns replica name of the store client. A replica consists of a set of endpoints belonging to the // same replica. E.g, "pantheon-db-rep0", "pantheon-db-rep1", "long-range-store". ReplicaKey() string @@ -86,6 +133,11 @@ type Client interface { // "long-range-store" has only one replica, "long-range-store". GroupKey() string + // ReplicaInfo returns replica topology hints used by the QUORUM partial response strategy. + // Implementations should unify DNS-based grouping (legacy) with first-class StoreInfo fields, + // preferring StoreInfo fields when available. + ReplicaInfo() ReplicaInfo + // Matches returns true if provided label matchers are allowed in the store. Matches(matches []*labels.Matcher) bool } @@ -461,11 +513,30 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. stores []Client storeLabelSets []labels.Labels ) - // groupReplicaStores[groupKey][replicaKey] = number of stores with the groupKey and replicaKey + strategy := originalRequest.PartialResponseStrategy + + // QUORUM strategy (first-class StoreInfo replica_group/quorum): + // - groupStores[group] = total stores in group (quorum>0 only) + // - groupQuorum[group] = quorum requirement for group + // - groupFailed[group] = failed stores in group + // - groupSucceeded[group] = stores that completed without errors + // - singletonStores = stores with quorum=0 (or empty group) must succeed + // Note: quorum is evaluated over the stores returned by matching/discovery (i.e. only "attempted" stores). + // A missing store is only problematic for correctness if it is expected to be a singleton store. + groupStores := make(map[string]int) + groupQuorum := make(map[string]int) + groupFailed := make(map[string]int) + groupSucceeded := make(map[string]int) + singletonStores := make(map[Client]bool) + storeGroup := make(map[Client]string) + storeFailed := make(map[Client]bool) + totalFailedStores := 0 + + // GROUP_REPLICA strategy (legacy DNS-based grouping): + // groupReplicaStores[groupKey][replicaKey] = number of stores + // failedStores[groupKey][replicaKey] = number of failures groupReplicaStores := make(map[string]map[string]int) - // failedStores[groupKey][replicaKey] = number of store failures failedStores := make(map[string]map[string]int) - totalFailedStores := 0 bumpCounter := func(key1, key2 string, mp map[string]map[string]int) { if _, ok := mp[key1]; !ok { mp[key1] = make(map[string]int) @@ -481,7 +552,27 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. s.metrics.storesPerQueryAfterEELFiltering.Set(float64(len(stores))) for _, st := range stores { - bumpCounter(st.GroupKey(), st.ReplicaKey(), groupReplicaStores) + switch strategy { + case storepb.PartialResponseStrategy_GROUP_REPLICA: + bumpCounter(st.GroupKey(), st.ReplicaKey(), groupReplicaStores) + case storepb.PartialResponseStrategy_QUORUM: + ri := st.ReplicaInfo() + if ri.Quorum <= 0 || ri.Group == "" { + singletonStores[st] = true + continue + } + storeGroup[st] = ri.Group + groupStores[ri.Group]++ + if prev, ok := groupQuorum[ri.Group]; ok && prev != ri.Quorum { + // Be conservative if misconfigured: require the highest quorum we see. + if ri.Quorum > prev { + groupQuorum[ri.Group] = ri.Quorum + } + level.Warn(reqLogger).Log("msg", "inconsistent quorum value for replica group", "group", ri.Group, "seen", ri.Quorum, "kept", groupQuorum[ri.Group]) + } else if !ok { + groupQuorum[ri.Group] = ri.Quorum + } + } } if len(stores) == 0 { level.Debug(reqLogger).Log("err", ErrorNoStoresMatched, "stores", strings.Join(storeDebugMsgs, ";")) @@ -503,7 +594,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, } - if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA && !s.forwardPartialStrategy { + if (strategy == storepb.PartialResponseStrategy_GROUP_REPLICA || strategy == storepb.PartialResponseStrategy_QUORUM) && !s.forwardPartialStrategy { // Do not forward this field as it might cause data loss. r.PartialResponseDisabled = true r.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT @@ -514,35 +605,66 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. storeResponses := make([]respSet, 0, len(stores)) - checkGroupReplicaErrors := func(st Client, err error) error { - if len(failedStores[st.GroupKey()]) > 1 { + checkGroupReplicaErrors := func(group, replica string, err error) error { + if len(failedStores[group]) > 1 { msg := "Multiple replicas have failures for the same group" - group := st.GroupKey() replicas := fmt.Sprintf("%+v", failedStores[group]) - level.Error(reqLogger).Log( - "msg", msg, - "group", group, - "replicas", replicas, - ) + level.Error(reqLogger).Log("msg", msg, "group", group, "replicas", replicas) return fmt.Errorf("%s group=%s replicas=%s: %w", msg, group, replicas, err) } - if len(groupReplicaStores[st.GroupKey()]) == 1 && failedStores[st.GroupKey()][st.ReplicaKey()] > 1 { + if len(groupReplicaStores[group]) == 1 && failedStores[group][replica] > 1 { msg := "A group with single replica has multiple failures" - group := st.GroupKey() replicas := fmt.Sprintf("%+v", failedStores[group]) - level.Error(reqLogger).Log( - "msg", msg, - "group", group, - "replicas", replicas, - ) + level.Error(reqLogger).Log("msg", msg, "group", group, "replicas", replicas) return fmt.Errorf("%s group=%s replicas=%s: %w", msg, group, replicas, err) } return nil } + markStoreFailed := func(st Client) bool { + if storeFailed[st] { + return false + } + storeFailed[st] = true + return true + } + + checkQuorumErrors := func(st Client, err error) error { + if singletonStores[st] { + addr, _ := st.Addr() + msg := "Singleton store failed" + level.Error(reqLogger).Log("msg", msg, "store", addr) + return fmt.Errorf("%s store=%s: %w", msg, addr, err) + } + group := storeGroup[st] + if group == "" { + group = st.ReplicaInfo().Group + } + if markStoreFailed(st) { + groupFailed[group]++ + } + healthy := groupStores[group] - groupFailed[group] + requiredQuorum := groupQuorum[group] + if healthy < requiredQuorum { + msg := "Replica group does not meet quorum requirement" + level.Error(reqLogger).Log("msg", msg, "group", group, "healthy", healthy, "quorum", requiredQuorum, "total", groupStores[group], "failed", groupFailed[group], "successful", groupSucceeded[group]) + return fmt.Errorf("%s group=%s healthy=%d quorum=%d: %w", msg, group, healthy, requiredQuorum, err) + } + return nil + } + logGroupReplicaErrors := func() { - if len(failedStores) > 0 { + if strategy == storepb.PartialResponseStrategy_QUORUM && len(groupFailed) > 0 { + level.Warn(s.logger).Log("msg", "Group/quorum errors", + "mode", "quorum-based", + "group_failures", fmt.Sprintf("%+v", groupFailed), + "total_failed_stores", totalFailedStores, + ) + s.metrics.failedStoresPerQuery.Set(float64(totalFailedStores)) + } + if strategy == storepb.PartialResponseStrategy_GROUP_REPLICA && len(failedStores) > 0 { level.Warn(s.logger).Log("msg", "Group/replica errors", + "mode", "dns-based", "errors", fmt.Sprintf("%+v", failedStores), "total_failed_stores", totalFailedStores, ) @@ -609,15 +731,32 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. grpcErrorCode = extractGRPCCode(err) } - level.Warn(s.logger).Log("msg", "Store failure", "group", st.GroupKey(), "replica", st.ReplicaKey(), "err", err) - s.metrics.storeFailureCount.WithLabelValues(st.GroupKey(), st.ReplicaKey()).Inc() - bumpCounter(st.GroupKey(), st.ReplicaKey(), failedStores) totalFailedStores++ - if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { - if checkGroupReplicaErrors(st, err) != nil { + switch strategy { + case storepb.PartialResponseStrategy_GROUP_REPLICA: + group, replica := st.GroupKey(), st.ReplicaKey() + level.Warn(s.logger).Log("msg", "Store failure", "group", group, "replica", replica, "err", err) + s.metrics.storeFailureCount.WithLabelValues(group, replica).Inc() + bumpCounter(group, replica, failedStores) + if checkGroupReplicaErrors(group, replica, err) != nil { return err } continue + case storepb.PartialResponseStrategy_QUORUM: + ri := st.ReplicaInfo() + if singletonStores[st] || ri.Quorum <= 0 || ri.Group == "" { + group, replica := st.GroupKey(), st.ReplicaKey() + level.Warn(s.logger).Log("msg", "Singleton store failure", "group", group, "replica", replica, "err", err) + s.metrics.storeFailureCount.WithLabelValues(group, replica).Inc() + return err + } + addr, _ := st.Addr() + level.Warn(s.logger).Log("msg", "Store failure", "group", storeGroup[st], "store", addr, "err", err) + // Note: storeFailureCount uses (groupKey, replicaKey). In quorum mode group is InfoAPI-based. + if qerr := checkQuorumErrors(st, err); qerr != nil { + return qerr + } + continue } level.Error(reqLogger).Log("err", err) if !originalRequest.PartialResponseDisabled || originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_WARN { @@ -630,23 +769,41 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } } + if strategy == storepb.PartialResponseStrategy_QUORUM { + respSet = &trackedRespSet{ + respSet: respSet, + store: st, + group: storeGroup[st], + singleton: singletonStores[st], + groupSucceeded: groupSucceeded, + storeFailed: storeFailed, + } + } storeResponses = append(storeResponses, respSet) defer respSet.Close() } level.Debug(reqLogger).Log("msg", "Series: started fanout streams", "num_stores", len(stores), "status", strings.Join(storeDebugMsgs, " | ")) - var respHeap seriesStream = NewProxyResponseLoserTree(storeResponses...) + rawHeap := NewProxyResponseLoserTree(storeResponses...) + var warnStream *warnAttributingStream + var respHeap seriesStream = rawHeap + if strategy == storepb.PartialResponseStrategy_QUORUM { + warnStream = newWarnAttributingStream(rawHeap) + respHeap = warnStream + } if s.enableDedup { respHeap = NewResponseDeduplicatorInternal(respHeap, s.quorumChunkDedup) } i := 0 var firstWarning *string + limited := false for respHeap.Next() { i++ seriesCount = i // Update our tracking variable if r.Limit > 0 && i > int(r.Limit) { + limited = true break } resp := respHeap.At() @@ -666,9 +823,28 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } level.Error(s.logger).Log("msg", "Store failure with warning", "warning", warning) - // Don't have group/replica keys here, so we can't attribute the warning to a specific store. - s.metrics.storeFailureCount.WithLabelValues("", "").Inc() - if originalRequest.PartialResponseStrategy == storepb.PartialResponseStrategy_GROUP_REPLICA { + if strategy == storepb.PartialResponseStrategy_QUORUM && warnStream != nil { + if strings.Contains(resp.GetWarning(), "The specified key does not exist") || strings.Contains(resp.GetWarning(), "The specified blob does not exist") { + level.Warn(s.logger).Log("msg", "Ignore 'the specified key/blob does not exist' error from Store") + s.metrics.missingBlockFileErrorCount.Inc() + } else { + if src, ok := warnStream.warnSrc[resp]; ok { + if tr, ok := src.(*trackedRespSet); ok { + if markStoreFailed(tr.store) { + groupFailed[tr.group]++ + totalFailedStores++ + } + if qerr := checkQuorumErrors(tr.store, errors.New(warning)); qerr != nil { + return qerr + } + } + } + } + } else { + // Don't have group/replica keys here, so we can't attribute the warning to a specific store. + s.metrics.storeFailureCount.WithLabelValues("", "").Inc() + } + if strategy == storepb.PartialResponseStrategy_GROUP_REPLICA { // The first error message is from AWS S3 and the second one is from Azure Blob Storage. if strings.Contains(resp.GetWarning(), "The specified key does not exist") || strings.Contains(resp.GetWarning(), "The specified blob does not exist") { level.Warn(s.logger).Log("msg", "Ignore 'the specified key/blob does not exist' error from Store") @@ -700,6 +876,13 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. } } + if strategy == storepb.PartialResponseStrategy_QUORUM && !limited { + for group, quorum := range groupQuorum { + if groupSucceeded[group] < quorum { + return fmt.Errorf("Replica group does not meet quorum requirement group=%s successful=%d quorum=%d", group, groupSucceeded[group], quorum) + } + } + } return nil } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index 5c30008e219..1b6b921ffa7 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -643,6 +643,256 @@ func TestProxyStore_Series(t *testing.T) { }, expectedErr: errors.New("fetch series for {ext=\"1\"} : error!"), }, + { + title: "quorum strategy; quorum met", + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica1", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica2", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica3", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseStrategy: storepb.PartialResponseStrategy_QUORUM, + }, + expectedSeries: []rawSeries{ + { + lset: labels.FromStrings("a", "b"), + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 0, + }, + { + title: "quorum strategy; multiple warnings tolerated when quorum met", + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + Name: "store1", + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica1", + ReplicaGroupStr: "rg", + QuorumValue: 1, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + injectedErrorIndex: 0, + injectedError: errors.New("error!"), + }, + Name: "store2", + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica2", + ReplicaGroupStr: "rg", + QuorumValue: 1, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + injectedErrorIndex: 0, + injectedError: errors.New("error!"), + }, + Name: "store3", + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica3", + ReplicaGroupStr: "rg", + QuorumValue: 1, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseStrategy: storepb.PartialResponseStrategy_QUORUM, + }, + expectedSeries: []rawSeries{ + { + lset: labels.FromStrings("a", "b"), + chunks: [][]sample{{{1, 1}, {2, 2}, {3, 3}}}, + }, + }, + expectedWarningsLen: 2, + }, + { + title: "quorum strategy; quorum not met", + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica1", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica2", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica3", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseStrategy: storepb.PartialResponseStrategy_QUORUM, + }, + expectedErr: errors.New("Replica group does not meet quorum requirement group=rg healthy=1 quorum=2: fetch series for {ext=\"1\"} : error!"), + }, + { + title: "quorum strategy; quorum not met when one store is missing from discovery", + // Equivalent to a 3-replica group with quorum=2 where only 2 stores are discovered/attempted. + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica1", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica2", + ReplicaGroupStr: "rg", + QuorumValue: 2, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseStrategy: storepb.PartialResponseStrategy_QUORUM, + }, + expectedErr: errors.New("Replica group does not meet quorum requirement group=rg healthy=1 quorum=2: fetch series for {ext=\"1\"} : error!"), + }, + { + title: "quorum strategy; singleton store failure aborts", + storeAPIs: []Client{ + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespError: errors.New("error!"), + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "singleton-group", + ReplicaKeyStr: "singleton-replica", + }, + &storetestutil.TestClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{1, 1}, {2, 2}, {3, 3}}), + }, + }, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + MinTime: 1, + MaxTime: 300, + GroupKeyStr: "legacy-group", + ReplicaKeyStr: "replica1", + ReplicaGroupStr: "rg", + QuorumValue: 1, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{{Name: "ext", Value: "1", Type: storepb.LabelMatcher_EQ}}, + PartialResponseStrategy: storepb.PartialResponseStrategy_QUORUM, + }, + expectedErr: errors.New("fetch series for {ext=\"1\"} : error!"), + }, { title: "group replica strategy; a group with single replica has one failure", storeAPIs: []Client{ @@ -3556,3 +3806,99 @@ func TestDedupRespHeap_QuorumChunkDedup(t *testing.T) { } } + +func TestProxyStore_GetGroupKeyQuorum(t *testing.T) { + t.Parallel() + + // Test that getGroupKey and getQuorum use first-class fields from StoreInfo + // when available, otherwise fall back to DNS-based grouping. + for _, tc := range []struct { + name string + client *storetestutil.TestClient + expectedGroup string + expectedReplica string + expectedQuorum int + expectMustSuccess bool + }{ + { + name: "no ReplicaGroup set - fallback to DNS-based GroupKey", + client: &storetestutil.TestClient{ + Name: "receive-rep0-0.receive.svc.cluster.local:10901", + GroupKeyStr: "receive-rep0", + ReplicaKeyStr: "receive-rep0-0", + ReplicaGroupStr: "", // Not set - triggers DNS fallback + QuorumValue: 0, // Ignored when ReplicaGroupStr is empty + }, + expectedGroup: "receive-rep0", // Falls back to GroupKeyStr + expectedReplica: "receive-rep0-0", + expectedQuorum: 0, // DNS fallback always has quorum=0 + expectMustSuccess: true, + }, + { + name: "ReplicaGroup and Quorum set - use first-class fields", + client: &storetestutil.TestClient{ + Name: "receive-rep0-0.receive.svc.cluster.local:10901", + GroupKeyStr: "receive-rep0", // Ignored when ReplicaGroupStr is set + ReplicaKeyStr: "receive-rep0-0", + ReplicaGroupStr: "receive-0", + QuorumValue: 2, + }, + expectedGroup: "receive-0", // Uses ReplicaGroupStr + expectedReplica: "receive-rep0-0", + expectedQuorum: 2, // Uses QuorumValue + expectMustSuccess: false, + }, + { + name: "store without ReplicaGroup/Quorum - singleton store via DNS fallback", + client: &storetestutil.TestClient{ + Name: "bucket-store.svc.cluster.local:10901", + GroupKeyStr: "bucket-store", + ReplicaKeyStr: "bucket-store", + ReplicaGroupStr: "", + QuorumValue: 0, + }, + expectedGroup: "bucket-store", // Falls back to GroupKeyStr + expectedReplica: "bucket-store", + expectedQuorum: 0, // Singleton store + expectMustSuccess: true, + }, + { + name: "ReplicaGroup set but Quorum is 0 - use ReplicaGroup, singleton store", + client: &storetestutil.TestClient{ + Name: "partial-store.svc.cluster.local:10901", + GroupKeyStr: "partial-store", + ReplicaKeyStr: "partial-store", + ReplicaGroupStr: "receive-0", + QuorumValue: 0, // Not set, singleton store + }, + expectedGroup: "receive-0", // Uses ReplicaGroupStr + expectedReplica: "partial-store", + expectedQuorum: 0, // Singleton store + expectMustSuccess: true, + }, + { + name: "high quorum value", + client: &storetestutil.TestClient{ + Name: "receive-0.receive.svc.cluster.local:10901", + GroupKeyStr: "receive-0", + ReplicaKeyStr: "receive-0", + ReplicaGroupStr: "group-a", + QuorumValue: 5, + }, + expectedGroup: "group-a", + expectedReplica: "receive-0", + expectedQuorum: 5, + expectMustSuccess: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Test the unified ReplicaInfo() method directly + ri := tc.client.ReplicaInfo() + + testutil.Equals(t, tc.expectedGroup, ri.Group) + testutil.Equals(t, tc.expectedReplica, ri.Replica) + testutil.Equals(t, tc.expectedQuorum, ri.Quorum) + testutil.Equals(t, tc.expectMustSuccess, ri.IsSingletonStore()) + }) + } +} diff --git a/pkg/store/storepb/replica_info.go b/pkg/store/storepb/replica_info.go new file mode 100644 index 00000000000..d742f8bd33c --- /dev/null +++ b/pkg/store/storepb/replica_info.go @@ -0,0 +1,26 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +// ReplicaInfo contains replica topology hints for partial response strategies. +// It unifies DNS-based grouping (legacy) with first-class StoreInfo fields. +type ReplicaInfo struct { + // Group identifies stores that hold replicated data. Stores with the same Group + // value are considered replicas of each other. + // Examples: "pantheon-db", "long-range-store" + Group string + + // Replica identifies this specific store within a group. + // Examples: "pantheon-db-rep0", "pantheon-db-rep1" + Replica string + + // Quorum is the minimum number of healthy stores required per group. + // A value of 0 means "singleton store" - the store must respond successfully. + Quorum int +} + +// IsSingletonStore returns true if this store must respond successfully (quorum=0). +func (ri ReplicaInfo) IsSingletonStore() bool { + return ri.Quorum == 0 +} diff --git a/pkg/store/storepb/testutil/client.go b/pkg/store/storepb/testutil/client.go index 4587c42f375..c88f1e8c138 100644 --- a/pkg/store/storepb/testutil/client.go +++ b/pkg/store/storepb/testutil/client.go @@ -22,8 +22,16 @@ type TestClient struct { IsLocalStore bool StoreTSDBInfos []infopb.TSDBInfo StoreFilterNotMatches bool - GroupKeyStr string - ReplicaKeyStr string + + // ReplicaInfo fields for GROUP_REPLICA strategy testing. + // GroupKeyStr is used as Group when ReplicaGroupStr is empty (DNS-based fallback). + // ReplicaGroupStr is the first-class StoreInfo field - if set, it takes precedence. + // ReplicaKeyStr is used as Replica in ReplicaInfo. + // QuorumValue is used as Quorum when ReplicaGroupStr is set (0 = singleton store). + GroupKeyStr string + ReplicaKeyStr string + ReplicaGroupStr string // First-class StoreInfo field (takes precedence over GroupKeyStr) + QuorumValue int } func (c TestClient) LabelSets() []labels.Labels { return c.ExtLset } @@ -38,3 +46,24 @@ func (c TestClient) Matches(matches []*labels.Matcher) bool { return !c.StoreFil func (c TestClient) GroupKey() string { return c.GroupKeyStr } func (c TestClient) ReplicaKey() string { return c.ReplicaKeyStr } + +// ReplicaInfo returns replica topology hints used by the QUORUM partial response strategy. +// It mimics the unification logic of endpointRef.ReplicaInfo(): +// - If ReplicaGroupStr is set (first-class StoreInfo field), use it as Group with QuorumValue. +// - Otherwise fall back to GroupKeyStr (DNS-based) with Quorum=0 (singleton store). +func (c TestClient) ReplicaInfo() storepb.ReplicaInfo { + // Prefer first-class ReplicaGroupStr if set + if c.ReplicaGroupStr != "" { + return storepb.ReplicaInfo{ + Group: c.ReplicaGroupStr, + Replica: c.ReplicaKeyStr, + Quorum: c.QuorumValue, + } + } + // Fall back to DNS-based grouping + return storepb.ReplicaInfo{ + Group: c.GroupKeyStr, + Replica: c.ReplicaKeyStr, + Quorum: 0, // Singleton store for DNS-based stores + } +} diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 5d0a1f9867f..66fed83556d 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -45,18 +45,23 @@ const ( /// If a group has more than one replicas, the group can tolerate any number of endpoint failures within one replica. It doesn't /// tolerate endpoint failures across replicas. PartialResponseStrategy_GROUP_REPLICA PartialResponseStrategy = 2 + /// QUORUM strategy uses first-class store topology hints (replica_group, quorum) exposed via InfoAPI. + /// Stores with quorum=0 are treated as singleton stores (any failure aborts the query). + PartialResponseStrategy_QUORUM PartialResponseStrategy = 3 ) var PartialResponseStrategy_name = map[int32]string{ 0: "WARN", 1: "ABORT", 2: "GROUP_REPLICA", + 3: "QUORUM", } var PartialResponseStrategy_value = map[string]int32{ "WARN": 0, "ABORT": 1, "GROUP_REPLICA": 2, + "QUORUM": 3, } func (x PartialResponseStrategy) String() string { diff --git a/pkg/store/storepb/types.proto b/pkg/store/storepb/types.proto index 42dfe883c65..baffd4110e8 100644 --- a/pkg/store/storepb/types.proto +++ b/pkg/store/storepb/types.proto @@ -79,4 +79,8 @@ enum PartialResponseStrategy { /// If a group has more than one replicas, the group can tolerate any number of endpoint failures within one replica. It doesn't /// tolerate endpoint failures across replicas. GROUP_REPLICA = 2; + + /// QUORUM strategy uses first-class store topology hints (replica_group, quorum) exposed via InfoAPI. + /// Stores with quorum=0 are treated as singleton stores (any failure aborts the query). + QUORUM = 3; } diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 4d6b8c9ef2d..6f3e9e06785 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -72,6 +72,8 @@ type TSDBStore struct { matcherCache storecache.MatchersCache extLset labels.Labels + replicaGroup string // Replica group identifier for GROUP_REPLICA strategy + quorum int32 // Minimum healthy stores required per group startStoreFilterUpdate bool storeFilter filter.StoreFilter mtx sync.RWMutex @@ -169,6 +171,29 @@ func (s *TSDBStore) SetExtLset(extLset labels.Labels) { s.extLset = extLset } +// SetReplicaInfo sets the replica group and quorum used by the QUORUM partial response strategy. +func (s *TSDBStore) SetReplicaInfo(replicaGroup string, quorum int32) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.replicaGroup = replicaGroup + s.quorum = quorum +} + +// ReplicaInfo returns replica topology hints used by the QUORUM partial response strategy. +// Since TSDBStore is used internally by receivers, it returns first-class fields directly. +// Quorum=0 means singleton store semantics. +func (s *TSDBStore) ReplicaInfo() ReplicaInfo { + s.mtx.RLock() + defer s.mtx.RUnlock() + + return ReplicaInfo{ + Group: s.replicaGroup, + Replica: "", // TSDBStore doesn't have a replica identifier + Quorum: int(s.quorum), + } +} + func (s *TSDBStore) getExtLset() labels.Labels { s.mtx.RLock() defer s.mtx.RUnlock()