Skip to content

Commit a62f938

Browse files
authored
Refactored TSDB opening concurrency at startup (#3393)
* Refactored TSDB opening concurrency at startup Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed metric tracking Signed-off-by: Marco Pracucci <marco@pracucci.com> * Fixed and simplified BlocksStorageConfig.Validate() test Signed-off-by: Marco Pracucci <marco@pracucci.com> * Addressed code review comments Signed-off-by: Marco Pracucci <marco@pracucci.com> * Addressed code review comments Signed-off-by: Marco Pracucci <marco@pracucci.com>
1 parent c84e4aa commit a62f938

File tree

4 files changed

+176
-287
lines changed

4 files changed

+176
-287
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 75 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,16 @@ import (
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/client_golang/prometheus/promauto"
1818
"github.com/prometheus/common/model"
19-
"github.com/prometheus/prometheus/pkg/gate"
2019
"github.com/prometheus/prometheus/pkg/labels"
2120
"github.com/prometheus/prometheus/storage"
2221
"github.com/prometheus/prometheus/tsdb"
23-
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
2422
"github.com/thanos-io/thanos/pkg/block/metadata"
2523
"github.com/thanos-io/thanos/pkg/objstore"
2624
"github.com/thanos-io/thanos/pkg/shipper"
2725
"github.com/weaveworks/common/httpgrpc"
2826
"github.com/weaveworks/common/user"
2927
"go.uber.org/atomic"
28+
"golang.org/x/sync/errgroup"
3029

3130
"github.com/cortexproject/cortex/pkg/ingester/client"
3231
"github.com/cortexproject/cortex/pkg/ring"
@@ -1143,98 +1142,100 @@ func (i *Ingester) closeAllTSDB() {
11431142
// concurrently opening TSDB.
11441143
func (i *Ingester) openExistingTSDB(ctx context.Context) error {
11451144
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
1146-
wg := &sync.WaitGroup{}
1147-
openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup)
11481145

1149-
// Keep track of all errors that could occur.
1150-
errs := tsdb_errors.MultiError{}
1151-
errsMx := sync.Mutex{}
1146+
queue := make(chan string)
1147+
group, groupCtx := errgroup.WithContext(ctx)
11521148

1153-
walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
1154-
if err != nil {
1155-
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
1156-
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
1157-
return filepath.SkipDir
1158-
}
1149+
// Create a pool of workers which will open existing TSDBs.
1150+
for n := 0; n < i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup; n++ {
1151+
group.Go(func() error {
1152+
for userID := range queue {
1153+
startTime := time.Now()
11591154

1160-
level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
1161-
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
1162-
}
1155+
db, err := i.createTSDB(userID)
1156+
if err != nil {
1157+
level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
1158+
return errors.Wrapf(err, "unable to open TSDB for user %s", userID)
1159+
}
1160+
1161+
// Add the database to the map of user databases
1162+
i.userStatesMtx.Lock()
1163+
i.TSDBState.dbs[userID] = db
1164+
i.userStatesMtx.Unlock()
1165+
i.metrics.memUsers.Inc()
1166+
1167+
i.TSDBState.walReplayTime.Observe(time.Since(startTime).Seconds())
1168+
}
11631169

1164-
// Skip root dir and all other files
1165-
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
11661170
return nil
1167-
}
1171+
})
1172+
}
11681173

1169-
// Top level directories are assumed to be user TSDBs
1170-
userID := info.Name()
1171-
f, err := os.Open(path)
1172-
if err != nil {
1173-
level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
1174-
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
1175-
}
1176-
defer f.Close()
1174+
// Spawn a goroutine to find all users with a TSDB on the filesystem.
1175+
group.Go(func() error {
1176+
// Close the queue once filesystem walking is done.
1177+
defer close(queue)
1178+
1179+
walkErr := filepath.Walk(i.cfg.BlocksStorageConfig.TSDB.Dir, func(path string, info os.FileInfo, err error) error {
1180+
if err != nil {
1181+
// If the root directory doesn't exist, we're OK (not needed to be created upfront).
1182+
if os.IsNotExist(err) && path == i.cfg.BlocksStorageConfig.TSDB.Dir {
1183+
return filepath.SkipDir
1184+
}
11771185

1178-
// If the dir is empty skip it
1179-
if _, err := f.Readdirnames(1); err != nil {
1180-
if err == io.EOF {
1181-
return filepath.SkipDir
1186+
level.Error(util.Logger).Log("msg", "an error occurred while iterating the filesystem storing TSDBs", "path", path, "err", err)
1187+
return errors.Wrapf(err, "an error occurred while iterating the filesystem storing TSDBs at %s", path)
11821188
}
11831189

1184-
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
1185-
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
1186-
}
1190+
// Skip root dir and all other files
1191+
if path == i.cfg.BlocksStorageConfig.TSDB.Dir || !info.IsDir() {
1192+
return nil
1193+
}
11871194

1188-
// Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled.
1189-
if err := openGate.Start(ctx); err != nil {
1190-
return err
1191-
}
1195+
// Top level directories are assumed to be user TSDBs
1196+
userID := info.Name()
1197+
f, err := os.Open(path)
1198+
if err != nil {
1199+
level.Error(util.Logger).Log("msg", "unable to open TSDB dir", "err", err, "user", userID, "path", path)
1200+
return errors.Wrapf(err, "unable to open TSDB dir %s for user %s", path, userID)
1201+
}
1202+
defer f.Close()
11921203

1193-
wg.Add(1)
1194-
go func(userID string) {
1195-
defer wg.Done()
1196-
defer openGate.Done()
1197-
defer func(ts time.Time) {
1198-
i.TSDBState.walReplayTime.Observe(time.Since(ts).Seconds())
1199-
}(time.Now())
1204+
// If the dir is empty skip it
1205+
if _, err := f.Readdirnames(1); err != nil {
1206+
if err == io.EOF {
1207+
return filepath.SkipDir
1208+
}
12001209

1201-
db, err := i.createTSDB(userID)
1202-
if err != nil {
1203-
errsMx.Lock()
1204-
errs.Add(errors.Wrapf(err, "unable to open TSDB for user %s", userID))
1205-
errsMx.Unlock()
1210+
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
1211+
return errors.Wrapf(err, "unable to read TSDB dir %s for user %s", path, userID)
1212+
}
12061213

1207-
level.Error(util.Logger).Log("msg", "unable to open TSDB", "err", err, "user", userID)
1208-
return
1214+
// Enqueue the user to be processed.
1215+
select {
1216+
case queue <- userID:
1217+
// Nothing to do.
1218+
case <-groupCtx.Done():
1219+
// Interrupt in case a failure occurred in another goroutine.
1220+
return nil
12091221
}
12101222

1211-
// Add the database to the map of user databases
1212-
i.userStatesMtx.Lock()
1213-
i.TSDBState.dbs[userID] = db
1214-
i.userStatesMtx.Unlock()
1215-
i.metrics.memUsers.Inc()
1216-
}(userID)
1223+
// Don't descend into subdirectories.
1224+
return filepath.SkipDir
1225+
})
12171226

1218-
return filepath.SkipDir // Don't descend into directories
1227+
return errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir)
12191228
})
12201229

1221-
if walkErr != nil {
1222-
errsMx.Lock()
1223-
errs.Add(errors.Wrapf(walkErr, "unable to walk directory %s containing existing TSDBs", i.cfg.BlocksStorageConfig.TSDB.Dir))
1224-
errsMx.Unlock()
1225-
}
1226-
1227-
// Wait for all opening routines to finish
1228-
wg.Wait()
1229-
1230-
// Ensure no error occurred.
1231-
if errs.Err() == nil {
1232-
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
1233-
return nil
1230+
// Wait for all workers to complete.
1231+
err := group.Wait()
1232+
if err != nil {
1233+
level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", err)
1234+
return err
12341235
}
12351236

1236-
level.Error(util.Logger).Log("msg", "error while opening existing TSDBs", "err", errs.Error())
1237-
return errs.Err()
1237+
level.Info(util.Logger).Log("msg", "successfully opened existing TSDBs")
1238+
return nil
12381239
}
12391240

12401241
// numSeriesInTSDB returns the total number of in-memory series across all open TSDBs.

pkg/ingester/ingester_v2_test.go

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,11 +1602,13 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16021602
t.Parallel()
16031603

16041604
tests := map[string]struct {
1605+
concurrency int
16051606
setup func(*testing.T, string)
16061607
check func(*testing.T, *Ingester)
16071608
expectedErr string
16081609
}{
16091610
"should not load TSDB if the user directory is empty": {
1611+
concurrency: 10,
16101612
setup: func(t *testing.T, dir string) {
16111613
require.NoError(t, os.Mkdir(filepath.Join(dir, "user0"), 0700))
16121614
},
@@ -1615,12 +1617,14 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16151617
},
16161618
},
16171619
"should not load any TSDB if the root directory is empty": {
1618-
setup: func(t *testing.T, dir string) {},
1620+
concurrency: 10,
1621+
setup: func(t *testing.T, dir string) {},
16191622
check: func(t *testing.T, i *Ingester) {
16201623
require.Zero(t, len(i.TSDBState.dbs))
16211624
},
16221625
},
16231626
"should not load any TSDB is the root directory is missing": {
1627+
concurrency: 10,
16241628
setup: func(t *testing.T, dir string) {
16251629
require.NoError(t, os.Remove(dir))
16261630
},
@@ -1629,6 +1633,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16291633
},
16301634
},
16311635
"should load TSDB for any non-empty user directory": {
1636+
concurrency: 10,
16321637
setup: func(t *testing.T, dir string) {
16331638
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
16341639
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
@@ -1641,7 +1646,26 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16411646
require.Nil(t, i.getTSDB("user2"))
16421647
},
16431648
},
1644-
"should fail and rollback if an error occur while loading any user's TSDB": {
1649+
"should load all TSDBs on concurrency < number of TSDBs": {
1650+
concurrency: 2,
1651+
setup: func(t *testing.T, dir string) {
1652+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
1653+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
1654+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "dummy"), 0700))
1655+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700))
1656+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700))
1657+
},
1658+
check: func(t *testing.T, i *Ingester) {
1659+
require.Equal(t, 5, len(i.TSDBState.dbs))
1660+
require.NotNil(t, i.getTSDB("user0"))
1661+
require.NotNil(t, i.getTSDB("user1"))
1662+
require.NotNil(t, i.getTSDB("user2"))
1663+
require.NotNil(t, i.getTSDB("user3"))
1664+
require.NotNil(t, i.getTSDB("user4"))
1665+
},
1666+
},
1667+
"should fail and rollback if an error occur while loading a TSDB on concurrency > number of TSDBs": {
1668+
concurrency: 10,
16451669
setup: func(t *testing.T, dir string) {
16461670
// Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and
16471671
// opening TSDB should fail).
@@ -1658,6 +1682,30 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16581682
},
16591683
expectedErr: "unable to open TSDB for user user0",
16601684
},
1685+
"should fail and rollback if an error occur while loading a TSDB on concurrency < number of TSDBs": {
1686+
concurrency: 2,
1687+
setup: func(t *testing.T, dir string) {
1688+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user0", "dummy"), 0700))
1689+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user1", "dummy"), 0700))
1690+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user3", "dummy"), 0700))
1691+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user4", "dummy"), 0700))
1692+
1693+
// Create a fake TSDB on disk with an empty chunks head segment file (it's invalid and
1694+
// opening TSDB should fail).
1695+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "wal", ""), 0700))
1696+
require.NoError(t, os.MkdirAll(filepath.Join(dir, "user2", "chunks_head", ""), 0700))
1697+
require.NoError(t, ioutil.WriteFile(filepath.Join(dir, "user2", "chunks_head", "00000001"), nil, 0700))
1698+
},
1699+
check: func(t *testing.T, i *Ingester) {
1700+
require.Equal(t, 0, len(i.TSDBState.dbs))
1701+
require.Nil(t, i.getTSDB("user0"))
1702+
require.Nil(t, i.getTSDB("user1"))
1703+
require.Nil(t, i.getTSDB("user2"))
1704+
require.Nil(t, i.getTSDB("user3"))
1705+
require.Nil(t, i.getTSDB("user4"))
1706+
},
1707+
expectedErr: "unable to open TSDB for user user2",
1708+
},
16611709
}
16621710

16631711
for name, test := range tests {
@@ -1678,6 +1726,7 @@ func TestIngester_v2OpenExistingTSDBOnStartup(t *testing.T) {
16781726
ingesterCfg := defaultIngesterTestConfig()
16791727
ingesterCfg.BlocksStorageEnabled = true
16801728
ingesterCfg.BlocksStorageConfig.TSDB.Dir = tempDir
1729+
ingesterCfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup = testData.concurrency
16811730
ingesterCfg.BlocksStorageConfig.Bucket.Backend = "s3"
16821731
ingesterCfg.BlocksStorageConfig.Bucket.S3.Endpoint = "localhost"
16831732

pkg/storage/tsdb/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ var (
5555

5656
errUnsupportedStorageBackend = errors.New("unsupported TSDB storage backend")
5757
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
58+
errInvalidOpeningConcurrency = errors.New("invalid TSDB opening concurrency")
5859
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
5960
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
6061
errInvalidStripeSize = errors.New("invalid TSDB stripe size")
@@ -209,6 +210,10 @@ func (cfg *TSDBConfig) Validate() error {
209210
return errInvalidShipConcurrency
210211
}
211212

213+
if cfg.MaxTSDBOpeningConcurrencyOnStartup <= 0 {
214+
return errInvalidOpeningConcurrency
215+
}
216+
212217
if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute {
213218
return errInvalidCompactionInterval
214219
}

0 commit comments

Comments
 (0)