From 2dafb0a4878a1035b08d5861b0baecbba7a7d02b Mon Sep 17 00:00:00 2001 From: John Westerlund Date: Thu, 5 Mar 2026 16:13:36 +0100 Subject: [PATCH] feat: add sector-aligned mmap flush Add sector_size parameter to flush only aligned mmap regions covering written data instead of the entire mapping. Integrates with both direct writes and wrap marker writes. Falls back to full flush when sector_size=0. Co-Authored-By: Claude Opus 4.6 --- docs/api.md | 3 +- docs/usage.md | 15 ++++++++ src/sdsavior/ring.py | 44 ++++++++++++++++++++---- tests/test_extended.py | 77 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 131 insertions(+), 8 deletions(-) diff --git a/docs/api.md b/docs/api.md index 545b98f..8813f98 100644 --- a/docs/api.md +++ b/docs/api.md @@ -14,13 +14,14 @@ Dataclass storing persisted pointer state: ### Constructor -`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None)` +`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None, sector_size=0)` - `capacity_bytes` must be a multiple of 8 and at least 16 KiB. - `json_dumps_kwargs` is copied internally. - `recover_scan_limit_bytes` can cap recovery scanning. - `coalesce_max_records` (optional): flush pending records after this count. - `coalesce_max_seconds` (optional): flush pending records after this many seconds. +- `sector_size` (default `0`): when set to a positive value, flush only the sector-aligned region of the mmap covering the write. Set to `0` for normal full-mapping flush. ### Lifecycle diff --git a/docs/usage.md b/docs/usage.md index ccf4f05..5faeeae 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -94,6 +94,21 @@ print(f"Physical flushes: {stats['physical_flushes']}") print(f"Bytes written: {stats['bytes_written']}") ``` +## Sector-Aligned Flush + +On block devices with specific sector sizes, use `sector_size` to flush only the aligned mmap regions instead of the entire mapping: + +```python +with SDSavior( + "data.ring", "data.meta", 8 * 1024 * 1024, + sector_size=4096, + fsync_data=True, +) as rb: + rb.append({"event": "sample"}) +``` + +When `sector_size` is set, only the sectors containing the written data are flushed. This can improve write performance on devices where full-mapping flushes are expensive. Set `sector_size=0` (default) for normal full-mapping flush behavior. + ### Pending Records in Iteration By default, `iter_records()` only yields durable on-disk records. To include unflushed pending records: diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index e73816b..865e825 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -98,6 +98,7 @@ def __init__( recover_scan_limit_bytes: int | None = None, coalesce_max_records: int | None = None, coalesce_max_seconds: float | None = None, + sector_size: int = 0, ): """Configure file paths, durability options, and recovery behavior for a ring instance.""" capacity = int(capacity_bytes) @@ -121,6 +122,7 @@ def __init__( self.recover_scan_limit_bytes = recover_scan_limit_bytes self.coalesce_max_records = coalesce_max_records self.coalesce_max_seconds = coalesce_max_seconds + self.sector_size = sector_size self._pending: list[Any] = [] self._last_flush_time: float = time.monotonic() self._logical_appends: int = 0 @@ -400,6 +402,30 @@ def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None: # ---------- internals ---------- + def _sector_flush(self, start: int, length: int) -> None: + """Flush only the sector-aligned region of the mmap covering [start, start+length).""" + assert self._data_mm is not None + assert self._data_fd is not None + if self.sector_size <= 0 or length <= 0: + self._data_mm.flush() + if self.fsync_data: + os.fsync(self._data_fd) + return + + # Use the larger of sector_size and system page size so that + # the offset passed to mmap.flush() satisfies the OS page-alignment + # requirement (macOS msync requires page-aligned offsets). + page_size = mmap.PAGESIZE + ss = max(self.sector_size, page_size) + aligned_start = (start // ss) * ss + aligned_end = ((start + length + ss - 1) // ss) * ss + # Clamp to capacity + aligned_end = min(aligned_end, self.capacity) + + self._data_mm.flush(aligned_start, aligned_end - aligned_start) + if self.fsync_data: + os.fsync(self._data_fd) + def _append_single(self, obj: Any) -> int: """Write a single JSON object to the ring. Returns the sequence number.""" assert self._state is not None @@ -460,8 +486,11 @@ def _append_single(self, obj: Any) -> int: self._bytes_written += total_len if self.fsync_data: - mm.flush() - os.fsync(data_fd) + if self.sector_size > 0: + self._sector_flush(head, total_len) + else: + mm.flush() + os.fsync(data_fd) return seq @@ -510,11 +539,12 @@ def _write_wrap_marker(self, off: int) -> None: mm[off + DATA_START:off + RECORD_HDR_SIZE] = ( b"\x00" * (RECORD_HDR_SIZE - DATA_START) ) - # Some platforms require page-aligned offsets for flush/msync. - # Wrap marker writes are tiny, so flush the whole mapping. - mm.flush() - if self.fsync_data: - os.fsync(data_fd) + if self.sector_size > 0: + self._sector_flush(off, RECORD_HDR_SIZE) + else: + mm.flush() + if self.fsync_data: + os.fsync(data_fd) def _read_record(self, off: int) -> ParsedRecord | None: """Parse and validate a record at ``off``; return ``None`` on any corruption.""" diff --git a/tests/test_extended.py b/tests/test_extended.py index 909ccfb..b8fb941 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -645,6 +645,83 @@ def test_coalesce_iter_records_excludes_pending(tmp_path: Path) -> None: assert [r[2]["n"] for r in rows] == [1, 2] +def test_sector_flush_correct_alignment( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Sector-aligned flush is called with correctly aligned parameters.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=4096, fsync_data=True, + ) as rb: + original_sector = rb._sector_flush + calls: list[tuple[int, int]] = [] + + def track_sector(start: int, length: int) -> None: + calls.append((start, length)) + original_sector(start, length) + + monkeypatch.setattr(rb, "_sector_flush", track_sector) + rb.append({"n": 1}) + + assert len(calls) >= 1 + for start, length in calls: + # Verify the aligned region computed internally is valid + ss = 4096 + aligned_start = (start // ss) * ss + aligned_end = ((start + length + ss - 1) // ss) * ss + assert aligned_start % ss == 0 + assert aligned_end % ss == 0 or aligned_end == rb.capacity + assert aligned_start <= start + assert aligned_end >= start + length or aligned_end == rb.capacity + + +def test_sector_flush_wraparound(tmp_path: Path) -> None: + """Sector-aligned flush handles wrap-around case.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 16 * 1024, sector_size=512, fsync_data=True, + ) as rb: + for i in range(300): + rb.append({"n": i, "payload": "x" * 96}) + + rows = list(rb.iter_records()) + assert rows # Should have readable records + assert rows[-1][2]["n"] == 299 + + +def test_sector_size_zero_fallback(tmp_path: Path) -> None: + """sector_size=0 falls back to normal flush behavior.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=0, fsync_data=True, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_sector_various_sizes(tmp_path: Path) -> None: + """Various sector sizes work correctly.""" + for sector in [512, 1024, 4096, 8192]: + data = tmp_path / f"ring_{sector}.dat" + meta = tmp_path / f"ring_{sector}.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=sector, fsync_data=True, + ) as rb: + for i in range(10): + rb.append({"n": i, "sector": sector}) + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == list(range(10)) + + def test_coalesce_records_survive_reopen(tmp_path: Path) -> None: """Flushed coalesced records survive close and reopen.""" data = tmp_path / "ring.dat"