diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index 3d7bd370..b73b9b27 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -253,6 +253,7 @@ func startStore( DataDir: cfg.Storage.DataDir, FracSize: uint64(cfg.Storage.FracSize), TotalSize: uint64(cfg.Storage.TotalSize), + SealingQueueLen: uint64(cfg.Storage.SealingQueueLen), CacheSize: uint64(cfg.Resources.CacheSize), SortCacheSize: uint64(cfg.Resources.SortDocsCacheSize), ReplayWorkers: cfg.Resources.ReplayWorkers, @@ -281,8 +282,10 @@ func startStore( SkipSortDocs: !cfg.DocsSorting.Enabled, KeepMetaFile: false, }, - OffloadingEnabled: cfg.Offloading.Enabled, - OffloadingRetention: cfg.Offloading.Retention, + OffloadingEnabled: cfg.Offloading.Enabled, + OffloadingRetention: cfg.Offloading.Retention, + OffloadingRetryDelay: cfg.Offloading.RetryDelay, + OffloadingQueueSize: uint64(float64(cfg.Storage.TotalSize) * cfg.Offloading.QueueSizePercent / 100), }, API: storeapi.APIConfig{ StoreMode: configMode, @@ -342,7 +345,6 @@ func initS3Client(cfg config.Config) *s3.Client { cfg.Offloading.SecretKey, cfg.Offloading.Region, cfg.Offloading.Bucket, - cfg.Offloading.RetryCount, ) if err != nil { diff --git a/config/config.go b/config/config.go index a63a450d..0e9d4a03 100644 --- a/config/config.go +++ b/config/config.go @@ -63,6 +63,11 @@ type Config struct { // TotalSize specifies upper bound of how much disk space can be occupied // by sealed fractions before they get deleted (or offloaded). TotalSize Bytes `config:"total_size" default:"1GiB"` + // SealingQueueLen defines the maximum length of the sealing queue. + // If the queue size exceeds this limit, writing to the store will be paused, + // and bulk requests will start returning errors. + // A value of zero disables this limit, allowing writes to proceed unconditionally. + SealingQueueLen int `config:"sealing_queue_len" default:"10"` } `config:"storage"` Cluster struct { @@ -241,9 +246,12 @@ type Config struct { // SecretKey configures S3 Secret Key for S3 client. // You can learn more about secret keys [here](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html). SecretKey string `config:"secret_key"` - // RetryCount sets [RetryMaxAttempts] for S3 client which is applied for all API calls. - // Be aware that fraction is suicided when offloading attempts exceeds [RetryCount]. - RetryCount int `config:"retry_count" default:"5"` + // Specifies the percentage of total local dataset size allocated to the offloading queue. + // Note: When the queue overflows, the oldest fraction of data is automatically removed. + // This automatic removal is disabled when set to zero. + QueueSizePercent float64 `config:"queue_size_percent" default:"5"` + // Delay duration between consecutive offloading retries + RetryDelay time.Duration `config:"retry_delay" default:"2s"` } `config:"offloading"` AsyncSearch struct { diff --git a/config/validation.go b/config/validation.go index 3cceb86b..15d63c9b 100644 --- a/config/validation.go +++ b/config/validation.go @@ -64,9 +64,11 @@ func (c *Config) storeValidations() []validateFn { greaterThan("resources.search_workers", 0, c.Resources.SearchWorkers), greaterThan("resources.replay_workers", 0, c.Resources.ReplayWorkers), greaterThan("resources.cache_size", 0, c.Resources.CacheSize), + greaterThan("storage.sealing_queue_len", -1, c.Storage.SealingQueueLen), inRange("compression.sealed_zstd_compression_level", -7, 22, c.Compression.SealedZstdCompressionLevel), inRange("compression.doc_block_zstd_compression_level", -7, 22, c.Compression.DocBlockZstdCompressionLevel), + inRange("offloading.queue_size_percent", 0, 100, c.Offloading.QueueSizePercent), greaterThan("experimental.max_regex_tokens_check", -1, c.Experimental.MaxRegexTokensCheck), } diff --git a/config/validation_test.go b/config/validation_test.go new file mode 100644 index 00000000..0a29f990 --- /dev/null +++ b/config/validation_test.go @@ -0,0 +1,119 @@ +package config + +import ( + "os" + "path" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestValidation(t *testing.T) { + base := `storage: + data_dir: /seq-db-data + frac_size: 16MiB + total_size: 10GiB + +mapping: + path: /configs/mapping.yaml + +resources: + cache_size: 2GiB + +limits: + query_rate: 1024 + search_requests: 1024 + bulk_requests: 128 + inflight_bulks: 128 + doc_size: 1MiB +` + + baseCfg := createCfgFile(t, base) + + tests := []struct { + name string + cfg string + env map[string]string + expectErr bool + }{ + { + name: "Invalid storage.sealing_queue_len 1", + cfg: baseCfg, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "-1"}, + expectErr: true, + }, + { + name: "Valid storage.sealing_queue_len 2", + cfg: baseCfg, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "0"}, + expectErr: false, + }, + { + name: "Valid storage.sealing_queue_len 3", + cfg: baseCfg, + env: map[string]string{"SEQDB_STORAGE_SEALING_QUEUE_LEN": "100"}, + expectErr: false, + }, + + { + name: "Invalid offloading.queue_size_percent 1", + cfg: baseCfg, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "-1"}, + expectErr: true, + }, + { + name: "Invalid offloading.queue_size_percent 2", + cfg: baseCfg, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100.1"}, + expectErr: true, + }, + { + name: "Valid offloading.queue_size_percent 3", + cfg: baseCfg, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "0"}, + expectErr: false, + }, + { + name: "Valid offloading.queue_size_percent 4", + cfg: baseCfg, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "100"}, + expectErr: false, + }, + { + name: "Valid offloading.queue_size_percent 5", + cfg: baseCfg, + env: map[string]string{"SEQDB_OFFLOADING_QUEUE_SIZE_PERCENT": "50"}, + expectErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for k, v := range tt.env { + t.Setenv(k, v) + } + + c, err := Parse(tt.cfg) + assert.NoError(t, err) + + res := c.Validate("store") + if tt.expectErr { + assert.Error(t, res) + } else { + assert.NoError(t, res) + } + }) + } + +} + +func createCfgFile(t *testing.T, data string) string { + f := path.Join(t.TempDir(), "config.yaml") + err := os.WriteFile(f, []byte(data), 0o666) + assert.NoError(t, err) + + abs, err := filepath.Abs(f) + assert.NoError(t, err) + return abs +} diff --git a/frac/fraction_test.go b/frac/fraction_test.go index 8d7a782f..161e1270 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -2134,7 +2134,6 @@ func (s *RemoteFractionTestSuite) SetupTest() { "SECRET_KEY", "eu-west-3", bucketName, - 3, ) s.Require().NoError(err, "s3 client setup failed") diff --git a/fracmanager/config.go b/fracmanager/config.go index de96c957..5a9790ac 100644 --- a/fracmanager/config.go +++ b/fracmanager/config.go @@ -19,6 +19,9 @@ type Config struct { TotalSize uint64 CacheSize uint64 + suspendThreshold uint64 + SealingQueueLen uint64 + ReplayWorkers int MaintenanceDelay time.Duration CacheCleanupDelay time.Duration @@ -28,8 +31,10 @@ type Config struct { Fraction frac.Config MinSealFracSize uint64 - OffloadingEnabled bool - OffloadingRetention time.Duration + OffloadingEnabled bool + OffloadingQueueSize uint64 + OffloadingRetention time.Duration + OffloadingRetryDelay time.Duration } func FillConfigWithDefault(config *Config) *Config { @@ -82,3 +87,14 @@ func FillConfigWithDefault(config *Config) *Config { return config } + +func (cfg *Config) SuspendThreshold() uint64 { + if cfg.suspendThreshold == 0 { + cfg.suspendThreshold = cfg.TotalSize + cfg.suspendThreshold += cfg.TotalSize / 100 // small buffer + if cfg.OffloadingEnabled { + cfg.suspendThreshold += cfg.OffloadingQueueSize // offloading queue size + } + } + return cfg.suspendThreshold +} diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..c81e2fa6 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -128,6 +128,18 @@ func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) } } +// Perform fraction maintenance (rotation, truncating, offloading, etc.) +func (fm *FracManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { + n := time.Now() + logger.Debug("maintenance iteration started") + + fm.mu.Lock() + defer fm.mu.Unlock() + + fm.lc.Maintain(ctx, cfg, wg) + logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds())) +} + // startCacheWorker starts background cache garbage collection func startCacheWorker(ctx context.Context, cfg *Config, cache *CacheMaintainer, wg *sync.WaitGroup) { wg.Add(1) @@ -161,21 +173,18 @@ func startStatsWorker(ctx context.Context, reg *fractionRegistry, wg *sync.WaitG // startMaintWorker starts periodic fraction maintenance operations func startMaintWorker(ctx context.Context, cfg *Config, fm *FracManager, wg *sync.WaitGroup) { wg.Add(1) + maintWg := sync.WaitGroup{} + go func() { defer wg.Done() logger.Info("maintenance loop is started") // Run maintenance at configured interval util.RunEvery(ctx.Done(), cfg.MaintenanceDelay, func() { - n := time.Now() - logger.Debug("maintenance iteration started") - fm.mu.Lock() - // Perform fraction maintenance (rotation, truncating, offloading, etc.) - fm.lc.Maintain(ctx, cfg, wg) - fm.mu.Unlock() - logger.Debug("maintenance iteration finished", zap.Int64("took_ms", time.Since(n).Milliseconds())) + fm.Maintain(ctx, cfg, &maintWg) }) - logger.Info("waiting maintenance complete background tasks") + logger.Info("waiting maintenance complete background tasks...") + maintWg.Wait() logger.Info("maintenance loop is stopped") }() } diff --git a/fracmanager/fraction_provider_test.go b/fracmanager/fraction_provider_test.go index f315b615..5dffeeee 100644 --- a/fracmanager/fraction_provider_test.go +++ b/fracmanager/fraction_provider_test.go @@ -26,7 +26,7 @@ func setupS3Client(t testing.TB) (*s3.Client, func()) { err := s3Backend.CreateBucket(bucketName) require.NoError(t, err, "create bucket failed") - s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName, 3) + s3cli, err := s3.NewClient(s3server.URL, "ACCESS_KEY", "SECRET_KEY", "eu-west-3", bucketName) require.NoError(t, err, "s3 client setup failed") return s3cli, s3server.Close diff --git a/fracmanager/fraction_registry.go b/fracmanager/fraction_registry.go index 2aeca819..4f804930 100644 --- a/fracmanager/fraction_registry.go +++ b/fracmanager/fraction_registry.go @@ -6,7 +6,11 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/ozontech/seq-db/frac" + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/util" ) // fractionRegistry manages fraction queues at different lifecycle stages. @@ -131,6 +135,8 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active curInfo := old.instance.Info() r.stats.sealing.Add(curInfo) + r.active.Suspend(old.Suspended()) + wg := sync.WaitGroup{} wg.Add(1) // since old.WaitWriteIdle() can take some time, we don't want to do it under the lock @@ -153,6 +159,53 @@ func (r *fractionRegistry) RotateIfFull(maxSize uint64, newActive func() *active return old, wg.Wait, nil } +func (r *fractionRegistry) SuspendIfOverCapacity(maxQueue, maxSize uint64) { + r.mu.Lock() + defer r.mu.Unlock() + + suspended := r.active.Suspended() + + if maxQueue > 0 && r.stats.sealing.count >= int(maxQueue) { + if !suspended { + logger.Warn("switching to read-only mode", + zap.String("reason", "sealing queue size exceeded"), + zap.Uint64("limit", maxQueue), + zap.Int("queue_size", r.stats.sealing.count)) + r.active.Suspend(true) + } + return + } + + du := r.diskUsage() + + if maxSize > 0 && du > maxSize { + if !suspended { + logger.Warn("switching to read-only mode", + zap.String("reason", "occupied space limit exceeded"), + zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), + zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2))) + r.active.Suspend(true) + } + return + } + + if suspended { + logger.Warn("switching to write mode", + zap.Float64("queue_size_limit_gb", util.Float64ToPrec(util.SizeToUnit(maxSize, "gb"), 2)), + zap.Float64("occupied_space_gb", util.Float64ToPrec(util.SizeToUnit(du, "gb"), 2)), + zap.Uint64("sealing_queue_size_limit", maxQueue), + zap.Int("queue_size", r.stats.sealing.count)) + r.active.Suspend(false) + } +} + +func (r *fractionRegistry) diskUsage() uint64 { + return r.active.instance.Info().FullSize() + + r.stats.sealed.totalSizeOnDisk + + r.stats.sealing.totalSizeOnDisk + + r.stats.offloading.totalSizeOnDisk +} + // addActive sets a new active fraction and updates the complete fractions list. func (r *fractionRegistry) addActive(a *activeProxy) { r.muAll.Lock() @@ -229,6 +282,10 @@ func (r *fractionRegistry) EvictLocal(shouldOffload bool, sizeLimit uint64) ([]* // Fractions older than retention period are permanently deleted. // Returns removed fractions or empty slice if nothing to remove. func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { + if retention == 0 { + return nil + } + r.mu.Lock() defer r.mu.Unlock() @@ -250,6 +307,42 @@ func (r *fractionRegistry) EvictRemote(retention time.Duration) []*remoteProxy { return evicted } +// EvictOverflowed removes oldest fractions from offloading queue when it exceeds size limit. +// Selects fractions that haven't finished offloading yet to minimize data loss. +// Used when offloading queue grows too large due to slow remote storage performance. +func (r *fractionRegistry) EvictOverflowed(sizeLimit uint64) []*sealedProxy { + if sizeLimit == 0 { + return nil + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Fast path: skip processing if within size limits + if r.stats.offloading.totalSizeOnDisk <= sizeLimit { + return nil + } + + count := 0 + evicted := []*sealedProxy{} + // filter fractions + for _, item := range r.offloading { + // keep items that are within limits or already offloaded + if r.stats.offloading.totalSizeOnDisk <= sizeLimit || item.remote != nil { + r.offloading[count] = item + count++ + continue + } + evicted = append(evicted, item) + r.stats.offloading.Sub(item.instance.Info()) + } + + r.offloading = r.offloading[:count] + r.rebuildAllFractions() + + return evicted +} + // PromoteToSealed moves fractions from sealing to local queue when sealing completes. // Maintains strict ordering - younger fractions wait for older ones to seal first. func (r *fractionRegistry) PromoteToSealed(active *activeProxy, sealed *frac.Sealed) { @@ -324,6 +417,11 @@ func (r *fractionRegistry) removeFromOffloading(sealed *sealedProxy) { count++ } } + + if count == len(r.offloading) { // not found to remove (can be removed earlier in EvictOverflowed) + return + } + r.offloading = r.offloading[:count] r.stats.offloading.Sub(sealed.instance.Info()) diff --git a/fracmanager/lifecycle_manager.go b/fracmanager/lifecycle_manager.go index 63934868..cd1c4bd3 100644 --- a/fracmanager/lifecycle_manager.go +++ b/fracmanager/lifecycle_manager.go @@ -21,6 +21,7 @@ type lifecycleManager struct { provider *fractionProvider // provider for fraction operations flags *StateManager // storage state flags registry *fractionRegistry // fraction state registry + tasks *TaskManager // Background offloading tasks sealingWg sync.WaitGroup } @@ -36,18 +37,24 @@ func newLifecycleManager( provider: provider, flags: flags, registry: registry, + tasks: NewTaskManager(), } } // Maintain performs periodic lifecycle management tasks. // It coordinates rotation, offloading, cleanup based on configuration. -func (lc *lifecycleManager) Maintain(ctx context.Context, config *Config, wg *sync.WaitGroup) { - lc.rotate(config.FracSize, wg) - if config.OffloadingEnabled { - lc.offloadLocal(ctx, config.TotalSize, wg) - lc.cleanRemote(config.OffloadingRetention, wg) +func (lc *lifecycleManager) Maintain(ctx context.Context, cfg *Config, wg *sync.WaitGroup) { + lc.registry.SuspendIfOverCapacity(cfg.SealingQueueLen, cfg.SuspendThreshold()) + + lc.rotate(cfg.FracSize, wg) + if cfg.OffloadingEnabled { + lc.offloadLocal(ctx, cfg.TotalSize, cfg.OffloadingRetryDelay, wg) + if cfg.OffloadingQueueSize > 0 { + lc.removeOverflowed(cfg.OffloadingQueueSize, wg) + } + lc.cleanRemote(cfg.OffloadingRetention, wg) } else { - lc.cleanLocal(config.TotalSize, wg) + lc.cleanLocal(cfg.TotalSize, wg) } lc.updateOldestMetric() lc.SyncInfoCache() @@ -113,17 +120,18 @@ func (lc *lifecycleManager) rotate(maxSize uint64, wg *sync.WaitGroup) { // offloadLocal starts offloading of local fractions to remote storage. // Selects fractions based on disk space usage and retention policy. -func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, wg *sync.WaitGroup) { +func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, retryDelay time.Duration, wg *sync.WaitGroup) { toOffload, err := lc.registry.EvictLocal(true, sizeLimit) if err != nil { logger.Fatal("error releasing old fractions:", zap.Error(err)) } for _, sealed := range toOffload { wg.Add(1) - go func() { + _, err := lc.tasks.Run(sealed.instance.BaseFileName, ctx, func(ctx context.Context) { defer wg.Done() - remote, _ := lc.tryOffload(ctx, sealed.instance) + remote := lc.offloadWithRetry(ctx, sealed.instance, retryDelay) + lc.registry.PromoteToRemote(sealed, remote) if remote == nil { @@ -136,7 +144,44 @@ func (lc *lifecycleManager) offloadLocal(ctx context.Context, sizeLimit uint64, // free up local resources sealed.instance.Suicide() maintenanceTruncateTotal.Add(1) - }() + }) + if err != nil { + panic(err) // we do not expect error here + } + } +} + +// OffloadWithRetry attempts to offload a fraction with retries until success or cancellation. +// Returns the remote fraction instance and a boolean indicating whether offloading was not canceled. +func (lc *lifecycleManager) offloadWithRetry(ctx context.Context, sealed *frac.Sealed, retryDelay time.Duration) *frac.Remote { + start := time.Now() + for i := 0; ; i++ { + remote, err := lc.tryOffload(ctx, sealed) + if err == nil { + return remote + } + + logger.Warn( + "fail to offload fraction", + zap.String("name", sealed.BaseFileName), + zap.Duration("offloading_time", time.Since(start)), + zap.Int("attempts", i), + zap.Error(err), + ) + + select { + case <-ctx.Done(): + logger.Info( + "fraction offloading was stopped", + zap.String("name", sealed.BaseFileName), + zap.Duration("offloading_time", time.Since(start)), + zap.Int("attempts", i), + zap.Error(ctx.Err()), + ) + return nil + case <-time.After(retryDelay): + // Wait before next retry attempt + } } } @@ -163,9 +208,6 @@ func (lc *lifecycleManager) tryOffload(ctx context.Context, sealed *frac.Sealed) // cleanRemote deletes outdated remote fractions based on retention policy. func (lc *lifecycleManager) cleanRemote(retention time.Duration, wg *sync.WaitGroup) { - if retention == 0 { - return - } toDelete := lc.registry.EvictRemote(retention) wg.Add(1) go func() { @@ -207,3 +249,18 @@ func (lc *lifecycleManager) updateOldestMetric() { oldestFracTime.WithLabelValues("remote").Set((time.Duration(lc.registry.OldestTotal()) * time.Millisecond).Seconds()) oldestFracTime.WithLabelValues("local").Set((time.Duration(lc.registry.OldestLocal()) * time.Millisecond).Seconds()) } + +// removeOverflowed removes fractions from offloading queue that exceed size limit +// Stops ongoing offloading tasks and cleans up both local and remote resources. +func (lc *lifecycleManager) removeOverflowed(sizeLimit uint64, wg *sync.WaitGroup) { + evicted := lc.registry.EvictOverflowed(sizeLimit) + for _, item := range evicted { + wg.Add(1) + go func() { + defer wg.Done() + // Cancel the offloading task - this operation may take significant time + // hence executed in a separate goroutine to avoid blocking + lc.tasks.Cancel(item.instance.BaseFileName) + }() + } +} diff --git a/fracmanager/lifecycle_manager_test.go b/fracmanager/lifecycle_manager_test.go index b7afb715..abd180e2 100644 --- a/fracmanager/lifecycle_manager_test.go +++ b/fracmanager/lifecycle_manager_test.go @@ -150,7 +150,7 @@ func TestOldestMetrics(t *testing.T) { } wg := sync.WaitGroup{} - lc.offloadLocal(t.Context(), total-halfSize, &wg) + lc.offloadLocal(t.Context(), total-halfSize, 0, &wg) wg.Wait() // Check state after offloading diff --git a/fracmanager/proxy_frac.go b/fracmanager/proxy_frac.go index 949e2412..ffc31854 100644 --- a/fracmanager/proxy_frac.go +++ b/fracmanager/proxy_frac.go @@ -23,6 +23,7 @@ var ( _ frac.Fraction = (*emptyFraction)(nil) ErrFractionNotWritable = errors.New("fraction is not writable") + ErrFractionSuspended = errors.New("write operations temporarily suspended - database capacity exceeded") ) // fractionProxy provides thread-safe access to a fraction with atomic replacement @@ -81,6 +82,7 @@ type activeProxy struct { wg sync.WaitGroup // Tracks pending write operations finalized bool // Whether fraction is frozen for writes + suspended bool // Temporarily suspended for writes } func newActiveProxy(active *frac.Active) *activeProxy { @@ -97,6 +99,10 @@ func (p *activeProxy) Append(docs, meta []byte) error { p.mu.RUnlock() return ErrFractionNotWritable } + if p.suspended { + p.mu.RUnlock() + return ErrFractionSuspended + } p.wg.Add(1) // Important: wg.Add() inside lock to prevent race with WaitWriteIdle() p.mu.RUnlock() @@ -115,6 +121,19 @@ func (p *activeProxy) WaitWriteIdle() { zap.Float64("time_wait_s", waitTime)) } +func (p *activeProxy) Suspended() bool { + p.mu.Lock() + defer p.mu.Unlock() + + return p.suspended +} + +func (p *activeProxy) Suspend(value bool) { + p.mu.Lock() + p.suspended = value + p.mu.Unlock() +} + // Finalize marks the fraction as read-only and prevents new writes from starting after finalize. func (p *activeProxy) Finalize() error { p.mu.Lock() diff --git a/fracmanager/tasks.go b/fracmanager/tasks.go new file mode 100644 index 00000000..349ebb61 --- /dev/null +++ b/fracmanager/tasks.go @@ -0,0 +1,72 @@ +package fracmanager + +import ( + "context" + "fmt" + "sync" +) + +// task represents a cancellable background task with synchronization +// Used for managing long-running operations like offloading fractions. +// Lifecycle: Created via Tasks.Run(), cancelled via Tasks.Cancel(), cleaned up on completion. +type task struct { + wg sync.WaitGroup // Synchronizes task completion + ctx context.Context // Context for cancellation + cancel context.CancelFunc // Function to cancel the task +} + +// TaskManager manages a collection of running background tasks +// Provides safe concurrent access to task tracking and cancellation. +type TaskManager struct { + mu sync.Mutex + running map[string]*task // Map of task ID to task instance +} + +func NewTaskManager() *TaskManager { + return &TaskManager{ + running: make(map[string]*task), + } +} + +// Run starts a new background task with the given ID and context. +// The task will be automatically removed when completed. +func (t *TaskManager) Run(id string, ctx context.Context, action func(ctx context.Context)) (*task, error) { + task := &task{} + task.ctx, task.cancel = context.WithCancel(ctx) + + t.mu.Lock() + if _, ok := t.running[id]; ok { + t.mu.Unlock() + return nil, fmt.Errorf("task with id = %s already exists", id) + } + t.running[id] = task + t.mu.Unlock() + + task.wg.Add(1) + go func() { + defer func() { + t.mu.Lock() + delete(t.running, id) + t.mu.Unlock() + + task.wg.Done() + }() + + action(task.ctx) + }() + + return task, nil +} + +// Cancel cancels and waits for completion of a task by ID +// Returns immediately if task with given ID doesn't exist. +func (t *TaskManager) Cancel(id string) { + t.mu.Lock() + task, ok := t.running[id] + t.mu.Unlock() + + if ok { + task.cancel() + task.wg.Wait() + } +} diff --git a/storage/s3/client.go b/storage/s3/client.go index 228f16a3..91cb9445 100644 --- a/storage/s3/client.go +++ b/storage/s3/client.go @@ -25,10 +25,7 @@ type Client struct { // - MaxIdleConnsPerHost; // // And maybe we should add tracing support as well. -func NewClient( - endpoint, accessKey, secretKey, region, bucket string, - maxRetryAttempts int, -) (*Client, error) { +func NewClient(endpoint, accessKey, secretKey, region, bucket string) (*Client, error) { credp := credentials.NewStaticCredentialsProvider(accessKey, secretKey, "") cfg, err := config.LoadDefaultConfig( @@ -46,7 +43,6 @@ func NewClient( s3cli := s3.NewFromConfig(cfg, func(o *s3.Options) { o.UsePathStyle = true o.DisableLogOutputChecksumValidationSkipped = true - o.RetryMaxAttempts = maxRetryAttempts }) return &Client{s3cli, bucket}, nil diff --git a/storage/s3/reader_test.go b/storage/s3/reader_test.go index d0409473..7e6deba1 100644 --- a/storage/s3/reader_test.go +++ b/storage/s3/reader_test.go @@ -26,7 +26,7 @@ func TestReader(t *testing.T) { s3cli, err := NewClient( "http://localhost:9000/", "minioadmin", "minioadmin", - "us-east-1", bucket, 0, + "us-east-1", bucket, ) require.NoError(t, err) diff --git a/tests/setup/env.go b/tests/setup/env.go index cafbbc02..fbc66018 100644 --- a/tests/setup/env.go +++ b/tests/setup/env.go @@ -184,7 +184,7 @@ func NewTestingEnv(cfg *TestingEnvConfig) *TestingEnv { cli, err := seqs3.NewClient( "http://localhost:9000/", "minioadmin", "minioadmin", - "us-east-1", createBucket(), 0, + "us-east-1", createBucket(), ) if err != nil { panic(err)