From 1d9247bafd0d268d280cbb1900e1acd7d6e83693 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 20 Nov 2025 10:08:11 +0300 Subject: [PATCH 1/6] feat(fracmanager) add handling for upload queue overflow and disk space exhaustion --- cmd/seq-db/seq-db.go | 7 ++- config/config.go | 12 +++- fracmanager/config.go | 9 ++- fracmanager/fraction_registry.go | 72 +++++++++++++++++++++++ fracmanager/lifecycle_manager.go | 82 ++++++++++++++++++++++----- fracmanager/lifecycle_manager_test.go | 2 +- fracmanager/proxy_frac.go | 19 +++++++ fracmanager/tasks.go | 67 ++++++++++++++++++++++ 8 files changed, 251 insertions(+), 19 deletions(-) create mode 100644 fracmanager/tasks.go diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 3d7bd370..423c5e47 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -253,6 +253,7 @@ func startStore( DataDir: cfg.Storage.DataDir, FracSize: uint64(cfg.Storage.FracSize), TotalSize: uint64(cfg.Storage.TotalSize), + SealingQueueLen: uint64(cfg.Storage.SealingQueueLen), CacheSize: uint64(cfg.Resources.CacheSize), SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize), ReplayWorkers: cfg.Resources.ReplayWorkers, @@ -281,8 +282,10 @@ func startStore( SkipSortDocs: !cfg.DocsSorting.Enabled, KeepMetaFile: false, }, - OffloadingEnabled: cfg.Offloading.Enabled, - OffloadingRetention: cfg.Offloading.Retention, + OffloadingEnabled: cfg.Offloading.Enabled, + OffloadingRetention: cfg.Offloading.Retention, + OffloadingRetryDelay: cfg.Offloading.RetryDelay, + OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100), }, API: storeapi.APIConfig{ StoreMode: configMode, diff --git a/config/config.go b/config/config.go index a63a450d..6180119f 100644 --- a/config/config.go +++ b/config/config.go @@ -63,6 +63,11 @@ type Config struct { // TotalSize specifies upper bound of how much disk space can be occupied // by sealed fractions before they get deleted (or offloaded). TotalSize Bytes `config:"total_size" default:"1GiB"` + // SealingQueueLen defines the maximum length of the sealing queue. + // If the queue size exceeds this limit, writing to the store will be paused, + // and bulk requests will start returning errors. + // A value of zero disables this limit, allowing writes to proceed unconditionally. + SealingQueueLen int `config:"sealing_queue_len" default:"10"` } `config:"storage"` Cluster struct { @@ -242,8 +247,13 @@ type Config struct { // You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html). SecretKey string `config:"secret_key"` // RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls. - // Be aware that fraction is suicided when offloading attempts exceeds [RetryCount]. RetryCount int `config:"retry_count" default:"5"` + // Specifies the percentage of total local dataset size allocated to the offloading queue. + // Note: When the queue overflows, the oldest fraction of data is automatically removed. + // This automatic removal is disabled when set to zero. + QueueSizePercent float64 `config:"queue_size_percent" default:"5"` + // Delay duration between consecutive offloading retries + RetryDelay time.Duration `config:"retry_delay" default:"2s"` } `config:"offloading"` AsyncSearch struct { diff --git a/fracmanager/config.go b/fracmanager/config.go index de96c957..5f3ae386 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -19,6 +19,9 @@ type Config struct { TotalSize uint64 CacheSize uint64 + SuspendThreshold uint64 + SealingQueueLen uint64 + ReplayWorkers int MaintenanceDelay time.Duration CacheCleanupDelay time.Duration @@ -28,8 +31,10 @@ type Config struct { Fraction frac.Config MinSealFracSize uint64 - OffloadingEnabled bool - OffloadingRetention time.Duration + OffloadingEnabled bool + OffloadingQueueSize uint64 + OffloadingRetention time.Duration + OffloadingRetryDelay time.Duration } func FillConfigWithDefault(config *Config) *Config { diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 2aeca819..22a79d5a 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -131,6 +131,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active curInfo := old.instance.Info() r.stats.sealing.Add(curInfo) + r.active.Suspend(old.Suspended()) + wg := sync.WaitGroup{} wg.Add(1) // since old.WaitWriteIdle() can take some time, we don't want to do it under the lock @@ -153,6 +155,31 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active return old, wg.Wait, nil } +func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) bool { + r.mu.Lock() + defer r.mu.Unlock() + + if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { + r.active.Suspend(true) + return true + } + + if maxSize > 0 && r.diskUsage() > maxSize { + r.active.Suspend(true) + return true + } + + r.active.Suspend(false) + return false +} + +func (r *fractionRegistry) diskUsage() uint64 { + return r.active.instance.Info().FullSize() + + r.stats.sealed.totalSizeOnDisk + + r.stats.sealing.totalSizeOnDisk + + r.stats.offloading.totalSizeOnDisk +} + // addActive sets a new active fraction and updates the complete fractions list. func (r *fractionRegistry) addActive(a *activeProxy) { r.muAll.Lock() @@ -229,6 +256,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]* // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { + if retention == 0 { + return nil + } + r.mu.Lock() defer r.mu.Unlock() @@ -250,6 +281,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { return evicted } +// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. +// Selects fractions that haven't finished offloading yet to minimize data loss. +// Used when offloading queue grows too large due to slow remote storage performance. +func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { + if sizeLimit == 0 { + return nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Fast path: skip processing if within size limits + if r.stats.offloading.totalSizeOnDisk <= sizeLimit { + return nil + } + + count := 0 + evicted := []*sealedProxy{} + // filter fractions + for _, item := range r.offloading { + // keep items that are within limits or already offloaded + if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil { + r.offloading[count] = item + count++ + continue + } + evicted = append(evicted, item) + r.stats.offloading.Sub(item.instance.Info()) + } + + r.offloading = r.offloading[:count] + r.rebuildAllFractions() + + return evicted +} + // PromoteToSealed moves fractions from sealing to local queue when sealing completes. // Maintains strict ordering - younger fractions wait for older ones to seal first. func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) { @@ -324,6 +391,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { count++ } } + + if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed) + return + } + r.offloading = r.offloading[:count] r.stats.offloading.Sub(sealed.instance.Info()) diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 63934868..0c0e298e 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -21,6 +21,7 @@ type lifecycleManager struct { provider *fractionProvider // provider for fraction operations flags *StateManager // storage state flags registry *fractionRegistry // fraction state registry + tasks *TaskManager // Background offloading tasks sealingWg sync.WaitGroup } @@ -36,18 +37,26 @@ func newLifecycleManager( provider: provider, flags: flags, registry: registry, + tasks: NewTaskManager(), } } // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. -func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) { - lc.rotate(config.FracSize, wg) - if config.OffloadingEnabled { - lc.offloadLocal(ctx, config.TotalSize, wg) - lc.cleanRemote(config.OffloadingRetention, wg) +func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { + + suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize + lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold) + + lc.rotate(cfg.FracSize, wg) + if cfg.OffloadingEnabled { + lc.offloadLocal(ctx, cfg.TotalSize, cfg.OffloadingRetryDelay, wg) + if cfg.OffloadingQueueSize > 0 { + lc.removeOverflowed(cfg.OffloadingQueueSize, wg) + } + lc.cleanRemote(cfg.OffloadingRetention, wg) } else { - lc.cleanLocal(config.TotalSize, wg) + lc.cleanLocal(cfg.TotalSize, wg) } lc.updateOldestMetric() lc.SyncInfoCache() @@ -113,17 +122,18 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. -func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) { +func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { toOffload, err := lc.registry.EvictLocal(true, sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } for _, sealed := range toOffload { wg.Add(1) - go func() { + lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { defer wg.Done() - remote, _ := lc.tryOffload(ctx, sealed.instance) + remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay) + lc.registry.PromoteToRemote(sealed, remote) if remote == nil { @@ -136,7 +146,41 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, // free up local resources sealed.instance.Suicide() maintenanceTruncateTotal.Add(1) - }() + }) + } +} + +// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation. +// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled. +func (lc *lifecycleManager) offloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote { + start := time.Now() + for i := 0; ; i++ { + remote, err := lc.tryOffload(ctx, sealed) + if err == nil { + return remote + } + + logger.Warn( + "fail to offload fraction", + zap.String("name", sealed.BaseFileName), + zap.Duration("offloading_time", time.Since(start)), + zap.Int("attempts", i), + zap.Error(err), + ) + + select { + case <-ctx.Done(): + logger.Info( + "fraction offloading was stopped", + zap.String("name", sealed.BaseFileName), + zap.Duration("offloading_time", time.Since(start)), + zap.Int("attempts", i), + zap.Error(ctx.Err()), + ) + return nil + case <-time.After(retryDelay): + // Wait before next retry attempt + } } } @@ -163,9 +207,6 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { - if retention == 0 { - return - } toDelete := lc.registry.EvictRemote(retention) wg.Add(1) go func() { @@ -207,3 +248,18 @@ func (lc *lifecycleManager) updateOldestMetric() { oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds()) oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds()) } + +// removeOverflowed removes fractions from offloading queue that exceed size limit +// Stops ongoing offloading tasks and cleans up both local and remote resources. +func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { + evicted := lc.registry.EvictOverflowed(sizeLimit) + for _, item := range evicted { + wg.Add(1) + go func() { + defer wg.Done() + // Cancel the offloading task - this operation may take significant time + // hence executed in a separate goroutine to avoid blocking + lc.tasks.Cancel(item.instance.BaseFileName) + }() + } +} diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index b7afb715..abd180e2 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -150,7 +150,7 @@ func TestOldestMetrics(t *testing.T) { } wg := sync.WaitGroup{} - lc.offloadLocal(t.Context(), total-halfSize, &wg) + lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) wg.Wait() // Check state after offloading diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index 949e2412..ffc31854 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -23,6 +23,7 @@ var ( _ frac.Fraction = (*emptyFraction)(nil) ErrFractionNotWritable = errors.New("fraction is not writable") + ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") ) // fractionProxy provides thread-safe access to a fraction with atomic replacement @@ -81,6 +82,7 @@ type activeProxy struct { wg sync.WaitGroup // Tracks pending write operations finalized bool // Whether fraction is frozen for writes + suspended bool // Temporarily suspended for writes } func newActiveProxy(active *frac.Active) *activeProxy { @@ -97,6 +99,10 @@ func (p *activeProxy) Append(docs, meta []byte) error { p.mu.RUnlock() return ErrFractionNotWritable } + if p.suspended { + p.mu.RUnlock() + return ErrFractionSuspended + } p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() p.mu.RUnlock() @@ -115,6 +121,19 @@ func (p *activeProxy) WaitWriteIdle() { zap.Float64("time_wait_s", waitTime)) } +func (p *activeProxy) Suspended() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.suspended +} + +func (p *activeProxy) Suspend(value bool) { + p.mu.Lock() + p.suspended = value + p.mu.Unlock() +} + // Finalize marks the fraction as read-only and prevents new writes from starting after finalize. func (p *activeProxy) Finalize() error { p.mu.Lock() diff --git a/fracmanager/tasks.go b/fracmanager/tasks.go new file mode 100644 index 00000000..ac11ea8d --- /dev/null +++ b/fracmanager/tasks.go @@ -0,0 +1,67 @@ +package fracmanager + +import ( + "context" + "sync" +) + +// task represents a cancellable background task with synchronization +// Used for managing long-running operations like offloading fractions. +// Lifecycle: Created via Tasks.Run(), cancelled via Tasks.Cancel(), cleaned up on completion. +type task struct { + wg sync.WaitGroup // Synchronizes task completion + ctx context.Context // Context for cancellation + cancel context.CancelFunc // Function to cancel the task +} + +// TaskManager manages a collection of running background tasks +// Provides safe concurrent access to task tracking and cancellation. +type TaskManager struct { + mu sync.Mutex + running map[string]*task // Map of task ID to task instance +} + +func NewTaskManager() *TaskManager { + return &TaskManager{ + running: make(map[string]*task), + } +} + +// Run starts a new background task with the given ID and context. +// The task will be automatically removed when completed. +func (t *TaskManager) Run(id string, ctx context.Context, action func(ctx context.Context)) *task { + task := &task{} + task.ctx, task.cancel = context.WithCancel(ctx) + + t.mu.Lock() + t.running[id] = task + t.mu.Unlock() + + task.wg.Add(1) + go func() { + defer func() { + t.mu.Lock() + delete(t.running, id) + t.mu.Unlock() + + task.wg.Done() + }() + + action(task.ctx) + }() + + return task +} + +// Cancel cancels and waits for completion of a task by ID +// Returns immediately if task with given ID doesn't exist. +func (t *TaskManager) Cancel(id string) { + t.mu.Lock() + task, ok := t.running[id] + t.mu.Unlock() + + if ok { + task.cancel() + task.wg.Wait() + } +} From c38f529d453988c2171c9cacb798c0360c3c632b Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Thu, 5 Mar 2026 21:42:07 +0300 Subject: [PATCH 2/6] review fixes --- cmd/seq-db/seq-db.go | 1 - config/config.go | 2 - config/validation.go | 2 + config/validation_test.go | 117 ++++++++++++++++++++++++++ frac/fraction_test.go | 1 - fracmanager/fracmanager.go | 25 ++++-- fracmanager/fraction_provider_test.go | 2 +- fracmanager/fraction_registry.go | 7 +- fracmanager/lifecycle_manager.go | 6 +- fracmanager/tasks.go | 9 +- storage/s3/client.go | 6 +- storage/s3/reader_test.go | 2 +- tests/setup/env.go | 2 +- 13 files changed, 154 insertions(+), 28 deletions(-) create mode 100644 config/validation_test.go diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 423c5e47..b73b9b27 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -345,7 +345,6 @@ func initS3Client(cfg config.Config) *s3.Client { cfg.Offloading.SecretKey, cfg.Offloading.Region, cfg.Offloading.Bucket, - cfg.Offloading.RetryCount, ) if err != nil { diff --git a/config/config.go b/config/config.go index 6180119f..0e9d4a03 100644 --- a/config/config.go +++ b/config/config.go @@ -246,8 +246,6 @@ type Config struct { // SecretKey configures S3 Secret Key for S3 client. // You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html). SecretKey string `config:"secret_key"` - // RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls. - RetryCount int `config:"retry_count" default:"5"` // Specifies the percentage of total local dataset size allocated to the offloading queue. // Note: When the queue overflows, the oldest fraction of data is automatically removed. // This automatic removal is disabled when set to zero. diff --git a/config/validation.go b/config/validation.go index 3cceb86b..15d63c9b 100644 --- a/config/validation.go +++ b/config/validation.go @@ -64,9 +64,11 @@ func (c *Config) storeValidations() []validateFn { greaterThan("resources.search_workers", 0, c.Resources.SearchWorkers), greaterThan("resources.replay_workers", 0, c.Resources.ReplayWorkers), greaterThan("resources.cache_size", 0, c.Resources.CacheSize), + greaterThan("storage.sealing_queue_len", -1, c.Storage.SealingQueueLen), inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel), inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel), + inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent), greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck), } diff --git a/config/validation_test.go b/config/validation_test.go new file mode 100644 index 00000000..90d355f2 --- /dev/null +++ b/config/validation_test.go @@ -0,0 +1,117 @@ +package config + +import ( + "os" + "path" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidation(t *testing.T) { + base := `storage: + data_dir: /seq-db-data + frac_size: 16MiB + total_size: 10GiB + +mapping: + path: /configs/mapping.yaml + +resources: + cache_size: 2GiB + +limits: + query_rate: 1024 + search_requests: 1024 + bulk_requests: 128 + inflight_bulks: 128 + doc_size: 1MiB +` + + tests := []struct { + name string + content string + env map[string]string + expectErr bool + }{ + { + name: "Invalid storage.sealing_queue_len 1", + content: base, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"}, + expectErr: true, + }, + { + name: "Valid storage.sealing_queue_len 2", + content: base, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"}, + expectErr: false, + }, + { + name: "Valid storage.sealing_queue_len 3", + content: base, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"}, + expectErr: false, + }, + + { + name: "Invalid offloading.queue_size_percent 1", + content: base, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"}, + expectErr: true, + }, + { + name: "Invalid offloading.queue_size_percent 2", + content: base, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"}, + expectErr: true, + }, + { + name: "Valid offloading.queue_size_percent 3", + content: base, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"}, + expectErr: false, + }, + { + name: "Valid offloading.queue_size_percent 4", + content: base, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"}, + expectErr: false, + }, + { + name: "Valid offloading.queue_size_percent 5", + content: base, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"}, + expectErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for k, v := range tt.env { + t.Setenv(k, v) + } + + c, err := Parse(createCfgFile(t, base)) + assert.NoError(t, err) + + res := c.Validate("store") + if tt.expectErr { + assert.Error(t, res) + } else { + assert.NoError(t, res) + } + }) + } + +} + +func createCfgFile(t *testing.T, data string) string { + f := path.Join(t.TempDir(), "config.yaml") + err := os.WriteFile(f, []byte(data), 0666) + assert.NoError(t, err) + + abs, err := filepath.Abs(f) + assert.NoError(t, err) + return abs +} diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8d7a782f..161e1270 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -2134,7 +2134,6 @@ func (s *RemoteFractionTestSuite) SetupTest() { "SECRET_KEY", "eu-west-3", bucketName, - 3, ) s.Require().NoError(err, "s3 client setup failed") diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..c81e2fa6 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -128,6 +128,18 @@ func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) } } +// Perform fraction maintenance (rotation, truncating, offloading, etc.) +func (fm *FracManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { + n := time.Now() + logger.Debug("maintenance iteration started") + + fm.mu.Lock() + defer fm.mu.Unlock() + + fm.lc.Maintain(ctx, cfg, wg) + logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds())) +} + // startCacheWorker starts background cache garbage collection func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, wg *sync.WaitGroup) { wg.Add(1) @@ -161,21 +173,18 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG // startMaintWorker starts periodic fraction maintenance operations func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) { wg.Add(1) + maintWg := sync.WaitGroup{} + go func() { defer wg.Done() logger.Info("maintenance loop is started") // Run maintenance at configured interval util.RunEvery(ctx.Done(), cfg.MaintenanceDelay, func() { - n := time.Now() - logger.Debug("maintenance iteration started") - fm.mu.Lock() - // Perform fraction maintenance (rotation, truncating, offloading, etc.) - fm.lc.Maintain(ctx, cfg, wg) - fm.mu.Unlock() - logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds())) + fm.Maintain(ctx, cfg, &maintWg) }) - logger.Info("waiting maintenance complete background tasks") + logger.Info("waiting maintenance complete background tasks...") + maintWg.Wait() logger.Info("maintenance loop is stopped") }() } diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index f315b615..5dffeeee 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -26,7 +26,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) { err := s3Backend.CreateBucket(bucketName) require.NoError(t, err, "create bucket failed") - s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName, 3) + s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName) require.NoError(t, err, "s3 client setup failed") return s3cli, s3server.Close diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 22a79d5a..42c3cfaf 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -155,22 +155,21 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active return old, wg.Wait, nil } -func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) bool { +func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { r.active.Suspend(true) - return true + return } if maxSize > 0 && r.diskUsage() > maxSize { r.active.Suspend(true) - return true + return } r.active.Suspend(false) - return false } func (r *fractionRegistry) diskUsage() uint64 { diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 0c0e298e..d918a283 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -44,7 +44,6 @@ func newLifecycleManager( // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { - suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold) @@ -129,7 +128,7 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, } for _, sealed := range toOffload { wg.Add(1) - lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { + _, err := lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { defer wg.Done() remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay) @@ -147,6 +146,9 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, sealed.instance.Suicide() maintenanceTruncateTotal.Add(1) }) + if err != nil { + panic(err) // we do not expect error here + } } } diff --git a/fracmanager/tasks.go b/fracmanager/tasks.go index ac11ea8d..349ebb61 100644 --- a/fracmanager/tasks.go +++ b/fracmanager/tasks.go @@ -2,6 +2,7 @@ package fracmanager import ( "context" + "fmt" "sync" ) @@ -29,11 +30,15 @@ func NewTaskManager() *TaskManager { // Run starts a new background task with the given ID and context. // The task will be automatically removed when completed. -func (t *TaskManager) Run(id string, ctx context.Context, action func(ctx context.Context)) *task { +func (t *TaskManager) Run(id string, ctx context.Context, action func(ctx context.Context)) (*task, error) { task := &task{} task.ctx, task.cancel = context.WithCancel(ctx) t.mu.Lock() + if _, ok := t.running[id]; ok { + t.mu.Unlock() + return nil, fmt.Errorf("task with id = %s already exists", id) + } t.running[id] = task t.mu.Unlock() @@ -50,7 +55,7 @@ func (t *TaskManager) Run(id string, ctx context.Context, action func(ctx contex action(task.ctx) }() - return task + return task, nil } // Cancel cancels and waits for completion of a task by ID diff --git a/storage/s3/client.go b/storage/s3/client.go index 228f16a3..91cb9445 100644 --- a/storage/s3/client.go +++ b/storage/s3/client.go @@ -25,10 +25,7 @@ type Client struct { // - MaxIdleConnsPerHost; // // And maybe we should add tracing support as well. -func NewClient( - endpoint, accessKey, secretKey, region, bucket string, - maxRetryAttempts int, -) (*Client, error) { +func NewClient(endpoint, accessKey, secretKey, region, bucket string) (*Client, error) { credp := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") cfg, err := config.LoadDefaultConfig( @@ -46,7 +43,6 @@ func NewClient( s3cli := s3.NewFromConfig(cfg, func(o *s3.Options) { o.UsePathStyle = true o.DisableLogOutputChecksumValidationSkipped = true - o.RetryMaxAttempts = maxRetryAttempts }) return &Client{s3cli, bucket}, nil diff --git a/storage/s3/reader_test.go b/storage/s3/reader_test.go index d0409473..7e6deba1 100644 --- a/storage/s3/reader_test.go +++ b/storage/s3/reader_test.go @@ -26,7 +26,7 @@ func TestReader(t *testing.T) { s3cli, err := NewClient( "http://localhost:9000/", "minioadmin", "minioadmin", - "us-east-1", bucket, 0, + "us-east-1", bucket, ) require.NoError(t, err) diff --git a/tests/setup/env.go b/tests/setup/env.go index cafbbc02..fbc66018 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -184,7 +184,7 @@ func NewTestingEnv(cfg *TestingEnvConfig) *TestingEnv { cli, err := seqs3.NewClient( "http://localhost:9000/", "minioadmin", "minioadmin", - "us-east-1", createBucket(), 0, + "us-east-1", createBucket(), ) if err != nil { panic(err) From 66dfbda0b535511ecb6bc105443a90f8bc6bb9ff Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 6 Mar 2026 12:03:07 +0300 Subject: [PATCH 3/6] review fixes 2 --- config/validation_test.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/config/validation_test.go b/config/validation_test.go index 90d355f2..037baf91 100644 --- a/config/validation_test.go +++ b/config/validation_test.go @@ -29,58 +29,60 @@ limits: doc_size: 1MiB ` + baseCfg := createCfgFile(t, base) + tests := []struct { name string - content string + cfg string env map[string]string expectErr bool }{ { name: "Invalid storage.sealing_queue_len 1", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"}, expectErr: true, }, { name: "Valid storage.sealing_queue_len 2", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"}, expectErr: false, }, { name: "Valid storage.sealing_queue_len 3", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"}, expectErr: false, }, { name: "Invalid offloading.queue_size_percent 1", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"}, expectErr: true, }, { name: "Invalid offloading.queue_size_percent 2", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"}, expectErr: true, }, { name: "Valid offloading.queue_size_percent 3", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"}, expectErr: false, }, { name: "Valid offloading.queue_size_percent 4", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"}, expectErr: false, }, { name: "Valid offloading.queue_size_percent 5", - content: base, + cfg: baseCfg, env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"}, expectErr: false, }, @@ -92,7 +94,7 @@ limits: t.Setenv(k, v) } - c, err := Parse(createCfgFile(t, base)) + c, err := Parse(tt.cfg) assert.NoError(t, err) res := c.Validate("store") From 398fdd79fc3304a00fdad2def9f31532664cc3ad Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 6 Mar 2026 12:22:40 +0300 Subject: [PATCH 4/6] review fixes 3 --- config/validation_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/validation_test.go b/config/validation_test.go index 037baf91..0a29f990 100644 --- a/config/validation_test.go +++ b/config/validation_test.go @@ -110,7 +110,7 @@ limits: func createCfgFile(t *testing.T, data string) string { f := path.Join(t.TempDir(), "config.yaml") - err := os.WriteFile(f, []byte(data), 0666) + err := os.WriteFile(f, []byte(data), 0o666) assert.NoError(t, err) abs, err := filepath.Abs(f) From 2d0726d58e690ad041d65833377b09ab4422f517 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 6 Mar 2026 16:00:58 +0300 Subject: [PATCH 5/6] add logging --- fracmanager/config.go | 13 +++++++++++- fracmanager/fraction_registry.go | 34 ++++++++++++++++++++++++++++---- fracmanager/lifecycle_manager.go | 3 +-- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/fracmanager/config.go b/fracmanager/config.go index 5f3ae386..5a9790ac 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -19,7 +19,7 @@ type Config struct { TotalSize uint64 CacheSize uint64 - SuspendThreshold uint64 + suspendThreshold uint64 SealingQueueLen uint64 ReplayWorkers int @@ -87,3 +87,14 @@ func FillConfigWithDefault(config *Config) *Config { return config } + +func (cfg *Config) SuspendThreshold() uint64 { + if cfg.suspendThreshold == 0 { + cfg.suspendThreshold = cfg.TotalSize + cfg.suspendThreshold += cfg.TotalSize / 100 // small buffer + if cfg.OffloadingEnabled { + cfg.suspendThreshold += cfg.OffloadingQueueSize // offloading queue size + } + } + return cfg.suspendThreshold +} diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 42c3cfaf..e4694959 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -7,6 +7,9 @@ import ( "time" "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" + "go.uber.org/zap" ) // fractionRegistry manages fraction queues at different lifecycle stages. @@ -159,17 +162,40 @@ func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { r.mu.Lock() defer r.mu.Unlock() + suspended := r.active.Suspended() + if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { - r.active.Suspend(true) + if !suspended { + logger.Warn("switching to read-only mode", + zap.String("reason", "sealing queue size exceeded"), + zap.Uint64("limit", maxQueue), + zap.Int("queue_size", r.stats.sealing.count)) + r.active.Suspend(true) + } return } - if maxSize > 0 && r.diskUsage() > maxSize { - r.active.Suspend(true) + du := r.diskUsage() + + if maxSize > 0 && du > maxSize { + if !suspended { + logger.Warn("switching to read-only mode", + zap.String("reason", "occupied space limit exceeded"), + zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), + zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) + r.active.Suspend(true) + } return } - r.active.Suspend(false) + if suspended { + logger.Warn("switching to write mode", + zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), + zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), + zap.Uint64("sealing_queue_size_limit", maxQueue), + zap.Int("queue_size", r.stats.sealing.count)) + r.active.Suspend(false) + } } func (r *fractionRegistry) diskUsage() uint64 { diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index d918a283..cd1c4bd3 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -44,8 +44,7 @@ func newLifecycleManager( // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { - suspendThreshold := cfg.TotalSize + cfg.TotalSize/100 + cfg.OffloadingQueueSize - lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, suspendThreshold) + lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) lc.rotate(cfg.FracSize, wg) if cfg.OffloadingEnabled { From 48666443d754060fa6375aa43a51520ffd866bd4 Mon Sep 17 00:00:00 2001 From: Evgenii Guguchkin Date: Fri, 6 Mar 2026 16:03:19 +0300 Subject: [PATCH 6/6] linting --- fracmanager/fraction_registry.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index e4694959..4f804930 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -6,10 +6,11 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/util" - "go.uber.org/zap" ) // fractionRegistry manages fraction queues at different lifecycle stages.