Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,12 @@ func runReceive(
var enableGRPCReadinessInterceptor bool
for _, feature := range *conf.featureList {
if feature == metricNamesFilter {
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
level.Info(logger).Log("msg", "metric name filter feature enabled")
if conf.metricNameShards > 0 {
level.Info(logger).Log("msg", "metric name filter feature disabled due to metric-name-shards being enabled")
} else {
multiTSDBOptions = append(multiTSDBOptions, receive.WithMetricNameFilterEnabled())
level.Info(logger).Log("msg", "metric name filter feature enabled")
}
}
if feature == grpcReadinessInterceptor {
enableGRPCReadinessInterceptor = true
Expand Down Expand Up @@ -313,6 +317,7 @@ func runReceive(
Limiter: limiter,
AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
ReplicationProtocol: receive.ReplicationProtocol(conf.replicationProtocol),
MetricNameShards: conf.metricNameShards,
})

{
Expand Down Expand Up @@ -436,6 +441,11 @@ func runReceive(
if matcherConverter != nil {
options = append(options, store.WithProxyStoreMatcherConverter(matcherConverter))
}
if conf.metricNameShards > 0 {
options = append(options, store.WithMetricNameShards(conf.metricNameShards))
}
// Pass tenant label name for metric name sharding
options = append(options, store.WithTenantLabelName(conf.tenantLabelName))

proxy := store.NewProxyStore(
logger,
Expand Down Expand Up @@ -1024,8 +1034,9 @@ type receiveConfig struct {
maxPendingGrpcWriteRequests int
lazyRetrievalMaxBufferedResponses int

featureList *[]string
noUploadTenants *[]string
featureList *[]string
noUploadTenants *[]string
metricNameShards int
}

func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -1196,6 +1207,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
rc.noUploadTenants = cmd.Flag("receive.no-upload-tenants", "Tenant IDs/patterns that should only store data locally (no object store upload). Supports exact matches (e.g., 'tenant1') and prefix patterns (e.g., 'prod-*'). Repeat this flag to specify multiple patterns.").Strings()
cmd.Flag("receive.lazy-retrieval-max-buffered-responses", "The lazy retrieval strategy can buffer up to this number of responses. This is to limit the memory usage. This flag takes effect only when the lazy retrieval strategy is enabled.").
Default("20").IntVar(&rc.lazyRetrievalMaxBufferedResponses)
cmd.Flag("receive.metric-name-shards", "When set and greater than 0, enables metric name sharding. In RouterOnly mode, modifies tenant header to {hashring-name}-{hash(metric_name) % shards}. In IngestorOnly/RouterIngestor mode, optimizes query fan-out by only querying TSDBs matching the metric's shard. Disables cuckoo filter when enabled.").
Default("0").IntVar(&rc.metricNameShards)
Comment on lines +1210 to +1211
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a configmap for the sharding schema instead of a command flag. Thanos Router needs to support the schema defined in the V2 design doc. It has special metric groups besides the number of shards for each metric scope.

  scopeName: "az-eastus2",
  shards: 20,
  // The metrics that are heavily used by alert rules or have super high cardinality
  // can be in special groups to avoid skewed data partitions.
  specialMetricGroups: [
    {
      name: "kube-metrics",
      metrics: [
        "container_cpu_usage_seconds_total",
        "container_memory_working_set_bytes"
      ]
    },
    {
      name: "rpc-metrics",
      metrics: ["rpc_client_requests_total"]
    }
  ]
}

=== shard name calculation ===

if metric_name in any special metric group then
  shard_name = the speical metric group name
else
  shard_name = hash(metric_name) % shards

}

// determineMode returns the ReceiverMode that this receiver is configured to run in.
Expand Down
63 changes: 60 additions & 3 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync"
"time"

"github.com/cespare/xxhash/v2"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -117,6 +118,7 @@ type Options struct {
Limiter *Limiter
AsyncForwardWorkerCount uint
ReplicationProtocol ReplicationProtocol
MetricNameShards int
}

// Handler serves a Prometheus remote write receiving HTTP endpoint.
Expand All @@ -143,7 +145,9 @@ type Handler struct {
writeTimeseriesError *prometheus.HistogramVec
writeE2eLatency *prometheus.HistogramVec

Limiter *Limiter
Limiter *Limiter
metricNameShards int
tenantErrorsTotal *prometheus.CounterVec
}

func NewHandler(logger log.Logger, o *Options) *Handler {
Expand Down Expand Up @@ -186,8 +190,9 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
workers,
o.ReplicationProtocol,
o.DialOpts...),
receiverMode: o.ReceiverMode,
Limiter: o.Limiter,
receiverMode: o.ReceiverMode,
Limiter: o.Limiter,
metricNameShards: o.MetricNameShards,
forwardRequests: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_receive_forward_requests_total",
Expand Down Expand Up @@ -248,6 +253,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
}, []string{"code", "tenant", "rollup"},
),
tenantErrorsTotal: promauto.With(registerer).NewCounterVec(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "tenant_modification_errors_total",
Help: "The number of errors encountered while modifying tenant headers for metric name sharding.",
}, []string{"reason"},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -521,6 +534,40 @@ func isPreAgged(ts prompb.TimeSeries) bool {
return false
}

// getMetricName extracts the __name__ label value from a time series.
func getMetricName(ts *prompb.TimeSeries) string {
for _, l := range ts.Labels {
if l.Name == "__name__" {
return l.Value
}
}
return ""
}

// modifyTenantForMetricSharding modifies the tenant string based on metric name sharding.
// Returns the modified tenant string and any error encountered.
func (h *Handler) modifyTenantForMetricSharding(originalTenant string, ts *prompb.TimeSeries, hashringName string) (string, error) {
if h.metricNameShards <= 0 || h.receiverMode != RouterOnly {
return originalTenant, nil
}

metricName := getMetricName(ts)
if metricName == "" {
h.tenantErrorsTotal.WithLabelValues("missing_metric_name").Inc()
return "", errors.New("time series missing __name__ label")
}

// Hash the metric name using xxhash
hasher := xxhash.New()
_, _ = hasher.WriteString(metricName)
hash := hasher.Sum64()

shard := hash % uint64(h.metricNameShards)
modifiedTenant := fmt.Sprintf("%s-%d", hashringName, shard)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What semantics will hashringName have in our context?


return modifiedTenant, nil
}

func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
var err error
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
Expand Down Expand Up @@ -921,6 +968,16 @@ func (h *Handler) distributeTimeseriesToReplicas(
}
}

// Apply metric name sharding if enabled
if h.metricNameShards > 0 && h.receiverMode == RouterOnly {
hashringName := h.hashring.GetHashringName(tenant)
modifiedTenant, err := h.modifyTenantForMetricSharding(tenant, &ts, hashringName)
if err != nil {
return nil, nil, err
}
tenant = modifiedTenant
}

for _, rn := range replicas {
endpoint, err := h.hashring.GetN(tenant, &ts, rn)
if err != nil {
Expand Down
59 changes: 56 additions & 3 deletions pkg/receive/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ type Hashring interface {
// Nodes returns a sorted slice of nodes that are in this hashring. Addresses could be duplicated
// if, for example, the same address is used for multiple tenants in the multi-hashring.
Nodes() []Endpoint
// GetHashringName returns the hashring name for the given tenant. Returns empty string for single hashrings.
GetHashringName(tenant string) string
}

// SingleNodeHashring always returns the same node.
Expand All @@ -76,6 +78,10 @@ func (s SingleNodeHashring) Nodes() []Endpoint {
return []Endpoint{{Address: string(s), CapNProtoAddress: string(s)}}
}

func (s SingleNodeHashring) GetHashringName(tenant string) string {
return ""
}

// GetN implements the Hashring interface.
func (s SingleNodeHashring) GetN(_ string, _ *prompb.TimeSeries, n uint64) (Endpoint, error) {
if n > 0 {
Expand Down Expand Up @@ -107,6 +113,10 @@ func (s simpleHashring) Nodes() []Endpoint {
return s
}

func (s simpleHashring) GetHashringName(tenant string) string {
return ""
}

// Get returns a target to handle the given tenant and time series.
func (s simpleHashring) Get(tenant string, ts *prompb.TimeSeries) (Endpoint, error) {
return s.GetN(tenant, ts, 0)
Expand Down Expand Up @@ -182,6 +192,10 @@ func (k *ketamaHashring) Nodes() []Endpoint {
return k.endpoints
}

func (k *ketamaHashring) GetHashringName(tenant string) string {
return ""
}

func sizeOfLeastOccupiedAZ(azSpread map[string]int64) int64 {
minValue := int64(math.MaxInt64)
for _, value := range azSpread {
Expand Down Expand Up @@ -278,9 +292,10 @@ func (t tenantSet) match(tenant string) (bool, error) {
// Which hashring to use for a tenant is determined
// by the tenants field of the hashring configuration.
type multiHashring struct {
cache map[string]Hashring
hashrings []Hashring
tenantSets []tenantSet
cache map[string]Hashring
hashrings []Hashring
tenantSets []tenantSet
hashringNames []string // Store hashring names corresponding to hashrings

// We need a mutex to guard concurrent access
// to the cache map, as this is both written to
Expand Down Expand Up @@ -333,6 +348,43 @@ func (m *multiHashring) Nodes() []Endpoint {
return m.nodes
}

// GetHashringName returns the hashring name for the given tenant.
// Returns empty string if no specific hashring is found (uses default).
func (m *multiHashring) GetHashringName(tenant string) string {
m.mu.RLock()
defer m.mu.RUnlock()

// Check if tenant is already cached
if _, ok := m.cache[tenant]; ok {
// Find which hashring this tenant maps to
for i, t := range m.tenantSets {
if t == nil {
// Default hashring
return m.hashringNames[i]
} else {
if found, _ := t.match(tenant); found {
return m.hashringNames[i]
}
}
}
}

// If not cached, check which hashring matches
for i, t := range m.tenantSets {
if t == nil {
// Default hashring matches everything
return m.hashringNames[i]
} else {
if found, _ := t.match(tenant); found {
return m.hashringNames[i]
}
}
}

// This should never happen if properly configured
return ""
}

// newMultiHashring creates a multi-tenant hashring for a given slice of
// groups.
// Which hashring to use for a tenant is determined
Expand All @@ -355,6 +407,7 @@ func NewMultiHashring(algorithm HashringAlgorithm, replicationFactor uint64, cfg
}
m.nodes = append(m.nodes, hashring.Nodes()...)
m.hashrings = append(m.hashrings, hashring)
m.hashringNames = append(m.hashringNames, h.Hashring)
var t map[string]tenantMatcher
if len(h.Tenants) != 0 {
t = make(map[string]tenantMatcher)
Expand Down
Loading
Loading