Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ Dataclass storing persisted pointer state:

### Constructor

`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None)`
`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None, sector_size=0)`

- `capacity_bytes` must be a multiple of 8 and at least 16 KiB.
- `json_dumps_kwargs` is copied internally.
- `recover_scan_limit_bytes` can cap recovery scanning.
- `coalesce_max_records` (optional): flush pending records after this count.
- `coalesce_max_seconds` (optional): flush pending records after this many seconds.
- `sector_size` (default `0`): when set to a positive value, flush only the sector-aligned region of the mmap covering the write. Set to `0` for normal full-mapping flush.

### Lifecycle

Expand Down
15 changes: 15 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,21 @@ print(f"Physical flushes: {stats['physical_flushes']}")
print(f"Bytes written: {stats['bytes_written']}")
```

## Sector-Aligned Flush

On block devices with specific sector sizes, use `sector_size` to flush only the aligned mmap regions instead of the entire mapping:

```python
with SDSavior(
"data.ring", "data.meta", 8 * 1024 * 1024,
sector_size=4096,
fsync_data=True,
) as rb:
rb.append({"event": "sample"})
```

When `sector_size` is set, only the sectors containing the written data are flushed. This can improve write performance on devices where full-mapping flushes are expensive. Set `sector_size=0` (default) for normal full-mapping flush behavior.

### Pending Records in Iteration

By default, `iter_records()` only yields durable on-disk records. To include unflushed pending records:
Expand Down
44 changes: 37 additions & 7 deletions src/sdsavior/ring.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def __init__(
recover_scan_limit_bytes: int | None = None,
coalesce_max_records: int | None = None,
coalesce_max_seconds: float | None = None,
sector_size: int = 0,
):
"""Configure file paths, durability options, and recovery behavior for a ring instance."""
capacity = int(capacity_bytes)
Expand All @@ -121,6 +122,7 @@ def __init__(
self.recover_scan_limit_bytes = recover_scan_limit_bytes
self.coalesce_max_records = coalesce_max_records
self.coalesce_max_seconds = coalesce_max_seconds
self.sector_size = sector_size
self._pending: list[Any] = []
self._last_flush_time: float = time.monotonic()
self._logical_appends: int = 0
Expand Down Expand Up @@ -400,6 +402,30 @@ def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None:

# ---------- internals ----------

def _sector_flush(self, start: int, length: int) -> None:
"""Flush only the sector-aligned region of the mmap covering [start, start+length)."""
assert self._data_mm is not None
assert self._data_fd is not None
if self.sector_size <= 0 or length <= 0:
self._data_mm.flush()
if self.fsync_data:
os.fsync(self._data_fd)
return

# Use the larger of sector_size and system page size so that
# the offset passed to mmap.flush() satisfies the OS page-alignment
# requirement (macOS msync requires page-aligned offsets).
page_size = mmap.PAGESIZE
ss = max(self.sector_size, page_size)
aligned_start = (start // ss) * ss
aligned_end = ((start + length + ss - 1) // ss) * ss
# Clamp to capacity
aligned_end = min(aligned_end, self.capacity)

self._data_mm.flush(aligned_start, aligned_end - aligned_start)
if self.fsync_data:
os.fsync(self._data_fd)

def _append_single(self, obj: Any) -> int:
"""Write a single JSON object to the ring. Returns the sequence number."""
assert self._state is not None
Expand Down Expand Up @@ -460,8 +486,11 @@ def _append_single(self, obj: Any) -> int:
self._bytes_written += total_len

if self.fsync_data:
mm.flush()
os.fsync(data_fd)
if self.sector_size > 0:
self._sector_flush(head, total_len)
else:
mm.flush()
os.fsync(data_fd)

return seq

Expand Down Expand Up @@ -510,11 +539,12 @@ def _write_wrap_marker(self, off: int) -> None:
mm[off + DATA_START:off + RECORD_HDR_SIZE] = (
b"\x00" * (RECORD_HDR_SIZE - DATA_START)
)
# Some platforms require page-aligned offsets for flush/msync.
# Wrap marker writes are tiny, so flush the whole mapping.
mm.flush()
if self.fsync_data:
os.fsync(data_fd)
if self.sector_size > 0:
self._sector_flush(off, RECORD_HDR_SIZE)
else:
mm.flush()
if self.fsync_data:
os.fsync(data_fd)

def _read_record(self, off: int) -> ParsedRecord | None:
"""Parse and validate a record at ``off``; return ``None`` on any corruption."""
Expand Down
77 changes: 77 additions & 0 deletions tests/test_extended.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,83 @@ def test_coalesce_iter_records_excludes_pending(tmp_path: Path) -> None:
assert [r[2]["n"] for r in rows] == [1, 2]


def test_sector_flush_correct_alignment(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch,
) -> None:
"""Sector-aligned flush is called with correctly aligned parameters."""
data = tmp_path / "ring.dat"
meta = tmp_path / "ring.meta"

with SDSavior(
str(data), str(meta), 256 * 1024, sector_size=4096, fsync_data=True,
) as rb:
original_sector = rb._sector_flush
calls: list[tuple[int, int]] = []

def track_sector(start: int, length: int) -> None:
calls.append((start, length))
original_sector(start, length)

monkeypatch.setattr(rb, "_sector_flush", track_sector)
rb.append({"n": 1})

assert len(calls) >= 1
for start, length in calls:
# Verify the aligned region computed internally is valid
ss = 4096
aligned_start = (start // ss) * ss
aligned_end = ((start + length + ss - 1) // ss) * ss
assert aligned_start % ss == 0
assert aligned_end % ss == 0 or aligned_end == rb.capacity
assert aligned_start <= start
assert aligned_end >= start + length or aligned_end == rb.capacity


def test_sector_flush_wraparound(tmp_path: Path) -> None:
"""Sector-aligned flush handles wrap-around case."""
data = tmp_path / "ring.dat"
meta = tmp_path / "ring.meta"

with SDSavior(
str(data), str(meta), 16 * 1024, sector_size=512, fsync_data=True,
) as rb:
for i in range(300):
rb.append({"n": i, "payload": "x" * 96})

rows = list(rb.iter_records())
assert rows # Should have readable records
assert rows[-1][2]["n"] == 299


def test_sector_size_zero_fallback(tmp_path: Path) -> None:
"""sector_size=0 falls back to normal flush behavior."""
data = tmp_path / "ring.dat"
meta = tmp_path / "ring.meta"

with SDSavior(
str(data), str(meta), 256 * 1024, sector_size=0, fsync_data=True,
) as rb:
rb.append({"n": 1})
rb.append({"n": 2})
rows = list(rb.iter_records())
assert [r[2]["n"] for r in rows] == [1, 2]


def test_sector_various_sizes(tmp_path: Path) -> None:
"""Various sector sizes work correctly."""
for sector in [512, 1024, 4096, 8192]:
data = tmp_path / f"ring_{sector}.dat"
meta = tmp_path / f"ring_{sector}.meta"

with SDSavior(
str(data), str(meta), 256 * 1024, sector_size=sector, fsync_data=True,
) as rb:
for i in range(10):
rb.append({"n": i, "sector": sector})
rows = list(rb.iter_records())
assert [r[2]["n"] for r in rows] == list(range(10))


def test_coalesce_records_survive_reopen(tmp_path: Path) -> None:
"""Flushed coalesced records survive close and reopen."""
data = tmp_path / "ring.dat"
Expand Down