diff --git a/docs/api.md b/docs/api.md index 545b98f..8813f98 100644 --- a/docs/api.md +++ b/docs/api.md @@ -14,13 +14,14 @@ Dataclass storing persisted pointer state: ### Constructor -`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None)` +`SDSavior(data_path, meta_path, capacity_bytes, *, fsync_data=False, fsync_meta=True, json_dumps_kwargs=None, recover_scan_limit_bytes=None, coalesce_max_records=None, coalesce_max_seconds=None, sector_size=0)` - `capacity_bytes` must be a multiple of 8 and at least 16 KiB. - `json_dumps_kwargs` is copied internally. - `recover_scan_limit_bytes` can cap recovery scanning. - `coalesce_max_records` (optional): flush pending records after this count. - `coalesce_max_seconds` (optional): flush pending records after this many seconds. +- `sector_size` (default `0`): when set to a positive value, flush only the sector-aligned region of the mmap covering the write. Set to `0` for normal full-mapping flush. ### Lifecycle diff --git a/docs/usage.md b/docs/usage.md index ccf4f05..5faeeae 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -94,6 +94,21 @@ print(f"Physical flushes: {stats['physical_flushes']}") print(f"Bytes written: {stats['bytes_written']}") ``` +## Sector-Aligned Flush + +On block devices with specific sector sizes, use `sector_size` to flush only the aligned mmap regions instead of the entire mapping: + +```python +with SDSavior( + "data.ring", "data.meta", 8 * 1024 * 1024, + sector_size=4096, + fsync_data=True, +) as rb: + rb.append({"event": "sample"}) +``` + +When `sector_size` is set, only the sectors containing the written data are flushed. This can improve write performance on devices where full-mapping flushes are expensive. Set `sector_size=0` (default) for normal full-mapping flush behavior. + ### Pending Records in Iteration By default, `iter_records()` only yields durable on-disk records. To include unflushed pending records: diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index e73816b..865e825 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -98,6 +98,7 @@ def __init__( recover_scan_limit_bytes: int | None = None, coalesce_max_records: int | None = None, coalesce_max_seconds: float | None = None, + sector_size: int = 0, ): """Configure file paths, durability options, and recovery behavior for a ring instance.""" capacity = int(capacity_bytes) @@ -121,6 +122,7 @@ def __init__( self.recover_scan_limit_bytes = recover_scan_limit_bytes self.coalesce_max_records = coalesce_max_records self.coalesce_max_seconds = coalesce_max_seconds + self.sector_size = sector_size self._pending: list[Any] = [] self._last_flush_time: float = time.monotonic() self._logical_appends: int = 0 @@ -400,6 +402,30 @@ def export_jsonl(self, out_path: str, *, from_seq: int | None = None) -> None: # ---------- internals ---------- + def _sector_flush(self, start: int, length: int) -> None: + """Flush only the sector-aligned region of the mmap covering [start, start+length).""" + assert self._data_mm is not None + assert self._data_fd is not None + if self.sector_size <= 0 or length <= 0: + self._data_mm.flush() + if self.fsync_data: + os.fsync(self._data_fd) + return + + # Use the larger of sector_size and system page size so that + # the offset passed to mmap.flush() satisfies the OS page-alignment + # requirement (macOS msync requires page-aligned offsets). + page_size = mmap.PAGESIZE + ss = max(self.sector_size, page_size) + aligned_start = (start // ss) * ss + aligned_end = ((start + length + ss - 1) // ss) * ss + # Clamp to capacity + aligned_end = min(aligned_end, self.capacity) + + self._data_mm.flush(aligned_start, aligned_end - aligned_start) + if self.fsync_data: + os.fsync(self._data_fd) + def _append_single(self, obj: Any) -> int: """Write a single JSON object to the ring. Returns the sequence number.""" assert self._state is not None @@ -460,8 +486,11 @@ def _append_single(self, obj: Any) -> int: self._bytes_written += total_len if self.fsync_data: - mm.flush() - os.fsync(data_fd) + if self.sector_size > 0: + self._sector_flush(head, total_len) + else: + mm.flush() + os.fsync(data_fd) return seq @@ -510,11 +539,12 @@ def _write_wrap_marker(self, off: int) -> None: mm[off + DATA_START:off + RECORD_HDR_SIZE] = ( b"\x00" * (RECORD_HDR_SIZE - DATA_START) ) - # Some platforms require page-aligned offsets for flush/msync. - # Wrap marker writes are tiny, so flush the whole mapping. - mm.flush() - if self.fsync_data: - os.fsync(data_fd) + if self.sector_size > 0: + self._sector_flush(off, RECORD_HDR_SIZE) + else: + mm.flush() + if self.fsync_data: + os.fsync(data_fd) def _read_record(self, off: int) -> ParsedRecord | None: """Parse and validate a record at ``off``; return ``None`` on any corruption.""" diff --git a/tests/test_extended.py b/tests/test_extended.py index 909ccfb..b8fb941 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -645,6 +645,83 @@ def test_coalesce_iter_records_excludes_pending(tmp_path: Path) -> None: assert [r[2]["n"] for r in rows] == [1, 2] +def test_sector_flush_correct_alignment( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, +) -> None: + """Sector-aligned flush is called with correctly aligned parameters.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=4096, fsync_data=True, + ) as rb: + original_sector = rb._sector_flush + calls: list[tuple[int, int]] = [] + + def track_sector(start: int, length: int) -> None: + calls.append((start, length)) + original_sector(start, length) + + monkeypatch.setattr(rb, "_sector_flush", track_sector) + rb.append({"n": 1}) + + assert len(calls) >= 1 + for start, length in calls: + # Verify the aligned region computed internally is valid + ss = 4096 + aligned_start = (start // ss) * ss + aligned_end = ((start + length + ss - 1) // ss) * ss + assert aligned_start % ss == 0 + assert aligned_end % ss == 0 or aligned_end == rb.capacity + assert aligned_start <= start + assert aligned_end >= start + length or aligned_end == rb.capacity + + +def test_sector_flush_wraparound(tmp_path: Path) -> None: + """Sector-aligned flush handles wrap-around case.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 16 * 1024, sector_size=512, fsync_data=True, + ) as rb: + for i in range(300): + rb.append({"n": i, "payload": "x" * 96}) + + rows = list(rb.iter_records()) + assert rows # Should have readable records + assert rows[-1][2]["n"] == 299 + + +def test_sector_size_zero_fallback(tmp_path: Path) -> None: + """sector_size=0 falls back to normal flush behavior.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=0, fsync_data=True, + ) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2] + + +def test_sector_various_sizes(tmp_path: Path) -> None: + """Various sector sizes work correctly.""" + for sector in [512, 1024, 4096, 8192]: + data = tmp_path / f"ring_{sector}.dat" + meta = tmp_path / f"ring_{sector}.meta" + + with SDSavior( + str(data), str(meta), 256 * 1024, sector_size=sector, fsync_data=True, + ) as rb: + for i in range(10): + rb.append({"n": i, "sector": sector}) + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == list(range(10)) + + def test_coalesce_records_survive_reopen(tmp_path: Path) -> None: """Flushed coalesced records survive close and reopen.""" data = tmp_path / "ring.dat"