Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (

// known extensions
MetaFileSuffix = ".meta"
WalFileSuffix = ".wal"

DocsFileSuffix = ".docs"
DocsDelFileSuffix = ".docs.del"
Expand Down
124 changes: 117 additions & 7 deletions frac/active.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package frac

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -54,7 +55,8 @@ type Active struct {
sortCache *cache.Cache[[]byte]

metaFile *os.File
metaReader storage.DocBlocksReader
metaReader *storage.DocBlocksReader
walReader *storage.WalReader

writer *ActiveWriter
indexer *ActiveIndexer
Expand All @@ -79,7 +81,8 @@ func NewActive(
cfg *Config,
) *Active {
docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync)
metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync)

metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats)

f := &Active{
TokenList: NewActiveTokenList(config.IndexWorkers),
Expand All @@ -95,13 +98,14 @@ func NewActive(
sortReader: storage.NewDocsReader(readLimiter, docsFile, sortCache),

metaFile: metaFile,
metaReader: storage.NewDocBlocksReader(readLimiter, metaFile),
metaReader: metaReader,
walReader: walReader,

indexer: activeIndexer,
writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync),
writer: writer,

BaseFileName: baseFileName,
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())),
info: common.NewInfo(baseFileName, uint64(docsStats.Size()), metaSize),
Config: cfg,
}

Expand All @@ -114,6 +118,35 @@ func NewActive(
return f
}

func mustOpenMetaWriter(
baseFileName string,
readLimiter *storage.ReadLimiter,
docsFile *os.File,
docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) {
legacyMetaFileName := baseFileName + consts.MetaFileSuffix

if _, err := os.Stat(legacyMetaFileName); err == nil {
// .meta file exists
metaFile, metaStats := mustOpenFile(legacyMetaFileName, config.SkipFsync)
metaSize := uint64(metaStats.Size())
metaReader := storage.NewDocBlocksReader(readLimiter, metaFile)
writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
logger.Info("using legacy meta file format", zap.String("fraction", baseFileName))
return metaFile, writer, &metaReader, nil, metaSize
} else {
logger.Info("using new WAL format", zap.String("fraction", baseFileName))
walFileName := baseFileName + consts.WalFileSuffix
metaFile, metaStats := mustOpenFile(walFileName, config.SkipFsync)
metaSize := uint64(metaStats.Size())
writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync)
walReader, err := storage.NewWalReader(readLimiter, metaFile, baseFileName)
if err != nil {
logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err))
}
return metaFile, writer, nil, walReader, metaSize
}
}

func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) {
file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o776)
if err != nil {
Expand All @@ -133,6 +166,81 @@ func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) {
}

func (f *Active) Replay(ctx context.Context) error {
walFileName := f.BaseFileName + consts.WalFileSuffix
if _, err := os.Stat(walFileName); err == nil {
return f.replayWalFile(ctx)
}

metaFileName := f.BaseFileName + consts.MetaFileSuffix
if _, err := os.Stat(metaFileName); err == nil {
return f.replayMetaFileLegacy(ctx)
}

logger.Info("neither wal nor legacy meta file was found, skipping replay", zap.String("fraction", f.BaseFileName))
return nil
}

func (f *Active) replayWalFile(ctx context.Context) error {
if f.walReader == nil {
return fmt.Errorf("WAL reader not initialized")
}

logger.Info("start replaying WAL file...", zap.String("name", f.info.Name()))

t := time.Now()

step := f.info.MetaOnDisk / 10
next := step

sw := stopwatch.New()
wg := sync.WaitGroup{}

for entry := range f.walReader.Iter() {
// Check for context cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if entry.Err != nil {
return entry.Err
}

if uint64(entry.Offset) > next {
next += step
progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100
logger.Info("replaying batch, meta",
zap.String("name", f.info.Name()),
zap.Int64("from", entry.Offset),
zap.Int64("to", entry.Offset+entry.Size),
zap.Uint64("target", f.info.MetaOnDisk),
util.ZapFloat64WithPrec("progress_percentage", progress, 2),
)
}

wg.Add(1)
f.indexer.Index(f, entry.Data, &wg, sw)
}

wg.Wait()

tookSeconds := util.DurationToUnit(time.Since(t), "s")
throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds
throughputMeta := util.SizeToUnit(f.info.MetaOnDisk, "mb") / tookSeconds
logger.Info("active fraction replayed",
zap.String("name", f.info.Name()),
zap.Uint32("docs_total", f.info.DocsTotal),
util.ZapUint64AsSizeStr("docs_size", f.info.DocsOnDisk),
util.ZapFloat64WithPrec("took_s", tookSeconds, 1),
util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1),
util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1),
)
return nil
}

// replayMetaFileLegacy replays legacy *.meta files. Only basic corruption detection support is implemented
func (f *Active) replayMetaFileLegacy(ctx context.Context) error {
logger.Info("start replaying...", zap.String("name", f.info.Name()))

t := time.Now()
Expand Down Expand Up @@ -175,7 +283,9 @@ out:
offset += metaSize

wg.Add(1)
f.indexer.Index(f, meta, &wg, sw)

walBlock := storage.PackDocBlockToWalBlock(meta)
f.indexer.Index(f, walBlock, &wg, sw)
}
}

Expand Down Expand Up @@ -204,7 +314,7 @@ var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
}, []string{"stage"})

// Append causes data to be written on disk and sends metas to index workers
func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error) {
func (f *Active) Append(docs storage.DocBlock, metas storage.WalBlock, wg *sync.WaitGroup) (err error) {
sw := stopwatch.New()
m := sw.Start("append")
if err = f.writer.Write(docs, metas, sw); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions frac/active_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ActiveIndexer struct {

type indexTask struct {
Frac *Active
Metas storage.DocBlock
Metas storage.WalBlock
Pos uint64
Wg *sync.WaitGroup
}
Expand All @@ -45,10 +45,10 @@ func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) {
return &idx, stopIdx
}

func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
func (ai *ActiveIndexer) Index(frac *Active, metas storage.WalBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) {
m := sw.Start("send_index_chan")
ai.ch <- &indexTask{
Pos: storage.DocBlock(metas).GetExt2(),
Pos: metas.DocsOffset(),
Metas: metas,
Frac: frac,
Wg: wg,
Expand Down
2 changes: 1 addition & 1 deletion frac/active_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func BenchmarkIndexer(b *testing.B) {
bulks := make([][]byte, 0, len(readers))
for _, readNext := range readers {
_, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext)
bulks = append(bulks, storage.CompressDocBlock(meta, nil, 3))
bulks = append(bulks, storage.CompressWalBlock(meta, nil, 3))
}
b.StartTimer()

Expand Down
29 changes: 21 additions & 8 deletions frac/active_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,32 @@ import (
)

type ActiveWriter struct {
docs *FileWriter
meta *FileWriter
docs *storage.FileWriter
meta MetaWriter
}

func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter {
type MetaWriter interface {
Write(data storage.WalBlock, sw *stopwatch.Stopwatch) (int64, error)
Stop()
}

// NewActiveWriter creates a writer for *.wal files
func NewActiveWriter(docsFile, walFile *os.File, docsOffset, walOffset int64, skipFsync bool) *ActiveWriter {
return &ActiveWriter{
docs: storage.NewFileWriter(docsFile, docsOffset, skipFsync),
meta: storage.NewWalWriter(walFile, walOffset, skipFsync),
}
}

// NewActiveWriterLegacy creates a writer for *.meta files
func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter {
return &ActiveWriter{
docs: NewFileWriter(docsFile, docsOffset, skipFsync),
meta: NewFileWriter(metaFile, metaOffset, skipFsync),
docs: storage.NewFileWriter(docsFile, docsOffset, skipFsync),
meta: NewLegacyMetaWriter(storage.NewFileWriter(metaFile, metaOffset, skipFsync)),
}
}

func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error {
func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.WalBlock, sw *stopwatch.Stopwatch) error {
m := sw.Start("write_docs")
offset, err := a.docs.Write(docs, sw)
m.Stop()
Expand All @@ -28,8 +42,7 @@ func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error {
return err
}

storage.DocBlock(meta).SetExt1(uint64(len(docs)))
storage.DocBlock(meta).SetExt2(uint64(offset))
meta.SetDocsOffset(uint64(offset))

m = sw.Start("write_meta")
_, err = a.meta.Write(meta, sw)
Expand Down
47 changes: 43 additions & 4 deletions frac/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/metric/stopwatch"
"github.com/ozontech/seq-db/storage"
)

type testWriterSyncer struct {
Expand Down Expand Up @@ -62,7 +64,7 @@ func (ws *testWriterSyncer) Check(val []byte) bool {

func TestFileWriter(t *testing.T) {
ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond}
fw := NewFileWriter(ws, 0, false)
fw := storage.NewFileWriter(ws, 0, false)

wg := sync.WaitGroup{}
for range 100 {
Expand All @@ -85,7 +87,7 @@ func TestFileWriter(t *testing.T) {

func TestFileWriterNoSync(t *testing.T) {
ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond}
fw := NewFileWriter(ws, 0, true)
fw := storage.NewFileWriter(ws, 0, true)

wg := sync.WaitGroup{}
for range 100 {
Expand All @@ -108,7 +110,7 @@ func TestFileWriterNoSync(t *testing.T) {

func TestFileWriterError(t *testing.T) {
ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true}
fw := NewFileWriter(ws, 0, false)
fw := storage.NewFileWriter(ws, 0, false)

wg := sync.WaitGroup{}
for range 100 {
Expand Down Expand Up @@ -149,7 +151,7 @@ func TestConcurrentFileWriting(t *testing.T) {

defer f.Close()

fw := NewFileWriter(&testRandPauseWriterAt{f: f}, 0, true)
fw := storage.NewFileWriter(&testRandPauseWriterAt{f: f}, 0, true)

const (
writersCount = 100
Expand Down Expand Up @@ -262,3 +264,40 @@ func TestSparseWrite(t *testing.T) {
e = os.Remove(rf.Name())
assert.NoError(t, e)
}

func TestLegacyMetaWriterConvertsWalBlockToDocBlock(t *testing.T) {
f, err := os.Create(t.TempDir() + "/test_wal_block.txt")
require.NoError(t, err)
defer f.Close()

meta := NewLegacyMetaWriter(storage.NewFileWriter(f, 0, false))

originalPayload := []byte("test payload for WalBlock to DocBlock conversion")
walBlock := storage.CompressWalBlock(originalPayload, nil, 3)
walBlock.SetDocsOffset(12345)
walBlock.SetVersion(1)

sw := stopwatch.New()
offset, err := meta.Write(walBlock, sw)
require.NoError(t, err)

meta.Stop()

docBlockSize := storage.DocBlockHeaderLen + walBlock.Len()

readBuf := make([]byte, docBlockSize)
bytesRead, err := f.ReadAt(readBuf, offset)
require.NoError(t, err)
require.Equal(t, int(docBlockSize), bytesRead)
readBuf = readBuf[:bytesRead]

docBlock := storage.DocBlock(readBuf)

assert.Equal(t, storage.CodecZSTD, docBlock.Codec())
assert.Equal(t, uint64(len(originalPayload)), docBlock.RawLen())
assert.Equal(t, uint64(12345), docBlock.GetExt2())

decompressed, err := docBlock.DecompressTo(nil)
require.NoError(t, err)
assert.Equal(t, originalPayload, decompressed)
}
4 changes: 2 additions & 2 deletions frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,15 +1347,15 @@ func (s *FractionTestSuite) TestFractionInfo() {
// but if compression/marshalling has changed, expected values can be updated accordingly
s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match")
// it varies depending on params and docs shuffled
s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(300),
s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(350),
"doc on disk doesn't match. actual value: %d", info.DocsOnDisk)
s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match")
s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match")
s.Require().Equal(seq.MID(946731654000000000), info.To, "to doesn't match")

switch s.fraction.(type) {
case *Active:
s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(350),
s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400),
"meta on disk doesn't match. actual value: %d", info.MetaOnDisk)
s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match")
case *Sealed:
Expand Down
Loading
Loading