From 07f3673cf2dc8e2afcf7913950fd3f97bd4d649e Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Wed, 26 Nov 2025 16:10:21 -0800 Subject: [PATCH 1/4] delete corrupted blocks if chunks are missing --- pkg/block/block.go | 50 ++++++++++++++++++++++++++++++++++++++++++ pkg/compact/compact.go | 49 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) diff --git a/pkg/block/block.go b/pkg/block/block.go index 09ebb94d0f6..62e547396ee 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -92,6 +92,56 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id return nil } +// ValidateBlockChunkFilesExist checks that all chunk files referenced in the block's +// metadata actually exist in the bucket. This detects corrupted blocks where the +// meta.json references chunk files that were never uploaded or were deleted. +// Returns nil if all files exist, or an error describing which files are missing. +func ValidateBlockChunkFilesExist(ctx context.Context, logger log.Logger, bkt objstore.Bucket, meta *metadata.Meta) error { + if meta == nil { + return errors.New("meta is nil") + } + + // Check if there are any chunk files referenced in metadata + var expectedChunkFiles []string + for _, f := range meta.Thanos.Files { + if strings.HasPrefix(f.RelPath, ChunksDirname+"/") { + expectedChunkFiles = append(expectedChunkFiles, f.RelPath) + } + } + + // Also check deprecated SegmentFiles field + for _, sf := range meta.Thanos.SegmentFiles { + expectedChunkFiles = append(expectedChunkFiles, path.Join(ChunksDirname, sf)) + } + + if len(expectedChunkFiles) == 0 { + // No chunk files expected (empty block), nothing to validate + return nil + } + + // Verify each expected chunk file exists in the bucket + var missingFiles []string + for _, relPath := range expectedChunkFiles { + fullPath := path.Join(meta.ULID.String(), relPath) + exists, err := bkt.Exists(ctx, fullPath) + if err != nil { + level.Warn(logger).Log("msg", "failed to check chunk file existence", "block", meta.ULID, "file", relPath, "err", err) + // Treat check failure as potentially missing to be safe + missingFiles = append(missingFiles, relPath) + continue + } + if !exists { + missingFiles = append(missingFiles, relPath) + } + } + + if len(missingFiles) > 0 { + return errors.Errorf("block %s has missing chunk files: %v", meta.ULID, missingFiles) + } + + return nil +} + // Upload uploads a TSDB block to the object storage. It verifies basic // features of Thanos block. func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 820ce1dff4c..24549e81e85 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -989,6 +989,28 @@ func IsOutOfOrderChunkError(err error) bool { return ok } +// MissingChunkFilesError is a type wrapper for errors when block metadata references +// chunk files that don't exist in the bucket. This typically happens due to incomplete +// or failed block uploads. +type MissingChunkFilesError struct { + err error + id ulid.ULID +} + +func (e MissingChunkFilesError) Error() string { + return e.err.Error() +} + +func missingChunkFilesError(err error, brokenBlock ulid.ULID) MissingChunkFilesError { + return MissingChunkFilesError{err: err, id: brokenBlock} +} + +// IsMissingChunkFilesError returns true if the base error is a MissingChunkFilesError. +func IsMissingChunkFilesError(err error) bool { + _, ok := errors.Cause(err).(MissingChunkFilesError) + return ok +} + // HaltError is a type wrapper for errors that should halt any further progress on compactions. type HaltError struct { err error @@ -1199,6 +1221,13 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp bdir := filepath.Join(dir, m.ULID.String()) func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { + // Validate chunk files exist in bucket before downloading. + // This catches corrupted blocks where meta.json references chunk files + // that were never uploaded or were deleted. + if err := block.ValidateBlockChunkFilesExist(ctx, cg.logger, cg.bkt, meta); err != nil { + return missingChunkFilesError(errors.Wrapf(err, "block %s has missing chunk files in bucket", meta.ULID), meta.ULID) + } + start := time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) @@ -1536,6 +1565,26 @@ func (c *BucketCompactor) Compact(ctx context.Context, progress *Progress) (rerr continue } } + // If block has missing chunk files (corrupted upload), mark it for deletion + // instead of halting the compactor. + if IsMissingChunkFilesError(err) { + blockID := err.(MissingChunkFilesError).id + level.Warn(c.logger).Log("msg", "block has missing chunk files, marking for deletion", "block", blockID, "err", err) + if markErr := block.MarkForDeletion( + ctx, + c.logger, + c.bkt, + blockID, + "MissingChunkFiles: block metadata references chunk files that don't exist in bucket", + c.sy.metrics.BlocksMarkedForDeletion); markErr == nil { + mtx.Lock() + finishedAllGroups = false + mtx.Unlock() + continue + } else { + level.Error(c.logger).Log("msg", "failed to mark corrupted block for deletion", "block", blockID, "err", markErr) + } + } errChan <- errors.Wrapf(err, "group %s", g.Key()) return } From 67a49e1b3ea6a327278505406dc9d7e95e596e3f Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Wed, 26 Nov 2025 16:21:49 -0800 Subject: [PATCH 2/4] low overhead --- pkg/block/block.go | 50 ------------------------------- pkg/compact/compact.go | 60 ++++++++++++++++++++++++++++++++----- pkg/compact/compact_test.go | 45 ++++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 57 deletions(-) diff --git a/pkg/block/block.go b/pkg/block/block.go index 62e547396ee..09ebb94d0f6 100644 --- a/pkg/block/block.go +++ b/pkg/block/block.go @@ -92,56 +92,6 @@ func Download(ctx context.Context, logger log.Logger, bucket objstore.Bucket, id return nil } -// ValidateBlockChunkFilesExist checks that all chunk files referenced in the block's -// metadata actually exist in the bucket. This detects corrupted blocks where the -// meta.json references chunk files that were never uploaded or were deleted. -// Returns nil if all files exist, or an error describing which files are missing. -func ValidateBlockChunkFilesExist(ctx context.Context, logger log.Logger, bkt objstore.Bucket, meta *metadata.Meta) error { - if meta == nil { - return errors.New("meta is nil") - } - - // Check if there are any chunk files referenced in metadata - var expectedChunkFiles []string - for _, f := range meta.Thanos.Files { - if strings.HasPrefix(f.RelPath, ChunksDirname+"/") { - expectedChunkFiles = append(expectedChunkFiles, f.RelPath) - } - } - - // Also check deprecated SegmentFiles field - for _, sf := range meta.Thanos.SegmentFiles { - expectedChunkFiles = append(expectedChunkFiles, path.Join(ChunksDirname, sf)) - } - - if len(expectedChunkFiles) == 0 { - // No chunk files expected (empty block), nothing to validate - return nil - } - - // Verify each expected chunk file exists in the bucket - var missingFiles []string - for _, relPath := range expectedChunkFiles { - fullPath := path.Join(meta.ULID.String(), relPath) - exists, err := bkt.Exists(ctx, fullPath) - if err != nil { - level.Warn(logger).Log("msg", "failed to check chunk file existence", "block", meta.ULID, "file", relPath, "err", err) - // Treat check failure as potentially missing to be safe - missingFiles = append(missingFiles, relPath) - continue - } - if !exists { - missingFiles = append(missingFiles, relPath) - } - } - - if len(missingFiles) > 0 { - return errors.Errorf("block %s has missing chunk files: %v", meta.ULID, missingFiles) - } - - return nil -} - // Upload uploads a TSDB block to the object storage. It verifies basic // features of Thanos block. func Upload(ctx context.Context, logger log.Logger, bkt objstore.Bucket, bdir string, hf metadata.HashFunc, options ...objstore.UploadOption) error { diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 24549e81e85..b01cab4a452 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -1011,6 +1011,49 @@ func IsMissingChunkFilesError(err error) bool { return ok } +// detectCorruptedBlockFromError checks if the error indicates a corrupted block +// with missing chunk files (e.g., "segment index X out of range" error). +// It attempts to identify which block caused the error by parsing the error message +// for block IDs mentioned in the "from block {ULID}" pattern. +// Returns the block ID and true if a corrupted block is detected, otherwise returns empty and false. +func detectCorruptedBlockFromError(err error, toCompact []*metadata.Meta) (ulid.ULID, bool) { + if err == nil { + return ulid.ULID{}, false + } + + errStr := err.Error() + + // Check for the specific error pattern that indicates missing chunk files + if !strings.Contains(errStr, "out of range") { + return ulid.ULID{}, false + } + + // Try to find block ID in error message - look for "from block {ULID}" pattern + // The error typically looks like: "cannot populate chunk X from block {ULID}: segment index Y out of range" + if idx := strings.Index(errStr, "from block "); idx != -1 { + // Extract the ULID after "from block " + start := idx + len("from block ") + if start+26 <= len(errStr) { // ULID is 26 characters + if blockID, parseErr := ulid.Parse(errStr[start : start+26]); parseErr == nil { + // Verify this block is in our compaction set + for _, meta := range toCompact { + if meta.ULID == blockID { + return blockID, true + } + } + } + } + } + + // If we couldn't parse a specific block ID but the error pattern matches, + // and we only have one block being compacted, assume it's that one + if len(toCompact) == 1 && strings.Contains(errStr, "segment index") { + return toCompact[0].ULID, true + } + + return ulid.ULID{}, false +} + // HaltError is a type wrapper for errors that should halt any further progress on compactions. type HaltError struct { err error @@ -1221,13 +1264,6 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp bdir := filepath.Join(dir, m.ULID.String()) func(ctx context.Context, meta *metadata.Meta) { g.Go(func() error { - // Validate chunk files exist in bucket before downloading. - // This catches corrupted blocks where meta.json references chunk files - // that were never uploaded or were deleted. - if err := block.ValidateBlockChunkFilesExist(ctx, cg.logger, cg.bkt, meta); err != nil { - return missingChunkFilesError(errors.Wrapf(err, "block %s has missing chunk files in bucket", meta.ULID), meta.ULID) - } - start := time.Now() if err := tracing.DoInSpanWithErr(ctx, "compaction_block_download", func(ctx context.Context) error { return block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir, objstore.WithFetchConcurrency(cg.blockFilesConcurrency)) @@ -1287,6 +1323,16 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp compIDs, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc) return e }); err != nil { + // Check if this is a "segment index out of range" error, which indicates + // a corrupted block with missing chunk files. Try to identify the block + // and return a MissingChunkFilesError so it can be deleted instead of halting. + if corruptedBlockID, ok := detectCorruptedBlockFromError(err, toCompact); ok { + level.Warn(cg.logger).Log("msg", "detected corrupted block with missing chunk files during compaction", + "block", corruptedBlockID, "err", err) + return false, nil, missingChunkFilesError( + errors.Wrapf(err, "block %s appears corrupted (missing chunk files)", corruptedBlockID), + corruptedBlockID) + } handledErrs := compactionLifecycleCallback.HandleError(ctx, cg.logger, cg, toCompact, err) return false, nil, halt(errors.Wrapf(err, "compact blocks %v, handled %d errors", toCompactDirs, handledErrs)) } diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 45057746517..d396e7a42e3 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -99,6 +99,51 @@ func TestRetryError(t *testing.T) { testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } +func TestDetectCorruptedBlockFromError(t *testing.T) { + t.Parallel() + + blockID1 := ulid.MustNew(1, nil) + blockID2 := ulid.MustNew(2, nil) + + toCompact := []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{ULID: blockID1}}, + {BlockMeta: tsdb.BlockMeta{ULID: blockID2}}, + } + + // Test: nil error returns false + id, ok := detectCorruptedBlockFromError(nil, toCompact) + testutil.Assert(t, !ok, "nil error should return false") + testutil.Equals(t, ulid.ULID{}, id) + + // Test: unrelated error returns false + err := errors.New("some random error") + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, !ok, "unrelated error should return false") + + // Test: error with "out of range" but no block ID returns false (multiple blocks) + err = errors.New("segment index 0 out of range") + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, !ok, "error without block ID should return false when multiple blocks") + + // Test: error with "out of range" and single block returns that block + singleBlock := []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{ULID: blockID1}}} + id, ok = detectCorruptedBlockFromError(err, singleBlock) + testutil.Assert(t, ok, "error with single block should return true") + testutil.Equals(t, blockID1, id) + + // Test: error with "from block {ULID}" pattern + err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", blockID1.String()) + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, ok, "error with block ID should return true") + testutil.Equals(t, blockID1, id) + + // Test: error with block ID not in toCompact returns false + unknownBlock := ulid.MustNew(999, nil) + err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", unknownBlock.String()) + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, !ok, "error with unknown block ID should return false") +} + func TestGroupKey(t *testing.T) { t.Parallel() From 860e8cd508ee8e1d33f49a34f71bf48e3a946a2e Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Wed, 26 Nov 2025 16:23:42 -0800 Subject: [PATCH 3/4] add ut --- pkg/compact/compact_test.go | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index d396e7a42e3..d163925803e 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -99,6 +99,24 @@ func TestRetryError(t *testing.T) { testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } +func TestMissingChunkFilesError(t *testing.T) { + t.Parallel() + + err := errors.New("test") + testutil.Assert(t, !IsMissingChunkFilesError(err), "should not be a missing chunk files error") + + blockID := ulid.MustNew(1, nil) + err = missingChunkFilesError(errors.New("test"), blockID) + testutil.Assert(t, IsMissingChunkFilesError(err), "should be a missing chunk files error") + testutil.Equals(t, blockID, err.(MissingChunkFilesError).id) + + err = errors.Wrap(missingChunkFilesError(errors.New("test"), blockID), "wrapped") + testutil.Assert(t, IsMissingChunkFilesError(err), "wrapped error should still be detected") + + err = errors.Wrap(errors.Wrap(missingChunkFilesError(errors.New("test"), blockID), "inner"), "outer") + testutil.Assert(t, IsMissingChunkFilesError(err), "double wrapped error should still be detected") +} + func TestDetectCorruptedBlockFromError(t *testing.T) { t.Parallel() @@ -142,6 +160,24 @@ func TestDetectCorruptedBlockFromError(t *testing.T) { err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", unknownBlock.String()) id, ok = detectCorruptedBlockFromError(err, toCompact) testutil.Assert(t, !ok, "error with unknown block ID should return false") + + // Test: wrapped error with "from block {ULID}" pattern + err = errors.Wrap( + errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", blockID2.String()), + "compaction failed") + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, ok, "wrapped error with block ID should return true") + testutil.Equals(t, blockID2, id) + + // Test: reference sequence out of range error (another variant) + err = errors.Errorf("cannot populate chunk 8 from block %s: reference sequence 0 out of range", blockID1.String()) + id, ok = detectCorruptedBlockFromError(err, toCompact) + testutil.Assert(t, ok, "reference sequence out of range should also be detected") + testutil.Equals(t, blockID1, id) + + // Test: empty toCompact slice + id, ok = detectCorruptedBlockFromError(errors.New("segment index 0 out of range"), []*metadata.Meta{}) + testutil.Assert(t, !ok, "empty toCompact should return false") } func TestGroupKey(t *testing.T) { From db4b02cfb03fd8bf52f9f5550f9707659de5b4cf Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 27 Nov 2025 14:40:36 -0800 Subject: [PATCH 4/4] lint --- pkg/compact/compact_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index d163925803e..6cbf14c67fb 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -135,12 +135,12 @@ func TestDetectCorruptedBlockFromError(t *testing.T) { // Test: unrelated error returns false err := errors.New("some random error") - id, ok = detectCorruptedBlockFromError(err, toCompact) + _, ok = detectCorruptedBlockFromError(err, toCompact) testutil.Assert(t, !ok, "unrelated error should return false") // Test: error with "out of range" but no block ID returns false (multiple blocks) err = errors.New("segment index 0 out of range") - id, ok = detectCorruptedBlockFromError(err, toCompact) + _, ok = detectCorruptedBlockFromError(err, toCompact) testutil.Assert(t, !ok, "error without block ID should return false when multiple blocks") // Test: error with "out of range" and single block returns that block @@ -158,7 +158,7 @@ func TestDetectCorruptedBlockFromError(t *testing.T) { // Test: error with block ID not in toCompact returns false unknownBlock := ulid.MustNew(999, nil) err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", unknownBlock.String()) - id, ok = detectCorruptedBlockFromError(err, toCompact) + _, ok = detectCorruptedBlockFromError(err, toCompact) testutil.Assert(t, !ok, "error with unknown block ID should return false") // Test: wrapped error with "from block {ULID}" pattern @@ -176,7 +176,7 @@ func TestDetectCorruptedBlockFromError(t *testing.T) { testutil.Equals(t, blockID1, id) // Test: empty toCompact slice - id, ok = detectCorruptedBlockFromError(errors.New("segment index 0 out of range"), []*metadata.Meta{}) + _, ok = detectCorruptedBlockFromError(errors.New("segment index 0 out of range"), []*metadata.Meta{}) testutil.Assert(t, !ok, "empty toCompact should return false") }