Skip to content

Commit c84e4aa

Browse files
codesomepstibrany
andauthored
Avoid large blocks on forced compaction of blocks ingester (#3344)
* Avoid large blocks on forced compaction of blocks ingester Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Add entry in CHANGELOG Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Refactor code and add unit test Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fix builds Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in> * Fixing nits. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Removed compactBlocksMtx, and use for-loop again in compactHead. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Comment. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> * Review feedback. Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com> Co-authored-by: Peter Štibraný <peter.stibrany@grafana.com>
1 parent 6c67ce8 commit c84e4aa

File tree

3 files changed

+119
-11
lines changed

3 files changed

+119
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
* [BUGFIX] Blocks storage: Avoid deletion of blocks in the ingester which are not shipped to the storage yet. #3346
107107
* [BUGFIX] Fix common prefixes returned by List method of S3 client. #3358
108108
* [BUGFIX] Honor configured timeout in Azure and GCS object clients. #3285
109+
* [BUGFIX] Blocks storage: Avoid creating blocks larger than configured block range period on forced compaction and when TSDB is idle. #3344
109110
* [BUGFIX] Shuffle sharding: fixed max global series per user/metric limit when shuffle sharding and `-distributor.shard-by-all-labels=true` are both enabled in distributor. When using these global limits you should now set `-distributor.sharding-strategy` and `-distributor.zone-awareness-enabled` to ingesters too. #3369
110111
* [BUGFIX] Slow query logging: when using downstream server request parameters were not logged. #3276
111112
* [BUGFIX] Fixed tenant detection in the ruler and alertmanager API when running without auth. #3343

pkg/ingester/ingester_v2.go

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type Shipper interface {
4848
}
4949

5050
type userTSDB struct {
51-
*tsdb.DB
51+
db *tsdb.DB
5252
userID string
5353
refCache *cortex_tsdb.RefCache
5454
activeSeries *ActiveSeries
@@ -66,14 +66,61 @@ type userTSDB struct {
6666
ingestedRuleSamples *ewmaRate
6767
}
6868

69+
// Explicitly wrapping the tsdb.DB functions that we use.
70+
71+
func (u *userTSDB) Appender(ctx context.Context) storage.Appender {
72+
return u.db.Appender(ctx)
73+
}
74+
75+
func (u *userTSDB) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
76+
return u.db.Querier(ctx, mint, maxt)
77+
}
78+
79+
func (u *userTSDB) Head() *tsdb.Head {
80+
return u.db.Head()
81+
}
82+
83+
func (u *userTSDB) Blocks() []*tsdb.Block {
84+
return u.db.Blocks()
85+
}
86+
87+
func (u *userTSDB) Close() error {
88+
return u.db.Close()
89+
}
90+
91+
func (u *userTSDB) Compact() error {
92+
return u.db.Compact()
93+
}
94+
95+
// compactHead compacts the Head block at specified block durations avoiding a single huge block.
96+
func (u *userTSDB) compactHead(blockDuration int64) error {
97+
h := u.Head()
98+
99+
minTime, maxTime := h.MinTime(), h.MaxTime()
100+
101+
for (minTime/blockDuration)*blockDuration != (maxTime/blockDuration)*blockDuration {
102+
// Data in Head spans across multiple block ranges, so we break it into blocks here.
103+
// Block max time is exclusive, so we do a -1 here.
104+
blockMaxTime := ((minTime/blockDuration)+1)*blockDuration - 1
105+
if err := u.db.CompactHead(tsdb.NewRangeHead(h, minTime, blockMaxTime)); err != nil {
106+
return err
107+
}
108+
109+
// Get current min/max times after compaction.
110+
minTime, maxTime = h.MinTime(), h.MaxTime()
111+
}
112+
113+
return u.db.CompactHead(tsdb.NewRangeHead(h, minTime, maxTime))
114+
}
115+
69116
// PreCreation implements SeriesLifecycleCallback interface.
70117
func (u *userTSDB) PreCreation(metric labels.Labels) error {
71118
if u.limiter == nil {
72119
return nil
73120
}
74121

75122
// Total series limit.
76-
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.DB.Head().NumSeries())); err != nil {
123+
if err := u.limiter.AssertMaxSeriesPerUser(u.userID, int(u.Head().NumSeries())); err != nil {
77124
return makeLimitError(perUserSeriesLimit, err)
78125
}
79126

@@ -113,15 +160,15 @@ func (u *userTSDB) PostDeletion(metrics ...labels.Labels) {
113160

114161
// blocksToDelete filters the input blocks and returns the blocks which are safe to be deleted from the ingester.
115162
func (u *userTSDB) blocksToDelete(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
116-
if u.DB == nil {
163+
if u.db == nil {
117164
return nil
118165
}
119-
deletable := tsdb.DefaultBlocksToDelete(u.DB)(blocks)
166+
deletable := tsdb.DefaultBlocksToDelete(u.db)(blocks)
120167
if u.shipper == nil {
121168
return deletable
122169
}
123170

124-
shipperMeta, err := shipper.ReadMetaFile(u.Dir())
171+
shipperMeta, err := shipper.ReadMetaFile(u.db.Dir())
125172
if err != nil {
126173
// If there is any issue with the shipper, we should be conservative and not delete anything.
127174
level.Error(util.Logger).Log("msg", "failed to read shipper meta during deletion of blocks", "user", u.userID, "err", err)
@@ -1022,7 +1069,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
10221069
return nil, errors.Wrapf(err, "failed to compact TSDB: %s", udir)
10231070
}
10241071

1025-
userDB.DB = db
1072+
userDB.db = db
10261073
// We set the limiter here because we don't want to limit
10271074
// series during WAL replay.
10281075
userDB.limiter = i.limiter
@@ -1313,12 +1360,12 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
13131360
switch {
13141361
case force:
13151362
reason = "forced"
1316-
err = userDB.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime()))
1363+
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
13171364

13181365
case i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout):
13191366
reason = "idle"
13201367
level.Info(util.Logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)
1321-
err = userDB.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime()))
1368+
err = userDB.compactHead(i.cfg.BlocksStorageConfig.TSDB.BlockRanges[0].Milliseconds())
13221369

13231370
default:
13241371
reason = "regular"

pkg/ingester/ingester_v2_test.go

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1806,7 +1806,7 @@ func TestIngester_flushing(t *testing.T) {
18061806
// Nothing shipped yet.
18071807
m.AssertNumberOfCalls(t, "Sync", 0)
18081808

1809-
i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/shutdown", nil))
1809+
i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil))
18101810

18111811
// Flush handler only triggers compactions, but doesn't wait for them to finish. Let's wait for a moment, and then verify.
18121812
time.Sleep(1 * time.Second)
@@ -1815,6 +1815,59 @@ func TestIngester_flushing(t *testing.T) {
18151815
m.AssertNumberOfCalls(t, "Sync", 1)
18161816
},
18171817
},
1818+
1819+
"flushMultipleBlocksWithDataSpanning3Days": {
1820+
setupIngester: func(cfg *Config) {
1821+
cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false
1822+
},
1823+
1824+
action: func(t *testing.T, i *Ingester, m *shipperMock) {
1825+
// Pushing 5 samples, spanning over 3 days.
1826+
// First block
1827+
pushSingleSampleAtTime(t, i, 23*time.Hour.Milliseconds())
1828+
pushSingleSampleAtTime(t, i, 24*time.Hour.Milliseconds()-1)
1829+
1830+
// Second block
1831+
pushSingleSampleAtTime(t, i, 24*time.Hour.Milliseconds()+1)
1832+
pushSingleSampleAtTime(t, i, 25*time.Hour.Milliseconds())
1833+
1834+
// Third block, far in the future.
1835+
pushSingleSampleAtTime(t, i, 50*time.Hour.Milliseconds())
1836+
1837+
// Nothing shipped yet.
1838+
m.AssertNumberOfCalls(t, "Sync", 0)
1839+
1840+
i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil))
1841+
1842+
// Wait for compaction to finish.
1843+
test.Poll(t, 5*time.Second, true, func() interface{} {
1844+
db := i.getTSDB(userID)
1845+
if db == nil {
1846+
return false
1847+
}
1848+
1849+
h := db.Head()
1850+
return h.NumSeries() == 0
1851+
})
1852+
1853+
m.AssertNumberOfCalls(t, "Sync", 1)
1854+
1855+
userDB := i.getTSDB(userID)
1856+
require.NotNil(t, userDB)
1857+
1858+
blocks := userDB.Blocks()
1859+
require.Equal(t, 3, len(blocks))
1860+
require.Equal(t, 23*time.Hour.Milliseconds(), blocks[0].Meta().MinTime)
1861+
require.Equal(t, 24*time.Hour.Milliseconds(), blocks[0].Meta().MaxTime) // Block maxt is exclusive.
1862+
1863+
// Even though we added 24*time.Hour.Milliseconds()+1, the Head compaction
1864+
// will leave Head's mint to 24*time.Hour.Milliseconds(). Hence the block mint.
1865+
require.Equal(t, 24*time.Hour.Milliseconds(), blocks[1].Meta().MinTime)
1866+
require.Equal(t, 26*time.Hour.Milliseconds(), blocks[1].Meta().MaxTime)
1867+
1868+
require.Equal(t, 50*time.Hour.Milliseconds()+1, blocks[2].Meta().MaxTime) // Block maxt is exclusive.
1869+
},
1870+
},
18181871
} {
18191872
t.Run(name, func(t *testing.T) {
18201873
cfg := defaultIngesterTestConfig()
@@ -2109,6 +2162,13 @@ func pushSingleSample(t *testing.T, i *Ingester) {
21092162
require.NoError(t, err)
21102163
}
21112164

2165+
func pushSingleSampleAtTime(t *testing.T, i *Ingester, ts int64) {
2166+
ctx := user.InjectOrgID(context.Background(), userID)
2167+
req, _, _ := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "test"}}, 0, ts)
2168+
_, err := i.v2Push(ctx, req)
2169+
require.NoError(t, err)
2170+
}
2171+
21122172
func TestHeadCompactionOnStartup(t *testing.T) {
21132173
// Create a temporary directory for TSDB
21142174
tempDir, err := ioutil.TempDir("", "tsdb")
@@ -2255,7 +2315,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
22552315
require.Equal(t, 3, len(oldBlocks))
22562316

22572317
// Saying that we have shipped the second block, so only that should get deleted.
2258-
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
2318+
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
22592319
Version: shipper.MetaVersion1,
22602320
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID},
22612321
}))
@@ -2276,7 +2336,7 @@ func TestIngesterNotDeleteUnshippedBlocks(t *testing.T) {
22762336
require.NotEqual(t, oldBlocks[1].Meta().ULID, newBlocks[2].Meta().ULID) // The new block won't match previous 2nd block.
22772337

22782338
// Shipping 2 more blocks, hence all the blocks from first round.
2279-
require.Nil(t, shipper.WriteMetaFile(nil, db.Dir(), &shipper.Meta{
2339+
require.Nil(t, shipper.WriteMetaFile(nil, db.db.Dir(), &shipper.Meta{
22802340
Version: shipper.MetaVersion1,
22812341
Uploaded: []ulid.ULID{oldBlocks[1].Meta().ULID, newBlocks[0].Meta().ULID, newBlocks[1].Meta().ULID},
22822342
}))

0 commit comments

Comments
 (0)