Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 58 additions & 45 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"hash/crc32"
"io"
"os"
"runtime"
"sync"
"sync/atomic"

"github.com/valyala/bytebufferpool"
)
Expand Down Expand Up @@ -49,9 +51,10 @@ const (
type segment struct {
id SegmentID
fd *os.File
currentBlockNumber uint32
currentBlockSize uint32
closed bool
currentBlockNumber atomic.Uint32
currentBlockSize atomic.Uint32
activeReader atomic.Int32
closed atomic.Bool
header []byte
startupBlock *startupBlock
isStartupTraversal bool
Expand Down Expand Up @@ -118,18 +121,20 @@ func openSegmentFile(dirPath, extName string, id uint32) (*segment, error) {
return nil, fmt.Errorf("seek to the end of segment file %d%s failed: %v", id, extName, err)
}

return &segment{
id: id,
fd: fd,
header: make([]byte, chunkHeaderSize),
currentBlockNumber: uint32(offset / blockSize),
currentBlockSize: uint32(offset % blockSize),
s := &segment{
id: id,
fd: fd,
header: make([]byte, chunkHeaderSize),
startupBlock: &startupBlock{
block: make([]byte, blockSize),
blockNumber: -1,
},
isStartupTraversal: false,
}, nil
}

s.currentBlockNumber.Store(uint32(offset / blockSize))
s.currentBlockSize.Store(uint32(offset % blockSize))
return s, nil
}

// NewReader creates a new segment reader.
Expand All @@ -145,16 +150,16 @@ func (seg *segment) NewReader() *segmentReader {

// Sync flushes the segment file to disk.
func (seg *segment) Sync() error {
if seg.closed {
if seg.closed.Load() {
return nil
}
return seg.fd.Sync()
}

// Remove removes the segment file.
func (seg *segment) Remove() error {
if !seg.closed {
seg.closed = true
if !seg.closed.Load() {
seg.closed.CompareAndSwap(false, true)
if err := seg.fd.Close(); err != nil {
return err
}
Expand All @@ -165,18 +170,23 @@ func (seg *segment) Remove() error {

// Close closes the segment file.
func (seg *segment) Close() error {
if seg.closed {
if seg.closed.Load() {
return nil
}

seg.closed = true
seg.closed.CompareAndSwap(false, true)

// retry for the close.
for seg.activeReader.Load() > 0 {
runtime.Gosched()
}
return seg.fd.Close()
}

// Size returns the size of the segment file.
func (seg *segment) Size() int64 {
size := int64(seg.currentBlockNumber) * int64(blockSize)
return size + int64(seg.currentBlockSize)
size := int64(seg.currentBlockNumber.Load()) * int64(blockSize)
return size + int64(seg.currentBlockSize.Load())
}

// writeToBuffer calculate chunkPosition for data, write data to bytebufferpool, update segment status
Expand All @@ -189,34 +199,34 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB
startBufferLen := chunkBuffer.Len()
padding := uint32(0)

if seg.closed {
if seg.closed.Load() {
return nil, ErrClosed
}

// if the left block size can not hold the chunk header, padding the block
if seg.currentBlockSize+chunkHeaderSize >= blockSize {
if seg.currentBlockSize.Load()+chunkHeaderSize >= blockSize {
// padding if necessary
if seg.currentBlockSize < blockSize {
p := make([]byte, blockSize-seg.currentBlockSize)
if seg.currentBlockSize.Load() < blockSize {
p := make([]byte, blockSize-seg.currentBlockSize.Load())
chunkBuffer.B = append(chunkBuffer.B, p...)
padding += blockSize - seg.currentBlockSize
padding += blockSize - seg.currentBlockSize.Load()

// a new block
seg.currentBlockNumber += 1
seg.currentBlockSize = 0
seg.currentBlockNumber.Add(1)
seg.currentBlockSize.Store(0)
}
}

// return the start position of the chunk, then the user can use it to read the data.
position := &ChunkPosition{
SegmentId: seg.id,
BlockNumber: seg.currentBlockNumber,
ChunkOffset: int64(seg.currentBlockSize),
BlockNumber: seg.currentBlockNumber.Load(),
ChunkOffset: int64(seg.currentBlockSize.Load()),
}

dataSize := uint32(len(data))
// The entire chunk can fit into the block.
if seg.currentBlockSize+dataSize+chunkHeaderSize <= blockSize {
if seg.currentBlockSize.Load()+dataSize+chunkHeaderSize <= blockSize {
seg.appendChunkBuffer(chunkBuffer, data, ChunkTypeFull)
position.ChunkSize = dataSize + chunkHeaderSize
} else {
Expand All @@ -225,7 +235,7 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB
var (
leftSize = dataSize
blockCount uint32 = 0
currBlockSize = seg.currentBlockSize
currBlockSize = seg.currentBlockSize.Load()
)

for leftSize > 0 {
Expand Down Expand Up @@ -266,32 +276,32 @@ func (seg *segment) writeToBuffer(data []byte, chunkBuffer *bytebufferpool.ByteB
}

// update segment status
seg.currentBlockSize += position.ChunkSize
if seg.currentBlockSize >= blockSize {
seg.currentBlockNumber += seg.currentBlockSize / blockSize
seg.currentBlockSize = seg.currentBlockSize % blockSize
seg.currentBlockSize.Add(position.ChunkSize)
if seg.currentBlockSize.Load() >= blockSize {
seg.currentBlockNumber.Add(seg.currentBlockSize.Load() / blockSize)
seg.currentBlockSize.Store(seg.currentBlockSize.Load() % blockSize)
}

return position, nil
}

// writeAll write batch data to the segment file.
func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err error) {
if seg.closed {
if seg.closed.Load() {
return nil, ErrClosed
}

// if any error occurs, restore the segment status
originBlockNumber := seg.currentBlockNumber
originBlockSize := seg.currentBlockSize
originBlockNumber := seg.currentBlockNumber.Load()
originBlockSize := seg.currentBlockSize.Load()

// init chunk buffer
chunkBuffer := bytebufferpool.Get()
chunkBuffer.Reset()
defer func() {
if err != nil {
seg.currentBlockNumber = originBlockNumber
seg.currentBlockSize = originBlockSize
seg.currentBlockNumber.Store(originBlockNumber)
seg.currentBlockSize.Store(originBlockSize)
}
bytebufferpool.Put(chunkBuffer)
}()
Expand All @@ -315,20 +325,20 @@ func (seg *segment) writeAll(data [][]byte) (positions []*ChunkPosition, err err

// Write writes the data to the segment file.
func (seg *segment) Write(data []byte) (pos *ChunkPosition, err error) {
if seg.closed {
if seg.closed.Load() {
return nil, ErrClosed
}

originBlockNumber := seg.currentBlockNumber
originBlockSize := seg.currentBlockSize
originBlockNumber := seg.currentBlockNumber.Load()
originBlockSize := seg.currentBlockSize.Load()

// init chunk buffer
chunkBuffer := bytebufferpool.Get()
chunkBuffer.Reset()
defer func() {
if err != nil {
seg.currentBlockNumber = originBlockNumber
seg.currentBlockSize = originBlockSize
seg.currentBlockNumber.Store(originBlockNumber)
seg.currentBlockSize.Store(originBlockSize)
}
bytebufferpool.Put(chunkBuffer)
}()
Expand Down Expand Up @@ -363,7 +373,7 @@ func (seg *segment) appendChunkBuffer(buf *bytebufferpool.ByteBuffer, data []byt

// write the pending chunk buffer to the segment file
func (seg *segment) writeChunkBuffer(buf *bytebufferpool.ByteBuffer) error {
if seg.currentBlockSize > blockSize {
if seg.currentBlockSize.Load() > blockSize {
return errors.New("the current block size exceeds the maximum block size")
}

Expand All @@ -384,10 +394,13 @@ func (seg *segment) Read(blockNumber uint32, chunkOffset int64) ([]byte, error)
}

func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte, *ChunkPosition, error) {
if seg.closed {
if seg.closed.Load() {
return nil, nil, ErrClosed
}

seg.activeReader.Add(1)
defer seg.activeReader.Add(-1)

var (
result []byte
block []byte
Expand Down Expand Up @@ -478,7 +491,7 @@ func (seg *segment) readInternal(blockNumber uint32, chunkOffset int64) ([]byte,
// You can call it repeatedly until io.EOF is returned.
func (segReader *segmentReader) Next() ([]byte, *ChunkPosition, error) {
// The segment file is closed
if segReader.segment.closed {
if segReader.segment.closed.Load() {
return nil, nil, ErrClosed
}

Expand Down