@@ -3,6 +3,7 @@ package builder
33import (
44 "context"
55 "crypto/rand"
6+ "io"
67 "os"
78 "path/filepath"
89 "sync"
@@ -26,6 +27,8 @@ import (
2627 "github.com/cortexproject/cortex/pkg/querier/iterators"
2728)
2829
30+ const unsortedChunksDir = "unsorted_chunks"
31+
2932// This builder uses TSDB's chunk and index writer directly, without
3033// using TSDB Head.
3134type tsdbBuilder struct {
@@ -35,8 +38,8 @@ type tsdbBuilder struct {
3538 outDir string
3639 tmpBlockDir string
3740
38- chunksWriterMu sync.Mutex
39- chunksWriter tsdb.ChunkWriter
41+ unsortedChunksWriterMu sync.Mutex
42+ unsortedChunksWriter tsdb.ChunkWriter
4043
4144 startTime model.Time
4245 endTime model.Time
@@ -69,21 +72,21 @@ func newTsdbBuilder(outDir string, start, end time.Time, seriesBatchLimit int, l
6972 return nil , err
7073 }
7174
72- chunksWriter , err := chunks .NewWriter (filepath .Join (blockDir , "chunks" ))
75+ unsortedChunksWriter , err := chunks .NewWriter (filepath .Join (blockDir , unsortedChunksDir ))
7376 if err != nil {
7477 return nil , errors .Wrap (err , "chunks writer" )
7578 }
7679
7780 return & tsdbBuilder {
78- log : log ,
79- ulid : id ,
80- outDir : outDir ,
81- tmpBlockDir : blockDir ,
82- chunksWriter : chunksWriter ,
83- startTime : model .TimeFromUnixNano (start .UnixNano ()),
84- endTime : model .TimeFromUnixNano (end .UnixNano ()),
85- series : newSeriesList (seriesBatchLimit , seriesDir ),
86- seriesDir : seriesDir ,
81+ log : log ,
82+ ulid : id ,
83+ outDir : outDir ,
84+ tmpBlockDir : blockDir ,
85+ unsortedChunksWriter : unsortedChunksWriter ,
86+ startTime : model .TimeFromUnixNano (start .UnixNano ()),
87+ endTime : model .TimeFromUnixNano (end .UnixNano ()),
88+ series : newSeriesList (seriesBatchLimit , seriesDir ),
89+ seriesDir : seriesDir ,
8790
8891 processedSeries : processedSeries ,
8992 writtenSamples : writtenSamples ,
@@ -149,9 +152,9 @@ func (d *tsdbBuilder) buildSingleSeries(metric labels.Labels, cs []chunk.Chunk)
149152 ch = nil
150153 }
151154
152- d .chunksWriterMu .Lock ()
153- err = d .chunksWriter .WriteChunks (chs ... )
154- d .chunksWriterMu .Unlock ()
155+ d .unsortedChunksWriterMu .Lock ()
156+ err = d .unsortedChunksWriter .WriteChunks (chs ... )
157+ d .unsortedChunksWriterMu .Unlock ()
155158
156159 if err != nil {
157160 return err
@@ -181,7 +184,7 @@ func (d *tsdbBuilder) buildSingleSeries(metric labels.Labels, cs []chunk.Chunk)
181184}
182185
183186func (d * tsdbBuilder ) finishBlock (source string , labels map [string ]string ) (ulid.ULID , error ) {
184- if err := d .chunksWriter .Close (); err != nil {
187+ if err := d .unsortedChunksWriter .Close (); err != nil {
185188 return ulid.ULID {}, errors .Wrap (err , "closing chunks writer" )
186189 }
187190
@@ -211,10 +214,27 @@ func (d *tsdbBuilder) finishBlock(source string, labels map[string]string) (ulid
211214 },
212215 }
213216
217+ toClose := map [string ]io.Closer {}
218+ defer func () {
219+ for k , c := range toClose {
220+ err := c .Close ()
221+ if err != nil {
222+ level .Error (d .log ).Log ("msg" , "close failed" , "name" , k , "err" , err )
223+ }
224+ }
225+ }()
226+
227+ const (
228+ indexWriterName = "index writer"
229+ unsortedChunksReaderName = "unsorted chunks reader"
230+ chunksWriterName = "chunks writer"
231+ )
232+
214233 indexWriter , err := index .NewWriter (context .Background (), filepath .Join (d .tmpBlockDir , "index" ))
215234 if err != nil {
216- return ulid.ULID {}, errors .Wrap (err , "new index writer" )
235+ return ulid.ULID {}, errors .Wrap (err , indexWriterName )
217236 }
237+ toClose [indexWriterName ] = indexWriter
218238
219239 symbols , err := addSymbolsToIndex (indexWriter , d .series )
220240 if err != nil {
@@ -223,16 +243,39 @@ func (d *tsdbBuilder) finishBlock(source string, labels map[string]string) (ulid
223243
224244 level .Info (d .log ).Log ("msg" , "added symbols to index" , "count" , symbols )
225245
226- stats , err := addSeriesToIndex (indexWriter , d .series )
246+ unsortedChunksReader , err := chunks .NewDirReader (filepath .Join (d .tmpBlockDir , unsortedChunksDir ), nil )
247+ if err != nil {
248+ return ulid.ULID {}, errors .Wrap (err , unsortedChunksReaderName )
249+ }
250+ toClose [unsortedChunksReaderName ] = unsortedChunksReader
251+
252+ chunksWriter , err := chunks .NewWriter (filepath .Join (d .tmpBlockDir , "chunks" ))
253+ if err != nil {
254+ return ulid.ULID {}, errors .Wrap (err , chunksWriterName )
255+ }
256+ toClose [chunksWriterName ] = chunksWriter
257+
258+ stats , err := addSeriesToIndex (indexWriter , d .series , unsortedChunksReader , chunksWriter )
227259 if err != nil {
228260 return ulid.ULID {}, errors .Wrap (err , "adding series" )
229261 }
230262 meta .Stats = stats
231263
232264 level .Info (d .log ).Log ("msg" , "added series to index" , "series" , stats .NumSeries , "chunks" , stats .NumChunks , "samples" , stats .NumSamples )
233265
234- if err := indexWriter .Close (); err != nil {
235- return ulid.ULID {}, errors .Wrap (err , "closing index writer" )
266+ // Close index writer, unsorted chunks reader and chunks writer.
267+ for k , c := range toClose {
268+ delete (toClose , k )
269+
270+ err := c .Close ()
271+ if err != nil {
272+ return ulid.ULID {}, errors .Wrapf (err , "closing %s" , k )
273+ }
274+ }
275+
276+ // Delete unsorted chunks, they are no longer needed.
277+ if err := os .RemoveAll (filepath .Join (d .tmpBlockDir , unsortedChunksDir )); err != nil {
278+ return ulid.ULID {}, errors .Wrap (err , "deleting unsorted chunks" )
236279 }
237280
238281 if err := metadata .Write (d .log , d .tmpBlockDir , meta ); err != nil {
@@ -246,7 +289,7 @@ func (d *tsdbBuilder) finishBlock(source string, labels map[string]string) (ulid
246289 return d .ulid , nil
247290}
248291
249- func addSeriesToIndex (indexWriter * index.Writer , sl * seriesList ) (tsdb.BlockStats , error ) {
292+ func addSeriesToIndex (indexWriter * index.Writer , sl * seriesList , unsortedChunksReader * chunks. Reader , chunksWriter * chunks. Writer ) (tsdb.BlockStats , error ) {
250293 var stats tsdb.BlockStats
251294
252295 it , err := sl .seriesIterator ()
@@ -259,6 +302,29 @@ func addSeriesToIndex(indexWriter *index.Writer, sl *seriesList) (tsdb.BlockStat
259302 l := s .Metric
260303 cs := s .Chunks
261304
305+ // Read chunks into memory.
306+ for ix := range s .Chunks {
307+ cs [ix ].Chunk , err = unsortedChunksReader .Chunk (cs [ix ].Ref )
308+ if err != nil {
309+ return stats , errors .Wrap (err , "failed to read chunk" )
310+ }
311+ cs [ix ].Ref = 0
312+ }
313+
314+ // Write chunks again. This time they will be written in the same order as series.
315+ err = chunksWriter .WriteChunks (cs ... )
316+ if err != nil {
317+ return stats , errors .Wrap (err , "failed to write sorted chunks" )
318+ }
319+
320+ // Remove chunks data from memory, but keep reference for writing to index.
321+ for ix := range cs {
322+ if cs [ix ].Ref == 0 {
323+ return stats , errors .Errorf ("chunk ref not set after writing sorted chunks" )
324+ }
325+ cs [ix ].Chunk = nil
326+ }
327+
262328 if err := indexWriter .AddSeries (uint64 (ix ), l , cs ... ); err != nil {
263329 return stats , errors .Wrapf (err , "adding series %v" , l )
264330 }
0 commit comments