From 916a2ed5095be7906b79186d99615162936bb805 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sun, 31 Aug 2025 04:48:41 -0700 Subject: [PATCH 1/3] metric name based tenant sharding --- cmd/thanos/receive.go | 21 ++++++++-- pkg/receive/handler.go | 63 ++++++++++++++++++++++++++++-- pkg/receive/hashring.go | 59 ++++++++++++++++++++++++++-- pkg/store/proxy.go | 85 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 218 insertions(+), 10 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index abb1c5cb691..e6c348d38a9 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -148,8 +148,12 @@ func runReceive( var enableGRPCReadinessInterceptor bool for _, feature := range *conf.featureList { if feature == metricNamesFilter { - multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) - level.Info(logger).Log("msg", "metric name filter feature enabled") + if conf.metricNameShards > 0 { + level.Info(logger).Log("msg", "metric name filter feature disabled due to metric-name-shards being enabled") + } else { + multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) + level.Info(logger).Log("msg", "metric name filter feature enabled") + } } if feature == grpcReadinessInterceptor { enableGRPCReadinessInterceptor = true @@ -313,6 +317,7 @@ func runReceive( Limiter: limiter, AsyncForwardWorkerCount: conf.asyncForwardWorkerCount, ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol), + MetricNameShards: conf.metricNameShards, }) { @@ -436,6 +441,11 @@ func runReceive( if matcherConverter != nil { options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter)) } + if conf.metricNameShards > 0 { + options = append(options, store.WithMetricNameShards(conf.metricNameShards)) + } + // Pass tenant label name for metric name sharding + options = append(options, store.WithTenantLabelName(conf.tenantLabelName)) proxy := store.NewProxyStore( logger, @@ -1024,8 +1034,9 @@ type receiveConfig struct { maxPendingGrpcWriteRequests int lazyRetrievalMaxBufferedResponses int - featureList *[]string - noUploadTenants *[]string + featureList *[]string + noUploadTenants *[]string + metricNameShards int } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -1196,6 +1207,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) + cmd.Flag("receive.metric-name-shards", "When set and greater than 0 in RouterOnly mode, modifies tenant header to include hashring name and metric name hash shard. Format: {hashring-name}-{hash(metric_name) % shards}. This creates tenant sharding based on metric names."). + Default("0").IntVar(&rc.metricNameShards) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b0c9ec55bbd..9b1e98b8cfc 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" @@ -117,6 +118,7 @@ type Options struct { Limiter *Limiter AsyncForwardWorkerCount uint ReplicationProtocol ReplicationProtocol + MetricNameShards int } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -143,7 +145,9 @@ type Handler struct { writeTimeseriesError *prometheus.HistogramVec writeE2eLatency *prometheus.HistogramVec - Limiter *Limiter + Limiter *Limiter + metricNameShards int + tenantErrorsTotal *prometheus.CounterVec } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -186,8 +190,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler { workers, o.ReplicationProtocol, o.DialOpts...), - receiverMode: o.ReceiverMode, - Limiter: o.Limiter, + receiverMode: o.ReceiverMode, + Limiter: o.Limiter, + metricNameShards: o.MetricNameShards, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -248,6 +253,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600}, }, []string{"code", "tenant", "rollup"}, ), + tenantErrorsTotal: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "tenant_modification_errors_total", + Help: "The number of errors encountered while modifying tenant headers for metric name sharding.", + }, []string{"reason"}, + ), } h.forwardRequests.WithLabelValues(labelSuccess) @@ -521,6 +534,40 @@ func isPreAgged(ts prompb.TimeSeries) bool { return false } +// getMetricName extracts the __name__ label value from a time series. +func getMetricName(ts *prompb.TimeSeries) string { + for _, l := range ts.Labels { + if l.Name == "__name__" { + return l.Value + } + } + return "" +} + +// modifyTenantForMetricSharding modifies the tenant string based on metric name sharding. +// Returns the modified tenant string and any error encountered. +func (h *Handler) modifyTenantForMetricSharding(originalTenant string, ts *prompb.TimeSeries, hashringName string) (string, error) { + if h.metricNameShards <= 0 || h.receiverMode != RouterOnly { + return originalTenant, nil + } + + metricName := getMetricName(ts) + if metricName == "" { + h.tenantErrorsTotal.WithLabelValues("missing_metric_name").Inc() + return "", errors.New("time series missing __name__ label") + } + + // Hash the metric name using xxhash + hasher := xxhash.New() + hasher.WriteString(metricName) + hash := hasher.Sum64() + + shard := hash % uint64(h.metricNameShards) + modifiedTenant := fmt.Sprintf("%s-%d", hashringName, shard) + + return modifiedTenant, nil +} + func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { var err error span, ctx := tracing.StartSpan(r.Context(), "receive_http") @@ -921,6 +968,16 @@ func (h *Handler) distributeTimeseriesToReplicas( } } + // Apply metric name sharding if enabled + if h.metricNameShards > 0 && h.receiverMode == RouterOnly { + hashringName := h.hashring.GetHashringName(tenant) + modifiedTenant, err := h.modifyTenantForMetricSharding(tenant, &ts, hashringName) + if err != nil { + return nil, nil, err + } + tenant = modifiedTenant + } + for _, rn := range replicas { endpoint, err := h.hashring.GetN(tenant, &ts, rn) if err != nil { diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 19fd19262fb..18bbcdf1559 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -62,6 +62,8 @@ type Hashring interface { // Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated // if, for example, the same address is used for multiple tenants in the multi-hashring. Nodes() []Endpoint + // GetHashringName returns the hashring name for the given tenant. Returns empty string for single hashrings. + GetHashringName(tenant string) string } // SingleNodeHashring always returns the same node. @@ -76,6 +78,10 @@ func (s SingleNodeHashring) Nodes() []Endpoint { return []Endpoint{{Address: string(s), CapNProtoAddress: string(s)}} } +func (s SingleNodeHashring) GetHashringName(tenant string) string { + return "" +} + // GetN implements the Hashring interface. func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (Endpoint, error) { if n > 0 { @@ -107,6 +113,10 @@ func (s simpleHashring) Nodes() []Endpoint { return s } +func (s simpleHashring) GetHashringName(tenant string) string { + return "" +} + // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (Endpoint, error) { return s.GetN(tenant, ts, 0) @@ -182,6 +192,10 @@ func (k *ketamaHashring) Nodes() []Endpoint { return k.endpoints } +func (k *ketamaHashring) GetHashringName(tenant string) string { + return "" +} + func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 { minValue := int64(math.MaxInt64) for _, value := range azSpread { @@ -278,9 +292,10 @@ func (t tenantSet) match(tenant string) (bool, error) { // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. type multiHashring struct { - cache map[string]Hashring - hashrings []Hashring - tenantSets []tenantSet + cache map[string]Hashring + hashrings []Hashring + tenantSets []tenantSet + hashringNames []string // Store hashring names corresponding to hashrings // We need a mutex to guard concurrent access // to the cache map, as this is both written to @@ -333,6 +348,43 @@ func (m *multiHashring) Nodes() []Endpoint { return m.nodes } +// GetHashringName returns the hashring name for the given tenant. +// Returns empty string if no specific hashring is found (uses default). +func (m *multiHashring) GetHashringName(tenant string) string { + m.mu.RLock() + defer m.mu.RUnlock() + + // Check if tenant is already cached + if _, ok := m.cache[tenant]; ok { + // Find which hashring this tenant maps to + for i, t := range m.tenantSets { + if t == nil { + // Default hashring + return m.hashringNames[i] + } else { + if found, _ := t.match(tenant); found { + return m.hashringNames[i] + } + } + } + } + + // If not cached, check which hashring matches + for i, t := range m.tenantSets { + if t == nil { + // Default hashring matches everything + return m.hashringNames[i] + } else { + if found, _ := t.match(tenant); found { + return m.hashringNames[i] + } + } + } + + // This should never happen if properly configured + return "" +} + // newMultiHashring creates a multi-tenant hashring for a given slice of // groups. // Which hashring to use for a tenant is determined @@ -355,6 +407,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg } m.nodes = append(m.nodes, hashring.Nodes()...) m.hashrings = append(m.hashrings, hashring) + m.hashringNames = append(m.hashringNames, h.Hashring) var t map[string]tenantMatcher if len(h.Tenants) != 0 { t = make(map[string]tenantMatcher) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index a23a50b41de..6ba8cc8e306 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -7,11 +7,13 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync" "time" "github.com/armon/go-radix" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -105,6 +107,8 @@ type ProxyStore struct { lazyRetrievalMaxBufferedResponses int blockedMetricPrefixes *radix.Tree blockedMetricExacts map[string]struct{} + metricNameShards int + tenantLabelName string } type proxyStoreMetrics struct { @@ -186,6 +190,19 @@ func WithProxyStoreMatcherConverter(mc *storepb.MatcherConverter) ProxyStoreOpti } } +// WithMetricNameShards returns a ProxyStoreOption that enables metric name sharding optimization. +func WithMetricNameShards(shards int) ProxyStoreOption { + return func(s *ProxyStore) { + s.metricNameShards = shards + } +} + +func WithTenantLabelName(labelName string) ProxyStoreOption { + return func(s *ProxyStore) { + s.tenantLabelName = labelName + } +} + // WithBlockedMetricPatterns returns a ProxyStoreOption that sets the blocked metric patterns. // It parses input patterns to extract prefixes (like "kube_", "envoy_") by checking suffix characters // and stores them in a radix tree for efficient prefix matching. Exact patterns @@ -851,6 +868,18 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, storeLabelSets []labels.Labels storeDebugMsgs []string ) + + // Extract exact __name__ matcher for metric name sharding optimization + var exactMetricName string + if s.metricNameShards > 0 { + for _, matcher := range matchers { + if matcher.Name == "__name__" && matcher.Type == labels.MatchEqual { + exactMetricName = matcher.Value + break + } + } + } + for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(ctx, s.debugLogging, st, minTime, maxTime, matchers...); !ok { @@ -859,6 +888,17 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, } continue } + + // Apply metric name sharding optimization + if exactMetricName != "" && s.metricNameShards > 0 { + if shouldSkipStoreForShard, reason := s.shouldSkipStoreForMetricShard(st, exactMetricName); shouldSkipStoreForShard { + if s.debugLogging { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to metric name sharding: %v", st, reason)) + } + continue + } + } + matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) if !matches { if s.debugLogging { @@ -874,9 +914,54 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, } } + // Log warning if metric name sharding is enabled but no matching shard found + if exactMetricName != "" && s.metricNameShards > 0 && len(stores) == 0 { + targetShard := xxhash.Sum64String(exactMetricName) % uint64(s.metricNameShards) + level.Warn(s.logger).Log("msg", "no TSDB found for metric name shard", + "metric", exactMetricName, + "target_shard", targetShard, + "total_shards", s.metricNameShards) + } + return stores, storeLabelSets, storeDebugMsgs } +// shouldSkipStoreForMetricShard determines if a store should be skipped based on metric name sharding. +// It extracts the tenant name from store's external labels and checks if the shard matches. +func (s *ProxyStore) shouldSkipStoreForMetricShard(store Client, metricName string) (bool, string) { + // Hash the metric name using xxhash + hasher := xxhash.New() + hasher.WriteString(metricName) + hash := hasher.Sum64() + targetShard := hash % uint64(s.metricNameShards) + + // Get the tenant name from store's external labels + labelSets := store.LabelSets() + for _, labelSet := range labelSets { + for _, label := range labelSet { + // Check for the configured tenant label name + if label.Name == s.tenantLabelName { + tenantName := label.Value + // Extract shard from tenant name (e.g., "pantheon-db-dp-35" -> 35) + if lastDash := strings.LastIndex(tenantName, "-"); lastDash != -1 { + shardStr := tenantName[lastDash+1:] + if shard, err := strconv.ParseUint(shardStr, 10, 64); err == nil { + if shard != targetShard { + return true, fmt.Sprintf("tenant shard %d does not match target shard %d for metric %s", shard, targetShard, metricName) + } + return false, "" + } + } + // If tenant name doesn't have shard suffix, don't skip (legacy tenant) + return false, "" + } + } + } + + // If no tenant label found, don't skip the store + return false, "" +} + // storeMatches returns boolean if the given store may hold data for the given label matchers, time ranges and debug store matches gathered from context. func storeMatches(ctx context.Context, debugLogging bool, s Client, mint, maxt int64, matchers ...*labels.Matcher) (ok bool, reason string) { var storeDebugMatcher [][]*labels.Matcher From c3f6ade60878046e6a5a8e093c3ed25414000a8a Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Mon, 1 Sep 2025 04:23:28 -0700 Subject: [PATCH 2/3] fix CI --- cmd/thanos/receive.go | 2 +- pkg/receive/hashring.go | 2 +- pkg/store/proxy.go | 28 ++++++++++++++++------------ 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index e6c348d38a9..9517f6759e7 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -1207,7 +1207,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) - cmd.Flag("receive.metric-name-shards", "When set and greater than 0 in RouterOnly mode, modifies tenant header to include hashring name and metric name hash shard. Format: {hashring-name}-{hash(metric_name) % shards}. This creates tenant sharding based on metric names."). + cmd.Flag("receive.metric-name-shards", "When set and greater than 0, enables metric name sharding. In RouterOnly mode, modifies tenant header to {hashring-name}-{hash(metric_name) % shards}. In IngestorOnly/RouterIngestor mode, optimizes query fan-out by only querying TSDBs matching the metric's shard. Disables cuckoo filter when enabled."). Default("0").IntVar(&rc.metricNameShards) } diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 18bbcdf1559..1679e4b2e45 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -380,7 +380,7 @@ func (m *multiHashring) GetHashringName(tenant string) string { } } } - + // This should never happen if properly configured return "" } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 6ba8cc8e306..9cb36effe6e 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -938,23 +938,27 @@ func (s *ProxyStore) shouldSkipStoreForMetricShard(store Client, metricName stri // Get the tenant name from store's external labels labelSets := store.LabelSets() for _, labelSet := range labelSets { - for _, label := range labelSet { + var tenantName string + labelSet.Range(func(label labels.Label) { // Check for the configured tenant label name if label.Name == s.tenantLabelName { - tenantName := label.Value - // Extract shard from tenant name (e.g., "pantheon-db-dp-35" -> 35) - if lastDash := strings.LastIndex(tenantName, "-"); lastDash != -1 { - shardStr := tenantName[lastDash+1:] - if shard, err := strconv.ParseUint(shardStr, 10, 64); err == nil { - if shard != targetShard { - return true, fmt.Sprintf("tenant shard %d does not match target shard %d for metric %s", shard, targetShard, metricName) - } - return false, "" + tenantName = label.Value + } + }) + + if tenantName != "" { + // Extract shard from tenant name (e.g., "pantheon-db-dp-35" -> 35) + if lastDash := strings.LastIndex(tenantName, "-"); lastDash != -1 { + shardStr := tenantName[lastDash+1:] + if shard, err := strconv.ParseUint(shardStr, 10, 64); err == nil { + if shard != targetShard { + return true, fmt.Sprintf("tenant shard %d does not match target shard %d for metric %s", shard, targetShard, metricName) } + return false, "" } - // If tenant name doesn't have shard suffix, don't skip (legacy tenant) - return false, "" } + // If tenant name doesn't have shard suffix, don't skip (legacy tenant) + return false, "" } } From 9867993fd7287f46a7ea6591282590d0d4c563be Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Mon, 1 Sep 2025 09:21:25 -0700 Subject: [PATCH 3/3] fix lint --- pkg/receive/handler.go | 2 +- pkg/store/proxy.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 9b1e98b8cfc..e5471280653 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -559,7 +559,7 @@ func (h *Handler) modifyTenantForMetricSharding(originalTenant string, ts *promp // Hash the metric name using xxhash hasher := xxhash.New() - hasher.WriteString(metricName) + _, _ = hasher.WriteString(metricName) hash := hasher.Sum64() shard := hash % uint64(h.metricNameShards) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 9cb36effe6e..1e4d6c6e900 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -931,7 +931,7 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, func (s *ProxyStore) shouldSkipStoreForMetricShard(store Client, metricName string) (bool, string) { // Hash the metric name using xxhash hasher := xxhash.New() - hasher.WriteString(metricName) + _, _ = hasher.WriteString(metricName) hash := hasher.Sum64() targetShard := hash % uint64(s.metricNameShards)