diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 1530c5cf5b0..9be48fae50a 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -24,8 +24,10 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "gopkg.in/yaml.v2" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" @@ -99,6 +101,11 @@ func registerCompact(app *extkingpin.App) { }) } +func extractOrdinalFromHostname(hostname string) (int, error) { + parts := strings.Split(hostname, "-") + return strconv.Atoi(parts[len(parts)-1]) +} + type compactMetrics struct { halted prometheus.Gauge retried prometheus.Counter @@ -179,6 +186,8 @@ func runCompact( progressRegistry := compact.NewProgressRegistry(reg, logger) downsampleMetrics := newDownsampleMetrics(reg) + ctx := context.Background() + httpProbe := prober.NewHTTP() statusProber := prober.Combine( httpProbe, @@ -207,33 +216,290 @@ func runCompact( return err } - bkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil) - if conf.enableFolderDeletion { - bkt, err = block.WrapWithAzDataLakeSdk(logger, confContentYaml, bkt) - level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name()) + var initialBucketConf client.BucketConfig + if err := yaml.Unmarshal(confContentYaml, &initialBucketConf); err != nil { + return errors.Wrap(err, "failed to parse bucket configuration") } - if err != nil { - return err + + // Setup tenant partitioning if enabled + var tenantPrefixes []string + var isMultiTenant bool + + if conf.enableTenantPathPrefix { + isMultiTenant = true + + hostname := os.Getenv("HOSTNAME") + ordinal, err := extractOrdinalFromHostname(hostname) + if err != nil { + return errors.Wrapf(err, "failed to extract ordinal from hostname %s", hostname) + } + + totalShards := conf.replicas / conf.replicationFactor + if conf.replicas%conf.replicationFactor != 0 || totalShards < 1 { + return errors.Errorf("replicas (%d) must be divisible by replication-factor (%d) and total-shards must be greater than 0", conf.replicas, conf.replicationFactor) + } + + if ordinal >= totalShards { + return errors.Errorf("ordinal (%d) must be less than total-shards (%d)", ordinal, totalShards) + } + + // Read tenant weights file path + tenantWeightsPath := conf.tenantWeightsFile.Path() + if tenantWeightsPath == "" { + return errors.New("tenant weights file required when using tenant partitioning") + } + + // Create a temporary bucket for tenant discovery (without prefix) + discoveryBkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil) + if err != nil { + return errors.Wrap(err, "failed to create discovery bucket") + } + defer runutil.CloseWithLogOnErr(logger, discoveryBkt, "discovery bucket client") + + level.Info(logger).Log("msg", "setting up tenant partitioning", + "ordinal", ordinal, + "total_shards", totalShards, + "common_path_prefix", conf.commonPathPrefix) + + // Get tenant assignments for this shard + tenantAssignments, err := compact.SetupTenantPartitioning(ctx, discoveryBkt, logger, tenantWeightsPath, conf.commonPathPrefix, totalShards) + if err != nil { + return errors.Wrap(err, "failed to setup tenant partitioning") + } + + // Get tenants assigned to this shard + assignedTenants := tenantAssignments[ordinal] + if len(assignedTenants) == 0 { + level.Warn(logger).Log("msg", "no tenants assigned to this shard", "ordinal", ordinal) + return nil // No tenants to compact + } + + // Build tenant prefixes from assigned tenants + for _, tenant := range assignedTenants { + tenantPrefixes = append(tenantPrefixes, path.Join(conf.commonPathPrefix, tenant)) + } + + level.Info(logger).Log("msg", "tenant partitioning setup complete", + "ordinal", ordinal, + "assigned_tenants", len(assignedTenants), + "tenants", strings.Join(assignedTenants, ",")) + } else { + // Single-tenant mode + isMultiTenant = false + tenantPrefixes = []string{""} + level.Info(logger).Log("msg", "running in single-tenant mode") } - insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) relabelContentYaml, err := conf.selectorRelabelConf.Content() if err != nil { return errors.Wrap(err, "get content of relabel configuration") } - relabelConfig, err := block.ParseRelabelConfig(relabelContentYaml, block.SelectorSupportedRelabelActions) if err != nil { return err } - // Ensure we close up everything properly. + retentionByResolution := map[compact.ResolutionLevel]time.Duration{ + compact.ResolutionLevelRaw: time.Duration(conf.retentionRaw), + compact.ResolutionLevel5m: time.Duration(conf.retentionFiveMin), + compact.ResolutionLevel1h: time.Duration(conf.retentionOneHr), + } + + if retentionByResolution[compact.ResolutionLevelRaw].Milliseconds() != 0 { + if !conf.disableDownsampling && retentionByResolution[compact.ResolutionLevelRaw].Milliseconds() < downsample.ResLevel1DownsampleRange { + return errors.New("raw resolution must be higher than the minimum block size after which 5m resolution downsampling will occur (40 hours)") + } + level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) + } + if retentionByResolution[compact.ResolutionLevel5m].Milliseconds() != 0 { + if !conf.disableDownsampling && retentionByResolution[compact.ResolutionLevel5m].Milliseconds() < downsample.ResLevel2DownsampleRange { + return errors.New("5m resolution retention must be higher than the minimum block size after which 1h resolution downsampling will occur (10 days)") + } + level.Info(logger).Log("msg", "retention policy of 5 min aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel5m]) + } + if retentionByResolution[compact.ResolutionLevel1h].Milliseconds() != 0 { + level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) + } + + levels, err := compactions.levels(conf.maxCompactionLevel) + if err != nil { + return errors.Wrap(err, "get compaction levels") + } + if conf.maxCompactionLevel < compactions.maxLevel() { + level.Warn(logger).Log("msg", "Max compaction level is lower than should be", "current", conf.maxCompactionLevel, "default", compactions.maxLevel()) + } + + dedupReplicaLabels := strutil.ParseFlagLabels(conf.dedupReplicaLabels) + enableVerticalCompaction := conf.enableVerticalCompaction + if len(dedupReplicaLabels) > 0 { + enableVerticalCompaction = true + level.Info(logger).Log( + "msg", "deduplication.replica-label specified, enabling vertical compaction", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","), + ) + } + if enableVerticalCompaction { + level.Info(logger).Log( + "msg", "vertical compaction is enabled", "compact.enable-vertical-compaction", fmt.Sprintf("%v", conf.enableVerticalCompaction), + ) + } + + var mergeFunc storage.VerticalChunkSeriesMergeFunc + switch conf.dedupFunc { + case compact.DedupAlgorithmPenalty: + mergeFunc = dedup.NewChunkSeriesMerger() + if len(dedupReplicaLabels) == 0 { + return errors.New("penalty based deduplication needs at least one replica label specified") + } + case "": + mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) + default: + return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc) + } + + compactDir := path.Join(conf.dataDir, "compact") + downsamplingDir := path.Join(conf.dataDir, "downsample") + if err := os.MkdirAll(compactDir, os.ModePerm); err != nil { + return errors.Wrap(err, "create working compact directory") + } + if err := os.MkdirAll(downsamplingDir, os.ModePerm); err != nil { + return errors.Wrap(err, "create working downsample directory") + } + + // Create a global bucket for the web UI (without tenant prefix) + globalBkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil) + if err != nil { + return errors.Wrap(err, "create global bucket") + } + globalInsBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(globalBkt, extprom.WrapRegistererWithPrefix("thanos_", reg), globalBkt.Name())) + defer func() { - if err != nil { - runutil.CloseWithLogOnErr(logger, insBkt, "bucket client") + if rerr != nil { + runutil.CloseWithLogOnErr(logger, globalInsBkt, "global bucket client") } }() + // Create block lister based on strategy + var blockLister block.Lister + switch conf.blockListStrategy { + case string(concurrentDiscovery): + blockLister = block.NewConcurrentLister(logger, globalInsBkt) + case string(recursiveDiscovery): + blockLister = block.NewRecursiveLister(logger, globalInsBkt) + default: + blockLister = block.NewConcurrentLister(logger, globalInsBkt) + } + + // Create base meta fetcher for global view (web UI) + baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, globalInsBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) + if err != nil { + return errors.Wrap(err, "create base meta fetcher") + } + + // Create global API for web UI + globalAPI := blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, globalInsBkt) + + // Create context with cancel for the entire compactor + ctx, cancel := context.WithCancel(ctx) + ctx = tracing.ContextWithTracer(ctx, tracer) + ctx = objstoretracing.ContextWithTracer(ctx, tracer) + + defer func() { + if rerr != nil { + cancel() + } + }() + + // Start compaction for each tenant + // Each will get its own bucket created via client.NewBucket with the appropriate prefix + for _, tenantPrefix := range tenantPrefixes { + + bucketConf := &client.BucketConfig{ + Type: initialBucketConf.Type, + Config: initialBucketConf.Config, + Prefix: path.Join(initialBucketConf.Prefix, tenantPrefix), + } + level.Info(logger).Log("msg", "starting compaction loop", "prefix", bucketConf.Prefix) + + tenantConfYaml, err := yaml.Marshal(bucketConf) + if err != nil { + return errors.Wrap(err, "marshal tenant bucket config") + } + + bkt, err := client.NewBucket(logger, tenantConfYaml, component.String(), nil) + if conf.enableFolderDeletion { + bkt, err = block.WrapWithAzDataLakeSdk(logger, tenantConfYaml, bkt) + level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "prefix", bucketConf.Prefix, "name", bkt.Name()) + } + if err != nil { + return err + } + + var tenantReg prometheus.Registerer + + if isMultiTenant { + tenantReg = prometheus.WrapRegistererWith(prometheus.Labels{"tenant": tenantPrefix}, reg) + } else { + tenantReg = reg + } + insBkt := objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", reg), bkt.Name())) + + // Create tenant-specific logger + var tenantLogger log.Logger + if isMultiTenant { + tenantLogger = log.With(logger, "tenant_prefix", tenantPrefix) + } else { + tenantLogger = logger + } + + // Ensure we close up everything properly. + defer func() { + if err != nil { + runutil.CloseWithLogOnErr(logger, insBkt, "bucket client") + } + }() + + err = runCompactForTenant(g, ctx, tenantLogger, tenantReg, reg, conf, flagsMap, deleteDelay, + compactMetrics, progressRegistry, downsampleMetrics, insBkt, relabelConfig, + retentionByResolution, levels, dedupReplicaLabels, enableVerticalCompaction, mergeFunc, + compactDir, downsamplingDir, cancel) + if err != nil { + return errors.Wrap(err, "failed to run compact for tenant") + } + } + + err = postProcess(g, ctx, logger, tracer, srv, conf, reg, component, progressRegistry, baseMetaFetcher, globalAPI, cancel) + if err != nil { + return errors.Wrap(err, "failed to post process after compacting tenants") + } + + level.Info(logger).Log("msg", "starting compact node") + statusProber.Ready() + return nil +} + +func runCompactForTenant( + g *run.Group, + ctx context.Context, + logger log.Logger, + reg prometheus.Registerer, + baseReg *prometheus.Registry, + conf compactConfig, + flagsMap map[string]string, + deleteDelay time.Duration, + compactMetrics *compactMetrics, + progressRegistry *compact.ProgressRegistry, + downsampleMetrics *DownsampleMetrics, + insBkt objstore.InstrumentedBucket, + relabelConfig []*relabel.Config, + retentionByResolution map[compact.ResolutionLevel]time.Duration, + levels []int64, + dedupReplicaLabels []string, + enableVerticalCompaction bool, + mergeFunc storage.VerticalChunkSeriesMergeFunc, + compactDir string, + downsamplingDir string, + cancel context.CancelFunc, +) (rerr error) { // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. @@ -244,37 +510,25 @@ func runCompact( labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig) consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) - - var blockLister block.Lister - switch syncStrategy(conf.blockListStrategy) { - case concurrentDiscovery: - blockLister = block.NewConcurrentLister(logger, insBkt) - case recursiveDiscovery: - blockLister = block.NewRecursiveLister(logger, insBkt) + // Create tenant-specific block lister and base fetcher + var tenantBlockLister block.Lister + switch conf.blockListStrategy { + case string(concurrentDiscovery): + tenantBlockLister = block.NewConcurrentLister(logger, insBkt) + case string(recursiveDiscovery): + tenantBlockLister = block.NewRecursiveLister(logger, insBkt) default: - return errors.Errorf("unknown sync strategy %s", conf.blockListStrategy) + tenantBlockLister = block.NewConcurrentLister(logger, insBkt) } - baseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, blockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) + + tenantBaseMetaFetcher, err := block.NewBaseFetcher(logger, conf.blockMetaFetchConcurrency, insBkt, tenantBlockLister, conf.dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg)) if err != nil { - return errors.Wrap(err, "create meta fetcher") + return errors.Wrap(err, "create tenant base meta fetcher") } - enableVerticalCompaction := conf.enableVerticalCompaction - dedupReplicaLabels := strutil.ParseFlagLabels(conf.dedupReplicaLabels) - if len(dedupReplicaLabels) > 0 { - enableVerticalCompaction = true - level.Info(logger).Log( - "msg", "deduplication.replica-label specified, enabling vertical compaction", "dedupReplicaLabels", strings.Join(dedupReplicaLabels, ","), - ) - } - if enableVerticalCompaction { - level.Info(logger).Log( - "msg", "vertical compaction is enabled", "compact.enable-vertical-compaction", fmt.Sprintf("%v", conf.enableVerticalCompaction), - ) - } var ( - api = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, insBkt) - sy *compact.Syncer + tenantAPI = blocksAPI.NewBlocksAPI(logger, conf.webConf.disableCORS, conf.label, flagsMap, insBkt) + sy *compact.Syncer ) { filters := []block.MetadataFilter{ @@ -290,10 +544,10 @@ func runCompact( filters = append(filters, noDownsampleMarkerFilter) } // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. - cf := baseMetaFetcher.NewMetaFetcher( + cf := tenantBaseMetaFetcher.NewMetaFetcher( extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { - api.SetLoaded(blocks, err) + tenantAPI.SetLoaded(blocks, err) }) // Still use blockViewerSyncBlockTimeout to retain original behavior before this upstream change: @@ -303,7 +557,8 @@ func runCompact( if !conf.wait { syncMetasTimeout = 0 } - sy, err = compact.NewMetaSyncer( + var syErr error + sy, syErr = compact.NewMetaSyncer( logger, reg, insBkt, @@ -314,45 +569,11 @@ func runCompact( compactMetrics.garbageCollectedBlocks, syncMetasTimeout, ) - if err != nil { - return errors.Wrap(err, "create syncer") + if syErr != nil { + return errors.Wrap(syErr, "create syncer") } } - levels, err := compactions.levels(conf.maxCompactionLevel) - if err != nil { - return errors.Wrap(err, "get compaction levels") - } - - if conf.maxCompactionLevel < compactions.maxLevel() { - level.Warn(logger).Log("msg", "Max compaction level is lower than should be", "current", conf.maxCompactionLevel, "default", compactions.maxLevel()) - } - - ctx, cancel := context.WithCancel(context.Background()) - ctx = tracing.ContextWithTracer(ctx, tracer) - ctx = objstoretracing.ContextWithTracer(ctx, tracer) // objstore tracing uses a different tracer key in context. - - defer func() { - if rerr != nil { - cancel() - } - }() - - var mergeFunc storage.VerticalChunkSeriesMergeFunc - switch conf.dedupFunc { - case compact.DedupAlgorithmPenalty: - mergeFunc = dedup.NewChunkSeriesMerger() - - if len(dedupReplicaLabels) == 0 { - return errors.New("penalty based deduplication needs at least one replica label specified") - } - case "": - mergeFunc = storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge) - - default: - return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc) - } - // Instantiate the compactor with different time slices. Timestamps in TSDB // are in milliseconds. comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc) @@ -360,19 +581,6 @@ func runCompact( return errors.Wrap(err, "create compactor") } - var ( - compactDir = path.Join(conf.dataDir, "compact") - downsamplingDir = path.Join(conf.dataDir, "downsample") - ) - - if err := os.MkdirAll(compactDir, os.ModePerm); err != nil { - return errors.Wrap(err, "create working compact directory") - } - - if err := os.MkdirAll(downsamplingDir, os.ModePerm); err != nil { - return errors.Wrap(err, "create working downsample directory") - } - grouper := compact.NewDefaultGrouper( logger, insBkt, @@ -408,7 +616,7 @@ func runCompact( planner, comp, compact.DefaultBlockDeletableChecker{}, - compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval), + compact.NewOverlappingCompactionLifecycleCallback(baseReg, logger, conf.enableOverlappingRemoval), compactDir, insBkt, conf.compactionConcurrency, @@ -418,30 +626,6 @@ func runCompact( return errors.Wrap(err, "create bucket compactor") } - retentionByResolution := map[compact.ResolutionLevel]time.Duration{ - compact.ResolutionLevelRaw: time.Duration(conf.retentionRaw), - compact.ResolutionLevel5m: time.Duration(conf.retentionFiveMin), - compact.ResolutionLevel1h: time.Duration(conf.retentionOneHr), - } - - if retentionByResolution[compact.ResolutionLevelRaw].Milliseconds() != 0 { - // If downsampling is enabled, error if raw retention is not sufficient for downsampling to occur (upper bound 10 days for 1h resolution) - if !conf.disableDownsampling && retentionByResolution[compact.ResolutionLevelRaw].Milliseconds() < downsample.ResLevel1DownsampleRange { - return errors.New("raw resolution must be higher than the minimum block size after which 5m resolution downsampling will occur (40 hours)") - } - level.Info(logger).Log("msg", "retention policy of raw samples is enabled", "duration", retentionByResolution[compact.ResolutionLevelRaw]) - } - if retentionByResolution[compact.ResolutionLevel5m].Milliseconds() != 0 { - // If retention is lower than minimum downsample range, then no downsampling at this resolution will be persisted - if !conf.disableDownsampling && retentionByResolution[compact.ResolutionLevel5m].Milliseconds() < downsample.ResLevel2DownsampleRange { - return errors.New("5m resolution retention must be higher than the minimum block size after which 1h resolution downsampling will occur (10 days)") - } - level.Info(logger).Log("msg", "retention policy of 5 min aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel5m]) - } - if retentionByResolution[compact.ResolutionLevel1h].Milliseconds() != 0 { - level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) - } - retentionByTenant, err := compact.ParesRetentionPolicyByTenant(logger, *conf.retentionTenants) if err != nil { level.Error(logger).Log("msg", "failed to parse retention policy by tenant", "err", err) @@ -574,6 +758,9 @@ func runCompact( return cleanPartialMarked(progress) } + // For backwards compatibility, use the existing single-goroutine approach + // The bucketsToCompact[0] is already assigned to insBkt above, and all setup used insBkt + // So the existing compactMainFn will work for the first/only bucket g.Add(func() error { defer runutil.CloseWithLogOnErr(logger, insBkt, "bucket client") @@ -616,6 +803,105 @@ func runCompact( cancel() }) + // Periodically remove partial blocks and blocks marked for deletion + // since one iteration potentially could take a long time. + if conf.wait && conf.cleanupBlocksInterval > 0 { + g.Add(func() error { + return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error { + err := cleanPartialMarked(progressRegistry.Get(compact.Cleanup)) + if err != nil && compact.IsRetryError(err) { + level.Error(logger).Log("msg", "retriable error", "err", err) + compactMetrics.retried.Inc() + return nil + } + return err + }) + }, func(error) { + cancel() + }) + } + + // Periodically calculate the progress of compaction, downsampling and retention. + if conf.wait && conf.progressCalculateInterval > 0 { + g.Add(func() error { + ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner) + rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution) + var ds *compact.DownsampleProgressCalculator + if !conf.disableDownsampling { + ds = compact.NewDownsampleProgressCalculator(reg) + } + + return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error { + progress := progressRegistry.Get(compact.Calculate) + defer progress.Idle() + progress.Set(compact.SyncMeta) + if err := sy.SyncMetas(ctx); err != nil { + if compact.IsRetryError(err) { + level.Error(logger).Log("msg", "retriable error", "err", err) + compactMetrics.retried.Inc() + return nil + } + return errors.Wrapf(err, "could not sync metas") + } + + metas := sy.Metas() + progress.Set(compact.Grouping) + groups, err := grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata for compaction") + } + progress.Set(compact.CalculateProgress) + if err = ps.ProgressCalculate(ctx, groups); err != nil { + return errors.Wrapf(err, "could not calculate compaction progress") + } + + progress.Set(compact.Grouping) + retGroups, err := grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata for retention") + } + + progress.Set(compact.CalculateProgress) + if err = rs.ProgressCalculate(ctx, retGroups); err != nil { + return errors.Wrapf(err, "could not calculate retention progress") + } + + if !conf.disableDownsampling { + progress.Set(compact.Grouping) + groups, err = grouper.Groups(metas) + if err != nil { + return errors.Wrapf(err, "could not group metadata into downsample groups") + } + progress.Set(compact.CalculateProgress) + if err := ds.ProgressCalculate(ctx, groups); err != nil { + return errors.Wrapf(err, "could not calculate downsampling progress") + } + } + + return nil + }) + }, func(error) { + cancel() + }) + } + + return nil +} + +func postProcess( + g *run.Group, + ctx context.Context, + logger log.Logger, + tracer opentracing.Tracer, + srv *httpserver.Server, + conf compactConfig, + reg *prometheus.Registry, + component component.Component, + progressRegistry *compact.ProgressRegistry, + baseMetaFetcher *block.BaseFetcher, + globalAPI *blocksAPI.BlocksAPI, + cancel context.CancelFunc, +) error { if conf.wait { if !conf.disableWeb { r := route.New() @@ -630,13 +916,13 @@ func runCompact( return logging.NoLogCall })} logMiddleware := logging.NewHTTPServerMiddleware(logger, opts...) - api.Register(r.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) + globalAPI.Register(r.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) // Separate fetcher for global view. // TODO(bwplotka): Allow Bucket UI to visualize the state of the block as well. f := baseMetaFetcher.NewMetaFetcher(extprom.WrapRegistererWithPrefix("thanos_bucket_ui", reg), nil, "component", "globalBucketUI") f.UpdateOnChange(func(blocks []metadata.Meta, err error) { - api.SetGlobal(blocks, err) + globalAPI.SetGlobal(blocks, err) }) srv.Handle("/", r) @@ -663,99 +949,9 @@ func runCompact( }) } - // Periodically remove partial blocks and blocks marked for deletion - // since one iteration potentially could take a long time. - if conf.cleanupBlocksInterval > 0 { - g.Add(func() error { - return runutil.Repeat(conf.cleanupBlocksInterval, ctx.Done(), func() error { - err := cleanPartialMarked(progressRegistry.Get(compact.Cleanup)) - if err != nil && compact.IsRetryError(err) { - // The RetryError signals that we hit an retriable error (transient error, no connection). - // You should alert on this being triggered too frequently. - level.Error(logger).Log("msg", "retriable error", "err", err) - compactMetrics.retried.Inc() - - return nil - } - - return err - }) - }, func(error) { - cancel() - }) - } - - // Periodically calculate the progress of compaction, downsampling and retention. - if conf.progressCalculateInterval > 0 { - g.Add(func() error { - ps := compact.NewCompactionProgressCalculator(reg, tsdbPlanner) - rs := compact.NewRetentionProgressCalculator(reg, retentionByResolution) - var ds *compact.DownsampleProgressCalculator - if !conf.disableDownsampling { - ds = compact.NewDownsampleProgressCalculator(reg) - } - - return runutil.Repeat(conf.progressCalculateInterval, ctx.Done(), func() error { - progress := progressRegistry.Get(compact.Calculate) - defer progress.Idle() - progress.Set(compact.SyncMeta) - if err := sy.SyncMetas(ctx); err != nil { - // The RetryError signals that we hit an retriable error (transient error, no connection). - // You should alert on this being triggered too frequently. - if compact.IsRetryError(err) { - level.Error(logger).Log("msg", "retriable error", "err", err) - compactMetrics.retried.Inc() - - return nil - } - - return errors.Wrapf(err, "could not sync metas") - } - - metas := sy.Metas() - progress.Set(compact.Grouping) - groups, err := grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata for compaction") - } - progress.Set(compact.CalculateProgress) - if err = ps.ProgressCalculate(ctx, groups); err != nil { - return errors.Wrapf(err, "could not calculate compaction progress") - } - - progress.Set(compact.Grouping) - retGroups, err := grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata for retention") - } - - progress.Set(compact.CalculateProgress) - if err = rs.ProgressCalculate(ctx, retGroups); err != nil { - return errors.Wrapf(err, "could not calculate retention progress") - } - - if !conf.disableDownsampling { - progress.Set(compact.Grouping) - groups, err = grouper.Groups(metas) - if err != nil { - return errors.Wrapf(err, "could not group metadata into downsample groups") - } - progress.Set(compact.CalculateProgress) - if err := ds.ProgressCalculate(ctx, groups); err != nil { - return errors.Wrapf(err, "could not calculate downsampling progress") - } - } - - return nil - }) - }, func(err error) { - cancel() - }) - } + // Note: Cleanup interval and progress calculation are handled per-tenant + // in runCompactForTenant() since they require per-tenant components (syncer, grouper, planner). } - - level.Info(logger).Log("msg", "starting compact node") - statusProber.Ready() return nil } @@ -797,6 +993,11 @@ type compactConfig struct { progressCalculateInterval time.Duration filterConf *store.FilterConfig disableAdminOperations bool + tenantWeightsFile extflag.PathOrContent + replicas int + replicationFactor int + commonPathPrefix string + enableTenantPathPrefix bool } func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -913,6 +1114,20 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { cc.selectorRelabelConf = *extkingpin.RegisterSelectorRelabelFlags(cmd) + cc.tenantWeightsFile = *extflag.RegisterPathOrContent(cmd, "compact.tenant-weights-file", "YAML file that contains the tenant weights for tenant partitioning.", extflag.WithEnvSubstitution()) + + cmd.Flag("compact.replicas", "Total replicas of the stateful set."). + Default("1").IntVar(&cc.replicas) + + cmd.Flag("compact.replication-factor", "Replication factor of the stateful set."). + Default("1").IntVar(&cc.replicationFactor) + + cmd.Flag("compact.common-path-prefix", "Common path prefix for tenant discovery when using tenant partitioning. This is the prefix before the tenant name in the object storage path."). + Default("v1/raw/").StringVar(&cc.commonPathPrefix) + + cmd.Flag("compact.enable-tenant-path-prefix", "Enable tenant path prefix mode for backward compatibility. When disabled, compactor runs in single-tenant mode."). + Default("false").BoolVar(&cc.enableTenantPathPrefix) + cc.webConf.registerFlag(cmd) cmd.Flag("bucket-web-label", "External block label to use as group title in the bucket web UI").StringVar(&cc.label) diff --git a/cmd/thanos/compact_test.go b/cmd/thanos/compact_test.go new file mode 100644 index 00000000000..633009b4b4e --- /dev/null +++ b/cmd/thanos/compact_test.go @@ -0,0 +1,302 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package main + +import ( + "path" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/thanos-io/objstore/client" + "gopkg.in/yaml.v2" +) + +func TestExtractOrdinalFromHostname(t *testing.T) { + tests := []struct { + name string + hostname string + expectedOrdinal int + expectError bool + }{ + { + name: "statefulset hostname with single digit", + hostname: "pantheon-compactor-0", + expectedOrdinal: 0, + expectError: false, + }, + { + name: "statefulset hostname with double digit", + hostname: "pantheon-compactor-15", + expectedOrdinal: 15, + expectError: false, + }, + { + name: "statefulset hostname with triple digit", + hostname: "pantheon-compactor-999", + expectedOrdinal: 999, + expectError: false, + }, + { + name: "kubernetes statefulset with namespace", + hostname: "pantheon-compactor-2", + expectedOrdinal: 2, + expectError: false, + }, + { + name: "complex statefulset name", + hostname: "pantheon-compactor-7", + expectedOrdinal: 7, + expectError: false, + }, + { + name: "hostname without number", + hostname: "pantheoncompactor", + expectError: true, + }, + { + name: "hostname with invalid suffix", + hostname: "pantheon-compactor-abc", + expectError: true, + }, + { + name: "empty hostname", + hostname: "", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ordinal, err := extractOrdinalFromHostname(tt.hostname) + if tt.expectError { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + testutil.Equals(t, tt.expectedOrdinal, ordinal) + } + }) + } +} + +func TestTenantPrefixBucketCreation(t *testing.T) { + tests := []struct { + name string + bucketConfig client.BucketConfig + tenantPrefixes []string + expectedEffectivePrefixes []string + }{ + { + name: "single-tenant mode (no prefixes)", + bucketConfig: client.BucketConfig{ + Type: client.FILESYSTEM, + Config: map[string]interface{}{ + "directory": "/tmp/test", + }, + Prefix: "", + }, + tenantPrefixes: []string{""}, + expectedEffectivePrefixes: []string{""}, + }, + { + name: "tenant partitioning with one tenant", + bucketConfig: client.BucketConfig{ + Type: client.FILESYSTEM, + Config: map[string]interface{}{ + "directory": "/tmp/test", + }, + Prefix: "", + }, + tenantPrefixes: []string{"v1/raw/tenant1"}, + expectedEffectivePrefixes: []string{"v1/raw/tenant1"}, + }, + { + name: "tenant partitioning with multiple tenants", + bucketConfig: client.BucketConfig{ + Type: client.FILESYSTEM, + Config: map[string]interface{}{ + "directory": "/tmp/test", + }, + Prefix: "", + }, + tenantPrefixes: []string{"v1/raw/tenant1", "v1/raw/tenant2", "v1/raw/tenant3"}, + expectedEffectivePrefixes: []string{"v1/raw/tenant1", "v1/raw/tenant2", "v1/raw/tenant3"}, + }, + { + name: "tenant partitioning with base prefix and tenant paths", + bucketConfig: client.BucketConfig{ + Type: client.FILESYSTEM, + Config: map[string]interface{}{ + "directory": "/tmp/test", + }, + Prefix: "base/", + }, + tenantPrefixes: []string{"v1/raw/tenant1", "v1/raw/tenant2"}, + expectedEffectivePrefixes: []string{"base/v1/raw/tenant1", "base/v1/raw/tenant2"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate bucket creation for each tenant prefix + var actualEffectivePrefixes []string + for _, tenantPrefix := range tt.tenantPrefixes { + bucketConf := &client.BucketConfig{ + Type: tt.bucketConfig.Type, + Config: tt.bucketConfig.Config, + Prefix: path.Join(tt.bucketConfig.Prefix, tenantPrefix), + } + actualEffectivePrefixes = append(actualEffectivePrefixes, bucketConf.Prefix) + + // Verify the bucket can be created + // Use a real temp directory for filesystem buckets + if bucketConf.Type == client.FILESYSTEM { + configMap := make(map[string]interface{}) + for k, v := range bucketConf.Config.(map[string]interface{}) { + configMap[k] = v + } + configMap["directory"] = t.TempDir() + bucketConf.Config = configMap + } + + tenantYaml, err := yaml.Marshal(bucketConf) + testutil.Ok(t, err) + + bkt, err := client.NewBucket(log.NewNopLogger(), tenantYaml, "test", nil) + testutil.Ok(t, err) + testutil.Ok(t, bkt.Close()) + } + + testutil.Equals(t, tt.expectedEffectivePrefixes, actualEffectivePrefixes) + }) + } +} + +func TestBucketConfigPrefixPreservation(t *testing.T) { + tests := []struct { + name string + prefix string + }{ + { + name: "empty prefix", + prefix: "", + }, + { + name: "simple prefix", + prefix: "tenant1", + }, + { + name: "v1/raw prefix", + prefix: "v1/raw/tenant1", + }, + { + name: "hierarchical prefix", + prefix: "org/team/tenant1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a bucket config with prefix + originalConf := client.BucketConfig{ + Type: client.FILESYSTEM, + Config: map[string]interface{}{ + "directory": "/tmp/test", + }, + Prefix: tt.prefix, + } + + // Marshal to YAML + yamlBytes, err := yaml.Marshal(originalConf) + testutil.Ok(t, err) + + // Unmarshal back + var unmarshaledConf client.BucketConfig + err = yaml.Unmarshal(yamlBytes, &unmarshaledConf) + testutil.Ok(t, err) + + // Verify prefix is preserved + testutil.Equals(t, tt.prefix, unmarshaledConf.Prefix) + }) + } +} + +func TestBucketConfigFromYAML(t *testing.T) { + tests := []struct { + name string + bucketYamlConfig string + expectedPrefix string + expectedType client.ObjProvider + }{ + { + name: "filesystem without prefix", + bucketYamlConfig: ` +type: FILESYSTEM +config: + directory: /tmp/test +prefix: "" +`, + expectedPrefix: "", + expectedType: client.FILESYSTEM, + }, + { + name: "filesystem with v1/raw prefix", + bucketYamlConfig: ` +type: FILESYSTEM +config: + directory: /tmp/test +prefix: "v1/raw/" +`, + expectedPrefix: "v1/raw/", + expectedType: client.FILESYSTEM, + }, + { + name: "S3 with data prefix", + bucketYamlConfig: ` +type: S3 +config: + bucket: test-bucket + endpoint: s3.amazonaws.com +prefix: "data/" +`, + expectedPrefix: "data/", + expectedType: client.S3, + }, + { + name: "filesystem with tenant path", + bucketYamlConfig: ` +type: FILESYSTEM +config: + directory: /tmp/test +prefix: "v1/raw/tenant-alpha" +`, + expectedPrefix: "v1/raw/tenant-alpha", + expectedType: client.FILESYSTEM, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var bucketConfig client.BucketConfig + err := yaml.Unmarshal([]byte(tt.bucketYamlConfig), &bucketConfig) + testutil.Ok(t, err) + + // Verify all fields are correctly parsed + testutil.Equals(t, tt.expectedPrefix, bucketConfig.Prefix) + testutil.Equals(t, tt.expectedType, bucketConfig.Type) + + // Verify we can marshal back + bucketYamlBytes, err := yaml.Marshal(bucketConfig) + testutil.Ok(t, err) + + // Verify we can unmarshal again and get the same result + var roundtripBucketConfig client.BucketConfig + err = yaml.Unmarshal(bucketYamlBytes, &roundtripBucketConfig) + testutil.Ok(t, err) + + testutil.Equals(t, bucketConfig.Prefix, roundtripBucketConfig.Prefix) + testutil.Equals(t, bucketConfig.Type, roundtripBucketConfig.Type) + }) + } +}