From 999a045eb428adfde44f863e2d56c5d97a4209c3 Mon Sep 17 00:00:00 2001 From: ankur-anand Date: Fri, 7 Feb 2025 18:38:16 +0530 Subject: [PATCH 1/2] fix: support concurrent read/write from same active segment. --- segment.go | 93 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/segment.go b/segment.go index c3fde85..847aa01 100644 --- a/segment.go +++ b/segment.go @@ -8,6 +8,7 @@ import ( "io" "os" "sync" + "sync/atomic" "github.com/valyala/bytebufferpool" ) @@ -49,9 +50,9 @@ const ( type segment struct { id SegmentID fd *os.File - currentBlockNumber uint32 - currentBlockSize uint32 - closed bool + currentBlockNumber atomic.Uint32 + currentBlockSize atomic.Uint32 + closed atomic.Bool header []byte startupBlock *startupBlock isStartupTraversal bool @@ -118,18 +119,20 @@ func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) { return nil, fmt.Errorf("seek to the end of segment file %d%s failed: %v", id, extName, err) } - return &segment{ - id: id, - fd: fd, - header: make([]byte, chunkHeaderSize), - currentBlockNumber: uint32(offset / blockSize), - currentBlockSize: uint32(offset % blockSize), + s := &segment{ + id: id, + fd: fd, + header: make([]byte, chunkHeaderSize), startupBlock: &startupBlock{ block: make([]byte, blockSize), blockNumber: -1, }, isStartupTraversal: false, - }, nil + } + + s.currentBlockNumber.Store(uint32(offset / blockSize)) + s.currentBlockSize.Store(uint32(offset % blockSize)) + return s, nil } // NewReader creates a new segment reader. @@ -145,7 +148,7 @@ func (seg *segment) NewReader() *segmentReader { // Sync flushes the segment file to disk. func (seg *segment) Sync() error { - if seg.closed { + if seg.closed.Load() { return nil } return seg.fd.Sync() @@ -153,8 +156,8 @@ func (seg *segment) Sync() error { // Remove removes the segment file. func (seg *segment) Remove() error { - if !seg.closed { - seg.closed = true + if !seg.closed.Load() { + seg.closed.CompareAndSwap(false, true) if err := seg.fd.Close(); err != nil { return err } @@ -165,18 +168,18 @@ func (seg *segment) Remove() error { // Close closes the segment file. func (seg *segment) Close() error { - if seg.closed { + if seg.closed.Load() { return nil } - seg.closed = true + seg.closed.CompareAndSwap(false, true) return seg.fd.Close() } // Size returns the size of the segment file. func (seg *segment) Size() int64 { - size := int64(seg.currentBlockNumber) * int64(blockSize) - return size + int64(seg.currentBlockSize) + size := int64(seg.currentBlockNumber.Load()) * int64(blockSize) + return size + int64(seg.currentBlockSize.Load()) } // writeToBuffer calculate chunkPosition for data, write data to bytebufferpool, update segment status @@ -189,34 +192,34 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB startBufferLen := chunkBuffer.Len() padding := uint32(0) - if seg.closed { + if seg.closed.Load() { return nil, ErrClosed } // if the left block size can not hold the chunk header, padding the block - if seg.currentBlockSize+chunkHeaderSize >= blockSize { + if seg.currentBlockSize.Load()+chunkHeaderSize >= blockSize { // padding if necessary - if seg.currentBlockSize < blockSize { - p := make([]byte, blockSize-seg.currentBlockSize) + if seg.currentBlockSize.Load() < blockSize { + p := make([]byte, blockSize-seg.currentBlockSize.Load()) chunkBuffer.B = append(chunkBuffer.B, p...) - padding += blockSize - seg.currentBlockSize + padding += blockSize - seg.currentBlockSize.Load() // a new block - seg.currentBlockNumber += 1 - seg.currentBlockSize = 0 + seg.currentBlockNumber.Add(1) + seg.currentBlockSize.Store(0) } } // return the start position of the chunk, then the user can use it to read the data. position := &ChunkPosition{ SegmentId: seg.id, - BlockNumber: seg.currentBlockNumber, - ChunkOffset: int64(seg.currentBlockSize), + BlockNumber: seg.currentBlockNumber.Load(), + ChunkOffset: int64(seg.currentBlockSize.Load()), } dataSize := uint32(len(data)) // The entire chunk can fit into the block. - if seg.currentBlockSize+dataSize+chunkHeaderSize <= blockSize { + if seg.currentBlockSize.Load()+dataSize+chunkHeaderSize <= blockSize { seg.appendChunkBuffer(chunkBuffer, data, ChunkTypeFull) position.ChunkSize = dataSize + chunkHeaderSize } else { @@ -225,7 +228,7 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB var ( leftSize = dataSize blockCount uint32 = 0 - currBlockSize = seg.currentBlockSize + currBlockSize = seg.currentBlockSize.Load() ) for leftSize > 0 { @@ -266,10 +269,10 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB } // update segment status - seg.currentBlockSize += position.ChunkSize - if seg.currentBlockSize >= blockSize { - seg.currentBlockNumber += seg.currentBlockSize / blockSize - seg.currentBlockSize = seg.currentBlockSize % blockSize + seg.currentBlockSize.Add(position.ChunkSize) + if seg.currentBlockSize.Load() >= blockSize { + seg.currentBlockNumber.Add(seg.currentBlockSize.Load() / blockSize) + seg.currentBlockSize.Store(seg.currentBlockSize.Load() % blockSize) } return position, nil @@ -277,21 +280,21 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB // writeAll write batch data to the segment file. func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err error) { - if seg.closed { + if seg.closed.Load() { return nil, ErrClosed } // if any error occurs, restore the segment status - originBlockNumber := seg.currentBlockNumber - originBlockSize := seg.currentBlockSize + originBlockNumber := seg.currentBlockNumber.Load() + originBlockSize := seg.currentBlockSize.Load() // init chunk buffer chunkBuffer := bytebufferpool.Get() chunkBuffer.Reset() defer func() { if err != nil { - seg.currentBlockNumber = originBlockNumber - seg.currentBlockSize = originBlockSize + seg.currentBlockNumber.Store(originBlockNumber) + seg.currentBlockSize.Store(originBlockSize) } bytebufferpool.Put(chunkBuffer) }() @@ -315,20 +318,20 @@ func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err err // Write writes the data to the segment file. func (seg *segment) Write(data []byte) (pos *ChunkPosition, err error) { - if seg.closed { + if seg.closed.Load() { return nil, ErrClosed } - originBlockNumber := seg.currentBlockNumber - originBlockSize := seg.currentBlockSize + originBlockNumber := seg.currentBlockNumber.Load() + originBlockSize := seg.currentBlockSize.Load() // init chunk buffer chunkBuffer := bytebufferpool.Get() chunkBuffer.Reset() defer func() { if err != nil { - seg.currentBlockNumber = originBlockNumber - seg.currentBlockSize = originBlockSize + seg.currentBlockNumber.Store(originBlockNumber) + seg.currentBlockSize.Store(originBlockSize) } bytebufferpool.Put(chunkBuffer) }() @@ -363,7 +366,7 @@ func (seg *segment) appendChunkBuffer(buf *bytebufferpool.ByteBuffer, data []byt // write the pending chunk buffer to the segment file func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error { - if seg.currentBlockSize > blockSize { + if seg.currentBlockSize.Load() > blockSize { return errors.New("the current block size exceeds the maximum block size") } @@ -384,7 +387,7 @@ func (seg *segment) Read(blockNumber uint32, chunkOffset int64) ([]byte, error) } func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, *ChunkPosition, error) { - if seg.closed { + if seg.closed.Load() { return nil, nil, ErrClosed } @@ -478,7 +481,7 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, // You can call it repeatedly until io.EOF is returned. func (segReader *segmentReader) Next() ([]byte, *ChunkPosition, error) { // The segment file is closed - if segReader.segment.closed { + if segReader.segment.closed.Load() { return nil, nil, ErrClosed } From c1e388fb827196c91dbf0a17115e7b28e4c9d745 Mon Sep 17 00:00:00 2001 From: ankur-anand Date: Thu, 20 Feb 2025 19:54:10 +0530 Subject: [PATCH 2/2] fix: concurrent read and close --- segment.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/segment.go b/segment.go index 847aa01..1b27337 100644 --- a/segment.go +++ b/segment.go @@ -7,6 +7,7 @@ import ( "hash/crc32" "io" "os" + "runtime" "sync" "sync/atomic" @@ -52,6 +53,7 @@ type segment struct { fd *os.File currentBlockNumber atomic.Uint32 currentBlockSize atomic.Uint32 + activeReader atomic.Int32 closed atomic.Bool header []byte startupBlock *startupBlock @@ -173,6 +175,11 @@ func (seg *segment) Close() error { } seg.closed.CompareAndSwap(false, true) + + // retry for the close. + for seg.activeReader.Load() > 0 { + runtime.Gosched() + } return seg.fd.Close() } @@ -391,6 +398,9 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, return nil, nil, ErrClosed } + seg.activeReader.Add(1) + defer seg.activeReader.Add(-1) + var ( result []byte block []byte