From 834e2ed1d21cc8968a65ab71447bb020b0fa006d Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 16 Jan 2026 14:31:44 +0400 Subject: [PATCH 1/7] WAL truncation test --- consts/consts.go | 1 + frac/active.go | 115 +++++- frac/active_indexer.go | 6 +- frac/active_writer.go | 21 +- frac/file_writer.go | 10 + frac/file_writer_test.go | 39 ++ frac/fraction_test.go | 4 +- fracmanager/frac_manifest.go | 11 +- fracmanager/fracmanager.go | 2 +- indexer/compress.go | 4 +- indexer/test_doc_provider.go | 2 +- storage/meta_block.go | 215 +++++++++++ storage/meta_block_test.go | 76 ++++ storage/wal_reader.go | 135 +++++++ storage/wal_writer.go | 165 ++++++++ storage/wal_writer_test.go | 714 +++++++++++++++++++++++++++++++++++ storeapi/grpc_bulk.go | 6 + 17 files changed, 1504 insertions(+), 22 deletions(-) create mode 100644 storage/meta_block.go create mode 100644 storage/meta_block_test.go create mode 100644 storage/wal_reader.go create mode 100644 storage/wal_writer.go create mode 100644 storage/wal_writer_test.go diff --git a/consts/consts.go b/consts/consts.go index 46a81d55..1055d5d1 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -50,6 +50,7 @@ const ( // known extensions MetaFileSuffix = ".meta" + WalFileSuffix = ".wal" DocsFileSuffix = ".docs" DocsDelFileSuffix = ".docs.del" diff --git a/frac/active.go b/frac/active.go index e16b48a1..53f34f81 100644 --- a/frac/active.go +++ b/frac/active.go @@ -2,6 +2,7 @@ package frac import ( "context" + "fmt" "io" "math" "os" @@ -55,6 +56,7 @@ type Active struct { metaFile *os.File metaReader storage.DocBlocksReader + walReader *storage.WalReader writer *ActiveWriter indexer *ActiveIndexer @@ -79,7 +81,30 @@ func NewActive( cfg *Config, ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) - metaFile, metaStats := mustOpenFile(baseFileName+consts.MetaFileSuffix, config.SkipFsync) + + var metaFile *os.File + var metaStats os.FileInfo + var writer *ActiveWriter + var metaReader storage.DocBlocksReader + var walReader *storage.WalReader + var metaSize uint64 + + legacyMetaFileName := baseFileName + consts.MetaFileSuffix + if _, err := os.Stat(legacyMetaFileName); err == nil { + // .meta file exists + metaFile, metaStats = mustOpenFile(legacyMetaFileName, config.SkipFsync) + metaSize = uint64(metaStats.Size()) + metaReader = storage.NewDocBlocksReader(readLimiter, metaFile) + writer = NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) + } else { + walFileName := baseFileName + consts.WalFileSuffix + metaFile, metaStats = mustOpenFile(walFileName, config.SkipFsync) + metaSize = uint64(metaStats.Size()) + walReader = storage.NewWalReader(readLimiter, metaFile, baseFileName) + writer = NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + logger.Info("using new WAL format", zap.String("fraction", baseFileName)) + } f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), @@ -95,13 +120,14 @@ func NewActive( sortReader: storage.NewDocsReader(readLimiter, docsFile, sortCache), metaFile: metaFile, - metaReader: storage.NewDocBlocksReader(readLimiter, metaFile), + metaReader: metaReader, + walReader: walReader, indexer: activeIndexer, - writer: NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync), + writer: writer, BaseFileName: baseFileName, - info: common.NewInfo(baseFileName, uint64(docsStats.Size()), uint64(metaStats.Size())), + info: common.NewInfo(baseFileName, uint64(docsStats.Size()), metaSize), Config: cfg, } @@ -133,6 +159,81 @@ func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) { } func (f *Active) Replay(ctx context.Context) error { + walFileName := f.BaseFileName + consts.WalFileSuffix + if _, err := os.Stat(walFileName); err == nil { + return f.replayWalFile(ctx) + } + + metaFileName := f.BaseFileName + consts.MetaFileSuffix + if _, err := os.Stat(metaFileName); err == nil { + return f.replayMetaFileLegacy(ctx) + } + + logger.Info("neither wal nor legacy meta file was found, skipping replay", zap.String("fraction", f.BaseFileName)) + return nil +} + +func (f *Active) replayWalFile(ctx context.Context) error { + if f.walReader == nil { + return fmt.Errorf("WAL reader not initialized") + } + + logger.Info("start replaying WAL file...", zap.String("name", f.info.Name())) + + t := time.Now() + + step := f.info.MetaOnDisk / 10 + next := step + + sw := stopwatch.New() + wg := sync.WaitGroup{} + + for entry := range f.walReader.Iter() { + // Check for context cancellation + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if entry.Err != nil { + return entry.Err + } + + if uint64(entry.Offset) > next { + next += step + progress := float64(uint64(entry.Offset)) / float64(f.info.MetaOnDisk) * 100 + logger.Info("replaying batch, meta", + zap.String("name", f.info.Name()), + zap.Int64("from", entry.Offset), + zap.Int64("to", entry.Offset+entry.Size), + zap.Uint64("target", f.info.MetaOnDisk), + util.ZapFloat64WithPrec("progress_percentage", progress, 2), + ) + } + + wg.Add(1) + f.indexer.Index(f, entry.Data, &wg, sw) + } + + wg.Wait() + + tookSeconds := util.DurationToUnit(time.Since(t), "s") + throughputRaw := util.SizeToUnit(f.info.DocsRaw, "mb") / tookSeconds + throughputMeta := util.SizeToUnit(f.info.MetaOnDisk, "mb") / tookSeconds + logger.Info("active fraction replayed", + zap.String("name", f.info.Name()), + zap.Uint32("docs_total", f.info.DocsTotal), + util.ZapUint64AsSizeStr("docs_size", f.info.DocsOnDisk), + util.ZapFloat64WithPrec("took_s", tookSeconds, 1), + util.ZapFloat64WithPrec("throughput_raw_mb_sec", throughputRaw, 1), + util.ZapFloat64WithPrec("throughput_meta_mb_sec", throughputMeta, 1), + ) + return nil +} + +// replayMetaFileLegacy replays legacy *.meta files. Only basic corruption detection support is implemented +func (f *Active) replayMetaFileLegacy(ctx context.Context) error { logger.Info("start replaying...", zap.String("name", f.info.Name())) t := time.Now() @@ -175,7 +276,9 @@ out: offset += metaSize wg.Add(1) - f.indexer.Index(f, meta, &wg, sw) + + metaBlock := storage.PackDocBlockToMetaBlock(meta) + f.indexer.Index(f, metaBlock, &wg, sw) } } @@ -204,7 +307,7 @@ var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ }, []string{"stage"}) // Append causes data to be written on disk and sends metas to index workers -func (f *Active) Append(docs, metas []byte, wg *sync.WaitGroup) (err error) { +func (f *Active) Append(docs storage.DocBlock, metas storage.MetaBlock, wg *sync.WaitGroup) (err error) { sw := stopwatch.New() m := sw.Start("append") if err = f.writer.Write(docs, metas, sw); err != nil { diff --git a/frac/active_indexer.go b/frac/active_indexer.go index 5cda8f9f..deeb1619 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -24,7 +24,7 @@ type ActiveIndexer struct { type indexTask struct { Frac *Active - Metas storage.DocBlock + Metas storage.MetaBlock Pos uint64 Wg *sync.WaitGroup } @@ -45,10 +45,10 @@ func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) { return &idx, stopIdx } -func (ai *ActiveIndexer) Index(frac *Active, metas []byte, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { +func (ai *ActiveIndexer) Index(frac *Active, metas storage.MetaBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { m := sw.Start("send_index_chan") ai.ch <- &indexTask{ - Pos: storage.DocBlock(metas).GetExt2(), + Pos: metas.DocsOffset(), Metas: metas, Frac: frac, Wg: wg, diff --git a/frac/active_writer.go b/frac/active_writer.go index 95cf2fdd..52a11428 100644 --- a/frac/active_writer.go +++ b/frac/active_writer.go @@ -9,17 +9,29 @@ import ( type ActiveWriter struct { docs *FileWriter - meta *FileWriter + meta MetaWriter } -func NewActiveWriter(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { +type MetaWriter interface { + Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) + Stop() +} + +func NewActiveWriter(docsFile, walFile *os.File, docsOffset, walOffset int64, skipFsync bool) *ActiveWriter { + return &ActiveWriter{ + docs: NewFileWriter(docsFile, docsOffset, skipFsync), + meta: storage.NewWalWriter(walFile, walOffset, skipFsync), + } +} + +func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { return &ActiveWriter{ docs: NewFileWriter(docsFile, docsOffset, skipFsync), meta: NewFileWriter(metaFile, metaOffset, skipFsync), } } -func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error { +func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.MetaBlock, sw *stopwatch.Stopwatch) error { m := sw.Start("write_docs") offset, err := a.docs.Write(docs, sw) m.Stop() @@ -28,8 +40,7 @@ func (a *ActiveWriter) Write(docs, meta []byte, sw *stopwatch.Stopwatch) error { return err } - storage.DocBlock(meta).SetExt1(uint64(len(docs))) - storage.DocBlock(meta).SetExt2(uint64(offset)) + meta.SetDocsOffset(uint64(offset)) m = sw.Start("write_meta") _, err = a.meta.Write(meta, sw) diff --git a/frac/file_writer.go b/frac/file_writer.go index 9ac5ab9c..683401b7 100644 --- a/frac/file_writer.go +++ b/frac/file_writer.go @@ -6,6 +6,7 @@ import ( "sync/atomic" "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/storage" ) type writeSyncer interface { @@ -21,6 +22,9 @@ type writeSyncer interface { // is performed, after which all requests receive a response about the successful (or unsuccessful) fsync. // // This results in one fsync system call for several writers performing a write at approximately the same time. +// +// FileWriter always stores data in DocBlock format. If MetaBlock is passed to Write, then it's converted to +// DocBlock. type FileWriter struct { ws writeSyncer offset atomic.Int64 @@ -69,6 +73,12 @@ func (fs *FileWriter) syncLoop() { func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { m := sw.Start("write_duration") + if storage.IsMetaBlock(data) { + // MetaBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix) + // This may happen if a new version of store has been deployed while a legacy active fraction with *.meta file exists. + data = storage.PackMetaBlockToDocBlock(data, nil) + } + dataLen := int64(len(data)) offset := fs.offset.Add(dataLen) - dataLen _, err := fs.ws.WriteAt(data, offset) diff --git a/frac/file_writer_test.go b/frac/file_writer_test.go index b72c011b..17680408 100644 --- a/frac/file_writer_test.go +++ b/frac/file_writer_test.go @@ -13,8 +13,10 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/storage" ) type testWriterSyncer struct { @@ -262,3 +264,40 @@ func TestSparseWrite(t *testing.T) { e = os.Remove(rf.Name()) assert.NoError(t, e) } + +func TestFileWriterConvertsMetaBlockToDocBlock(t *testing.T) { + f, err := os.Create(t.TempDir() + "/test_metablock.txt") + require.NoError(t, err) + defer f.Close() + + fw := NewFileWriter(f, 0, false) + + originalPayload := []byte("test payload for MetaBlock to DocBlock conversion") + metaBlock := storage.CompressMetaBlock(originalPayload, nil, 3) + metaBlock.SetDocsOffset(12345) + metaBlock.SetVersion(1) + + sw := stopwatch.New() + offset, err := fw.Write(metaBlock, sw) + require.NoError(t, err) + + fw.Stop() + + docBlockSize := storage.DocBlockHeaderLen + metaBlock.Len() + + readBuf := make([]byte, docBlockSize) + bytesRead, err := f.ReadAt(readBuf, offset) + require.NoError(t, err) + require.Equal(t, int(docBlockSize), bytesRead) + readBuf = readBuf[:bytesRead] + + docBlock := storage.DocBlock(readBuf) + + assert.Equal(t, storage.CodecZSTD, docBlock.Codec()) + assert.Equal(t, uint64(len(originalPayload)), docBlock.RawLen()) + assert.Equal(t, uint64(12345), docBlock.GetExt2()) + + decompressed, err := docBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, originalPayload, decompressed) +} diff --git a/frac/fraction_test.go b/frac/fraction_test.go index d43b30ae..e8d3b0f7 100644 --- a/frac/fraction_test.go +++ b/frac/fraction_test.go @@ -1347,7 +1347,7 @@ func (s *FractionTestSuite) TestFractionInfo() { // but if compression/marshalling has changed, expected values can be updated accordingly s.Require().Equal(uint32(5), info.DocsTotal, "doc total doesn't match") // it varies depending on params and docs shuffled - s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(300), + s.Require().True(info.DocsOnDisk > uint64(200) && info.DocsOnDisk < uint64(350), "doc on disk doesn't match. actual value: %d", info.DocsOnDisk) s.Require().Equal(uint64(583), info.DocsRaw, "doc raw doesn't match") s.Require().Equal(seq.MID(946731625000000000), info.From, "from doesn't match") @@ -1355,7 +1355,7 @@ func (s *FractionTestSuite) TestFractionInfo() { switch s.fraction.(type) { case *Active: - s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(350), + s.Require().True(info.MetaOnDisk >= uint64(250) && info.MetaOnDisk <= uint64(400), "meta on disk doesn't match. actual value: %d", info.MetaOnDisk) s.Require().Equal(uint64(0), info.IndexOnDisk, "index on disk doesn't match") case *Sealed: diff --git a/fracmanager/frac_manifest.go b/fracmanager/frac_manifest.go index eb0eae4a..2a258bda 100644 --- a/fracmanager/frac_manifest.go +++ b/fracmanager/frac_manifest.go @@ -20,7 +20,8 @@ type fracManifest struct { basePath string // base path to fraction files (without extension) hasDocs bool // presence of main documents file hasIndex bool // presence of index file - hasMeta bool // presence of meta-information + hasMeta bool // presence of meta-information (legacy WAL format) + hasWal bool // presence of WAL with meta (new WAL format) hasSdocs bool // presence of sorted documents hasRemote bool // presence of remote fraction @@ -42,6 +43,8 @@ func (m *fracManifest) AddExtension(ext string) error { m.hasDocs = true case consts.MetaFileSuffix: m.hasMeta = true + case consts.WalFileSuffix: + m.hasWal = true case consts.SdocsFileSuffix: m.hasSdocs = true case consts.IndexFileSuffix: @@ -88,7 +91,7 @@ func (m *fracManifest) Stage() fracStage { if m.hasIndex && (m.hasSdocs || m.hasDocs) { return fracStageSealed } - if m.hasMeta && m.hasDocs { + if (m.hasMeta || m.hasWal) && m.hasDocs { return fracStageActive } if m.hasDocsDel || m.hasIndexDel || m.hasSdocsDel { @@ -116,6 +119,10 @@ func removeMeta(m *fracManifest) { util.RemoveFile(m.basePath + consts.MetaFileSuffix) m.hasMeta = false } + if m.hasWal { + util.RemoveFile(m.basePath + consts.WalFileSuffix) + m.hasWal = false + } } func removeIndex(m *fracManifest) { diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 50d7e1f3..92b2981e 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -109,7 +109,7 @@ func (fm *FracManager) Flags() *StateManager { // Append writes documents and metadata to the active fraction // Implements retry logic in case of fraction sealing during write -func (fm *FracManager) Append(ctx context.Context, docs, metas storage.DocBlock) error { +func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas storage.MetaBlock) error { for { select { case <-ctx.Done(): diff --git a/indexer/compress.go b/indexer/compress.go index 9ce4f88a..eff18712 100644 --- a/indexer/compress.go +++ b/indexer/compress.go @@ -14,7 +14,7 @@ type DocsMetasCompressor struct { metaCompressLevel int docsBuf storage.DocBlock - metaBuf storage.DocBlock + metaBuf storage.MetaBlock } var compressorPool = sync.Pool{ @@ -42,7 +42,7 @@ func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte) { // Compress docs block. c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) // Compress metas block. - c.metaBuf = storage.CompressDocBlock(meta, c.metaBuf, c.metaCompressLevel) + c.metaBuf = storage.CompressMetaBlock(meta, c.metaBuf, c.metaCompressLevel) bulkSizeAfterCompression.Observe(float64(len(c.docsBuf) + len(c.metaBuf))) } diff --git a/indexer/test_doc_provider.go b/indexer/test_doc_provider.go index 0af90dde..257ed115 100644 --- a/indexer/test_doc_provider.go +++ b/indexer/test_doc_provider.go @@ -53,7 +53,7 @@ func (dp *TestDocProvider) TryReset() { } -func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.DocBlock) { +func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.MetaBlock) { c := GetDocsMetasCompressor(-1, -1) c.CompressDocsAndMetas(dp.Docs, dp.Metas) return c.DocsMetas() diff --git a/storage/meta_block.go b/storage/meta_block.go new file mode 100644 index 00000000..eba57353 --- /dev/null +++ b/storage/meta_block.go @@ -0,0 +1,215 @@ +package storage + +import ( + "encoding/binary" + "hash/crc32" + + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +const ( + MetaBlockMagic byte = 101 + + offsetMetaBlockMagic = 0 // 1 byte (M) Magic byte (always 101) + offsetMetaBlockVersion = 1 // 1 byte (V) Version + offsetMetaBlockCodec = 2 // 1 byte (C) Codec + offsetMetaBlockLength = 3 // 4 bytes (L) Length of payload + offsetMetaBlockRawLength = 7 // 4 bytes (U) Raw length (after decompression) + offsetMetaBlockPayloadChecksum = 11 // 4 bytes (P) Payload checksum - covers payload only + offsetMetaBlockDocsOffset = 15 // 8 bytes (D) Docs offset + offsetMetaBlockHeaderChecksum = 23 // 4 bytes (H) Header checksum - covers bytes 0-22 + + MetaBlockHeaderLen = 27 + MetaBlockCurrentVersion = uint8(1) +) + +// MetaBlock format: M : V : C : LLLL : UUUU : PPPP : DDDD-DDDD : HHHH +// M = Magic (101), V = Version, C = Codec, L = Length, U = Raw Length, P = Payload Checksum, D = Docs Offset, H = Header Checksum + +type MetaBlock []byte + +func (b MetaBlock) Magic() byte { + return b[offsetMetaBlockMagic] +} + +func (b MetaBlock) Version() uint8 { + return b[offsetMetaBlockVersion] +} + +func (b MetaBlock) SetVersion(version uint8) { + b[offsetMetaBlockVersion] = version +} + +func (b MetaBlock) Codec() Codec { + return Codec(b[offsetMetaBlockCodec]) +} + +func (b MetaBlock) SetCodec(codecVal Codec) { + b[offsetMetaBlockCodec] = byte(codecVal) +} + +func (b MetaBlock) Len() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockLength:]) +} + +func (b MetaBlock) SetLen(val uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockLength:], val) +} + +func (b MetaBlock) FullLen() uint32 { + return b.Len() + MetaBlockHeaderLen +} + +func (b MetaBlock) CalcLen() { + b.SetLen(uint32(len(b) - MetaBlockHeaderLen)) +} + +func (b MetaBlock) RawLen() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockRawLength:]) +} + +func (b MetaBlock) SetRawLen(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockRawLength:], x) +} + +func (b MetaBlock) PayloadChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockPayloadChecksum:]) +} + +func (b MetaBlock) SetPayloadChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockPayloadChecksum:], x) +} + +func (b MetaBlock) CalcPayloadChecksum() { + b.SetPayloadChecksum(crc32.ChecksumIEEE(b.Payload())) +} + +func (b MetaBlock) HeaderChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetMetaBlockHeaderChecksum:]) +} + +func (b MetaBlock) SetHeaderChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetMetaBlockHeaderChecksum:], x) +} + +func (b MetaBlock) CalcHeaderChecksum() { + b.SetHeaderChecksum(crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum])) +} + +func (b MetaBlock) DocsOffset() uint64 { + return binary.LittleEndian.Uint64(b[offsetMetaBlockDocsOffset:]) +} + +// SetDocsOffset updates docs offset. It will also recalc header checksum (cheap). +func (b MetaBlock) SetDocsOffset(x uint64) { + binary.LittleEndian.PutUint64(b[offsetMetaBlockDocsOffset:], x) + b.CalcHeaderChecksum() +} + +func (b MetaBlock) Payload() []byte { + return b[MetaBlockHeaderLen:] +} + +// IsCorrect checks if this is a correct meta block by checking header and payload checksums +func (b MetaBlock) IsCorrect() bool { + return b.IsHeaderCorrect() && b.IsPayloadCorrect() +} + +// IsHeaderCorrect checks if header checksum is correct +func (b MetaBlock) IsHeaderCorrect() bool { + return crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum]) == b.HeaderChecksum() +} + +// IsPayloadCorrect checks if payload checksum is valid +func (b MetaBlock) IsPayloadCorrect() bool { + return crc32.ChecksumIEEE(b.Payload()) == b.PayloadChecksum() +} + +// IsMetaBlock checks if this data is possibly a meta block. +// Returns true if the data has at least MetaBlockHeaderLen bytes and starts with magic byte. +// This doesn't check for corruption, use IsCorrect() for checksum validation. +func IsMetaBlock(data []byte) bool { + return len(data) >= MetaBlockHeaderLen && data[0] == MetaBlockMagic +} + +func CompressMetaBlock(src []byte, dst MetaBlock, zstdLevel int) MetaBlock { + dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup + dst = zstd.CompressLevel(src, dst, zstdLevel) + + dst[offsetMetaBlockMagic] = MetaBlockMagic + dst.SetVersion(MetaBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(src))) + dst.SetCodec(CodecZSTD) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +func PackMetaBlock(payload []byte, dst MetaBlock) MetaBlock { + dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup + dst = append(dst, payload...) + + dst[offsetMetaBlockMagic] = MetaBlockMagic + dst.SetVersion(MetaBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(payload))) + dst.SetCodec(CodecNo) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +// PackMetaBlockToDocBlock converts MetaBlock to legacy DocBlock. +func PackMetaBlockToDocBlock(metaBlock MetaBlock, dst DocBlock) DocBlock { + dst = append(dst[:0], make([]byte, DocBlockHeaderLen)...) + dst = append(dst, metaBlock.Payload()...) + + dst.CalcLen() + dst.SetRawLen(uint64(metaBlock.RawLen())) + dst.SetCodec(metaBlock.Codec()) + dst.SetExt2(metaBlock.DocsOffset()) + + return dst +} + +// PackDocBlockToMetaBlock converts DocBlock to MetaBlock in place without copying payload. +// docBlock will be invalid after packing +func PackDocBlockToMetaBlock(docBlock DocBlock) MetaBlock { + rawLen := uint32(docBlock.RawLen()) + codec := docBlock.Codec() + docsOffset := docBlock.GetExt2() + payloadLen := uint32(len(docBlock) - DocBlockHeaderLen) + + const headerDiff = DocBlockHeaderLen - MetaBlockHeaderLen + mb := MetaBlock(docBlock[headerDiff:]) + + mb[offsetMetaBlockMagic] = MetaBlockMagic + mb.SetVersion(MetaBlockCurrentVersion) + mb.SetLen(payloadLen) + mb.SetRawLen(rawLen) + mb.SetCodec(codec) + // write docs offset directly since SetDocsOffset recalculates header checksum + binary.LittleEndian.PutUint64(mb[offsetMetaBlockDocsOffset:], docsOffset) + mb.CalcPayloadChecksum() + mb.CalcHeaderChecksum() + + return mb +} + +// DecompressTo always put the result in `dst` regardless of whether unpacking is required +// or part of the MetaBlock can be enough. +// +// So MetaBlock does not share the same data with `dst` and can be used safely +func (b MetaBlock) DecompressTo(dst []byte) ([]byte, error) { + payload := b.Payload() + if b.Codec() == CodecNo { + dst = util.EnsureSliceSize(dst, int(b.RawLen())) + copy(dst, payload) + return dst, nil + } + return b.Codec().decompressBlock(int(b.RawLen()), payload, dst) +} diff --git a/storage/meta_block_test.go b/storage/meta_block_test.go new file mode 100644 index 00000000..6bdd492e --- /dev/null +++ b/storage/meta_block_test.go @@ -0,0 +1,76 @@ +package storage + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCorruptionDetection(t *testing.T) { + payload := []byte("test payload for test of checking corrupted data") + block := CompressMetaBlock(payload, nil, 1) + + assert.True(t, block.IsCorrect()) + + // change 1 byte of data + prev := block[35] + block[35] = 3 + assert.False(t, block.IsCorrect()) + block[35] = prev + assert.True(t, block.IsCorrect()) + + // change type (the first byte) + block[0] = 7 + assert.False(t, block.IsCorrect()) + block[0] = MetaBlockMagic + + // change version (second byte) + block[1] = 233 + assert.False(t, block.IsCorrect()) + block[1] = 1 // TODO use meta block version const + + // change length of block + truncated := block[0 : len(block)-3] + assert.False(t, truncated.IsCorrect()) + + appended := append(block, 1, 2, 3) + assert.False(t, appended.IsCorrect()) + + assert.True(t, block.IsCorrect()) +} + +func TestConvertDocToMetaBlock(t *testing.T) { + payload := []byte("test test payload") + + docBlock := CompressDocBlock(payload, nil, 1) + docBlock.SetExt2(11111) + + metaBlock := PackDocBlockToMetaBlock(docBlock) + + assert.Equal(t, CodecZSTD, metaBlock.Codec()) + assert.Equal(t, uint32(len(payload)), metaBlock.RawLen()) + assert.Equal(t, uint64(11111), metaBlock.DocsOffset()) + assert.True(t, metaBlock.IsCorrect()) + + decompressed, err := metaBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, payload, decompressed) +} + +func TestConvertMetaToDocBlock(t *testing.T) { + payload := []byte("test payload data") + + metaBlock := CompressMetaBlock(payload, nil, 1) + metaBlock.SetDocsOffset(22222) + + docBlock := PackMetaBlockToDocBlock(metaBlock, nil) + + assert.Equal(t, CodecZSTD, docBlock.Codec()) + assert.Equal(t, uint64(len(payload)), docBlock.RawLen()) + assert.Equal(t, uint64(22222), docBlock.GetExt2()) + + decompressed, err := docBlock.DecompressTo(nil) + require.NoError(t, err) + assert.Equal(t, payload, decompressed) +} diff --git a/storage/wal_reader.go b/storage/wal_reader.go new file mode 100644 index 00000000..4b0f9f64 --- /dev/null +++ b/storage/wal_reader.go @@ -0,0 +1,135 @@ +package storage + +import ( + "errors" + "io" + "iter" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" +) + +type WalRecord struct { + Data MetaBlock + Offset int64 + Size int64 + Err error +} + +type WalReader struct { + limiter *ReadLimiter + reader io.ReaderAt + headerOffset int64 // offset where actual data starts (WALHeaderSize for new format) + baseFileName string +} + +func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) *WalReader { + return &WalReader{ + limiter: limiter, + reader: reader, + headerOffset: WALHeaderSize, + baseFileName: baseFileName, + } +} + +// Iter iterates through WAL file. Corrupted entries are skipped and never propagated to a client. +// Corruption ranges are logged with "from" and "to" offsets. +func (r *WalReader) Iter() iter.Seq[WalRecord] { + return func(yield func(WalRecord) bool) { + offset := nextBlockOffset(r.headerOffset) + + var corruptionStart int64 = -1 + logCorruptionEnd := func(offset int64) { + if corruptionStart >= 0 { + logger.Error("WAL file corrupted", + zap.String("fraction", r.baseFileName), + zap.Int64("from", corruptionStart), + zap.Int64("to", offset)) + corruptionStart = -1 + } + } + startCorruptionTracking := func(offset int64) { + if corruptionStart < 0 { + corruptionStart = offset + } + } + + for { + headerBuf := make([]byte, MetaBlockHeaderLen) + n, err := r.limiter.ReadAt(r.reader, headerBuf, offset) + + if err != nil && !errors.Is(err, io.EOF) { + logCorruptionEnd(offset) + yield(WalRecord{Offset: offset, Err: err}) + return + } + + if errors.Is(err, io.EOF) || n < MetaBlockHeaderLen { + logCorruptionEnd(offset) + return + } + + if !IsMetaBlock(headerBuf) { + startCorruptionTracking(offset) + offset += BlockAlignment + continue + } + + mb := MetaBlock(headerBuf) + + if !mb.IsHeaderCorrect() { + startCorruptionTracking(offset) + offset += BlockAlignment + continue + } + + // header is correct, try to read the payload + blockLen := int64(mb.FullLen()) + blockBuf := make([]byte, blockLen) + n, err = r.limiter.ReadAt(r.reader, blockBuf, offset) + + if err != nil && !errors.Is(err, io.EOF) { + // this is the last WAL record + // start corruption tracking if not started already and print + startCorruptionTracking(offset) + logCorruptionEnd(offset) + yield(WalRecord{Offset: offset, Err: err}) + return + } + + if errors.Is(err, io.EOF) || int64(n) < blockLen { + startCorruptionTracking(offset) + logCorruptionEnd(offset) + return + } + + mb = blockBuf + + if !mb.IsPayloadCorrect() { + startCorruptionTracking(offset) + offset = nextBlockOffset(offset + blockLen) + continue + } + + logCorruptionEnd(offset) + + entry := WalRecord{ + Data: mb, + Offset: offset, + Size: blockLen, + } + + if !yield(entry) { + return + } + + offset = nextBlockOffset(offset + blockLen) + } + } +} + +// nextBlockOffset aligns provided offset to BlockAlignment +func nextBlockOffset(offset int64) int64 { + return (offset + BlockAlignment - 1) &^ (BlockAlignment - 1) +} diff --git a/storage/wal_writer.go b/storage/wal_writer.go new file mode 100644 index 00000000..f9f0940b --- /dev/null +++ b/storage/wal_writer.go @@ -0,0 +1,165 @@ +package storage + +import ( + "encoding/binary" + "io" + "sync" + "sync/atomic" + + "go.uber.org/zap" + + "github.com/ozontech/seq-db/logger" + "github.com/ozontech/seq-db/metric/stopwatch" +) + +const ( + // WALMagic is the magic number at the start of WAL files + WALMagic uint32 = 0xFFFFFFFF + // WALVersion1 is the first version of WAL file with CRC32 checksums and 64 byte alignment for blocks. + WALVersion1 uint8 = 1 + // WALCurrentVersion is the current WAL format version. + WALCurrentVersion = WALVersion1 + // WALHeaderSize is the size of the WAL header in bytes (4 bytes magic + 1 byte version). 59 bytes are also reserved + // due to alignment + WALHeaderSize = 5 + // BlockAlignment is the alignment boundary for blocks in the new WAL format. Must be greater than + // MetaBlock header (27 bytes) to prevent header torn writes and allow faster navigation during replay + // of corrupted WAL file + BlockAlignment int64 = 64 +) + +type WriteSyncer interface { + io.ReaderAt + io.WriterAt + Sync() error +} + +// WalWriter writes MetaBlocks to a WAL file with header and 64-byte alignment. +// Format: [Header 5B] [... -> align to 64] [MetaBlock] [... -> align to 64] [MetaBlock] ... +type WalWriter struct { + ws WriteSyncer + offset atomic.Int64 + skipSync bool + + mu sync.Mutex + queue []chan error + notify chan struct{} + + wg sync.WaitGroup +} + +func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { + w := &WalWriter{ + ws: ws, + skipSync: skipSync, + notify: make(chan struct{}, 1), + } + + // write a header at the beggining if it's a new file + if offset == 0 { + if err := writeWALHeader(ws); err != nil { + logger.Panic("failed to write WAL header", zap.Error(err)) + } + + if !skipSync { + _ = ws.Sync() + } + w.offset.Store(WALHeaderSize) + } else { + w.offset.Store(nextBlockOffset(offset)) + } + + w.wg.Add(1) + go func() { + w.syncLoop() + w.wg.Done() + }() + + return w +} + +func (w *WalWriter) syncLoop() { + for range w.notify { + w.mu.Lock() + queue := w.queue + w.queue = make([]chan error, 0, len(queue)) + w.mu.Unlock() + + err := w.ws.Sync() + + for _, syncRes := range queue { + syncRes <- err + } + } +} + +// Write writes a MetaBlock to the WAL file. The data must already be a MetaBlock. +// Returns the offset where the MetaBlock starts. +func (w *WalWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { + m := sw.Start("write_duration") + + offset := w.reserveSpace(int64(len(data))) + + if _, err := w.ws.WriteAt(data, offset); err != nil { + m.Stop() + return 0, err + } + m.Stop() + + err := w.sync(m, sw) + + return offset, err +} + +// reserveSpace atomically reserves a necessary space and returns the next position where block may be written. The position +// is aligned to BlockAlignment +func (w *WalWriter) reserveSpace(blockSize int64) int64 { + var result int64 + for { + curr := w.offset.Load() + nextSlotOffset := nextBlockOffset(curr) + + if w.offset.CompareAndSwap(curr, nextSlotOffset+blockSize) { + result = nextSlotOffset + break + } + } + return result +} + +func (w *WalWriter) sync(m stopwatch.Metric, sw *stopwatch.Stopwatch) error { + if w.skipSync { + return nil + } + + m = sw.Start("fsync") + + syncRes := make(chan error) + + w.mu.Lock() + w.queue = append(w.queue, syncRes) + size := len(w.queue) + w.mu.Unlock() + + if size == 1 { + w.notify <- struct{}{} + } + + err := <-syncRes + + m.Stop() + return err +} + +func (w *WalWriter) Stop() { + close(w.notify) + w.wg.Wait() +} + +func writeWALHeader(w io.WriterAt) error { + header := make([]byte, WALHeaderSize) + binary.LittleEndian.PutUint32(header[0:4], WALMagic) + header[4] = WALCurrentVersion + _, err := w.WriteAt(header, 0) + return err +} diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go new file mode 100644 index 00000000..bce84ea3 --- /dev/null +++ b/storage/wal_writer_test.go @@ -0,0 +1,714 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "math/rand/v2" + "os" + "slices" + "strconv" + "sync" + "testing" + "time" + + "github.com/alecthomas/units" + "github.com/stretchr/testify/assert" + + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type testWriterSyncer struct { + mu sync.RWMutex + in [][]byte + out map[string]struct{} + pause time.Duration + err bool + bytes []byte +} + +func TestFileWriter(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewWalWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.True(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterNoSync(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewWalWriter(ws, 0, true) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterError(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} + fw := NewWalWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.Error(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func (ws *testWriterSyncer) WriteAt(p []byte, off int64) (n int, err error) { + ws.mu.Lock() + defer ws.mu.Unlock() + + ws.in = append(ws.in, p) + + // Extend storage if needed and write data + end := int(off) + len(p) + if end > len(ws.bytes) { + newStorage := make([]byte, end) + copy(newStorage, ws.bytes) + ws.bytes = newStorage + } + copy(ws.bytes[off:], p) + + return len(p), nil +} + +func (ws *testWriterSyncer) ReadAt(p []byte, off int64) (n int, err error) { + ws.mu.RLock() + defer ws.mu.RUnlock() + + if int(off) >= len(ws.bytes) { + return 0, io.EOF + } + + n = copy(p, ws.bytes[off:]) + if n < len(p) { + return n, io.EOF + } + return n, nil +} + +func (ws *testWriterSyncer) Sync() error { + time.Sleep(ws.pause) + + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.err { + ws.in = nil + return errors.New("test") + } + + for _, val := range ws.in { + ws.out[string(val)] = struct{}{} + } + ws.in = nil + + return nil +} + +func (ws *testWriterSyncer) Check(val []byte) bool { + ws.mu.RLock() + defer ws.mu.RUnlock() + _, ok := ws.out[string(val)] + return ok +} + +type testRandPauseWriterAt struct { + f *os.File +} + +func (w *testRandPauseWriterAt) WriteAt(p []byte, off int64) (n int, err error) { + // random pause + time.Sleep(time.Microsecond * time.Duration(rand.IntN(20))) + return w.f.WriteAt(p, off) +} + +func (w *testRandPauseWriterAt) ReadAt(p []byte, off int64) (n int, err error) { + return w.f.ReadAt(p, off) +} + +func (w *testRandPauseWriterAt) Sync() error { + return w.f.Sync() +} + +func TestConcurrentFileWriting(t *testing.T) { + f, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + defer f.Close() + + fw := NewWalWriter(&testRandPauseWriterAt{f: f}, 0, true) + + const ( + writersCount = 100 + writesCount = 100 + ) + + type writeSample struct { + offset int64 + payload []byte + } + + wg := sync.WaitGroup{} + samplesQueues := [writersCount][]writeSample{} + + // run writers - write MetaBlocks + for i := range writersCount { + wg.Add(1) + go func() { + defer wg.Done() + + sw := stopwatch.New() + workerName := strconv.Itoa(i) + + for j := range writesCount { + payload := []byte("<" + workerName + "-" + strconv.Itoa(j) + ">") + metaBlock := PackMetaBlock(payload, nil) + offset, e := fw.Write(metaBlock, sw) + assert.NoError(t, e) + + samplesQueues[i] = append(samplesQueues[i], writeSample{payload: payload, offset: offset}) + } + }() + } + + wg.Wait() + + // join and sort all samples by offset + all := make([]writeSample, 0, writersCount*writesCount) + for _, c := range samplesQueues { + all = append(all, c...) + } + slices.SortFunc(all, func(a, b writeSample) int { + if a.offset < b.offset { + return -1 + } + if a.offset > b.offset { + return 1 + } + return 0 + }) + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + idx := 0 + for entry := range reader.Iter() { + assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx) + assert.Equal(t, all[idx].payload, entry.Data.Payload(), "block %d payload mismatch", idx) + idx++ + } + assert.Equal(t, len(all), idx, "should read all blocks") + + s, e := f.Stat() + assert.NoError(t, e) + fmt.Printf("File size: %d bytes, %d blocks written\n", s.Size(), len(all)) + + e = os.Remove(f.Name()) + assert.NoError(t, e) +} + +func TestSparseWrite(t *testing.T) { + wf, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("333"), 30) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("222"), 20) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("111"), 10) + assert.NoError(t, e) + + e = wf.Close() + assert.NoError(t, e) + + rf, e := os.Open(wf.Name()) + buf := make([]byte, 33) + assert.NoError(t, e) + + n, e := rf.Read(buf) + assert.NoError(t, e) + assert.Equal(t, len(buf), n) + + expected := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00111\x00\x00\x00\x00\x00\x00\x00222\x00\x00\x00\x00\x00\x00\x00333") + assert.Equal(t, expected, buf) + + n, e = rf.Read(buf) + assert.Error(t, e) + assert.Equal(t, 0, n) + assert.ErrorIs(t, e, io.EOF) + + e = rf.Close() + assert.NoError(t, e) + + e = os.Remove(rf.Name()) + assert.NoError(t, e) +} + +func TestWalWriterWriteAndRead(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-test-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("block one"), + []byte("block two with more data"), + []byte("block three"), + []byte("fourth block here"), + []byte("and the fifth block"), + } + + offsets := make([]int64, len(payloads)) + sw := stopwatch.New() + + for i, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets[i] = offset + } + + fw.Stop() + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + count := 0 + for entry := range reader.Iter() { + assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count) + assert.Equal(t, payloads[count], entry.Data.Payload(), "block %d payload mismatch", count) + assert.Equal(t, MetaBlockMagic, entry.Data.Magic(), "block %d should have MetaBlock magic", count) + count++ + } + assert.Equal(t, len(payloads), count, "should read all blocks") +} + +func TestWalReaderIteratorEmptyFile(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-iter-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + fw.Stop() + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + count := 0 + for range reader.Iter() { + count++ + } + assert.Equal(t, 0, count) +} + +func TestWalReaderIterator(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-iter-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("first payload data"), + []byte("second payload with more content here"), + []byte("third"), + []byte("fourth payload block"), + []byte("fifth and final payload"), + } + + sw := stopwatch.New() + var expectedOffsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + expectedOffsets = append(expectedOffsets, offset) + } + fw.Stop() + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + var readPayloads [][]byte + var readOffsets []int64 + idx := 0 + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + readOffsets = append(readOffsets, entry.Offset) + idx++ + } + + assert.Equal(t, len(payloads), len(readPayloads)) + for i, expected := range payloads { + assert.Equal(t, expected, readPayloads[i], "block %d payload doesn't match", i) + assert.Equal(t, expectedOffsets[i], readOffsets[i], "block %d offset doesn't match", i) + } +} + +// TestWalReaderSkipsCorruptedBlocks tests very simple single byte corruption in block header +func TestWalReaderSkipsCorruptedBlocks(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-corrupted-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("block one"), + []byte("block two - will be corrupted"), + []byte("block three"), + []byte("block four"), + } + + sw := stopwatch.New() + var offsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + // corrupt block 2 (index 1) by flipping a byte in the header checksum + corruptOffset := offsets[1] + offsetMetaBlockHeaderChecksum + _, err = f.WriteAt([]byte{0xFF}, corruptOffset) + assert.NoError(t, err) + t.Logf("corrupted header checksum at offset %d", corruptOffset) + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + var readPayloads [][]byte + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + t.Logf("read block at offset %d: %q", entry.Offset, entry.Data.Payload()) + } + + assert.Equal(t, 3, len(readPayloads), "should recover 3 out of 4 blocks") + assert.Equal(t, payloads[0], readPayloads[0], "first block should match") + assert.Equal(t, payloads[2], readPayloads[1], "third block should match") + assert.Equal(t, payloads[3], readPayloads[2], "fourth block should match") +} + +// TestWalReaderSkipsCorruptedPayload tests very simple single byte corruption +func TestWalReaderSkipsCorruptedPayload(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-payload-corrupt-*.bin") + assert.NoError(t, err) + defer f.Close() + + fw := NewWalWriter(f, 0, false) + + payloads := [][]byte{ + []byte("first block"), + []byte("second block - payload will be corrupted"), + []byte("third block"), + } + + sw := stopwatch.New() + var offsets []int64 + + for _, payload := range payloads { + metaBlock := PackMetaBlock(payload, nil) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + payloadOffset := offsets[1] + MetaBlockHeaderLen + 5 // corrupt somewhere in payload + _, err = f.WriteAt([]byte{0xFF, 0xFF}, payloadOffset) + assert.NoError(t, err) + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + var readPayloads [][]byte + for entry := range reader.Iter() { + readPayloads = append(readPayloads, entry.Data.Payload()) + } + + assert.Equal(t, 2, len(readPayloads), "should recover 2 out of 3 blocks") + assert.Equal(t, payloads[0], readPayloads[0], "first block should match") + assert.Equal(t, payloads[2], readPayloads[1], "third block should match") +} + +// TestWalReaderSingleByteCorruption tests flipping a random byte in WAL file and verifying that we never +// lose more than a single block on replay +func TestWalReaderSingleByteCorruption(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 10 + maxPayloadLen = int(10 * units.KiB) + ) + + totalLostBlocks := 0 + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-corruption-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + _, err = fw.Write(metaBlock, sw) + assert.NoError(t, err) + } + fw.Stop() + + stat, err := f.Stat() + assert.NoError(t, err) + fileSize := stat.Size() + + // flip a random byte at random offset + // we do not corrupt the first 5 bytes - WAL header, other bytes might be corrupted including block headers + corruptOffset := int64(WALHeaderSize) + rand.Int64N(fileSize-WALHeaderSize) + + originalByte := make([]byte, 1) + _, err = f.ReadAt(originalByte, corruptOffset) + assert.NoError(t, err) + corruptedByte := originalByte[0] ^ 0xFF + _, err = f.WriteAt([]byte{corruptedByte}, corruptOffset) + assert.NoError(t, err) + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + expected := blocks[int(entry.Data.Payload()[0])] + assert.Equal(t, expected, entry.Data, "meta block content doesn't match") + readBlocks++ + } + + lostCount := numBlocks - readBlocks + totalLostBlocks += lostCount + + if lostCount > 1 { + assert.Fail(t, "lost %d blocks", lostCount) + } + + err = f.Close() + assert.NoError(t, err) + } + + assert.NotZero(t, totalLostBlocks, "have not missed a single block across 100 runs") +} + +// TestWalReaderTruncation tests that we can iterate through truncated WAL file. +func TestWalReaderTruncation(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 512 + maxPayloadLen = int(10 * units.KiB) + ) + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-truncate-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + offsets := make([]int64, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + offset, err := fw.Write(metaBlock, sw) + assert.NoError(t, err) + offsets = append(offsets, offset) + } + fw.Stop() + + // choose random block index at which truncation happens + truncateIndex := rand.IntN(numBlocks) + + // this ensures truncation happens somewhere within the chosen block either at the header or the payload + truncateOffset := offsets[truncateIndex] + rand.Int64N(50) + + err = f.Truncate(truncateOffset) + assert.NoError(t, err) + + // validate we can read all blocks from 0 to truncateIndex (exclusive) + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + // verify block index matches expected sequence + blockIndex := int(entry.Data.Payload()[0]) + assert.Equal(t, readBlocks, blockIndex) + + expected := blocks[blockIndex] + assert.Equal(t, expected, entry.Data) + readBlocks++ + } + + // we should have read exactly truncateIndex blocks + assert.Equal(t, truncateIndex, readBlocks) + + err = f.Close() + assert.NoError(t, err) + } +} + +// TestWalReaderSectorLoss tests losing whole disk sectors (512-byte each) +func TestWalReaderSectorLoss(t *testing.T) { + const ( + numRuns = 100 + numBlocks = 100 + minPayloadLen = 128 + maxPayloadLen = int(4 * units.KiB) + sectorSize = 512 + sectorsToCorrupt = 10 + // each disk sector spans 4 blocks at most (each block is at least 128 bytes long) + maxLostBlocks = sectorsToCorrupt * 4 + ) + + for run := range numRuns { + f, err := os.CreateTemp(t.TempDir(), fmt.Sprintf("wal-sector-loss-%d-*.bin", run)) + assert.NoError(t, err) + + fw := NewWalWriter(f, 0, false) + sw := stopwatch.New() + + blocks := make([]MetaBlock, 0) + + // write blocks to WAL + for i := range numBlocks { + payloadLen := minPayloadLen + rand.IntN(maxPayloadLen-minPayloadLen+1) + payload := make([]byte, payloadLen) + + for j := range payload { + payload[j] = byte(rand.IntN(256)) + } + // store the first byte as index of block + payload[0] = byte(i) + + metaBlock := PackMetaBlock(payload, nil) + blocks = append(blocks, metaBlock) + _, err = fw.Write(metaBlock, sw) + assert.NoError(t, err) + } + fw.Stop() + + stat, err := f.Stat() + assert.NoError(t, err) + fileSize := stat.Size() + + numSectors := int(fileSize / sectorSize) + zeroes := make([]byte, sectorSize) + corruptedSectors := make(map[int]bool) + + // zero out 10 random sectors (not the first one which contains WAL header) + for len(corruptedSectors) < sectorsToCorrupt && len(corruptedSectors) < numSectors-1 { + // choose sector index from 1 to numSectors-1 (skip first sector) + idx := 1 + rand.IntN(numSectors-1) + if corruptedSectors[idx] { + continue + } + corruptedSectors[idx] = true + + sectorOffset := int64(idx * sectorSize) + _, err = f.WriteAt(zeroes, sectorOffset) + assert.NoError(t, err) + } + + reader := NewWalReader(NewReadLimiter(1, nil), f, "") + readBlocks := 0 + + for entry := range reader.Iter() { + assert.NoError(t, entry.Err) + assert.True(t, entry.Data.IsCorrect()) + + // validate payload content + blockIndex := int(entry.Data.Payload()[0]) + expected := blocks[blockIndex] + assert.Equal(t, expected, entry.Data) + readBlocks++ + } + + lostCount := numBlocks - readBlocks + if lostCount > maxLostBlocks { + assert.Fail(t, "lost too much blocks") + } + + err = f.Close() + assert.NoError(t, err) + } +} diff --git a/storeapi/grpc_bulk.go b/storeapi/grpc_bulk.go index 67239edd..1f6ecce9 100644 --- a/storeapi/grpc_bulk.go +++ b/storeapi/grpc_bulk.go @@ -14,6 +14,7 @@ import ( "github.com/ozontech/seq-db/logger" "github.com/ozontech/seq-db/metric" "github.com/ozontech/seq-db/pkg/storeapi" + "github.com/ozontech/seq-db/storage" "github.com/ozontech/seq-db/tracing" ) @@ -64,6 +65,11 @@ func (g *GrpcV1) doBulk(ctx context.Context, req *storeapi.BulkRequest) error { start := time.Now() + if !storage.IsMetaBlock(req.Metas) { + // We use MetaBlock now for metas everywhere, DocBlock might be sent by legacy proxy service. + req.Metas = storage.PackDocBlockToMetaBlock(req.Metas) + } + err := g.fracManager.Append(ctx, req.Docs, req.Metas) if err != nil { From 14d3d01d381af9d7e22afb1d1ef45ff95f5a93ed Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Wed, 28 Jan 2026 15:23:36 +0400 Subject: [PATCH 2/7] read WAL header --- frac/active.go | 8 +++- storage/wal_reader.go | 23 ++++++++++- storage/wal_writer_test.go | 81 +++++++++++++++++++++++++++++++++----- 3 files changed, 99 insertions(+), 13 deletions(-) diff --git a/frac/active.go b/frac/active.go index 53f34f81..3cd418e5 100644 --- a/frac/active.go +++ b/frac/active.go @@ -98,12 +98,16 @@ func NewActive( writer = NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) } else { + logger.Info("using new WAL format", zap.String("fraction", baseFileName)) walFileName := baseFileName + consts.WalFileSuffix metaFile, metaStats = mustOpenFile(walFileName, config.SkipFsync) metaSize = uint64(metaStats.Size()) - walReader = storage.NewWalReader(readLimiter, metaFile, baseFileName) writer = NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) - logger.Info("using new WAL format", zap.String("fraction", baseFileName)) + var err error + walReader, err = storage.NewWalReader(readLimiter, metaFile, baseFileName) + if err != nil { + logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err)) + } } f := &Active{ diff --git a/storage/wal_reader.go b/storage/wal_reader.go index 4b0f9f64..63cf93e8 100644 --- a/storage/wal_reader.go +++ b/storage/wal_reader.go @@ -1,7 +1,9 @@ package storage import ( + "encoding/binary" "errors" + "fmt" "io" "iter" @@ -24,13 +26,30 @@ type WalReader struct { baseFileName string } -func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) *WalReader { +func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) (*WalReader, error) { + header := make([]byte, WALHeaderSize) + n, err := limiter.ReadAt(reader, header, 0) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("failed to read WAL header: %w", err) + } + if n < WALHeaderSize { + return nil, fmt.Errorf("WAL file too short: expected at least %d bytes, got %d", WALHeaderSize, n) + } + magic := binary.LittleEndian.Uint32(header[0:4]) + if magic != WALMagic { + return nil, fmt.Errorf("invalid WAL magic: expected 0x%X, got 0x%X", WALMagic, magic) + } + version := header[4] + if version != WALVersion1 { + return nil, fmt.Errorf("unknown WAL version: %d (supported: %d)", version, WALVersion1) + } + return &WalReader{ limiter: limiter, reader: reader, headerOffset: WALHeaderSize, baseFileName: baseFileName, - } + }, nil } // Iter iterates through WAL file. Corrupted entries are skipped and never propagated to a client. diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go index bce84ea3..ea67a3b2 100644 --- a/storage/wal_writer_test.go +++ b/storage/wal_writer_test.go @@ -231,7 +231,8 @@ func TestConcurrentFileWriting(t *testing.T) { return 0 }) - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) idx := 0 for entry := range reader.Iter() { assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx) @@ -314,7 +315,8 @@ func TestWalWriterWriteAndRead(t *testing.T) { fw.Stop() - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) count := 0 for entry := range reader.Iter() { assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count) @@ -333,7 +335,8 @@ func TestWalReaderIteratorEmptyFile(t *testing.T) { fw := NewWalWriter(f, 0, false) fw.Stop() - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) count := 0 for range reader.Iter() { @@ -342,6 +345,60 @@ func TestWalReaderIteratorEmptyFile(t *testing.T) { assert.Equal(t, 0, count) } +func TestWalReaderInvalidMagic(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-invalid-magic-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write invalid magic (not 0xFFFFFFFF) + header := make([]byte, WALHeaderSize) + header[0] = 0x00 + header[1] = 0x00 + header[2] = 0x00 + header[3] = 0x00 + header[4] = WALVersion1 + _, err = f.WriteAt(header, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "invalid WAL magic") +} + +func TestWalReaderUnknownVersion(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-unknown-version-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write correct magic but unknown version + header := make([]byte, WALHeaderSize) + header[0] = 0xFF + header[1] = 0xFF + header[2] = 0xFF + header[3] = 0xFF + header[4] = 99 // unknown version + _, err = f.WriteAt(header, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "unknown WAL version") +} + +func TestWalReaderFileTooShort(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "wal-too-short-*.bin") + assert.NoError(t, err) + defer f.Close() + + // write only 3 bytes (less than WALHeaderSize which is 5) + _, err = f.WriteAt([]byte{0xFF, 0xFF, 0xFF}, 0) + assert.NoError(t, err) + + _, err = NewWalReader(NewReadLimiter(1, nil), f, "") + assert.Error(t, err) + assert.Contains(t, err.Error(), "WAL file too short") +} + func TestWalReaderIterator(t *testing.T) { f, err := os.CreateTemp(t.TempDir(), "wal-iter-*.bin") assert.NoError(t, err) @@ -368,7 +425,8 @@ func TestWalReaderIterator(t *testing.T) { } fw.Stop() - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) var readPayloads [][]byte var readOffsets []int64 @@ -418,7 +476,8 @@ func TestWalReaderSkipsCorruptedBlocks(t *testing.T) { assert.NoError(t, err) t.Logf("corrupted header checksum at offset %d", corruptOffset) - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) var readPayloads [][]byte for entry := range reader.Iter() { @@ -461,7 +520,8 @@ func TestWalReaderSkipsCorruptedPayload(t *testing.T) { _, err = f.WriteAt([]byte{0xFF, 0xFF}, payloadOffset) assert.NoError(t, err) - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) var readPayloads [][]byte for entry := range reader.Iter() { @@ -527,7 +587,8 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { _, err = f.WriteAt([]byte{corruptedByte}, corruptOffset) assert.NoError(t, err) - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) readBlocks := 0 @@ -602,7 +663,8 @@ func TestWalReaderTruncation(t *testing.T) { assert.NoError(t, err) // validate we can read all blocks from 0 to truncateIndex (exclusive) - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) readBlocks := 0 @@ -689,7 +751,8 @@ func TestWalReaderSectorLoss(t *testing.T) { assert.NoError(t, err) } - reader := NewWalReader(NewReadLimiter(1, nil), f, "") + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") + assert.NoError(t, err) readBlocks := 0 for entry := range reader.Iter() { From 67064355019946d939840e47d7e4f9cf4e82f01d Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 30 Jan 2026 11:36:46 +0400 Subject: [PATCH 3/7] linter issues, fix test --- proxy/bulk/ingestor_test.go | 2 +- storage/meta_block_test.go | 6 ++---- storage/wal_writer_test.go | 2 -- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index e38a9180..bdd1d8db 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -430,7 +430,7 @@ func TestProcessDocuments(t *testing.T) { gotDocs = append(gotDocs, docsUnpacker.GetBinary()) } - binaryMetas, err := storage.DocBlock(c.metas).DecompressTo(nil) + binaryMetas, err := storage.MetaBlock(c.metas).DecompressTo(nil) require.NoError(t, err) metasUnpacker := packer.NewBytesUnpacker(binaryMetas) var gotMetas []indexer.MetaData diff --git a/storage/meta_block_test.go b/storage/meta_block_test.go index 6bdd492e..06bf2a7d 100644 --- a/storage/meta_block_test.go +++ b/storage/meta_block_test.go @@ -34,10 +34,8 @@ func TestCorruptionDetection(t *testing.T) { truncated := block[0 : len(block)-3] assert.False(t, truncated.IsCorrect()) - appended := append(block, 1, 2, 3) - assert.False(t, appended.IsCorrect()) - - assert.True(t, block.IsCorrect()) + block = append(block, 1, 2, 3) + assert.False(t, block.IsCorrect()) } func TestConvertDocToMetaBlock(t *testing.T) { diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go index ea67a3b2..7a3cb80e 100644 --- a/storage/wal_writer_test.go +++ b/storage/wal_writer_test.go @@ -430,11 +430,9 @@ func TestWalReaderIterator(t *testing.T) { var readPayloads [][]byte var readOffsets []int64 - idx := 0 for entry := range reader.Iter() { readPayloads = append(readPayloads, entry.Data.Payload()) readOffsets = append(readOffsets, entry.Offset) - idx++ } assert.Equal(t, len(payloads), len(readPayloads)) From 792a7fa85e1cc4bc45205199895adb9e42161577 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 30 Jan 2026 11:47:57 +0400 Subject: [PATCH 4/7] fix bench --- frac/active_indexer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index fc2585c6..8a6862d3 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -99,7 +99,7 @@ func BenchmarkIndexer(b *testing.B) { bulks := make([][]byte, 0, len(readers)) for _, readNext := range readers { _, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext) - bulks = append(bulks, storage.CompressDocBlock(meta, nil, 3)) + bulks = append(bulks, storage.CompressMetaBlock(meta, nil, 3)) } b.StartTimer() From 9779ff6f18af22633135f2995bddb19d3271fcf4 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 20 Feb 2026 12:16:43 +0400 Subject: [PATCH 5/7] rename MetaBlock to WalBlock --- frac/active.go | 6 +- frac/active_indexer.go | 4 +- frac/active_indexer_test.go | 2 +- frac/active_writer.go | 2 +- frac/file_writer.go | 8 +- frac/file_writer_test.go | 16 +- fracmanager/fracmanager.go | 2 +- indexer/compress.go | 4 +- indexer/test_doc_provider.go | 2 +- proxy/bulk/ingestor_test.go | 2 +- storage/meta_block.go | 215 ------------------ storage/wal_block.go | 215 ++++++++++++++++++ .../{meta_block_test.go => wal_block_test.go} | 24 +- storage/wal_reader.go | 47 ++-- storage/wal_writer.go | 60 +++-- storage/wal_writer_test.go | 62 ++--- storeapi/grpc_bulk.go | 6 +- 17 files changed, 337 insertions(+), 340 deletions(-) delete mode 100644 storage/meta_block.go create mode 100644 storage/wal_block.go rename storage/{meta_block_test.go => wal_block_test.go} (71%) diff --git a/frac/active.go b/frac/active.go index 3cd418e5..af2e2750 100644 --- a/frac/active.go +++ b/frac/active.go @@ -281,8 +281,8 @@ out: wg.Add(1) - metaBlock := storage.PackDocBlockToMetaBlock(meta) - f.indexer.Index(f, metaBlock, &wg, sw) + walBlock := storage.PackDocBlockToWalBlock(meta) + f.indexer.Index(f, walBlock, &wg, sw) } } @@ -311,7 +311,7 @@ var bulkStagesSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ }, []string{"stage"}) // Append causes data to be written on disk and sends metas to index workers -func (f *Active) Append(docs storage.DocBlock, metas storage.MetaBlock, wg *sync.WaitGroup) (err error) { +func (f *Active) Append(docs storage.DocBlock, metas storage.WalBlock, wg *sync.WaitGroup) (err error) { sw := stopwatch.New() m := sw.Start("append") if err = f.writer.Write(docs, metas, sw); err != nil { diff --git a/frac/active_indexer.go b/frac/active_indexer.go index deeb1619..9f3cc6ae 100644 --- a/frac/active_indexer.go +++ b/frac/active_indexer.go @@ -24,7 +24,7 @@ type ActiveIndexer struct { type indexTask struct { Frac *Active - Metas storage.MetaBlock + Metas storage.WalBlock Pos uint64 Wg *sync.WaitGroup } @@ -45,7 +45,7 @@ func NewActiveIndexer(workerCount, chLen int) (*ActiveIndexer, func()) { return &idx, stopIdx } -func (ai *ActiveIndexer) Index(frac *Active, metas storage.MetaBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { +func (ai *ActiveIndexer) Index(frac *Active, metas storage.WalBlock, wg *sync.WaitGroup, sw *stopwatch.Stopwatch) { m := sw.Start("send_index_chan") ai.ch <- &indexTask{ Pos: metas.DocsOffset(), diff --git a/frac/active_indexer_test.go b/frac/active_indexer_test.go index 8a6862d3..db798d22 100644 --- a/frac/active_indexer_test.go +++ b/frac/active_indexer_test.go @@ -99,7 +99,7 @@ func BenchmarkIndexer(b *testing.B) { bulks := make([][]byte, 0, len(readers)) for _, readNext := range readers { _, _, meta, _ := processor.ProcessBulk(time.Now(), nil, nil, readNext) - bulks = append(bulks, storage.CompressMetaBlock(meta, nil, 3)) + bulks = append(bulks, storage.CompressWalBlock(meta, nil, 3)) } b.StartTimer() diff --git a/frac/active_writer.go b/frac/active_writer.go index 52a11428..a6a690c5 100644 --- a/frac/active_writer.go +++ b/frac/active_writer.go @@ -31,7 +31,7 @@ func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset i } } -func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.MetaBlock, sw *stopwatch.Stopwatch) error { +func (a *ActiveWriter) Write(docs storage.DocBlock, meta storage.WalBlock, sw *stopwatch.Stopwatch) error { m := sw.Start("write_docs") offset, err := a.docs.Write(docs, sw) m.Stop() diff --git a/frac/file_writer.go b/frac/file_writer.go index 683401b7..ffaa48a0 100644 --- a/frac/file_writer.go +++ b/frac/file_writer.go @@ -23,7 +23,7 @@ type writeSyncer interface { // // This results in one fsync system call for several writers performing a write at approximately the same time. // -// FileWriter always stores data in DocBlock format. If MetaBlock is passed to Write, then it's converted to +// FileWriter always stores data in DocBlock format. If WalBlock is passed to Write, then it's converted to // DocBlock. type FileWriter struct { ws writeSyncer @@ -73,10 +73,10 @@ func (fs *FileWriter) syncLoop() { func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { m := sw.Start("write_duration") - if storage.IsMetaBlock(data) { - // MetaBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix) + if storage.IsWalBlock(data) { + // WalBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix) // This may happen if a new version of store has been deployed while a legacy active fraction with *.meta file exists. - data = storage.PackMetaBlockToDocBlock(data, nil) + data = storage.PackWalBlockToDocBlock(data, nil) } dataLen := int64(len(data)) diff --git a/frac/file_writer_test.go b/frac/file_writer_test.go index 17680408..dcfce50a 100644 --- a/frac/file_writer_test.go +++ b/frac/file_writer_test.go @@ -265,25 +265,25 @@ func TestSparseWrite(t *testing.T) { assert.NoError(t, e) } -func TestFileWriterConvertsMetaBlockToDocBlock(t *testing.T) { - f, err := os.Create(t.TempDir() + "/test_metablock.txt") +func TestFileWriterConvertsWalBlockToDocBlock(t *testing.T) { + f, err := os.Create(t.TempDir() + "/test_wal_block.txt") require.NoError(t, err) defer f.Close() fw := NewFileWriter(f, 0, false) - originalPayload := []byte("test payload for MetaBlock to DocBlock conversion") - metaBlock := storage.CompressMetaBlock(originalPayload, nil, 3) - metaBlock.SetDocsOffset(12345) - metaBlock.SetVersion(1) + originalPayload := []byte("test payload for WalBlock to DocBlock conversion") + walBlock := storage.CompressWalBlock(originalPayload, nil, 3) + walBlock.SetDocsOffset(12345) + walBlock.SetVersion(1) sw := stopwatch.New() - offset, err := fw.Write(metaBlock, sw) + offset, err := fw.Write(walBlock, sw) require.NoError(t, err) fw.Stop() - docBlockSize := storage.DocBlockHeaderLen + metaBlock.Len() + docBlockSize := storage.DocBlockHeaderLen + walBlock.Len() readBuf := make([]byte, docBlockSize) bytesRead, err := f.ReadAt(readBuf, offset) diff --git a/fracmanager/fracmanager.go b/fracmanager/fracmanager.go index 92b2981e..65e62165 100644 --- a/fracmanager/fracmanager.go +++ b/fracmanager/fracmanager.go @@ -109,7 +109,7 @@ func (fm *FracManager) Flags() *StateManager { // Append writes documents and metadata to the active fraction // Implements retry logic in case of fraction sealing during write -func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas storage.MetaBlock) error { +func (fm *FracManager) Append(ctx context.Context, docs storage.DocBlock, metas storage.WalBlock) error { for { select { case <-ctx.Done(): diff --git a/indexer/compress.go b/indexer/compress.go index eff18712..ee706e4c 100644 --- a/indexer/compress.go +++ b/indexer/compress.go @@ -14,7 +14,7 @@ type DocsMetasCompressor struct { metaCompressLevel int docsBuf storage.DocBlock - metaBuf storage.MetaBlock + metaBuf storage.WalBlock } var compressorPool = sync.Pool{ @@ -42,7 +42,7 @@ func (c *DocsMetasCompressor) CompressDocsAndMetas(docs, meta []byte) { // Compress docs block. c.docsBuf = storage.CompressDocBlock(docs, c.docsBuf, c.docsCompressLevel) // Compress metas block. - c.metaBuf = storage.CompressMetaBlock(meta, c.metaBuf, c.metaCompressLevel) + c.metaBuf = storage.CompressWalBlock(meta, c.metaBuf, c.metaCompressLevel) bulkSizeAfterCompression.Observe(float64(len(c.docsBuf) + len(c.metaBuf))) } diff --git a/indexer/test_doc_provider.go b/indexer/test_doc_provider.go index 257ed115..443df9f0 100644 --- a/indexer/test_doc_provider.go +++ b/indexer/test_doc_provider.go @@ -53,7 +53,7 @@ func (dp *TestDocProvider) TryReset() { } -func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.MetaBlock) { +func (dp *TestDocProvider) Provide() (storage.DocBlock, storage.WalBlock) { c := GetDocsMetasCompressor(-1, -1) c.CompressDocsAndMetas(dp.Docs, dp.Metas) return c.DocsMetas() diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index bdd1d8db..0088b0e7 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -430,7 +430,7 @@ func TestProcessDocuments(t *testing.T) { gotDocs = append(gotDocs, docsUnpacker.GetBinary()) } - binaryMetas, err := storage.MetaBlock(c.metas).DecompressTo(nil) + binaryMetas, err := storage.WalBlock(c.metas).DecompressTo(nil) require.NoError(t, err) metasUnpacker := packer.NewBytesUnpacker(binaryMetas) var gotMetas []indexer.MetaData diff --git a/storage/meta_block.go b/storage/meta_block.go deleted file mode 100644 index eba57353..00000000 --- a/storage/meta_block.go +++ /dev/null @@ -1,215 +0,0 @@ -package storage - -import ( - "encoding/binary" - "hash/crc32" - - "github.com/ozontech/seq-db/util" - "github.com/ozontech/seq-db/zstd" -) - -const ( - MetaBlockMagic byte = 101 - - offsetMetaBlockMagic = 0 // 1 byte (M) Magic byte (always 101) - offsetMetaBlockVersion = 1 // 1 byte (V) Version - offsetMetaBlockCodec = 2 // 1 byte (C) Codec - offsetMetaBlockLength = 3 // 4 bytes (L) Length of payload - offsetMetaBlockRawLength = 7 // 4 bytes (U) Raw length (after decompression) - offsetMetaBlockPayloadChecksum = 11 // 4 bytes (P) Payload checksum - covers payload only - offsetMetaBlockDocsOffset = 15 // 8 bytes (D) Docs offset - offsetMetaBlockHeaderChecksum = 23 // 4 bytes (H) Header checksum - covers bytes 0-22 - - MetaBlockHeaderLen = 27 - MetaBlockCurrentVersion = uint8(1) -) - -// MetaBlock format: M : V : C : LLLL : UUUU : PPPP : DDDD-DDDD : HHHH -// M = Magic (101), V = Version, C = Codec, L = Length, U = Raw Length, P = Payload Checksum, D = Docs Offset, H = Header Checksum - -type MetaBlock []byte - -func (b MetaBlock) Magic() byte { - return b[offsetMetaBlockMagic] -} - -func (b MetaBlock) Version() uint8 { - return b[offsetMetaBlockVersion] -} - -func (b MetaBlock) SetVersion(version uint8) { - b[offsetMetaBlockVersion] = version -} - -func (b MetaBlock) Codec() Codec { - return Codec(b[offsetMetaBlockCodec]) -} - -func (b MetaBlock) SetCodec(codecVal Codec) { - b[offsetMetaBlockCodec] = byte(codecVal) -} - -func (b MetaBlock) Len() uint32 { - return binary.LittleEndian.Uint32(b[offsetMetaBlockLength:]) -} - -func (b MetaBlock) SetLen(val uint32) { - binary.LittleEndian.PutUint32(b[offsetMetaBlockLength:], val) -} - -func (b MetaBlock) FullLen() uint32 { - return b.Len() + MetaBlockHeaderLen -} - -func (b MetaBlock) CalcLen() { - b.SetLen(uint32(len(b) - MetaBlockHeaderLen)) -} - -func (b MetaBlock) RawLen() uint32 { - return binary.LittleEndian.Uint32(b[offsetMetaBlockRawLength:]) -} - -func (b MetaBlock) SetRawLen(x uint32) { - binary.LittleEndian.PutUint32(b[offsetMetaBlockRawLength:], x) -} - -func (b MetaBlock) PayloadChecksum() uint32 { - return binary.LittleEndian.Uint32(b[offsetMetaBlockPayloadChecksum:]) -} - -func (b MetaBlock) SetPayloadChecksum(x uint32) { - binary.LittleEndian.PutUint32(b[offsetMetaBlockPayloadChecksum:], x) -} - -func (b MetaBlock) CalcPayloadChecksum() { - b.SetPayloadChecksum(crc32.ChecksumIEEE(b.Payload())) -} - -func (b MetaBlock) HeaderChecksum() uint32 { - return binary.LittleEndian.Uint32(b[offsetMetaBlockHeaderChecksum:]) -} - -func (b MetaBlock) SetHeaderChecksum(x uint32) { - binary.LittleEndian.PutUint32(b[offsetMetaBlockHeaderChecksum:], x) -} - -func (b MetaBlock) CalcHeaderChecksum() { - b.SetHeaderChecksum(crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum])) -} - -func (b MetaBlock) DocsOffset() uint64 { - return binary.LittleEndian.Uint64(b[offsetMetaBlockDocsOffset:]) -} - -// SetDocsOffset updates docs offset. It will also recalc header checksum (cheap). -func (b MetaBlock) SetDocsOffset(x uint64) { - binary.LittleEndian.PutUint64(b[offsetMetaBlockDocsOffset:], x) - b.CalcHeaderChecksum() -} - -func (b MetaBlock) Payload() []byte { - return b[MetaBlockHeaderLen:] -} - -// IsCorrect checks if this is a correct meta block by checking header and payload checksums -func (b MetaBlock) IsCorrect() bool { - return b.IsHeaderCorrect() && b.IsPayloadCorrect() -} - -// IsHeaderCorrect checks if header checksum is correct -func (b MetaBlock) IsHeaderCorrect() bool { - return crc32.ChecksumIEEE(b[:offsetMetaBlockHeaderChecksum]) == b.HeaderChecksum() -} - -// IsPayloadCorrect checks if payload checksum is valid -func (b MetaBlock) IsPayloadCorrect() bool { - return crc32.ChecksumIEEE(b.Payload()) == b.PayloadChecksum() -} - -// IsMetaBlock checks if this data is possibly a meta block. -// Returns true if the data has at least MetaBlockHeaderLen bytes and starts with magic byte. -// This doesn't check for corruption, use IsCorrect() for checksum validation. -func IsMetaBlock(data []byte) bool { - return len(data) >= MetaBlockHeaderLen && data[0] == MetaBlockMagic -} - -func CompressMetaBlock(src []byte, dst MetaBlock, zstdLevel int) MetaBlock { - dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup - dst = zstd.CompressLevel(src, dst, zstdLevel) - - dst[offsetMetaBlockMagic] = MetaBlockMagic - dst.SetVersion(MetaBlockCurrentVersion) - dst.CalcLen() - dst.SetRawLen(uint32(len(src))) - dst.SetCodec(CodecZSTD) - dst.CalcPayloadChecksum() - dst.CalcHeaderChecksum() - - return dst -} - -func PackMetaBlock(payload []byte, dst MetaBlock) MetaBlock { - dst = append(dst[:0], make([]byte, MetaBlockHeaderLen)...) // fill header with zeros for cleanup - dst = append(dst, payload...) - - dst[offsetMetaBlockMagic] = MetaBlockMagic - dst.SetVersion(MetaBlockCurrentVersion) - dst.CalcLen() - dst.SetRawLen(uint32(len(payload))) - dst.SetCodec(CodecNo) - dst.CalcPayloadChecksum() - dst.CalcHeaderChecksum() - - return dst -} - -// PackMetaBlockToDocBlock converts MetaBlock to legacy DocBlock. -func PackMetaBlockToDocBlock(metaBlock MetaBlock, dst DocBlock) DocBlock { - dst = append(dst[:0], make([]byte, DocBlockHeaderLen)...) - dst = append(dst, metaBlock.Payload()...) - - dst.CalcLen() - dst.SetRawLen(uint64(metaBlock.RawLen())) - dst.SetCodec(metaBlock.Codec()) - dst.SetExt2(metaBlock.DocsOffset()) - - return dst -} - -// PackDocBlockToMetaBlock converts DocBlock to MetaBlock in place without copying payload. -// docBlock will be invalid after packing -func PackDocBlockToMetaBlock(docBlock DocBlock) MetaBlock { - rawLen := uint32(docBlock.RawLen()) - codec := docBlock.Codec() - docsOffset := docBlock.GetExt2() - payloadLen := uint32(len(docBlock) - DocBlockHeaderLen) - - const headerDiff = DocBlockHeaderLen - MetaBlockHeaderLen - mb := MetaBlock(docBlock[headerDiff:]) - - mb[offsetMetaBlockMagic] = MetaBlockMagic - mb.SetVersion(MetaBlockCurrentVersion) - mb.SetLen(payloadLen) - mb.SetRawLen(rawLen) - mb.SetCodec(codec) - // write docs offset directly since SetDocsOffset recalculates header checksum - binary.LittleEndian.PutUint64(mb[offsetMetaBlockDocsOffset:], docsOffset) - mb.CalcPayloadChecksum() - mb.CalcHeaderChecksum() - - return mb -} - -// DecompressTo always put the result in `dst` regardless of whether unpacking is required -// or part of the MetaBlock can be enough. -// -// So MetaBlock does not share the same data with `dst` and can be used safely -func (b MetaBlock) DecompressTo(dst []byte) ([]byte, error) { - payload := b.Payload() - if b.Codec() == CodecNo { - dst = util.EnsureSliceSize(dst, int(b.RawLen())) - copy(dst, payload) - return dst, nil - } - return b.Codec().decompressBlock(int(b.RawLen()), payload, dst) -} diff --git a/storage/wal_block.go b/storage/wal_block.go new file mode 100644 index 00000000..d84b7587 --- /dev/null +++ b/storage/wal_block.go @@ -0,0 +1,215 @@ +package storage + +import ( + "encoding/binary" + "hash/crc32" + + "github.com/ozontech/seq-db/util" + "github.com/ozontech/seq-db/zstd" +) + +const ( + WalBlockMagic byte = 101 + + offsetWalBlockMagic = 0 // 1 byte (M) Magic byte (always 101) + offsetWalBlockVersion = 1 // 1 byte (V) Version + offsetWalBlockCodec = 2 // 1 byte (C) Codec + offsetWalBlockLength = 3 // 4 bytes (L) Length of payload + offsetWalBlockRawLength = 7 // 4 bytes (U) Raw length (after decompression) + offsetWalBlockPayloadChecksum = 11 // 4 bytes (P) Payload checksum - covers payload only + offsetWalBlockDocsOffset = 15 // 8 bytes (D) Docs offset + offsetWalBlockHeaderChecksum = 23 // 4 bytes (H) Header checksum - covers bytes 0-22 + + WalBlockHeaderLen = 27 + WalBlockCurrentVersion = uint8(1) +) + +// WalBlock format: M : V : C : LLLL : UUUU : PPPP : DDDD-DDDD : HHHH +// M = Magic (101), V = Version, C = Codec, L = Length, U = Raw Length, P = Payload Checksum, D = Docs Offset, H = Header Checksum + +type WalBlock []byte + +func (b WalBlock) Magic() byte { + return b[offsetWalBlockMagic] +} + +func (b WalBlock) Version() uint8 { + return b[offsetWalBlockVersion] +} + +func (b WalBlock) SetVersion(version uint8) { + b[offsetWalBlockVersion] = version +} + +func (b WalBlock) Codec() Codec { + return Codec(b[offsetWalBlockCodec]) +} + +func (b WalBlock) SetCodec(codecVal Codec) { + b[offsetWalBlockCodec] = byte(codecVal) +} + +func (b WalBlock) Len() uint32 { + return binary.LittleEndian.Uint32(b[offsetWalBlockLength:]) +} + +func (b WalBlock) SetLen(val uint32) { + binary.LittleEndian.PutUint32(b[offsetWalBlockLength:], val) +} + +func (b WalBlock) FullLen() uint32 { + return b.Len() + WalBlockHeaderLen +} + +func (b WalBlock) CalcLen() { + b.SetLen(uint32(len(b) - WalBlockHeaderLen)) +} + +func (b WalBlock) RawLen() uint32 { + return binary.LittleEndian.Uint32(b[offsetWalBlockRawLength:]) +} + +func (b WalBlock) SetRawLen(x uint32) { + binary.LittleEndian.PutUint32(b[offsetWalBlockRawLength:], x) +} + +func (b WalBlock) PayloadChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetWalBlockPayloadChecksum:]) +} + +func (b WalBlock) SetPayloadChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetWalBlockPayloadChecksum:], x) +} + +func (b WalBlock) CalcPayloadChecksum() { + b.SetPayloadChecksum(crc32.ChecksumIEEE(b.Payload())) +} + +func (b WalBlock) HeaderChecksum() uint32 { + return binary.LittleEndian.Uint32(b[offsetWalBlockHeaderChecksum:]) +} + +func (b WalBlock) SetHeaderChecksum(x uint32) { + binary.LittleEndian.PutUint32(b[offsetWalBlockHeaderChecksum:], x) +} + +func (b WalBlock) CalcHeaderChecksum() { + b.SetHeaderChecksum(crc32.ChecksumIEEE(b[:offsetWalBlockHeaderChecksum])) +} + +func (b WalBlock) DocsOffset() uint64 { + return binary.LittleEndian.Uint64(b[offsetWalBlockDocsOffset:]) +} + +// SetDocsOffset updates docs offset. It will also recalc header checksum (cheap). +func (b WalBlock) SetDocsOffset(x uint64) { + binary.LittleEndian.PutUint64(b[offsetWalBlockDocsOffset:], x) + b.CalcHeaderChecksum() +} + +func (b WalBlock) Payload() []byte { + return b[WalBlockHeaderLen:] +} + +// IsCorrect checks if this is a correct meta block by checking header and payload checksums +func (b WalBlock) IsCorrect() bool { + return b.IsHeaderCorrect() && b.IsPayloadCorrect() +} + +// IsHeaderCorrect checks if header checksum is correct +func (b WalBlock) IsHeaderCorrect() bool { + return crc32.ChecksumIEEE(b[:offsetWalBlockHeaderChecksum]) == b.HeaderChecksum() +} + +// IsPayloadCorrect checks if payload checksum is valid +func (b WalBlock) IsPayloadCorrect() bool { + return crc32.ChecksumIEEE(b.Payload()) == b.PayloadChecksum() +} + +// IsWalBlock checks if this data is possibly a meta block. +// Returns true if the data has at least WalBlockHeaderLen bytes and starts with magic byte. +// This doesn't check for corruption, use IsCorrect() for checksum validation. +func IsWalBlock(data []byte) bool { + return len(data) >= WalBlockHeaderLen && data[0] == WalBlockMagic +} + +func CompressWalBlock(src []byte, dst WalBlock, zstdLevel int) WalBlock { + dst = append(dst[:0], make([]byte, WalBlockHeaderLen)...) // fill header with zeros for cleanup + dst = zstd.CompressLevel(src, dst, zstdLevel) + + dst[offsetWalBlockMagic] = WalBlockMagic + dst.SetVersion(WalBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(src))) + dst.SetCodec(CodecZSTD) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +func PackWalBlock(payload []byte, dst WalBlock) WalBlock { + dst = append(dst[:0], make([]byte, WalBlockHeaderLen)...) // fill header with zeros for cleanup + dst = append(dst, payload...) + + dst[offsetWalBlockMagic] = WalBlockMagic + dst.SetVersion(WalBlockCurrentVersion) + dst.CalcLen() + dst.SetRawLen(uint32(len(payload))) + dst.SetCodec(CodecNo) + dst.CalcPayloadChecksum() + dst.CalcHeaderChecksum() + + return dst +} + +// PackWalBlockToDocBlock converts WalBlock to legacy DocBlock. +func PackWalBlockToDocBlock(walBlock WalBlock, dst DocBlock) DocBlock { + dst = append(dst[:0], make([]byte, DocBlockHeaderLen)...) + dst = append(dst, walBlock.Payload()...) + + dst.CalcLen() + dst.SetRawLen(uint64(walBlock.RawLen())) + dst.SetCodec(walBlock.Codec()) + dst.SetExt2(walBlock.DocsOffset()) + + return dst +} + +// PackDocBlockToWalBlock converts DocBlock to WalBlock in place without copying payload. +// docBlock will be invalid after packing +func PackDocBlockToWalBlock(docBlock DocBlock) WalBlock { + rawLen := uint32(docBlock.RawLen()) + codec := docBlock.Codec() + docsOffset := docBlock.GetExt2() + payloadLen := uint32(len(docBlock) - DocBlockHeaderLen) + + const headerDiff = DocBlockHeaderLen - WalBlockHeaderLen + mb := WalBlock(docBlock[headerDiff:]) + + mb[offsetWalBlockMagic] = WalBlockMagic + mb.SetVersion(WalBlockCurrentVersion) + mb.SetLen(payloadLen) + mb.SetRawLen(rawLen) + mb.SetCodec(codec) + // write docs offset directly since SetDocsOffset recalculates header checksum + binary.LittleEndian.PutUint64(mb[offsetWalBlockDocsOffset:], docsOffset) + mb.CalcPayloadChecksum() + mb.CalcHeaderChecksum() + + return mb +} + +// DecompressTo always put the result in `dst` regardless of whether unpacking is required +// or part of the WalBlock can be enough. +// +// So WalBlock does not share the same data with `dst` and can be used safely +func (b WalBlock) DecompressTo(dst []byte) ([]byte, error) { + payload := b.Payload() + if b.Codec() == CodecNo { + dst = util.EnsureSliceSize(dst, int(b.RawLen())) + copy(dst, payload) + return dst, nil + } + return b.Codec().decompressBlock(int(b.RawLen()), payload, dst) +} diff --git a/storage/meta_block_test.go b/storage/wal_block_test.go similarity index 71% rename from storage/meta_block_test.go rename to storage/wal_block_test.go index 06bf2a7d..c18b2a7e 100644 --- a/storage/meta_block_test.go +++ b/storage/wal_block_test.go @@ -9,7 +9,7 @@ import ( func TestCorruptionDetection(t *testing.T) { payload := []byte("test payload for test of checking corrupted data") - block := CompressMetaBlock(payload, nil, 1) + block := CompressWalBlock(payload, nil, 1) assert.True(t, block.IsCorrect()) @@ -23,7 +23,7 @@ func TestCorruptionDetection(t *testing.T) { // change type (the first byte) block[0] = 7 assert.False(t, block.IsCorrect()) - block[0] = MetaBlockMagic + block[0] = WalBlockMagic // change version (second byte) block[1] = 233 @@ -38,20 +38,20 @@ func TestCorruptionDetection(t *testing.T) { assert.False(t, block.IsCorrect()) } -func TestConvertDocToMetaBlock(t *testing.T) { +func TestConvertDocToWalBlock(t *testing.T) { payload := []byte("test test payload") docBlock := CompressDocBlock(payload, nil, 1) docBlock.SetExt2(11111) - metaBlock := PackDocBlockToMetaBlock(docBlock) + walBlock := PackDocBlockToWalBlock(docBlock) - assert.Equal(t, CodecZSTD, metaBlock.Codec()) - assert.Equal(t, uint32(len(payload)), metaBlock.RawLen()) - assert.Equal(t, uint64(11111), metaBlock.DocsOffset()) - assert.True(t, metaBlock.IsCorrect()) + assert.Equal(t, CodecZSTD, walBlock.Codec()) + assert.Equal(t, uint32(len(payload)), walBlock.RawLen()) + assert.Equal(t, uint64(11111), walBlock.DocsOffset()) + assert.True(t, walBlock.IsCorrect()) - decompressed, err := metaBlock.DecompressTo(nil) + decompressed, err := walBlock.DecompressTo(nil) require.NoError(t, err) assert.Equal(t, payload, decompressed) } @@ -59,10 +59,10 @@ func TestConvertDocToMetaBlock(t *testing.T) { func TestConvertMetaToDocBlock(t *testing.T) { payload := []byte("test payload data") - metaBlock := CompressMetaBlock(payload, nil, 1) - metaBlock.SetDocsOffset(22222) + walBlock := CompressWalBlock(payload, nil, 1) + walBlock.SetDocsOffset(22222) - docBlock := PackMetaBlockToDocBlock(metaBlock, nil) + docBlock := PackWalBlockToDocBlock(walBlock, nil) assert.Equal(t, CodecZSTD, docBlock.Codec()) assert.Equal(t, uint64(len(payload)), docBlock.RawLen()) diff --git a/storage/wal_reader.go b/storage/wal_reader.go index 63cf93e8..811acd75 100644 --- a/storage/wal_reader.go +++ b/storage/wal_reader.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "iter" + "unsafe" "go.uber.org/zap" @@ -13,7 +14,7 @@ import ( ) type WalRecord struct { - Data MetaBlock + Data WalBlock Offset int64 Size int64 Err error @@ -22,32 +23,32 @@ type WalRecord struct { type WalReader struct { limiter *ReadLimiter reader io.ReaderAt - headerOffset int64 // offset where actual data starts (WALHeaderSize for new format) + headerOffset int64 // offset where actual data starts (WalHeaderSize for new format) baseFileName string } func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) (*WalReader, error) { - header := make([]byte, WALHeaderSize) + header := make([]byte, WalHeaderSize) n, err := limiter.ReadAt(reader, header, 0) if err != nil && !errors.Is(err, io.EOF) { return nil, fmt.Errorf("failed to read WAL header: %w", err) } - if n < WALHeaderSize { - return nil, fmt.Errorf("WAL file too short: expected at least %d bytes, got %d", WALHeaderSize, n) + if n < WalHeaderSize { + return nil, fmt.Errorf("WAL file too short: expected at least %d bytes, got %d", WalHeaderSize, n) } - magic := binary.LittleEndian.Uint32(header[0:4]) - if magic != WALMagic { - return nil, fmt.Errorf("invalid WAL magic: expected 0x%X, got 0x%X", WALMagic, magic) + magic := binary.LittleEndian.Uint32(header[0:unsafe.Sizeof(WalMagic)]) + if magic != WalMagic { + return nil, fmt.Errorf("invalid WAL magic: expected 0x%X, got 0x%X", WalMagic, magic) } version := header[4] - if version != WALVersion1 { - return nil, fmt.Errorf("unknown WAL version: %d (supported: %d)", version, WALVersion1) + if version != WalVersion1 { + return nil, fmt.Errorf("unknown WAL version: %d (supported: %d)", version, WalVersion1) } return &WalReader{ limiter: limiter, reader: reader, - headerOffset: WALHeaderSize, + headerOffset: WalHeaderSize, baseFileName: baseFileName, }, nil } @@ -56,7 +57,7 @@ func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) // Corruption ranges are logged with "from" and "to" offsets. func (r *WalReader) Iter() iter.Seq[WalRecord] { return func(yield func(WalRecord) bool) { - offset := nextBlockOffset(r.headerOffset) + offset := alignSize(r.headerOffset) var corruptionStart int64 = -1 logCorruptionEnd := func(offset int64) { @@ -74,8 +75,8 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { } } + headerBuf := make([]byte, WalBlockHeaderLen) for { - headerBuf := make([]byte, MetaBlockHeaderLen) n, err := r.limiter.ReadAt(r.reader, headerBuf, offset) if err != nil && !errors.Is(err, io.EOF) { @@ -84,22 +85,22 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { return } - if errors.Is(err, io.EOF) || n < MetaBlockHeaderLen { + if errors.Is(err, io.EOF) || n < WalBlockHeaderLen { logCorruptionEnd(offset) return } - if !IsMetaBlock(headerBuf) { + if !IsWalBlock(headerBuf) { startCorruptionTracking(offset) - offset += BlockAlignment + offset += WalBlockAlignment continue } - mb := MetaBlock(headerBuf) + mb := WalBlock(headerBuf) if !mb.IsHeaderCorrect() { startCorruptionTracking(offset) - offset += BlockAlignment + offset += WalBlockAlignment continue } @@ -127,7 +128,7 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { if !mb.IsPayloadCorrect() { startCorruptionTracking(offset) - offset = nextBlockOffset(offset + blockLen) + offset = alignSize(offset + blockLen) continue } @@ -143,12 +144,12 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { return } - offset = nextBlockOffset(offset + blockLen) + offset = alignSize(offset + blockLen) } } } -// nextBlockOffset aligns provided offset to BlockAlignment -func nextBlockOffset(offset int64) int64 { - return (offset + BlockAlignment - 1) &^ (BlockAlignment - 1) +// alignSize aligns provided offset to WalBlockAlignment +func alignSize(offset int64) int64 { + return (offset + WalBlockAlignment - 1) &^ (WalBlockAlignment - 1) } diff --git a/storage/wal_writer.go b/storage/wal_writer.go index f9f0940b..6f68e3d7 100644 --- a/storage/wal_writer.go +++ b/storage/wal_writer.go @@ -13,19 +13,19 @@ import ( ) const ( - // WALMagic is the magic number at the start of WAL files - WALMagic uint32 = 0xFFFFFFFF - // WALVersion1 is the first version of WAL file with CRC32 checksums and 64 byte alignment for blocks. - WALVersion1 uint8 = 1 + // WalMagic is the magic number at the start of WAL files + WalMagic uint32 = 0xFFFFFFFF + // WalVersion1 is the first version of WAL file with CRC32 checksums and 64 byte alignment for blocks. + WalVersion1 uint8 = 1 // WALCurrentVersion is the current WAL format version. - WALCurrentVersion = WALVersion1 - // WALHeaderSize is the size of the WAL header in bytes (4 bytes magic + 1 byte version). 59 bytes are also reserved + WALCurrentVersion = WalVersion1 + // WalHeaderSize is the size of the WAL header in bytes (4 bytes magic + 1 byte version). 59 bytes are also reserved // due to alignment - WALHeaderSize = 5 - // BlockAlignment is the alignment boundary for blocks in the new WAL format. Must be greater than - // MetaBlock header (27 bytes) to prevent header torn writes and allow faster navigation during replay + WalHeaderSize = 5 + // WalBlockAlignment is the alignment boundary for blocks in the new WAL format. Must be greater than + // WalBlock header (27 bytes) to prevent header torn writes and allow faster navigation during replay // of corrupted WAL file - BlockAlignment int64 = 64 + WalBlockAlignment int64 = 64 ) type WriteSyncer interface { @@ -34,8 +34,8 @@ type WriteSyncer interface { Sync() error } -// WalWriter writes MetaBlocks to a WAL file with header and 64-byte alignment. -// Format: [Header 5B] [... -> align to 64] [MetaBlock] [... -> align to 64] [MetaBlock] ... +// WalWriter writes WalBlocks to a WAL file with header and 64-byte alignment. +// Format: [Header 5B] [... -> align to 64] [WalBlock] [... -> align to 64] [WalBlock] ... type WalWriter struct { ws WriteSyncer offset atomic.Int64 @@ -55,7 +55,7 @@ func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { notify: make(chan struct{}, 1), } - // write a header at the beggining if it's a new file + // write a header at the beginning if it's a new file if offset == 0 { if err := writeWALHeader(ws); err != nil { logger.Panic("failed to write WAL header", zap.Error(err)) @@ -64,9 +64,8 @@ func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { if !skipSync { _ = ws.Sync() } - w.offset.Store(WALHeaderSize) - } else { - w.offset.Store(nextBlockOffset(offset)) + + w.offset.Store(alignSize(WalHeaderSize)) } w.wg.Add(1) @@ -93,8 +92,8 @@ func (w *WalWriter) syncLoop() { } } -// Write writes a MetaBlock to the WAL file. The data must already be a MetaBlock. -// Returns the offset where the MetaBlock starts. +// Write writes a WalBlock to the WAL file. The data must already be a WalBlock. +// Returns the offset where the WalBlock starts. func (w *WalWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { m := sw.Start("write_duration") @@ -112,19 +111,16 @@ func (w *WalWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { } // reserveSpace atomically reserves a necessary space and returns the next position where block may be written. The position -// is aligned to BlockAlignment +// is aligned to WalBlockAlignment func (w *WalWriter) reserveSpace(blockSize int64) int64 { - var result int64 - for { - curr := w.offset.Load() - nextSlotOffset := nextBlockOffset(curr) - - if w.offset.CompareAndSwap(curr, nextSlotOffset+blockSize) { - result = nextSlotOffset - break - } - } - return result + aligned := alignSize(blockSize) + + // w.offset is already aligned. + // So when we add aligned block we still have aligned offset. + end := w.offset.Add(aligned) + start := end - aligned + + return start } func (w *WalWriter) sync(m stopwatch.Metric, sw *stopwatch.Stopwatch) error { @@ -157,8 +153,8 @@ func (w *WalWriter) Stop() { } func writeWALHeader(w io.WriterAt) error { - header := make([]byte, WALHeaderSize) - binary.LittleEndian.PutUint32(header[0:4], WALMagic) + header := make([]byte, WalHeaderSize) + binary.LittleEndian.PutUint32(header[0:4], WalMagic) header[4] = WALCurrentVersion _, err := w.WriteAt(header, 0) return err diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go index 7a3cb80e..1d75c70e 100644 --- a/storage/wal_writer_test.go +++ b/storage/wal_writer_test.go @@ -194,7 +194,7 @@ func TestConcurrentFileWriting(t *testing.T) { wg := sync.WaitGroup{} samplesQueues := [writersCount][]writeSample{} - // run writers - write MetaBlocks + // run writers - write WalBlocks for i := range writersCount { wg.Add(1) go func() { @@ -205,8 +205,8 @@ func TestConcurrentFileWriting(t *testing.T) { for j := range writesCount { payload := []byte("<" + workerName + "-" + strconv.Itoa(j) + ">") - metaBlock := PackMetaBlock(payload, nil) - offset, e := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + offset, e := fw.Write(walBlock, sw) assert.NoError(t, e) samplesQueues[i] = append(samplesQueues[i], writeSample{payload: payload, offset: offset}) @@ -307,8 +307,8 @@ func TestWalWriterWriteAndRead(t *testing.T) { sw := stopwatch.New() for i, payload := range payloads { - metaBlock := PackMetaBlock(payload, nil) - offset, err := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + offset, err := fw.Write(walBlock, sw) assert.NoError(t, err) offsets[i] = offset } @@ -321,7 +321,7 @@ func TestWalWriterWriteAndRead(t *testing.T) { for entry := range reader.Iter() { assert.Equal(t, offsets[count], entry.Offset, "block %d offset mismatch", count) assert.Equal(t, payloads[count], entry.Data.Payload(), "block %d payload mismatch", count) - assert.Equal(t, MetaBlockMagic, entry.Data.Magic(), "block %d should have MetaBlock magic", count) + assert.Equal(t, WalBlockMagic, entry.Data.Magic(), "block %d should have WalBlock magic", count) count++ } assert.Equal(t, len(payloads), count, "should read all blocks") @@ -351,12 +351,12 @@ func TestWalReaderInvalidMagic(t *testing.T) { defer f.Close() // write invalid magic (not 0xFFFFFFFF) - header := make([]byte, WALHeaderSize) + header := make([]byte, WalHeaderSize) header[0] = 0x00 header[1] = 0x00 header[2] = 0x00 header[3] = 0x00 - header[4] = WALVersion1 + header[4] = WalVersion1 _, err = f.WriteAt(header, 0) assert.NoError(t, err) @@ -371,7 +371,7 @@ func TestWalReaderUnknownVersion(t *testing.T) { defer f.Close() // write correct magic but unknown version - header := make([]byte, WALHeaderSize) + header := make([]byte, WalHeaderSize) header[0] = 0xFF header[1] = 0xFF header[2] = 0xFF @@ -390,7 +390,7 @@ func TestWalReaderFileTooShort(t *testing.T) { assert.NoError(t, err) defer f.Close() - // write only 3 bytes (less than WALHeaderSize which is 5) + // write only 3 bytes (less than WalHeaderSize which is 5) _, err = f.WriteAt([]byte{0xFF, 0xFF, 0xFF}, 0) assert.NoError(t, err) @@ -418,8 +418,8 @@ func TestWalReaderIterator(t *testing.T) { var expectedOffsets []int64 for _, payload := range payloads { - metaBlock := PackMetaBlock(payload, nil) - offset, err := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + offset, err := fw.Write(walBlock, sw) assert.NoError(t, err) expectedOffsets = append(expectedOffsets, offset) } @@ -461,15 +461,15 @@ func TestWalReaderSkipsCorruptedBlocks(t *testing.T) { var offsets []int64 for _, payload := range payloads { - metaBlock := PackMetaBlock(payload, nil) - offset, err := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + offset, err := fw.Write(walBlock, sw) assert.NoError(t, err) offsets = append(offsets, offset) } fw.Stop() // corrupt block 2 (index 1) by flipping a byte in the header checksum - corruptOffset := offsets[1] + offsetMetaBlockHeaderChecksum + corruptOffset := offsets[1] + offsetWalBlockHeaderChecksum _, err = f.WriteAt([]byte{0xFF}, corruptOffset) assert.NoError(t, err) t.Logf("corrupted header checksum at offset %d", corruptOffset) @@ -507,14 +507,14 @@ func TestWalReaderSkipsCorruptedPayload(t *testing.T) { var offsets []int64 for _, payload := range payloads { - metaBlock := PackMetaBlock(payload, nil) - offset, err := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + offset, err := fw.Write(walBlock, sw) assert.NoError(t, err) offsets = append(offsets, offset) } fw.Stop() - payloadOffset := offsets[1] + MetaBlockHeaderLen + 5 // corrupt somewhere in payload + payloadOffset := offsets[1] + WalBlockHeaderLen + 5 // corrupt somewhere in payload _, err = f.WriteAt([]byte{0xFF, 0xFF}, payloadOffset) assert.NoError(t, err) @@ -550,7 +550,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { fw := NewWalWriter(f, 0, false) sw := stopwatch.New() - blocks := make([]MetaBlock, 0) + blocks := make([]WalBlock, 0) // write blocks to WAL for i := range numBlocks { @@ -563,9 +563,9 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { // store the first byte as index of block payload[0] = byte(i) - metaBlock := PackMetaBlock(payload, nil) - blocks = append(blocks, metaBlock) - _, err = fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + blocks = append(blocks, walBlock) + _, err = fw.Write(walBlock, sw) assert.NoError(t, err) } fw.Stop() @@ -576,7 +576,7 @@ func TestWalReaderSingleByteCorruption(t *testing.T) { // flip a random byte at random offset // we do not corrupt the first 5 bytes - WAL header, other bytes might be corrupted including block headers - corruptOffset := int64(WALHeaderSize) + rand.Int64N(fileSize-WALHeaderSize) + corruptOffset := int64(WalHeaderSize) + rand.Int64N(fileSize-WalHeaderSize) originalByte := make([]byte, 1) _, err = f.ReadAt(originalByte, corruptOffset) @@ -629,7 +629,7 @@ func TestWalReaderTruncation(t *testing.T) { fw := NewWalWriter(f, 0, false) sw := stopwatch.New() - blocks := make([]MetaBlock, 0) + blocks := make([]WalBlock, 0) offsets := make([]int64, 0) // write blocks to WAL @@ -643,9 +643,9 @@ func TestWalReaderTruncation(t *testing.T) { // store the first byte as index of block payload[0] = byte(i) - metaBlock := PackMetaBlock(payload, nil) - blocks = append(blocks, metaBlock) - offset, err := fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + blocks = append(blocks, walBlock) + offset, err := fw.Write(walBlock, sw) assert.NoError(t, err) offsets = append(offsets, offset) } @@ -707,7 +707,7 @@ func TestWalReaderSectorLoss(t *testing.T) { fw := NewWalWriter(f, 0, false) sw := stopwatch.New() - blocks := make([]MetaBlock, 0) + blocks := make([]WalBlock, 0) // write blocks to WAL for i := range numBlocks { @@ -720,9 +720,9 @@ func TestWalReaderSectorLoss(t *testing.T) { // store the first byte as index of block payload[0] = byte(i) - metaBlock := PackMetaBlock(payload, nil) - blocks = append(blocks, metaBlock) - _, err = fw.Write(metaBlock, sw) + walBlock := PackWalBlock(payload, nil) + blocks = append(blocks, walBlock) + _, err = fw.Write(walBlock, sw) assert.NoError(t, err) } fw.Stop() diff --git a/storeapi/grpc_bulk.go b/storeapi/grpc_bulk.go index 1f6ecce9..9616a090 100644 --- a/storeapi/grpc_bulk.go +++ b/storeapi/grpc_bulk.go @@ -65,9 +65,9 @@ func (g *GrpcV1) doBulk(ctx context.Context, req *storeapi.BulkRequest) error { start := time.Now() - if !storage.IsMetaBlock(req.Metas) { - // We use MetaBlock now for metas everywhere, DocBlock might be sent by legacy proxy service. - req.Metas = storage.PackDocBlockToMetaBlock(req.Metas) + if !storage.IsWalBlock(req.Metas) { + // We use WalBlock now for metas everywhere, DocBlock might be sent by legacy proxy service. + req.Metas = storage.PackDocBlockToWalBlock(req.Metas) } err := g.fracManager.Append(ctx, req.Docs, req.Metas) From 65ef2b97aa63a47bd28729c7f596ea45f34cb04a Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:16:35 +0400 Subject: [PATCH 6/7] use FileWriter in WalWriter --- frac/active_writer.go | 12 +- frac/file_writer_test.go | 16 +- frac/legacy_meta_writer.go | 25 +++ {frac => storage}/file_writer.go | 35 ++-- storage/file_writer_test.go | 264 +++++++++++++++++++++++++++++++ storage/wal_reader.go | 10 +- storage/wal_writer.go | 105 ++---------- storage/wal_writer_test.go | 79 +++------ 8 files changed, 364 insertions(+), 182 deletions(-) create mode 100644 frac/legacy_meta_writer.go rename {frac => storage}/file_writer.go (70%) create mode 100644 storage/file_writer_test.go diff --git a/frac/active_writer.go b/frac/active_writer.go index a6a690c5..279746f0 100644 --- a/frac/active_writer.go +++ b/frac/active_writer.go @@ -8,26 +8,28 @@ import ( ) type ActiveWriter struct { - docs *FileWriter + docs *storage.FileWriter meta MetaWriter } type MetaWriter interface { - Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) + Write(data storage.WalBlock, sw *stopwatch.Stopwatch) (int64, error) Stop() } +// NewActiveWriter creates a writer for *.wal files func NewActiveWriter(docsFile, walFile *os.File, docsOffset, walOffset int64, skipFsync bool) *ActiveWriter { return &ActiveWriter{ - docs: NewFileWriter(docsFile, docsOffset, skipFsync), + docs: storage.NewFileWriter(docsFile, docsOffset, skipFsync), meta: storage.NewWalWriter(walFile, walOffset, skipFsync), } } +// NewActiveWriterLegacy creates a writer for *.meta files func NewActiveWriterLegacy(docsFile, metaFile *os.File, docsOffset, metaOffset int64, skipFsync bool) *ActiveWriter { return &ActiveWriter{ - docs: NewFileWriter(docsFile, docsOffset, skipFsync), - meta: NewFileWriter(metaFile, metaOffset, skipFsync), + docs: storage.NewFileWriter(docsFile, docsOffset, skipFsync), + meta: NewLegacyMetaWriter(storage.NewFileWriter(metaFile, metaOffset, skipFsync)), } } diff --git a/frac/file_writer_test.go b/frac/file_writer_test.go index dcfce50a..a09f65e5 100644 --- a/frac/file_writer_test.go +++ b/frac/file_writer_test.go @@ -64,7 +64,7 @@ func (ws *testWriterSyncer) Check(val []byte) bool { func TestFileWriter(t *testing.T) { ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} - fw := NewFileWriter(ws, 0, false) + fw := storage.NewFileWriter(ws, 0, false) wg := sync.WaitGroup{} for range 100 { @@ -87,7 +87,7 @@ func TestFileWriter(t *testing.T) { func TestFileWriterNoSync(t *testing.T) { ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} - fw := NewFileWriter(ws, 0, true) + fw := storage.NewFileWriter(ws, 0, true) wg := sync.WaitGroup{} for range 100 { @@ -110,7 +110,7 @@ func TestFileWriterNoSync(t *testing.T) { func TestFileWriterError(t *testing.T) { ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} - fw := NewFileWriter(ws, 0, false) + fw := storage.NewFileWriter(ws, 0, false) wg := sync.WaitGroup{} for range 100 { @@ -151,7 +151,7 @@ func TestConcurrentFileWriting(t *testing.T) { defer f.Close() - fw := NewFileWriter(&testRandPauseWriterAt{f: f}, 0, true) + fw := storage.NewFileWriter(&testRandPauseWriterAt{f: f}, 0, true) const ( writersCount = 100 @@ -265,12 +265,12 @@ func TestSparseWrite(t *testing.T) { assert.NoError(t, e) } -func TestFileWriterConvertsWalBlockToDocBlock(t *testing.T) { +func TestLegacyMetaWriterConvertsWalBlockToDocBlock(t *testing.T) { f, err := os.Create(t.TempDir() + "/test_wal_block.txt") require.NoError(t, err) defer f.Close() - fw := NewFileWriter(f, 0, false) + meta := NewLegacyMetaWriter(storage.NewFileWriter(f, 0, false)) originalPayload := []byte("test payload for WalBlock to DocBlock conversion") walBlock := storage.CompressWalBlock(originalPayload, nil, 3) @@ -278,10 +278,10 @@ func TestFileWriterConvertsWalBlockToDocBlock(t *testing.T) { walBlock.SetVersion(1) sw := stopwatch.New() - offset, err := fw.Write(walBlock, sw) + offset, err := meta.Write(walBlock, sw) require.NoError(t, err) - fw.Stop() + meta.Stop() docBlockSize := storage.DocBlockHeaderLen + walBlock.Len() diff --git a/frac/legacy_meta_writer.go b/frac/legacy_meta_writer.go new file mode 100644 index 00000000..b4c97e6f --- /dev/null +++ b/frac/legacy_meta_writer.go @@ -0,0 +1,25 @@ +package frac + +import ( + "github.com/ozontech/seq-db/metric/stopwatch" + "github.com/ozontech/seq-db/storage" +) + +// LegacyMetaWriter is MetaWriter for the legacy *.meta files. Converts new storage.WalBlock block type to +// storage.DocBlock +type LegacyMetaWriter struct { + fw *storage.FileWriter +} + +func NewLegacyMetaWriter(fw *storage.FileWriter) *LegacyMetaWriter { + return &LegacyMetaWriter{fw: fw} +} + +func (l *LegacyMetaWriter) Write(data storage.WalBlock, sw *stopwatch.Stopwatch) (int64, error) { + docBlock := storage.PackWalBlockToDocBlock(data, nil) + return l.fw.Write(docBlock, sw) +} + +func (l *LegacyMetaWriter) Stop() { + l.fw.Stop() +} diff --git a/frac/file_writer.go b/storage/file_writer.go similarity index 70% rename from frac/file_writer.go rename to storage/file_writer.go index ffaa48a0..94ebac37 100644 --- a/frac/file_writer.go +++ b/storage/file_writer.go @@ -1,4 +1,4 @@ -package frac +package storage import ( "io" @@ -6,10 +6,9 @@ import ( "sync/atomic" "github.com/ozontech/seq-db/metric/stopwatch" - "github.com/ozontech/seq-db/storage" ) -type writeSyncer interface { +type fileWriterSyncer interface { io.WriterAt Sync() error } @@ -23,10 +22,9 @@ type writeSyncer interface { // // This results in one fsync system call for several writers performing a write at approximately the same time. // -// FileWriter always stores data in DocBlock format. If WalBlock is passed to Write, then it's converted to -// DocBlock. +// FileWriter does not interpret block format; it only writes bytes and triggers fsync. type FileWriter struct { - ws writeSyncer + ws fileWriterSyncer offset atomic.Int64 skipSync bool @@ -37,7 +35,8 @@ type FileWriter struct { wg sync.WaitGroup } -func NewFileWriter(ws writeSyncer, offset int64, skipSync bool) *FileWriter { +// NewFileWriter creates a new FileWriter. offset is the initial write position for sequential Write. +func NewFileWriter(ws fileWriterSyncer, offset int64, skipSync bool) *FileWriter { fs := &FileWriter{ ws: ws, skipSync: skipSync, @@ -71,16 +70,22 @@ func (fs *FileWriter) syncLoop() { } func (fs *FileWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { - m := sw.Start("write_duration") + offset := fs.ReserveSpace(int64(len(data))) + return fs.writeAt(offset, data, sw) +} - if storage.IsWalBlock(data) { - // WalBlock must be converted to DocBock if is written to a legacy WAL meta file (with *.meta suffix) - // This may happen if a new version of store has been deployed while a legacy active fraction with *.meta file exists. - data = storage.PackWalBlockToDocBlock(data, nil) - } +func (fs *FileWriter) WriteAt(offset int64, data []byte, sw *stopwatch.Stopwatch) (int64, error) { + return fs.writeAt(offset, data, sw) +} +func (fs *FileWriter) ReserveSpace(size int64) int64 { + end := fs.offset.Add(size) + start := end - size + return start +} + +func (fs *FileWriter) writeAt(offset int64, data []byte, sw *stopwatch.Stopwatch) (int64, error) { + m := sw.Start("write_duration") - dataLen := int64(len(data)) - offset := fs.offset.Add(dataLen) - dataLen _, err := fs.ws.WriteAt(data, offset) m.Stop() diff --git a/storage/file_writer_test.go b/storage/file_writer_test.go new file mode 100644 index 00000000..77acc49f --- /dev/null +++ b/storage/file_writer_test.go @@ -0,0 +1,264 @@ +package storage + +import ( + "errors" + "fmt" + "io" + "math/rand/v2" + "os" + "slices" + "strconv" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/ozontech/seq-db/metric/stopwatch" +) + +type testWriterSyncer struct { + mu sync.RWMutex + in [][]byte + out map[string]struct{} + pause time.Duration + err bool +} + +func (ws *testWriterSyncer) WriteAt(p []byte, _ int64) (n int, err error) { + ws.mu.Lock() + defer ws.mu.Unlock() + + ws.in = append(ws.in, p) + + return len(p), nil +} + +func (ws *testWriterSyncer) Sync() error { + time.Sleep(ws.pause) + + ws.mu.Lock() + defer ws.mu.Unlock() + + if ws.err { + ws.in = nil + return errors.New("test") + } + + for _, val := range ws.in { + ws.out[string(val)] = struct{}{} + } + ws.in = nil + + return nil +} + +func (ws *testWriterSyncer) Check(val []byte) bool { + ws.mu.RLock() + defer ws.mu.RUnlock() + _, ok := ws.out[string(val)] + return ok +} + +func TestFileWriter(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewFileWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.True(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterNoSync(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} + fw := NewFileWriter(ws, 0, true) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.NoError(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +func TestFileWriterError(t *testing.T) { + ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} + fw := NewFileWriter(ws, 0, false) + + wg := sync.WaitGroup{} + for range 100 { + wg.Add(1) + go func() { + for range 100 { + sw := stopwatch.New() + k := []byte(strconv.FormatUint(rand.Uint64(), 16)) + _, err := fw.Write(k, sw) + assert.Error(t, err) + assert.False(t, ws.Check(k)) + } + wg.Done() + }() + } + + wg.Wait() + fw.Stop() +} + +type testRandPauseWriterAt struct { + f *os.File +} + +func (w *testRandPauseWriterAt) WriteAt(p []byte, off int64) (n int, err error) { + // random pause + time.Sleep(time.Microsecond * time.Duration(rand.IntN(20))) + return w.f.WriteAt(p, off) +} + +func (w *testRandPauseWriterAt) Sync() error { + return w.f.Sync() +} + +func TestConcurrentFileWriting(t *testing.T) { + f, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + defer f.Close() + + fw := NewFileWriter(&testRandPauseWriterAt{f: f}, 0, true) + + const ( + writersCount = 100 + writesCount = 1000 + ) + + type writeSample struct { + offset int64 + data []byte + } + + wg := sync.WaitGroup{} + samplesQueues := [writersCount][]writeSample{} + + // run writers + for i := range writersCount { + wg.Add(1) + go func() { + defer wg.Done() + + sw := stopwatch.New() + workerName := strconv.Itoa(i) + + for j := range writesCount { + + data := []byte("<" + workerName + "-" + strconv.Itoa(j) + ">") + offset, e := fw.Write(data, sw) + assert.NoError(t, e) + + samplesQueues[i] = append(samplesQueues[i], writeSample{data: data, offset: offset}) + } + }() + } + + wg.Wait() + + // join and sort all samples by offset + all := make([]writeSample, 0, writersCount*writersCount) + for _, c := range samplesQueues { + all = append(all, c...) + } + slices.SortFunc(all, func(a, b writeSample) int { + if a.offset < b.offset { + return -1 + } + if a.offset > b.offset { + return 1 + } + return 0 + }) + + // check all samples and file content + offset := int64(0) + buf := make([]byte, 1000) + for _, w := range all { + s := len(w.data) + buf = buf[:s] + _, e = f.ReadAt(buf, int64(offset)) + assert.NoError(t, e) + assert.Equal(t, w.data, buf) + assert.Equal(t, w.offset, offset) + offset += int64(s) + } + + s, e := f.Stat() + assert.NoError(t, e) + fmt.Println(s.Size()) + + assert.Equal(t, offset, s.Size()) + + e = os.Remove(f.Name()) + assert.NoError(t, e) +} + +func TestSparseWrite(t *testing.T) { + wf, e := os.Create(t.TempDir() + "/test.txt") + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("333"), 30) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("222"), 20) + assert.NoError(t, e) + + _, e = wf.WriteAt([]byte("111"), 10) + assert.NoError(t, e) + + e = wf.Close() + assert.NoError(t, e) + + rf, e := os.Open(wf.Name()) + buf := make([]byte, 33) + assert.NoError(t, e) + + n, e := rf.Read(buf) + assert.NoError(t, e) + assert.Equal(t, len(buf), n) + + expected := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00111\x00\x00\x00\x00\x00\x00\x00222\x00\x00\x00\x00\x00\x00\x00333") + assert.Equal(t, expected, buf) + + n, e = rf.Read(buf) + assert.Error(t, e) + assert.Equal(t, 0, n) + assert.ErrorIs(t, e, io.EOF) + + e = rf.Close() + assert.NoError(t, e) + + e = os.Remove(rf.Name()) + assert.NoError(t, e) +} diff --git a/storage/wal_reader.go b/storage/wal_reader.go index 811acd75..0c926132 100644 --- a/storage/wal_reader.go +++ b/storage/wal_reader.go @@ -57,7 +57,7 @@ func NewWalReader(limiter *ReadLimiter, reader io.ReaderAt, baseFileName string) // Corruption ranges are logged with "from" and "to" offsets. func (r *WalReader) Iter() iter.Seq[WalRecord] { return func(yield func(WalRecord) bool) { - offset := alignSize(r.headerOffset) + offset := align(r.headerOffset) var corruptionStart int64 = -1 logCorruptionEnd := func(offset int64) { @@ -128,7 +128,7 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { if !mb.IsPayloadCorrect() { startCorruptionTracking(offset) - offset = alignSize(offset + blockLen) + offset = align(offset + blockLen) continue } @@ -144,12 +144,12 @@ func (r *WalReader) Iter() iter.Seq[WalRecord] { return } - offset = alignSize(offset + blockLen) + offset = align(offset + blockLen) } } } -// alignSize aligns provided offset to WalBlockAlignment -func alignSize(offset int64) int64 { +// align aligns provided offset to WalBlockAlignment +func align(offset int64) int64 { return (offset + WalBlockAlignment - 1) &^ (WalBlockAlignment - 1) } diff --git a/storage/wal_writer.go b/storage/wal_writer.go index 6f68e3d7..d16d181d 100644 --- a/storage/wal_writer.go +++ b/storage/wal_writer.go @@ -3,8 +3,6 @@ package storage import ( "encoding/binary" "io" - "sync" - "sync/atomic" "go.uber.org/zap" @@ -34,28 +32,18 @@ type WriteSyncer interface { Sync() error } -// WalWriter writes WalBlocks to a WAL file with header and 64-byte alignment. +// WalWriter writes WalBlock to a WAL file with header and 64-byte alignment. +// It works on top of FileWriter, but it also maintains header at the beginning of the file and align block +// offsets to WalBlockAlignment. // Format: [Header 5B] [... -> align to 64] [WalBlock] [... -> align to 64] [WalBlock] ... type WalWriter struct { - ws WriteSyncer - offset atomic.Int64 - skipSync bool - - mu sync.Mutex - queue []chan error - notify chan struct{} - - wg sync.WaitGroup + fw *FileWriter } func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { - w := &WalWriter{ - ws: ws, - skipSync: skipSync, - notify: make(chan struct{}, 1), - } + w := &WalWriter{} - // write a header at the beginning if it's a new file + offset = align(offset) if offset == 0 { if err := writeWALHeader(ws); err != nil { logger.Panic("failed to write WAL header", zap.Error(err)) @@ -65,91 +53,22 @@ func NewWalWriter(ws WriteSyncer, offset int64, skipSync bool) *WalWriter { _ = ws.Sync() } - w.offset.Store(alignSize(WalHeaderSize)) + offset = align(WalHeaderSize) } - w.wg.Add(1) - go func() { - w.syncLoop() - w.wg.Done() - }() - + w.fw = NewFileWriter(ws, offset, skipSync) return w } -func (w *WalWriter) syncLoop() { - for range w.notify { - w.mu.Lock() - queue := w.queue - w.queue = make([]chan error, 0, len(queue)) - w.mu.Unlock() - - err := w.ws.Sync() - - for _, syncRes := range queue { - syncRes <- err - } - } -} - // Write writes a WalBlock to the WAL file. The data must already be a WalBlock. // Returns the offset where the WalBlock starts. -func (w *WalWriter) Write(data []byte, sw *stopwatch.Stopwatch) (int64, error) { - m := sw.Start("write_duration") - - offset := w.reserveSpace(int64(len(data))) - - if _, err := w.ws.WriteAt(data, offset); err != nil { - m.Stop() - return 0, err - } - m.Stop() - - err := w.sync(m, sw) - - return offset, err -} - -// reserveSpace atomically reserves a necessary space and returns the next position where block may be written. The position -// is aligned to WalBlockAlignment -func (w *WalWriter) reserveSpace(blockSize int64) int64 { - aligned := alignSize(blockSize) - - // w.offset is already aligned. - // So when we add aligned block we still have aligned offset. - end := w.offset.Add(aligned) - start := end - aligned - - return start -} - -func (w *WalWriter) sync(m stopwatch.Metric, sw *stopwatch.Stopwatch) error { - if w.skipSync { - return nil - } - - m = sw.Start("fsync") - - syncRes := make(chan error) - - w.mu.Lock() - w.queue = append(w.queue, syncRes) - size := len(w.queue) - w.mu.Unlock() - - if size == 1 { - w.notify <- struct{}{} - } - - err := <-syncRes - - m.Stop() - return err +func (w *WalWriter) Write(data WalBlock, sw *stopwatch.Stopwatch) (int64, error) { + offset := w.fw.ReserveSpace(align(int64(len(data)))) + return w.fw.WriteAt(offset, data, sw) } func (w *WalWriter) Stop() { - close(w.notify) - w.wg.Wait() + w.fw.Stop() } func writeWALHeader(w io.WriterAt) error { diff --git a/storage/wal_writer_test.go b/storage/wal_writer_test.go index 1d75c70e..1d10dbba 100644 --- a/storage/wal_writer_test.go +++ b/storage/wal_writer_test.go @@ -18,7 +18,7 @@ import ( "github.com/ozontech/seq-db/metric/stopwatch" ) -type testWriterSyncer struct { +type walTestWriterSyncer struct { mu sync.RWMutex in [][]byte out map[string]struct{} @@ -27,8 +27,8 @@ type testWriterSyncer struct { bytes []byte } -func TestFileWriter(t *testing.T) { - ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} +func TestWalWriter(t *testing.T) { + ws := &walTestWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} fw := NewWalWriter(ws, 0, false) wg := sync.WaitGroup{} @@ -50,8 +50,8 @@ func TestFileWriter(t *testing.T) { fw.Stop() } -func TestFileWriterNoSync(t *testing.T) { - ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} +func TestWalWriterNoSync(t *testing.T) { + ws := &walTestWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond} fw := NewWalWriter(ws, 0, true) wg := sync.WaitGroup{} @@ -73,8 +73,8 @@ func TestFileWriterNoSync(t *testing.T) { fw.Stop() } -func TestFileWriterError(t *testing.T) { - ws := &testWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} +func TestWalWriterError(t *testing.T) { + ws := &walTestWriterSyncer{out: map[string]struct{}{}, pause: time.Millisecond, err: true} fw := NewWalWriter(ws, 0, false) wg := sync.WaitGroup{} @@ -96,7 +96,7 @@ func TestFileWriterError(t *testing.T) { fw.Stop() } -func (ws *testWriterSyncer) WriteAt(p []byte, off int64) (n int, err error) { +func (ws *walTestWriterSyncer) WriteAt(p []byte, off int64) (n int, err error) { ws.mu.Lock() defer ws.mu.Unlock() @@ -114,7 +114,7 @@ func (ws *testWriterSyncer) WriteAt(p []byte, off int64) (n int, err error) { return len(p), nil } -func (ws *testWriterSyncer) ReadAt(p []byte, off int64) (n int, err error) { +func (ws *walTestWriterSyncer) ReadAt(p []byte, off int64) (n int, err error) { ws.mu.RLock() defer ws.mu.RUnlock() @@ -129,7 +129,7 @@ func (ws *testWriterSyncer) ReadAt(p []byte, off int64) (n int, err error) { return n, nil } -func (ws *testWriterSyncer) Sync() error { +func (ws *walTestWriterSyncer) Sync() error { time.Sleep(ws.pause) ws.mu.Lock() @@ -148,38 +148,38 @@ func (ws *testWriterSyncer) Sync() error { return nil } -func (ws *testWriterSyncer) Check(val []byte) bool { +func (ws *walTestWriterSyncer) Check(val []byte) bool { ws.mu.RLock() defer ws.mu.RUnlock() _, ok := ws.out[string(val)] return ok } -type testRandPauseWriterAt struct { +type walTestRandPauseWriterAt struct { f *os.File } -func (w *testRandPauseWriterAt) WriteAt(p []byte, off int64) (n int, err error) { +func (w *walTestRandPauseWriterAt) WriteAt(p []byte, off int64) (n int, err error) { // random pause time.Sleep(time.Microsecond * time.Duration(rand.IntN(20))) return w.f.WriteAt(p, off) } -func (w *testRandPauseWriterAt) ReadAt(p []byte, off int64) (n int, err error) { +func (w *walTestRandPauseWriterAt) ReadAt(p []byte, off int64) (n int, err error) { return w.f.ReadAt(p, off) } -func (w *testRandPauseWriterAt) Sync() error { +func (w *walTestRandPauseWriterAt) Sync() error { return w.f.Sync() } -func TestConcurrentFileWriting(t *testing.T) { +func TestWalWriter_ConcurrentWrites(t *testing.T) { f, e := os.Create(t.TempDir() + "/test.txt") assert.NoError(t, e) defer f.Close() - fw := NewWalWriter(&testRandPauseWriterAt{f: f}, 0, true) + fw := NewWalWriter(&walTestRandPauseWriterAt{f: f}, 0, true) const ( writersCount = 100 @@ -231,13 +231,19 @@ func TestConcurrentFileWriting(t *testing.T) { return 0 }) + // validate all blocks are aligned and there is minimum of wasted space + reader, err := NewWalReader(NewReadLimiter(1, nil), f, "") assert.NoError(t, err) + offset := WalBlockAlignment idx := 0 for entry := range reader.Iter() { + assert.Equal(t, offset, entry.Offset, "block %d offset mismatch", idx) assert.Equal(t, all[idx].offset, entry.Offset, "block %d offset mismatch", idx) assert.Equal(t, all[idx].payload, entry.Data.Payload(), "block %d payload mismatch", idx) idx++ + // messages are small, so the next slot is exactly the next value aligned to 64 + offset += WalBlockAlignment } assert.Equal(t, len(all), idx, "should read all blocks") @@ -249,45 +255,6 @@ func TestConcurrentFileWriting(t *testing.T) { assert.NoError(t, e) } -func TestSparseWrite(t *testing.T) { - wf, e := os.Create(t.TempDir() + "/test.txt") - assert.NoError(t, e) - - _, e = wf.WriteAt([]byte("333"), 30) - assert.NoError(t, e) - - _, e = wf.WriteAt([]byte("222"), 20) - assert.NoError(t, e) - - _, e = wf.WriteAt([]byte("111"), 10) - assert.NoError(t, e) - - e = wf.Close() - assert.NoError(t, e) - - rf, e := os.Open(wf.Name()) - buf := make([]byte, 33) - assert.NoError(t, e) - - n, e := rf.Read(buf) - assert.NoError(t, e) - assert.Equal(t, len(buf), n) - - expected := []byte("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00111\x00\x00\x00\x00\x00\x00\x00222\x00\x00\x00\x00\x00\x00\x00333") - assert.Equal(t, expected, buf) - - n, e = rf.Read(buf) - assert.Error(t, e) - assert.Equal(t, 0, n) - assert.ErrorIs(t, e, io.EOF) - - e = rf.Close() - assert.NoError(t, e) - - e = os.Remove(rf.Name()) - assert.NoError(t, e) -} - func TestWalWriterWriteAndRead(t *testing.T) { f, err := os.CreateTemp(t.TempDir(), "wal-test-*.bin") assert.NoError(t, err) From 4375be9555a1329687094af6c70e6d0bce3b2da2 Mon Sep 17 00:00:00 2001 From: Andrei Cheboksarov <37665782+cheb0@users.noreply.github.com> Date: Fri, 20 Feb 2026 17:02:54 +0400 Subject: [PATCH 7/7] move meta reader/writer initialization to a separate method --- frac/active.go | 59 ++++++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/frac/active.go b/frac/active.go index af2e2750..cebc7220 100644 --- a/frac/active.go +++ b/frac/active.go @@ -55,7 +55,7 @@ type Active struct { sortCache *cache.Cache[[]byte] metaFile *os.File - metaReader storage.DocBlocksReader + metaReader *storage.DocBlocksReader walReader *storage.WalReader writer *ActiveWriter @@ -82,33 +82,7 @@ func NewActive( ) *Active { docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) - var metaFile *os.File - var metaStats os.FileInfo - var writer *ActiveWriter - var metaReader storage.DocBlocksReader - var walReader *storage.WalReader - var metaSize uint64 - - legacyMetaFileName := baseFileName + consts.MetaFileSuffix - if _, err := os.Stat(legacyMetaFileName); err == nil { - // .meta file exists - metaFile, metaStats = mustOpenFile(legacyMetaFileName, config.SkipFsync) - metaSize = uint64(metaStats.Size()) - metaReader = storage.NewDocBlocksReader(readLimiter, metaFile) - writer = NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) - logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) - } else { - logger.Info("using new WAL format", zap.String("fraction", baseFileName)) - walFileName := baseFileName + consts.WalFileSuffix - metaFile, metaStats = mustOpenFile(walFileName, config.SkipFsync) - metaSize = uint64(metaStats.Size()) - writer = NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) - var err error - walReader, err = storage.NewWalReader(readLimiter, metaFile, baseFileName) - if err != nil { - logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err)) - } - } + metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats) f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), @@ -144,6 +118,35 @@ func NewActive( return f } +func mustOpenMetaWriter( + baseFileName string, + readLimiter *storage.ReadLimiter, + docsFile *os.File, + docsStats os.FileInfo) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) { + legacyMetaFileName := baseFileName + consts.MetaFileSuffix + + if _, err := os.Stat(legacyMetaFileName); err == nil { + // .meta file exists + metaFile, metaStats := mustOpenFile(legacyMetaFileName, config.SkipFsync) + metaSize := uint64(metaStats.Size()) + metaReader := storage.NewDocBlocksReader(readLimiter, metaFile) + writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) + return metaFile, writer, &metaReader, nil, metaSize + } else { + logger.Info("using new WAL format", zap.String("fraction", baseFileName)) + walFileName := baseFileName + consts.WalFileSuffix + metaFile, metaStats := mustOpenFile(walFileName, config.SkipFsync) + metaSize := uint64(metaStats.Size()) + writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + walReader, err := storage.NewWalReader(readLimiter, metaFile, baseFileName) + if err != nil { + logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err)) + } + return metaFile, writer, nil, walReader, metaSize + } +} + func mustOpenFile(name string, skipFsync bool) (*os.File, os.FileInfo) { file, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0o776) if err != nil {