Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,9 +973,14 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
db.metrics.maxBytes.Set(float64(maxBytes))
db.metrics.retentionDuration.Set((time.Duration(opts.RetentionDuration) * time.Millisecond).Seconds())

// Calling db.reload() calls db.reloadBlocks() which requires cmtx to be locked.
db.cmtx.Lock()
if err := db.reload(); err != nil {
db.cmtx.Unlock()
return nil, err
}
db.cmtx.Unlock()

// Set the min valid time for the ingested samples
// to be no lower than the maxt of the last block.
minValidTime := int64(math.MinInt64)
Expand Down Expand Up @@ -1334,6 +1339,7 @@ func (db *DB) CompactOOOHead(ctx context.Context) error {
// Callback for testing.
var compactOOOHeadTestingCallback func()

// The db.cmtx mutex should be held before calling this method.
func (db *DB) compactOOOHead(ctx context.Context) error {
if !db.oooWasEnabled.Load() {
return nil
Expand Down Expand Up @@ -1388,6 +1394,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error {

// compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given.
// Each ULID in the result corresponds to a block in a unique time range.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) {
start := time.Now()

Expand Down Expand Up @@ -1432,7 +1439,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID
}

// compactHead compacts the given RangeHead.
// The compaction mutex should be held before calling this method.
// The db.cmtx should be held before calling this method.
func (db *DB) compactHead(head *RangeHead) error {
uids, err := db.compactor.Write(db.dir, head, head.MinTime(), head.BlockMaxTime(), nil)
if err != nil {
Expand All @@ -1458,7 +1465,7 @@ func (db *DB) compactHead(head *RangeHead) error {
}

// compactBlocks compacts all the eligible on-disk blocks.
// The compaction mutex should be held before calling this method.
// The db.cmtx should be held before calling this method.
func (db *DB) compactBlocks() (err error) {
// Check for compactions of multiple blocks.
for {
Expand Down Expand Up @@ -1515,6 +1522,7 @@ func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) {
}

// reload reloads blocks and truncates the head and its WAL.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) reload() error {
if err := db.reloadBlocks(); err != nil {
return fmt.Errorf("reloadBlocks: %w", err)
Expand All @@ -1531,6 +1539,7 @@ func (db *DB) reload() error {

// reloadBlocks reloads blocks without touching head.
// Blocks that are obsolete due to replacement or retention will be deleted.
// The db.cmtx mutex should be held before calling this method.
func (db *DB) reloadBlocks() (err error) {
defer func() {
if err != nil {
Expand All @@ -1539,13 +1548,9 @@ func (db *DB) reloadBlocks() (err error) {
db.metrics.reloads.Inc()
}()

// Now that we reload TSDB every minute, there is a high chance for a race condition with a reload
// triggered by CleanTombstones(). We need to lock the reload to avoid the situation where
// a normal reload and CleanTombstones try to delete the same block.
db.mtx.Lock()
defer db.mtx.Unlock()

db.mtx.RLock()
loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
db.mtx.RUnlock()
if err != nil {
return err
}
Expand All @@ -1571,11 +1576,13 @@ func (db *DB) reloadBlocks() (err error) {
if len(corrupted) > 0 {
// Corrupted but no child loaded for it.
// Close all new blocks to release the lock for windows.
db.mtx.RLock()
for _, block := range loadable {
if _, open := getBlock(db.blocks, block.Meta().ULID); !open {
block.Close()
}
}
db.mtx.RUnlock()
errs := tsdb_errors.NewMulti()
for ulid, err := range corrupted {
if err != nil {
Expand Down Expand Up @@ -1614,8 +1621,10 @@ func (db *DB) reloadBlocks() (err error) {
})

// Swap new blocks first for subsequently created readers to be seen.
db.mtx.Lock()
oldBlocks := db.blocks
db.blocks = toLoad
db.mtx.Unlock()

// Only check overlapping blocks when overlapping compaction is enabled.
if db.opts.EnableOverlappingCompaction {
Expand Down
55 changes: 0 additions & 55 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,61 +1353,6 @@ func TestTombstoneCleanFail(t *testing.T) {
require.Len(t, intersection(oldBlockDirs, actualBlockDirs), len(actualBlockDirs)-1)
}

// TestTombstoneCleanRetentionLimitsRace tests that a CleanTombstones operation
// and retention limit policies, when triggered at the same time,
// won't race against each other.
func TestTombstoneCleanRetentionLimitsRace(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}

opts := DefaultOptions()
var wg sync.WaitGroup

// We want to make sure that a race doesn't happen when a normal reload and a CleanTombstones()
// reload try to delete the same block. Without the correct lock placement, it can happen if a
// block is marked for deletion due to retention limits and also has tombstones to be cleaned at
// the same time.
//
// That is something tricky to trigger, so let's try several times just to make sure.
for i := 0; i < 20; i++ {
t.Run(fmt.Sprintf("iteration%d", i), func(t *testing.T) {
db := openTestDB(t, opts, nil)
totalBlocks := 20
dbDir := db.Dir()
// Generate some blocks with old mint (near epoch).
for j := 0; j < totalBlocks; j++ {
blockDir := createBlock(t, dbDir, genSeries(10, 1, int64(j), int64(j)+1))
block, err := OpenBlock(nil, blockDir, nil)
require.NoError(t, err)
// Cover block with tombstones so it can be deleted with CleanTombstones() as well.
tomb := tombstones.NewMemTombstones()
tomb.AddInterval(0, tombstones.Interval{Mint: int64(j), Maxt: int64(j) + 1})
block.tombstones = tomb

db.blocks = append(db.blocks, block)
}

wg.Add(2)
// Run reload and CleanTombstones together, with a small time window randomization
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.reloadBlocks())
}()
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Float64() * 100 * float64(time.Millisecond)))
require.NoError(t, db.CleanTombstones())
}()

wg.Wait()

require.NoError(t, db.Close())
})
}
}

func intersection(oldBlocks, actualBlocks []string) (intersection []string) {
hash := make(map[string]bool)
for _, e := range oldBlocks {
Expand Down
Loading
Loading