diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index abb1c5cb691..9517f6759e7 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -148,8 +148,12 @@ func runReceive( var enableGRPCReadinessInterceptor bool for _, feature := range *conf.featureList { if feature == metricNamesFilter { - multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) - level.Info(logger).Log("msg", "metric name filter feature enabled") + if conf.metricNameShards > 0 { + level.Info(logger).Log("msg", "metric name filter feature disabled due to metric-name-shards being enabled") + } else { + multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled()) + level.Info(logger).Log("msg", "metric name filter feature enabled") + } } if feature == grpcReadinessInterceptor { enableGRPCReadinessInterceptor = true @@ -313,6 +317,7 @@ func runReceive( Limiter: limiter, AsyncForwardWorkerCount: conf.asyncForwardWorkerCount, ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol), + MetricNameShards: conf.metricNameShards, }) { @@ -436,6 +441,11 @@ func runReceive( if matcherConverter != nil { options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter)) } + if conf.metricNameShards > 0 { + options = append(options, store.WithMetricNameShards(conf.metricNameShards)) + } + // Pass tenant label name for metric name sharding + options = append(options, store.WithTenantLabelName(conf.tenantLabelName)) proxy := store.NewProxyStore( logger, @@ -1024,8 +1034,9 @@ type receiveConfig struct { maxPendingGrpcWriteRequests int lazyRetrievalMaxBufferedResponses int - featureList *[]string - noUploadTenants *[]string + featureList *[]string + noUploadTenants *[]string + metricNameShards int } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -1196,6 +1207,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings() cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled."). Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses) + cmd.Flag("receive.metric-name-shards", "When set and greater than 0, enables metric name sharding. In RouterOnly mode, modifies tenant header to {hashring-name}-{hash(metric_name) % shards}. In IngestorOnly/RouterIngestor mode, optimizes query fan-out by only querying TSDBs matching the metric's shard. Disables cuckoo filter when enabled."). + Default("0").IntVar(&rc.metricNameShards) } // determineMode returns the ReceiverMode that this receiver is configured to run in. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index b0c9ec55bbd..e5471280653 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -19,6 +19,7 @@ import ( "sync" "time" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/protobuf/proto" @@ -117,6 +118,7 @@ type Options struct { Limiter *Limiter AsyncForwardWorkerCount uint ReplicationProtocol ReplicationProtocol + MetricNameShards int } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -143,7 +145,9 @@ type Handler struct { writeTimeseriesError *prometheus.HistogramVec writeE2eLatency *prometheus.HistogramVec - Limiter *Limiter + Limiter *Limiter + metricNameShards int + tenantErrorsTotal *prometheus.CounterVec } func NewHandler(logger log.Logger, o *Options) *Handler { @@ -186,8 +190,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler { workers, o.ReplicationProtocol, o.DialOpts...), - receiverMode: o.ReceiverMode, - Limiter: o.Limiter, + receiverMode: o.ReceiverMode, + Limiter: o.Limiter, + metricNameShards: o.MetricNameShards, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -248,6 +253,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600}, }, []string{"code", "tenant", "rollup"}, ), + tenantErrorsTotal: promauto.With(registerer).NewCounterVec( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "tenant_modification_errors_total", + Help: "The number of errors encountered while modifying tenant headers for metric name sharding.", + }, []string{"reason"}, + ), } h.forwardRequests.WithLabelValues(labelSuccess) @@ -521,6 +534,40 @@ func isPreAgged(ts prompb.TimeSeries) bool { return false } +// getMetricName extracts the __name__ label value from a time series. +func getMetricName(ts *prompb.TimeSeries) string { + for _, l := range ts.Labels { + if l.Name == "__name__" { + return l.Value + } + } + return "" +} + +// modifyTenantForMetricSharding modifies the tenant string based on metric name sharding. +// Returns the modified tenant string and any error encountered. +func (h *Handler) modifyTenantForMetricSharding(originalTenant string, ts *prompb.TimeSeries, hashringName string) (string, error) { + if h.metricNameShards <= 0 || h.receiverMode != RouterOnly { + return originalTenant, nil + } + + metricName := getMetricName(ts) + if metricName == "" { + h.tenantErrorsTotal.WithLabelValues("missing_metric_name").Inc() + return "", errors.New("time series missing __name__ label") + } + + // Hash the metric name using xxhash + hasher := xxhash.New() + _, _ = hasher.WriteString(metricName) + hash := hasher.Sum64() + + shard := hash % uint64(h.metricNameShards) + modifiedTenant := fmt.Sprintf("%s-%d", hashringName, shard) + + return modifiedTenant, nil +} + func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { var err error span, ctx := tracing.StartSpan(r.Context(), "receive_http") @@ -921,6 +968,16 @@ func (h *Handler) distributeTimeseriesToReplicas( } } + // Apply metric name sharding if enabled + if h.metricNameShards > 0 && h.receiverMode == RouterOnly { + hashringName := h.hashring.GetHashringName(tenant) + modifiedTenant, err := h.modifyTenantForMetricSharding(tenant, &ts, hashringName) + if err != nil { + return nil, nil, err + } + tenant = modifiedTenant + } + for _, rn := range replicas { endpoint, err := h.hashring.GetN(tenant, &ts, rn) if err != nil { diff --git a/pkg/receive/hashring.go b/pkg/receive/hashring.go index 19fd19262fb..1679e4b2e45 100644 --- a/pkg/receive/hashring.go +++ b/pkg/receive/hashring.go @@ -62,6 +62,8 @@ type Hashring interface { // Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated // if, for example, the same address is used for multiple tenants in the multi-hashring. Nodes() []Endpoint + // GetHashringName returns the hashring name for the given tenant. Returns empty string for single hashrings. + GetHashringName(tenant string) string } // SingleNodeHashring always returns the same node. @@ -76,6 +78,10 @@ func (s SingleNodeHashring) Nodes() []Endpoint { return []Endpoint{{Address: string(s), CapNProtoAddress: string(s)}} } +func (s SingleNodeHashring) GetHashringName(tenant string) string { + return "" +} + // GetN implements the Hashring interface. func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (Endpoint, error) { if n > 0 { @@ -107,6 +113,10 @@ func (s simpleHashring) Nodes() []Endpoint { return s } +func (s simpleHashring) GetHashringName(tenant string) string { + return "" +} + // Get returns a target to handle the given tenant and time series. func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (Endpoint, error) { return s.GetN(tenant, ts, 0) @@ -182,6 +192,10 @@ func (k *ketamaHashring) Nodes() []Endpoint { return k.endpoints } +func (k *ketamaHashring) GetHashringName(tenant string) string { + return "" +} + func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 { minValue := int64(math.MaxInt64) for _, value := range azSpread { @@ -278,9 +292,10 @@ func (t tenantSet) match(tenant string) (bool, error) { // Which hashring to use for a tenant is determined // by the tenants field of the hashring configuration. type multiHashring struct { - cache map[string]Hashring - hashrings []Hashring - tenantSets []tenantSet + cache map[string]Hashring + hashrings []Hashring + tenantSets []tenantSet + hashringNames []string // Store hashring names corresponding to hashrings // We need a mutex to guard concurrent access // to the cache map, as this is both written to @@ -333,6 +348,43 @@ func (m *multiHashring) Nodes() []Endpoint { return m.nodes } +// GetHashringName returns the hashring name for the given tenant. +// Returns empty string if no specific hashring is found (uses default). +func (m *multiHashring) GetHashringName(tenant string) string { + m.mu.RLock() + defer m.mu.RUnlock() + + // Check if tenant is already cached + if _, ok := m.cache[tenant]; ok { + // Find which hashring this tenant maps to + for i, t := range m.tenantSets { + if t == nil { + // Default hashring + return m.hashringNames[i] + } else { + if found, _ := t.match(tenant); found { + return m.hashringNames[i] + } + } + } + } + + // If not cached, check which hashring matches + for i, t := range m.tenantSets { + if t == nil { + // Default hashring matches everything + return m.hashringNames[i] + } else { + if found, _ := t.match(tenant); found { + return m.hashringNames[i] + } + } + } + + // This should never happen if properly configured + return "" +} + // newMultiHashring creates a multi-tenant hashring for a given slice of // groups. // Which hashring to use for a tenant is determined @@ -355,6 +407,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg } m.nodes = append(m.nodes, hashring.Nodes()...) m.hashrings = append(m.hashrings, hashring) + m.hashringNames = append(m.hashringNames, h.Hashring) var t map[string]tenantMatcher if len(h.Tenants) != 0 { t = make(map[string]tenantMatcher) diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index a23a50b41de..1e4d6c6e900 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -7,11 +7,13 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync" "time" "github.com/armon/go-radix" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -105,6 +107,8 @@ type ProxyStore struct { lazyRetrievalMaxBufferedResponses int blockedMetricPrefixes *radix.Tree blockedMetricExacts map[string]struct{} + metricNameShards int + tenantLabelName string } type proxyStoreMetrics struct { @@ -186,6 +190,19 @@ func WithProxyStoreMatcherConverter(mc *storepb.MatcherConverter) ProxyStoreOpti } } +// WithMetricNameShards returns a ProxyStoreOption that enables metric name sharding optimization. +func WithMetricNameShards(shards int) ProxyStoreOption { + return func(s *ProxyStore) { + s.metricNameShards = shards + } +} + +func WithTenantLabelName(labelName string) ProxyStoreOption { + return func(s *ProxyStore) { + s.tenantLabelName = labelName + } +} + // WithBlockedMetricPatterns returns a ProxyStoreOption that sets the blocked metric patterns. // It parses input patterns to extract prefixes (like "kube_", "envoy_") by checking suffix characters // and stores them in a radix tree for efficient prefix matching. Exact patterns @@ -851,6 +868,18 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, storeLabelSets []labels.Labels storeDebugMsgs []string ) + + // Extract exact __name__ matcher for metric name sharding optimization + var exactMetricName string + if s.metricNameShards > 0 { + for _, matcher := range matchers { + if matcher.Name == "__name__" && matcher.Type == labels.MatchEqual { + exactMetricName = matcher.Value + break + } + } + } + for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(ctx, s.debugLogging, st, minTime, maxTime, matchers...); !ok { @@ -859,6 +888,17 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, } continue } + + // Apply metric name sharding optimization + if exactMetricName != "" && s.metricNameShards > 0 { + if shouldSkipStoreForShard, reason := s.shouldSkipStoreForMetricShard(st, exactMetricName); shouldSkipStoreForShard { + if s.debugLogging { + storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to metric name sharding: %v", st, reason)) + } + continue + } + } + matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...) if !matches { if s.debugLogging { @@ -874,9 +914,58 @@ func (s *ProxyStore) matchingStores(ctx context.Context, minTime, maxTime int64, } } + // Log warning if metric name sharding is enabled but no matching shard found + if exactMetricName != "" && s.metricNameShards > 0 && len(stores) == 0 { + targetShard := xxhash.Sum64String(exactMetricName) % uint64(s.metricNameShards) + level.Warn(s.logger).Log("msg", "no TSDB found for metric name shard", + "metric", exactMetricName, + "target_shard", targetShard, + "total_shards", s.metricNameShards) + } + return stores, storeLabelSets, storeDebugMsgs } +// shouldSkipStoreForMetricShard determines if a store should be skipped based on metric name sharding. +// It extracts the tenant name from store's external labels and checks if the shard matches. +func (s *ProxyStore) shouldSkipStoreForMetricShard(store Client, metricName string) (bool, string) { + // Hash the metric name using xxhash + hasher := xxhash.New() + _, _ = hasher.WriteString(metricName) + hash := hasher.Sum64() + targetShard := hash % uint64(s.metricNameShards) + + // Get the tenant name from store's external labels + labelSets := store.LabelSets() + for _, labelSet := range labelSets { + var tenantName string + labelSet.Range(func(label labels.Label) { + // Check for the configured tenant label name + if label.Name == s.tenantLabelName { + tenantName = label.Value + } + }) + + if tenantName != "" { + // Extract shard from tenant name (e.g., "pantheon-db-dp-35" -> 35) + if lastDash := strings.LastIndex(tenantName, "-"); lastDash != -1 { + shardStr := tenantName[lastDash+1:] + if shard, err := strconv.ParseUint(shardStr, 10, 64); err == nil { + if shard != targetShard { + return true, fmt.Sprintf("tenant shard %d does not match target shard %d for metric %s", shard, targetShard, metricName) + } + return false, "" + } + } + // If tenant name doesn't have shard suffix, don't skip (legacy tenant) + return false, "" + } + } + + // If no tenant label found, don't skip the store + return false, "" +} + // storeMatches returns boolean if the given store may hold data for the given label matchers, time ranges and debug store matches gathered from context. func storeMatches(ctx context.Context, debugLogging bool, s Client, mint, maxt int64, matchers ...*labels.Matcher) (ok bool, reason string) { var storeDebugMatcher [][]*labels.Matcher