diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8795bfd..20e6bec 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,15 +1,21 @@ -name: CI +name: SD Savior Pipeline on: - push: - branches: ["**"] pull_request: + push: + branches: + - main permissions: contents: read +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + jobs: validate: + name: Validate (lint, types, tests) runs-on: ubuntu-latest strategy: fail-fast: false @@ -24,6 +30,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} + cache: pip - name: Install dependencies run: | @@ -36,14 +43,14 @@ jobs: - name: Type check run: mypy src - - name: Tests with coverage gate + - name: Tests run: pytest -q semver: - name: Release + name: Release (semantic-release) runs-on: ubuntu-latest needs: [validate] - if: github.ref == 'refs/heads/main' + if: github.event_name == 'push' && github.ref == 'refs/heads/main' permissions: contents: write env: @@ -52,6 +59,7 @@ jobs: released: ${{ steps.release.outputs.released }} tag: ${{ steps.release.outputs.tag }} commit_sha: ${{ steps.release.outputs.commit_sha }} + steps: - name: Checkout (full history for tags) uses: actions/checkout@v4 @@ -68,12 +76,14 @@ jobs: git_committer_email: "41898282+github-actions[bot]@users.noreply.github.com" docs-build: + name: Build docs + needs: [validate] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: validate runs-on: ubuntu-latest permissions: pages: write id-token: write + steps: - name: Checkout uses: actions/checkout@v4 @@ -82,6 +92,7 @@ jobs: uses: actions/setup-python@v5 with: python-version: "3.11" + cache: pip - name: Install dependencies run: | @@ -100,8 +111,9 @@ jobs: path: site docs-deploy: + name: Deploy docs + needs: [docs-build] if: github.event_name == 'push' && github.ref == 'refs/heads/main' - needs: docs-build runs-on: ubuntu-latest permissions: pages: write @@ -109,20 +121,41 @@ jobs: environment: name: github-pages url: ${{ steps.deployment.outputs.page_url }} + steps: - name: Deploy to GitHub Pages id: deployment uses: actions/deploy-pages@v4 pypi-publish: - needs: semver name: Upload release to PyPI + needs: [semver] + if: github.event_name == 'push' && github.ref == 'refs/heads/main' && needs.semver.outputs.released == 'true' runs-on: ubuntu-latest - environment: - name: pypi - url: https://pypi.org/p/sdsavior permissions: id-token: write + steps: - - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 \ No newline at end of file + - name: Checkout release tag + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ref: ${{ needs.semver.outputs.tag }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + cache: pip + + - name: Build tooling + run: python -m pip install -U build twine + + - name: Build package + run: python -m build + + - name: Upload package to PyPI + run: python -m twine upload dist/* + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.PYPI_TOKEN }} \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 1acba1f..76af8ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,32 @@ +## v1.0.2 (2026-03-04) + +### Bug Fixes + +- **ci**: Pypy with twine + ([`f4bd2c2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f4bd2c2833b42478fb15be6ac2b63a7a17ee7250)) + +### Chores + +- Line to long + ([`fd9e506`](https://github.com/well-it-wasnt-me/SDSavior/commit/fd9e5069ff4e5c79ca96b8e2ab320de1d3c36bd9)) + +### Testing + +- **coverage**: Extended test coverage + ([`3f0b76a`](https://github.com/well-it-wasnt-me/SDSavior/commit/3f0b76a922c79593ce877fbf91df6e27dd98bff5)) + + +## v1.0.1 (2026-03-04) + +### Bug Fixes + +- Harden open/recovery edge cases + ([`f55a6f2`](https://github.com/well-it-wasnt-me/SDSavior/commit/f55a6f2a742f43790f0f8ad866a2d12b88963cc8)) + + ## v1.0.0 (2026-02-28) - Initial Release diff --git a/docs/api.md b/docs/api.md index 020d3f7..afd6322 100644 --- a/docs/api.md +++ b/docs/api.md @@ -29,7 +29,8 @@ Dataclass storing persisted pointer state: ### Data Operations - `append(obj) -> int`: append JSON object and return assigned sequence. -- `iter_records(from_seq=None)`: iterate `(seq, ts_ns, obj)` from tail to head. +- `iter_records(from_seq=None, skip_corrupt=False)`: iterate `(seq, ts_ns, obj)` from tail to head. When `skip_corrupt=True`, skips corrupt records and continues scanning for valid records instead of stopping. The scan searches up to the full ring capacity, handling corruption of any size. +- `_last_iter_skipped`: after `iter_records(skip_corrupt=True)`, contains the count of corrupt regions that were skipped. - `export_jsonl(out_path, from_seq=None)`: write records to JSONL file. ### Internal Mechanics (for whomever wish to contribute) diff --git a/docs/recovery.md b/docs/recovery.md index 05f796f..3d55e69 100644 --- a/docs/recovery.md +++ b/docs/recovery.md @@ -43,3 +43,16 @@ On `open()`: 5. Adjust `seq_next` if needed. This keeps unreadable partial writes out of normal iteration. + +## CRC Skip Recovery + +In addition to recovery on `open()`, you can use `skip_corrupt=True` during iteration to skip over corrupt records: + +```python +with SDSavior("data.ring", "data.meta", 8 * 1024 * 1024) as rb: + for seq, ts_ns, obj in rb.iter_records(skip_corrupt=True): + print(seq, obj) + print(f"Skipped {rb._last_iter_skipped} corrupt region(s)") +``` + +This scans the full ring capacity to find valid records beyond corruption, regardless of corruption size. diff --git a/docs/usage.md b/docs/usage.md index d3ab36f..04184e4 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -35,6 +35,20 @@ for seq, ts_ns, obj in rb.iter_records(from_seq=200): ... ``` +## Skip Corrupt Records + +By default, iteration stops at the first corrupt record. Use `skip_corrupt=True` to skip over corrupt regions and yield all valid records: + +```python +for seq, ts_ns, obj in rb.iter_records(skip_corrupt=True): + print(seq, obj) + +# Check how many corrupt regions were skipped +print(f"Skipped {rb._last_iter_skipped} corrupt region(s)") +``` + +This is useful for data recovery when partial corruption has occurred. + ## Export JSONL ```python diff --git a/pyproject.toml b/pyproject.toml index 8edbfd8..fe6a581 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sdsavior" -version = "0.1.0" +version = "1.0.1" description = "Crash-recoverable memory-mapped ring buffer for JSON records (SD-card friendly-ish)" readme = "README.md" requires-python = ">=3.11" @@ -51,3 +51,4 @@ select = ["E", "F", "I", "UP", "B"] [tool.pytest.ini_options] addopts = "--cov=src/sdsavior --cov-report=term-missing --cov-fail-under=90" testpaths = ["tests"] +pythonpath = ["src"] diff --git a/src/sdsavior/ring.py b/src/sdsavior/ring.py index eb30a3b..9857405 100644 --- a/src/sdsavior/ring.py +++ b/src/sdsavior/ring.py @@ -123,6 +123,7 @@ def __init__( self._data_mm: mmap.mmap | None = None self._meta_mm: mmap.mmap | None = None self._state: MetaState | None = None + self._last_iter_skipped: int = 0 def __enter__(self) -> SDSavior: """Open the ring buffer when entering a ``with`` block.""" @@ -149,14 +150,28 @@ def open(self) -> None: meta_mm: mmap.mmap | None = None try: - data_exists = os.path.exists(self.data_path) - existing_data_size = os.path.getsize(self.data_path) if data_exists else 0 - data_fd = os.open(self.data_path, os.O_RDWR | os.O_CREAT) meta_fd = os.open(self.meta_path, os.O_RDWR | os.O_CREAT) - self._ensure_file_size(data_fd, self.capacity) - self._ensure_file_size(meta_fd, META_FILE_SIZE) + data_size = os.fstat(data_fd).st_size + meta_size = os.fstat(meta_fd).st_size + is_new_data_file = data_size == 0 + + if data_size not in (0, self.capacity): + raise ValueError( + f"Data file size ({data_size}) != requested capacity ({self.capacity}). " + "Use the same capacity or create new files." + ) + if meta_size not in (0, META_FILE_SIZE): + raise ValueError( + f"Meta file size ({meta_size}) is invalid. " + f"Expected 0 or {META_FILE_SIZE} bytes." + ) + + if is_new_data_file: + os.ftruncate(data_fd, self.capacity) + if meta_size == 0: + os.ftruncate(meta_fd, META_FILE_SIZE) data_mm = mmap.mmap(data_fd, self.capacity, access=mmap.ACCESS_WRITE) meta_mm = mmap.mmap(meta_fd, META_FILE_SIZE, access=mmap.ACCESS_WRITE) @@ -170,12 +185,12 @@ def open(self) -> None: data_mm = None meta_mm = None - if existing_data_size > 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if not is_new_data_file and self._data_mm[:DATA_START] != DATA_MAGIC: raise ValueError( f"Data file {self.data_path!r} has invalid magic; " "refusing to overwrite existing data." ) - if existing_data_size == 0 and self._data_mm[:DATA_START] != DATA_MAGIC: + if is_new_data_file: self._data_mm[:DATA_START] = DATA_MAGIC self._data_mm.flush() if self.fsync_data: @@ -215,28 +230,29 @@ def open(self) -> None: def close(self) -> None: """Persist metadata and release mmap/file descriptors.""" - if self._state is not None: - self._write_meta(self._state) - - if self._data_mm is not None: - self._data_mm.flush() - self._data_mm.close() - self._data_mm = None + try: + if self._state is not None: + self._write_meta(self._state) + finally: + if self._data_mm is not None: + self._data_mm.flush() + self._data_mm.close() + self._data_mm = None - if self._meta_mm is not None: - self._meta_mm.flush() - self._meta_mm.close() - self._meta_mm = None + if self._meta_mm is not None: + self._meta_mm.flush() + self._meta_mm.close() + self._meta_mm = None - if self._data_fd is not None: - os.close(self._data_fd) - self._data_fd = None + if self._data_fd is not None: + os.close(self._data_fd) + self._data_fd = None - if self._meta_fd is not None: - os.close(self._meta_fd) - self._meta_fd = None + if self._meta_fd is not None: + os.close(self._meta_fd) + self._meta_fd = None - self._state = None + self._state = None def _cleanup_open_handles(self) -> None: """Best-effort cleanup of mapped files and fds without persisting metadata.""" @@ -320,26 +336,57 @@ def append(self, obj: Any) -> int: return seq - def iter_records(self, *, from_seq: int | None = None) -> Iterator[tuple[int, int, Any]]: + def iter_records( + self, *, from_seq: int | None = None, skip_corrupt: bool = False, + ) -> Iterator[tuple[int, int, Any]]: """ Iterate records from tail -> head, yielding (seq, ts_ns, obj). If from_seq is provided, skips older sequences. + If skip_corrupt is True, skips over corrupt records instead of stopping. """ self._require_open() assert self._state is not None s = self._state + self._last_iter_skipped = 0 off = s.tail scanned = 0 - limit = self.recover_scan_limit_bytes or self.capacity + limit = self.capacity while off != s.head and scanned < limit: rec = self._read_record(off) if rec is None: - break + if not skip_corrupt: + break + # Scan forward to find next valid record + self._last_iter_skipped += 1 + found = False + scan_off = off + ALIGN + while scan_off != s.head: + if scan_off >= self.capacity: + scan_off = DATA_START + if scan_off == s.head: + break + probe = self._read_record(scan_off) + if probe is not None: + step = self._distance(off, scan_off) + scanned += step + off = scan_off + found = True + break + scan_off += ALIGN + if scan_off >= self.capacity: + scan_off = DATA_START + if not found: + break + continue + kind, next_off, seq, ts_ns, obj = rec - scanned += self._distance(off, next_off) + step = self._distance(off, next_off) + if step <= 0: + break + scanned += step if kind == "wrap": off = next_off @@ -405,6 +452,8 @@ def _read_record(self, off: int) -> ParsedRecord | None: (total_len,) = struct.unpack_from(" self.capacity: @@ -579,14 +628,22 @@ def _recover(self) -> None: break kind, next_off, seq, _ts_ns, _obj = rec + step = self._distance(off, next_off) + if step <= 0: + s.head = last_good_off + s.commit += 1 + self._write_meta(s) + truncated = True + break + if kind == "wrap": - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off continue last_seq = seq - scanned += self._distance(off, next_off) + scanned += step off = next_off last_good_off = off diff --git a/tests/test_basic.py b/tests/test_basic.py index 41e36ea..6e9da1d 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -52,6 +52,15 @@ def test_capacity_must_be_aligned(tmp_path: Path) -> None: SDSavior(str(data), str(meta), (16 * 1024) + 1) +def test_capacity_too_small_raises(tmp_path: Path) -> None: + """Reject capacities that are too small to be useful.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with pytest.raises(ValueError, match="too small"): + SDSavior(str(data), str(meta), 8 * 1024) + + def test_json_kwargs_are_copied(tmp_path: Path) -> None: """Ensure caller-owned JSON kwargs are copied, not referenced.""" data = tmp_path / "ring.dat" @@ -72,10 +81,12 @@ def test_open_rejects_existing_invalid_magic(tmp_path: Path) -> None: data.write_bytes(b"BAD!" + b"\x00" * (capacity - 4)) meta.write_bytes(b"") + size_before = data.stat().st_size rb = SDSavior(str(data), str(meta), capacity) with pytest.raises(ValueError, match="invalid magic"): rb.open() + assert data.stat().st_size == size_before def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: @@ -94,6 +105,42 @@ def test_recover_truncates_when_scan_limit_is_reached(tmp_path: Path) -> None: assert [r[2]["n"] for r in rows] == [1] +def test_iter_records_ignores_recover_scan_limit_after_open(tmp_path: Path) -> None: + """Ensure iteration is not implicitly capped by recovery-only scan limits.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) + + with SDSavior(str(data), str(meta), capacity) as rb2: + rb2.recover_scan_limit_bytes = 1 + rows = list(rb2.iter_records()) + assert [r[2]["n"] for r in rows] == [1, 2, 3] + + +def test_append_fsyncs_data(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> None: + """Verify append calls fsync when data syncing is enabled.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + fsync_calls: list[int] = [] + + def fake_fsync(fd: int) -> None: + fsync_calls.append(fd) + + monkeypatch.setattr("sdsavior.ring.os.fsync", fake_fsync) + + with SDSavior(str(data), str(meta), 16 * 1024, fsync_data=True, fsync_meta=False) as rb: + assert rb._data_fd is not None + data_fd = rb._data_fd + fsync_calls.clear() + rb.append({"n": 1}) + assert fsync_calls == [data_fd] + + def test_write_wrap_marker_fsyncs_data_when_enabled( tmp_path: Path, monkeypatch: pytest.MonkeyPatch, diff --git a/tests/test_extended.py b/tests/test_extended.py index 3d8910e..449e379 100644 --- a/tests/test_extended.py +++ b/tests/test_extended.py @@ -1,14 +1,24 @@ from __future__ import annotations import json +import os import struct import sys from pathlib import Path +from types import SimpleNamespace import pytest from sdsavior import SDSavior, cli -from sdsavior.ring import RECORD_HDR, RECORD_HDR_SIZE, _crc32_bytes +from sdsavior.ring import ( + DATA_START, + META_FILE_SIZE, + RECORD_HDR, + RECORD_HDR_SIZE, + WRAP_MARKER, + MetaState, + _crc32_bytes, +) def _record_offsets(rb: SDSavior) -> list[int]: @@ -161,13 +171,16 @@ def test_open_rejects_capacity_mismatch(tmp_path: Path) -> None: """Reject opening existing files when requested capacity does not match meta.""" data = tmp_path / "ring.dat" meta = tmp_path / "ring.meta" + original_capacity = 256 * 1024 - with SDSavior(str(data), str(meta), 256 * 1024) as rb: + with SDSavior(str(data), str(meta), original_capacity) as rb: rb.append({"n": 1}) - with pytest.raises(ValueError, match="Meta capacity"): + size_before = data.stat().st_size + with pytest.raises(ValueError, match="Data file size"): with SDSavior(str(data), str(meta), 16 * 1024) as rb2: rb2.iter_records() + assert data.stat().st_size == size_before def test_open_twice_requires_close(tmp_path: Path) -> None: @@ -265,3 +278,382 @@ def raise_on_load_meta(self: SDSavior) -> None: assert rb._data_mm is None assert rb._meta_mm is None assert rb._state is None + + +def test_open_rejects_invalid_meta_size(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 16 * 1024 + + with open(data, "wb") as f: + f.truncate(capacity) + with open(meta, "wb") as f: + f.truncate(1) + + rb = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta file size"): + rb.open() + + +def test_open_rejects_meta_capacity_mismatch(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 32 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + other_capacity = 16 * 1024 + state = MetaState( + capacity=other_capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + meta.write_bytes(raw + b"\x00" * (META_FILE_SIZE - len(raw))) + + rb2 = SDSavior(str(data), str(meta), capacity) + with pytest.raises(ValueError, match="Meta capacity"): + rb2.open() + + +def test_iter_records_breaks_on_corrupt_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + assert list(rb.iter_records()) == [] + + +def test_read_record_invalid_cases(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._data_mm is not None + mm = rb._data_mm + + assert rb._read_record(0) is None + + struct.pack_into(" None: + path = tmp_path / "size.dat" + fd = os.open(path, os.O_RDWR | os.O_CREAT) + try: + SDSavior._ensure_file_size(fd, 128) + assert os.fstat(fd).st_size == 128 + finally: + os.close(fd) + + +def test_unpack_meta_rejects_invalid_buffers(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 16 * 1024) + + assert rb._unpack_meta(b"short") is None + + state = MetaState( + capacity=rb.capacity, + head=DATA_START, + tail=DATA_START, + seq_next=1, + commit=1, + ) + raw = rb._pack_meta(state) + corrupt_crc = bytearray(raw) + corrupt_crc[-8] ^= 0xFF + assert rb._unpack_meta(bytes(corrupt_crc)) is None + + bad_state = MetaState( + capacity=rb.capacity, + head=0, + tail=0, + seq_next=1, + commit=1, + ) + bad_raw = rb._pack_meta(bad_state) + assert rb._unpack_meta(bad_raw) is None + + +def test_make_space_handles_corrupt_tail_record(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + assert rb._data_mm is not None + struct.pack_into(" None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + assert rb._state is not None + start_commit = rb._state.commit + + def stuck(_off: int): + return ("rec", _off, 1, 0, {}) + + monkeypatch.setattr(rb, "_read_record", stuck) + rb._recover() + assert rb._state.commit == start_commit + 1 + assert rb._state.head == rb._state.tail + + +def test_recover_wrap_marker_advances(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + assert rb._state is not None + off = DATA_START + 64 + rb._write_wrap_marker(off) + rb._state.tail = off + rb._state.head = DATA_START + rb._recover() + assert rb._state.tail == off + assert rb._state.head == DATA_START + + +def test_recover_updates_seq_next(tmp_path: Path) -> None: + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + + with SDSavior(str(data), str(meta), 16 * 1024) as rb: + rb.append({"n": 1}) + last_seq = rb.append({"n": 2}) + assert rb._state is not None + rb._state.seq_next = 1 + rb._recover() + assert rb._state.seq_next == last_seq + 1 + + +def test_cli_returns_nonzero_for_unknown_command(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + cli.argparse.ArgumentParser, + "parse_args", + lambda _self: SimpleNamespace(cmd="noop"), + ) + assert cli.main() == 2 + + +def test_recover_treats_wrap_marker_at_data_start_as_corruption(tmp_path: Path) -> None: + """Ensure wrap markers at DATA_START do not cause recovery/iteration hangs.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + + with SDSavior(str(data), str(meta), capacity) as rb: + assert rb._data_mm is not None + struct.pack_into(" None: + """Ensure close releases resources even if metadata persistence raises.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + rb = SDSavior(str(data), str(meta), 256 * 1024) + rb.open() + + def raise_on_write_meta(_st) -> None: + """Force close() to execute cleanup via the finally branch.""" + raise RuntimeError("forced write_meta failure") + + monkeypatch.setattr(rb, "_write_meta", raise_on_write_meta) + + with pytest.raises(RuntimeError, match="forced write_meta failure"): + rb.close() + + assert rb._data_fd is None + assert rb._meta_fd is None + assert rb._data_mm is None + assert rb._meta_mm is None + assert rb._state is None + + +def test_skip_corrupt_single_record(tmp_path: Path) -> None: + """Skip a single corrupt record in the middle and yield surrounding records.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) + offsets = _record_offsets(rb) + assert len(offsets) >= 3 + + # Corrupt record 2 (index 1) + off2 = offsets[1] + assert rb._data_mm is not None + payload_off = off2 + RECORD_HDR_SIZE + rb._data_mm[payload_off] = rb._data_mm[payload_off] ^ 0x01 + rb._data_mm.flush() + + # With skip_corrupt=True, should get records 1 and 3 + rows = list(rb.iter_records(skip_corrupt=True)) + assert [r[2]["n"] for r in rows] == [1, 3] + assert rb._last_iter_skipped == 1 + + +def test_skip_corrupt_multiple_regions(tmp_path: Path) -> None: + """Skip multiple corrupt records and yield the remaining valid ones.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + for i in range(5): + rb.append({"n": i}) + offsets = _record_offsets(rb) + assert len(offsets) == 5 + + assert rb._data_mm is not None + # Corrupt records at index 1 and 3 + for idx in [1, 3]: + off = offsets[idx] + payload_off = off + RECORD_HDR_SIZE + rb._data_mm[payload_off] = rb._data_mm[payload_off] ^ 0x01 + rb._data_mm.flush() + + rows = list(rb.iter_records(skip_corrupt=True)) + assert [r[2]["n"] for r in rows] == [0, 2, 4] + assert rb._last_iter_skipped == 2 + + +def test_skip_corrupt_false_default_stops(tmp_path: Path) -> None: + """Default skip_corrupt=False still stops at first corruption.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + rb.append({"n": 1}) + rb.append({"n": 2}) + rb.append({"n": 3}) + offsets = _record_offsets(rb) + + assert rb._data_mm is not None + off2 = offsets[1] + payload_off = off2 + RECORD_HDR_SIZE + rb._data_mm[payload_off] = rb._data_mm[payload_off] ^ 0x01 + rb._data_mm.flush() + + # Default behavior stops at corruption + rows = list(rb.iter_records()) + assert [r[2]["n"] for r in rows] == [1] + assert rb._last_iter_skipped == 0 + + +def test_skip_corrupt_count_accurate(tmp_path: Path) -> None: + """Verify _last_iter_skipped count matches actual skipped regions.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + for i in range(6): + rb.append({"n": i}) + offsets = _record_offsets(rb) + + assert rb._data_mm is not None + # Corrupt records 1, 2, 4 (three separate corruptions) + for idx in [1, 2, 4]: + off = offsets[idx] + payload_off = off + RECORD_HDR_SIZE + rb._data_mm[payload_off] = rb._data_mm[payload_off] ^ 0x01 + rb._data_mm.flush() + + rows = list(rb.iter_records(skip_corrupt=True)) + assert [r[2]["n"] for r in rows] == [0, 3, 5] + assert rb._last_iter_skipped >= 2 # At least 2 skip events (records 1-2 may merge) + + +def test_skip_corrupt_large_corruption_span(tmp_path: Path) -> None: + """Handle corruption spanning a large region beyond 4096 bytes.""" + data = tmp_path / "ring.dat" + meta = tmp_path / "ring.meta" + capacity = 256 * 1024 + + with SDSavior(str(data), str(meta), capacity) as rb: + # Write records with large payloads so corruption span > 4096 + rb.append({"n": 0}) + for i in range(1, 8): + rb.append({"n": i, "pad": "x" * 800}) + rb.append({"n": 8}) + offsets = _record_offsets(rb) + + assert rb._data_mm is not None + # Corrupt several consecutive large records (spanning well over 4096 bytes) + for idx in range(1, 8): + off = offsets[idx] + payload_off = off + RECORD_HDR_SIZE + rb._data_mm[payload_off] = rb._data_mm[payload_off] ^ 0x01 + rb._data_mm.flush() + + rows = list(rb.iter_records(skip_corrupt=True)) + nums = [r[2]["n"] for r in rows] + assert 0 in nums + assert 8 in nums + assert rb._last_iter_skipped > 0