Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/compact/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,71 @@ func IsOutOfOrderChunkError(err error) bool {
return ok
}

// MissingChunkFilesError is a type wrapper for errors when block metadata references
// chunk files that don't exist in the bucket. This typically happens due to incomplete
// or failed block uploads.
type MissingChunkFilesError struct {
err error
id ulid.ULID
}

func (e MissingChunkFilesError) Error() string {
return e.err.Error()
}

func missingChunkFilesError(err error, brokenBlock ulid.ULID) MissingChunkFilesError {
return MissingChunkFilesError{err: err, id: brokenBlock}
}

// IsMissingChunkFilesError returns true if the base error is a MissingChunkFilesError.
func IsMissingChunkFilesError(err error) bool {
_, ok := errors.Cause(err).(MissingChunkFilesError)
return ok
}

// detectCorruptedBlockFromError checks if the error indicates a corrupted block
// with missing chunk files (e.g., "segment index X out of range" error).
// It attempts to identify which block caused the error by parsing the error message
// for block IDs mentioned in the "from block {ULID}" pattern.
// Returns the block ID and true if a corrupted block is detected, otherwise returns empty and false.
func detectCorruptedBlockFromError(err error, toCompact []*metadata.Meta) (ulid.ULID, bool) {
if err == nil {
return ulid.ULID{}, false
}

errStr := err.Error()

// Check for the specific error pattern that indicates missing chunk files
if !strings.Contains(errStr, "out of range") {
return ulid.ULID{}, false
}

// Try to find block ID in error message - look for "from block {ULID}" pattern
// The error typically looks like: "cannot populate chunk X from block {ULID}: segment index Y out of range"
if idx := strings.Index(errStr, "from block "); idx != -1 {
// Extract the ULID after "from block "
start := idx + len("from block ")
if start+26 <= len(errStr) { // ULID is 26 characters
if blockID, parseErr := ulid.Parse(errStr[start : start+26]); parseErr == nil {
// Verify this block is in our compaction set
for _, meta := range toCompact {
if meta.ULID == blockID {
return blockID, true
}
}
}
}
}

// If we couldn't parse a specific block ID but the error pattern matches,
// and we only have one block being compacted, assume it's that one
if len(toCompact) == 1 && strings.Contains(errStr, "segment index") {
return toCompact[0].ULID, true
}

return ulid.ULID{}, false
}

// HaltError is a type wrapper for errors that should halt any further progress on compactions.
type HaltError struct {
err error
Expand Down Expand Up @@ -1258,6 +1323,16 @@ func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp
compIDs, e = comp.CompactWithBlockPopulator(dir, toCompactDirs, nil, populateBlockFunc)
return e
}); err != nil {
// Check if this is a "segment index out of range" error, which indicates
// a corrupted block with missing chunk files. Try to identify the block
// and return a MissingChunkFilesError so it can be deleted instead of halting.
if corruptedBlockID, ok := detectCorruptedBlockFromError(err, toCompact); ok {
level.Warn(cg.logger).Log("msg", "detected corrupted block with missing chunk files during compaction",
"block", corruptedBlockID, "err", err)
return false, nil, missingChunkFilesError(
errors.Wrapf(err, "block %s appears corrupted (missing chunk files)", corruptedBlockID),
corruptedBlockID)
}
handledErrs := compactionLifecycleCallback.HandleError(ctx, cg.logger, cg, toCompact, err)
return false, nil, halt(errors.Wrapf(err, "compact blocks %v, handled %d errors", toCompactDirs, handledErrs))
}
Expand Down Expand Up @@ -1536,6 +1611,26 @@ func (c *BucketCompactor) Compact(ctx context.Context, progress *Progress) (rerr
continue
}
}
// If block has missing chunk files (corrupted upload), mark it for deletion
// instead of halting the compactor.
if IsMissingChunkFilesError(err) {
blockID := err.(MissingChunkFilesError).id
level.Warn(c.logger).Log("msg", "block has missing chunk files, marking for deletion", "block", blockID, "err", err)
if markErr := block.MarkForDeletion(
ctx,
c.logger,
c.bkt,
blockID,
"MissingChunkFiles: block metadata references chunk files that don't exist in bucket",
c.sy.metrics.BlocksMarkedForDeletion); markErr == nil {
mtx.Lock()
finishedAllGroups = false
mtx.Unlock()
continue
} else {
level.Error(c.logger).Log("msg", "failed to mark corrupted block for deletion", "block", blockID, "err", markErr)
}
}
errChan <- errors.Wrapf(err, "group %s", g.Key())
return
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/compact/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,87 @@ func TestRetryError(t *testing.T) {
testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error")
}

func TestMissingChunkFilesError(t *testing.T) {
t.Parallel()

err := errors.New("test")
testutil.Assert(t, !IsMissingChunkFilesError(err), "should not be a missing chunk files error")

blockID := ulid.MustNew(1, nil)
err = missingChunkFilesError(errors.New("test"), blockID)
testutil.Assert(t, IsMissingChunkFilesError(err), "should be a missing chunk files error")
testutil.Equals(t, blockID, err.(MissingChunkFilesError).id)

err = errors.Wrap(missingChunkFilesError(errors.New("test"), blockID), "wrapped")
testutil.Assert(t, IsMissingChunkFilesError(err), "wrapped error should still be detected")

err = errors.Wrap(errors.Wrap(missingChunkFilesError(errors.New("test"), blockID), "inner"), "outer")
testutil.Assert(t, IsMissingChunkFilesError(err), "double wrapped error should still be detected")
}

func TestDetectCorruptedBlockFromError(t *testing.T) {
t.Parallel()

blockID1 := ulid.MustNew(1, nil)
blockID2 := ulid.MustNew(2, nil)

toCompact := []*metadata.Meta{
{BlockMeta: tsdb.BlockMeta{ULID: blockID1}},
{BlockMeta: tsdb.BlockMeta{ULID: blockID2}},
}

// Test: nil error returns false
id, ok := detectCorruptedBlockFromError(nil, toCompact)
testutil.Assert(t, !ok, "nil error should return false")
testutil.Equals(t, ulid.ULID{}, id)

// Test: unrelated error returns false
err := errors.New("some random error")
_, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, !ok, "unrelated error should return false")

// Test: error with "out of range" but no block ID returns false (multiple blocks)
err = errors.New("segment index 0 out of range")
_, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, !ok, "error without block ID should return false when multiple blocks")

// Test: error with "out of range" and single block returns that block
singleBlock := []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{ULID: blockID1}}}
id, ok = detectCorruptedBlockFromError(err, singleBlock)
testutil.Assert(t, ok, "error with single block should return true")
testutil.Equals(t, blockID1, id)

// Test: error with "from block {ULID}" pattern
err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", blockID1.String())
id, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, ok, "error with block ID should return true")
testutil.Equals(t, blockID1, id)

// Test: error with block ID not in toCompact returns false
unknownBlock := ulid.MustNew(999, nil)
err = errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", unknownBlock.String())
_, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, !ok, "error with unknown block ID should return false")

// Test: wrapped error with "from block {ULID}" pattern
err = errors.Wrap(
errors.Errorf("cannot populate chunk 8 from block %s: segment index 0 out of range", blockID2.String()),
"compaction failed")
id, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, ok, "wrapped error with block ID should return true")
testutil.Equals(t, blockID2, id)

// Test: reference sequence out of range error (another variant)
err = errors.Errorf("cannot populate chunk 8 from block %s: reference sequence 0 out of range", blockID1.String())
id, ok = detectCorruptedBlockFromError(err, toCompact)
testutil.Assert(t, ok, "reference sequence out of range should also be detected")
testutil.Equals(t, blockID1, id)

// Test: empty toCompact slice
_, ok = detectCorruptedBlockFromError(errors.New("segment index 0 out of range"), []*metadata.Meta{})
testutil.Assert(t, !ok, "empty toCompact should return false")
}

func TestGroupKey(t *testing.T) {
t.Parallel()

Expand Down
Loading