Support for tenant bucket prefixing in compactor#239
Support for tenant bucket prefixing in compactor#239willh-db wants to merge 36 commits intodatabricks:db_mainfrom
Conversation
cmd/thanos/compact.go
Outdated
| type TenantBucketConfig struct { | ||
| TenantPrefixes []string `yaml:"tenant_prefixes"` | ||
| // Other config fields can be added here if needed | ||
| } |
There was a problem hiding this comment.
I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.
- Copy and extend the struct.
type MultiTenancyBucketConfig struct {
Type objstore.ObjProvider `yaml:"type"`
Config interface{} `yaml:"config"`
Prefix string `yaml:"prefix" default:""`
// Example value: "v1/raw/tenant_a,v1/raw/tenant_b,v1/raw/tenant_c"
TenantPrefixes []string `yaml:"tenant_prefixes"`
}
- Generate N
client.BucketConfiginstances from the MultileTenancyBucketConfig instance. Each of the instances has a differentclient.BucketConfig.Prefixwhich isjoin(BucketConfig.Prefix, MultileTenancyBucketConfig.TenantPrefixes[i]) - Later on (not on this PR), compactor's init container will generate an
MultiTenancyBucketConfiginstance for its compactor container.
There was a problem hiding this comment.
I've just realized the client.BucketConfig struct is in another repo as a vendor repo to Thanos repo. We can't easily extend that struct. We have to copy that struct to here.
Sounds good. I wrote it this way thinking the init container could pass the list of tenants via the client.BucketConfig.Config, but this is cleaner and more type safe.
cmd/thanos/compact.go
Outdated
| type TenantBucketConfig struct { | ||
| TenantPrefixes []string `yaml:"tenant_prefixes"` | ||
| // Other config fields can be added here if needed | ||
| } |
cmd/thanos/compact.go
Outdated
| // Determine tenant prefixes to use (if provided) | ||
| var multiTenancyBucketConfig MultiTenancyBucketConfig | ||
| if err := yaml.Unmarshal(confContentYaml, &multiTenancyBucketConfig); err != nil { | ||
| return errors.Wrap(err, "parse bucket config") |
There was a problem hiding this comment.
"failed to parse MultiTenancyBucketConfig"
| Config: multiTenancyBucketConfig.Config, | ||
| Prefix: multiTenancyBucketConfig.Prefix + tenantPrefix, | ||
| } | ||
|
|
There was a problem hiding this comment.
Log something like "starting compaction loop for prefix: " + "bucketConf.prefix"
cmd/thanos/compact.go
Outdated
| // Create bucket for this tenant | ||
| if tenantPrefix != "" { | ||
| level.Info(logger).Log("msg", "creating compactor bucket with tenant prefix", "prefix", tenantPrefix) | ||
| } |
There was a problem hiding this comment.
After this point, the tenant prefix is abstracted out and just becomes prefix. We don't need anything special after this point.
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled for tenant", "prefix", tenantPrefix, "name", bkt.Name()) | ||
| } else { | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name()) | ||
| } | ||
| } |
There was a problem hiding this comment.
Remove the if and just log something agnostic to the tenant prefix.
cmd/thanos/compact.go
Outdated
| // Create tenant-specific logger | ||
| tenantLogger := logger | ||
| if tenantPrefix != "" { | ||
| tenantLogger = log.With(logger, "tenant_prefix", tenantPrefix) | ||
| } | ||
| sy, err = compact.NewMetaSyncer( | ||
| logger, | ||
| reg, | ||
| insBkt, | ||
| cf, | ||
| duplicateBlocksFilter, | ||
| ignoreDeletionMarkFilter, | ||
| compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), | ||
| compactMetrics.garbageCollectedBlocks, | ||
| syncMetasTimeout, | ||
| ) | ||
|
|
This reverts commit fd8b9ee.
| bkt, err := client.NewBucket(logger, confContentYaml, component.String(), nil) | ||
| if conf.enableFolderDeletion { | ||
| bkt, err = block.WrapWithAzDataLakeSdk(logger, confContentYaml, bkt) | ||
| level.Info(logger).Log("msg", "azdatalake sdk wrapper enabled", "name", bkt.Name()) | ||
| } | ||
| if err != nil { | ||
| return err |
There was a problem hiding this comment.
I see a lot of code got shuffled rather than deleted which makes reviewing this pr hard. Wonder if it is possible to keep some of the unchanged code path the same? Might need to pay some efforts to find the right place to insert as well as introduce helper functions but may worth it (admit this function is ready long)
jnyi
left a comment
There was a problem hiding this comment.
really cool, comments most about styling, also thanks for adding unit tests!
cmd/thanos/compact.go
Outdated
| insBkt, | ||
| conf.acceptMalformedIndex, | ||
| enableVerticalCompaction, | ||
| tenantReg, |
There was a problem hiding this comment.
same for this variable, is it possible to override instead
cmd/thanos/compact.go
Outdated
| planner, | ||
| comp, | ||
| compact.DefaultBlockDeletableChecker{}, | ||
| overlappingCallback, |
There was a problem hiding this comment.
if we don't change reg variable name, this line won't be changed.
cmd/thanos/compact.go
Outdated
| if err := os.MkdirAll(compactDir, os.ModePerm); err != nil { | ||
| return errors.Wrap(err, "create working compact directory") | ||
| } | ||
| overlappingCallback := compact.NewOverlappingCompactionLifecycleCallback(reg, logger, conf.enableOverlappingRemoval) |
There was a problem hiding this comment.
no need to create temp variable which is only used once later in the old place
There was a problem hiding this comment.
It is run once per tenant as it is within the loop but it shouldn't be an issue
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| // For multi-tenant mode, we pass a nil registerer to avoid metric collisions | ||
| // TODO (willh-db): revisit metrics structure for multi-tenant mode | ||
| tenantReg = nil | ||
| insBkt = objstoretracing.WrapWithTraces(bkt) | ||
| } else { | ||
| tenantReg = reg | ||
| insBkt = objstoretracing.WrapWithTraces(objstore.WrapWithMetrics(bkt, extprom.WrapRegistererWithPrefix("thanos_", tenantReg), bkt.Name())) | ||
| } | ||
|
|
||
| progress.Set(compact.CleanBlocks) | ||
| compact.BestEffortCleanAbortedPartialUploads(ctx, logger, sy.Partial(), insBkt, compactMetrics.partialUploadDeleteAttempts, compactMetrics.blocksCleaned, compactMetrics.blockCleanupFailures) | ||
| if err := blocksCleaner.DeleteMarkedBlocks(ctx); err != nil { | ||
| return errors.Wrap(err, "cleaning marked blocks") | ||
| // Create tenant-specific logger | ||
| if tenantPrefix != "" { | ||
| logger = log.With(logger, "tenant_prefix", tenantPrefix) | ||
| } |
There was a problem hiding this comment.
nit: both use the same if condition, could merge it to simply
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| f = baseMetaFetcher.NewMetaFetcher(nil, nil, "component", "globalBucketUI") // TODO (willh-db): revisit metrics here |
There was a problem hiding this comment.
some of the metrics might not be useful, so I don't think we need to alter all of them via this if-condition tenantPrefix != ""
| bucketConf := &client.BucketConfig{ | ||
| Type: initialBucketConf.Type, | ||
| Config: initialBucketConf.Config, | ||
| Prefix: path.Join(initialBucketConf.Prefix, tenantPrefix), |
There was a problem hiding this comment.
just curious if tenantPrefix == "", it won't generate an empty folder in the middle?
There was a problem hiding this comment.
Empty elements should be ignored: https://pkg.go.dev/path#Join
cmd/thanos/compact.go
Outdated
| if tenantPrefix != "" { | ||
| // For multi-tenant mode, we pass a nil registerer to avoid metric collisions | ||
| // TODO (willh-db): revisit metrics structure for multi-tenant mode | ||
| tenantReg = nil |
There was a problem hiding this comment.
I'd suggest to use this function to augment tenantReg instead.
prometheus.Labels{"tenant": tenantPrefix},
reg,
)
cmd/thanos/compact.go
Outdated
| progress.Set(compact.SyncMeta) | ||
| if err := sy.SyncMetas(ctx); err != nil { | ||
| return errors.Wrap(err, "syncing metas") | ||
| if tenantPrefix != "" { |
There was a problem hiding this comment.
I see a lot of if conditions based on this, I'd suggest to create a bool variable instead:
isMultiTenantMode = tenantPrefix != ""
if isMultiTenantMode {
... // do sth
There was a problem hiding this comment.
hi @willh-db , I actually had a better idea, WDYT?
The original runCompact function can be broken into 3 parts:
- initial and parse config
- setup bucket and use it in middle
- post bucket setup after conf.wait
instead of injecting a for loop per tenant and change a umber of code inline, I am wondering if we could break the original function into 3 functions so most code lines retain its original position
func runCompact(args) {
// part1
+ for tenants {
+ set up bucket per tenant
+ perBucketSetup(bucket_per_tenant, reg_per_tenant, logger_per_tenant)
+ }
+ postProcess()
+ }
+
+ func perBucketSetup() {. <-- add new func in the middle and call it from orignal with variable name unchanged.
// part2 which we want to for-loop
// most code don't need to change since we wrap it in a helper function
+ }
+
+ func postProcess() {
+ // part3
}

Changes
tenant_prefixeslist in bucket config (passed by the init container)v1/raw/tenant) and proceed as normal per tenant, using the existing goroutine systemVerification
Note