From c4d78f6d9f3f28d384eae7af1240efbd330bc9ee Mon Sep 17 00:00:00 2001 From: Nico Kemnitz Date: Fri, 20 Mar 2026 18:40:32 +0100 Subject: [PATCH] feat: volumetric tabular layer (parquet, csv, json) --- tests/unit/layer/volumetric/test_tabular.py | 617 ++++++++++++++++++ zetta_utils/layer/volumetric/__init__.py | 1 + .../layer/volumetric/tabular/__init__.py | 3 + .../layer/volumetric/tabular/backend.py | 194 ++++++ zetta_utils/layer/volumetric/tabular/build.py | 162 +++++ zetta_utils/layer/volumetric/tabular/layer.py | 36 + 6 files changed, 1013 insertions(+) create mode 100644 tests/unit/layer/volumetric/test_tabular.py create mode 100644 zetta_utils/layer/volumetric/tabular/__init__.py create mode 100644 zetta_utils/layer/volumetric/tabular/backend.py create mode 100644 zetta_utils/layer/volumetric/tabular/build.py create mode 100644 zetta_utils/layer/volumetric/tabular/layer.py diff --git a/tests/unit/layer/volumetric/test_tabular.py b/tests/unit/layer/volumetric/test_tabular.py new file mode 100644 index 000000000..3fe27a8b7 --- /dev/null +++ b/tests/unit/layer/volumetric/test_tabular.py @@ -0,0 +1,617 @@ +import numpy as np +import pandas as pd +import pytest + +from zetta_utils.geometry import BBox3D, Vec3D +from zetta_utils.layer.volumetric.index import VolumetricIndex +from zetta_utils.layer.volumetric.tabular.backend import TabularBackend, read_info +from zetta_utils.layer.volumetric.tabular.build import build_volumetric_tabular_layer + +SAMPLE_SCHEMA = ( + {"name": "segment_id", "dtype": "int64"}, + {"name": "score", "dtype": "float64"}, + {"name": "label", "dtype": "object"}, +) + +UINT64_SCHEMA = ( + {"name": "seg_id", "dtype": "uint64"}, + {"name": "count", "dtype": "uint64"}, +) + + +def _make_idx(start=(0, 0, 0), end=(64, 64, 40), resolution=(4, 4, 40)): + return VolumetricIndex.from_coords( + start_coord=start, end_coord=end, resolution=Vec3D(*resolution) + ) + + +def _make_sample_df(): + return pd.DataFrame( + { + "segment_id": [1, 2, 3, 4, 5], + "score": [0.1, 0.2, 0.3, 0.4, 0.5], + "label": ["a", "b", "c", "d", "e"], + } + ) + + +def _make_uint64_df(): + """DataFrame with uint64 values beyond float64 precision (>2^53).""" + return pd.DataFrame( + { + "seg_id": np.array( + [ + 2 ** 53 + 1, # 9007199254740993 - first non-exact float64 integer + 2 ** 53 + 5, + 2 ** 63, # 9223372036854775808 - exceeds int64 max + 2 ** 64 - 2, # near uint64 max + 0, + ], + dtype=np.uint64, + ), + "count": np.array([10, 20, 30, 40, 50], dtype=np.uint64), + } + ) + + +def _make_backend(tmp_path, encoding="parquet", column_schema=(), **kwargs): + return TabularBackend( + path=str(tmp_path), + resolution=Vec3D(4, 4, 40), + voxel_offset=Vec3D(0, 0, 0), + size=Vec3D(256, 256, 100), + chunk_size=Vec3D(64, 64, 40), + encoding=encoding, + column_schema=column_schema, + **kwargs, + ) + + +class TestTabularBackendParquet: + def test_write_read(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + idx = _make_idx() + df = _make_sample_df() + + backend.write(idx, df) + result = backend.read(idx) + + pd.testing.assert_frame_equal(result, df) + + def test_chunk_file_exists(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + idx = _make_idx() + backend.write(idx, _make_sample_df()) + + chunk_file = tmp_path / "data" / "0-64_0-64_0-40.parquet" + assert chunk_file.exists() + + +class TestTabularBackendCSV: + def test_write_read(self, tmp_path): + backend = _make_backend(tmp_path, encoding="csv", column_schema=SAMPLE_SCHEMA) + idx = _make_idx() + df = _make_sample_df() + + backend.write(idx, df) + result = backend.read(idx) + + pd.testing.assert_frame_equal(result, df) + + def test_chunk_file_exists(self, tmp_path): + backend = _make_backend(tmp_path, encoding="csv", column_schema=SAMPLE_SCHEMA) + idx = _make_idx() + backend.write(idx, _make_sample_df()) + + chunk_file = tmp_path / "data" / "0-64_0-64_0-40.csv" + assert chunk_file.exists() + + +class TestTabularBackendJSON: + def test_write_read(self, tmp_path): + backend = _make_backend(tmp_path, encoding="json", column_schema=SAMPLE_SCHEMA) + idx = _make_idx() + df = _make_sample_df() + + backend.write(idx, df) + result = backend.read(idx) + + pd.testing.assert_frame_equal(result, df) + + def test_chunk_file_exists(self, tmp_path): + backend = _make_backend(tmp_path, encoding="json", column_schema=SAMPLE_SCHEMA) + idx = _make_idx() + backend.write(idx, _make_sample_df()) + + chunk_file = tmp_path / "data" / "0-64_0-64_0-40.json" + assert chunk_file.exists() + + +class TestTabularBackendReadMissing: + def test_read_missing_chunk_returns_empty(self, tmp_path): + backend = _make_backend(tmp_path) + idx = _make_idx() + result = backend.read(idx) + assert isinstance(result, pd.DataFrame) + assert len(result) == 0 + + +class TestTabularBackendDeleteEmpty: + def test_write_empty_deletes_chunk(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + idx = _make_idx() + + backend.write(idx, _make_sample_df()) + chunk_file = tmp_path / "data" / "0-64_0-64_0-40.parquet" + assert chunk_file.exists() + + backend.write(idx, pd.DataFrame()) + assert not chunk_file.exists() + + def test_write_empty_keeps_chunk_when_disabled(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet", delete_empty_uploads=False) + idx = _make_idx() + + backend.write(idx, _make_sample_df()) + chunk_file = tmp_path / "data" / "0-64_0-64_0-40.parquet" + assert chunk_file.exists() + + backend.write(idx, pd.DataFrame()) + assert chunk_file.exists() + + +class TestTabularBackendInfo: + def test_info_roundtrip(self, tmp_path): + backend = _make_backend(tmp_path, encoding="csv", column_schema=UINT64_SCHEMA) + backend.write_info() + + info = read_info(str(tmp_path)) + assert info["type"] == "volumetric_tabular" + assert info["encoding"] == "csv" + assert info["resolution"] == [4, 4, 40] + assert info["voxel_offset"] == [0, 0, 0] + assert info["size"] == [256, 256, 100] + assert info["chunk_size"] == [64, 64, 40] + assert info["column_schema"] == list(UINT64_SCHEMA) + + def test_from_path(self, tmp_path): + backend = _make_backend(tmp_path, encoding="json", column_schema=UINT64_SCHEMA) + backend.write_info() + + loaded = TabularBackend.from_path(str(tmp_path)) + assert loaded.encoding == "json" + assert loaded.resolution == Vec3D(4, 4, 40) + assert loaded.voxel_offset == Vec3D(0, 0, 0) + assert loaded.size == Vec3D(256, 256, 100) + assert loaded.chunk_size == Vec3D(64, 64, 40) + assert loaded.column_schema == UINT64_SCHEMA + + +class TestTabularBackendDelete: + def test_delete_all(self, tmp_path): + backend = _make_backend(tmp_path) + backend.write_info() + backend.write(_make_idx(), _make_sample_df()) + + assert (tmp_path / "info").exists() + assert (tmp_path / "data" / "0-64_0-64_0-40.parquet").exists() + + backend.delete() + + assert not (tmp_path / "info").exists() + assert not (tmp_path / "data").exists() + + +class TestTabularBackendWithChanges: + def test_with_changes(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + changed = backend.with_changes(encoding="csv") + assert changed.encoding == "csv" + assert changed.path == backend.path + + +class TestTabularBackendName: + def test_name_property(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + assert "TabularBackend[" in backend.name + assert str(tmp_path) in backend.name + + +class TestTabularBackendValidation: + def test_invalid_encoding_raises(self, tmp_path): + with pytest.raises(ValueError, match="encoding must be one of"): + _make_backend(tmp_path, encoding="xml") + + +class TestUint64Roundtrip: + """Verify that uint64 values > 2^53 survive serialization without precision loss.""" + + @pytest.mark.parametrize("encoding", ["parquet", "csv", "json"]) + def test_uint64_roundtrip(self, tmp_path, encoding): + backend = _make_backend(tmp_path, encoding=encoding, column_schema=UINT64_SCHEMA) + idx = _make_idx() + df = _make_uint64_df() + + backend.write(idx, df) + result = backend.read(idx) + + np.testing.assert_array_equal(result["seg_id"].to_numpy(), df["seg_id"].to_numpy()) + np.testing.assert_array_equal(result["count"].to_numpy(), df["count"].to_numpy()) + assert result["seg_id"].dtype == np.uint64 + assert result["count"].dtype == np.uint64 + + +class TestFormatConversion: + """Write with one encoding, read back, re-write with another, verify data survives.""" + + @pytest.mark.parametrize( + "src_enc,dst_enc", + [ + ("parquet", "csv"), + ("parquet", "json"), + ("csv", "parquet"), + ("csv", "json"), + ("json", "parquet"), + ("json", "csv"), + ], + ) + def test_convert_format(self, tmp_path, src_enc, dst_enc): + src_dir = tmp_path / "src" + dst_dir = tmp_path / "dst" + idx = _make_idx() + df = _make_sample_df() + + src_backend = _make_backend(src_dir, encoding=src_enc, column_schema=SAMPLE_SCHEMA) + src_backend.write(idx, df) + intermediate = src_backend.read(idx) + + dst_backend = _make_backend(dst_dir, encoding=dst_enc, column_schema=SAMPLE_SCHEMA) + dst_backend.write(idx, intermediate) + result = dst_backend.read(idx) + + pd.testing.assert_frame_equal(result, df) + + @pytest.mark.parametrize( + "src_enc,dst_enc", + [ + ("parquet", "csv"), + ("parquet", "json"), + ("csv", "parquet"), + ("csv", "json"), + ("json", "parquet"), + ("json", "csv"), + ], + ) + def test_convert_format_uint64(self, tmp_path, src_enc, dst_enc): + src_dir = tmp_path / "src" + dst_dir = tmp_path / "dst" + idx = _make_idx() + df = _make_uint64_df() + + src_backend = _make_backend(src_dir, encoding=src_enc, column_schema=UINT64_SCHEMA) + src_backend.write(idx, df) + intermediate = src_backend.read(idx) + + dst_backend = _make_backend(dst_dir, encoding=dst_enc, column_schema=UINT64_SCHEMA) + dst_backend.write(idx, intermediate) + result = dst_backend.read(idx) + + np.testing.assert_array_equal(result["seg_id"].to_numpy(), df["seg_id"].to_numpy()) + assert result["seg_id"].dtype == np.uint64 + + +class TestChunkSizeConversion: + """Write data across multiple chunks, then re-chunk into a different chunk size.""" + + def test_rechunk(self, tmp_path): + src_dir = tmp_path / "src" + dst_dir = tmp_path / "dst" + + src_backend = TabularBackend( + path=str(src_dir), + resolution=Vec3D(4, 4, 40), + voxel_offset=Vec3D(0, 0, 0), + size=Vec3D(128, 64, 40), + chunk_size=Vec3D(64, 64, 40), + encoding="parquet", + ) + + df_chunk0 = pd.DataFrame({"x": [1, 2], "val": [10, 20]}) + df_chunk1 = pd.DataFrame({"x": [3, 4], "val": [30, 40]}) + idx0 = _make_idx(start=(0, 0, 0), end=(64, 64, 40)) + idx1 = _make_idx(start=(64, 0, 0), end=(128, 64, 40)) + + src_backend.write(idx0, df_chunk0) + src_backend.write(idx1, df_chunk1) + + # Re-chunk into a single larger chunk + dst_backend = TabularBackend( + path=str(dst_dir), + resolution=Vec3D(4, 4, 40), + voxel_offset=Vec3D(0, 0, 0), + size=Vec3D(128, 64, 40), + chunk_size=Vec3D(128, 64, 40), + encoding="parquet", + ) + + combined = pd.concat([src_backend.read(idx0), src_backend.read(idx1)], ignore_index=True) + dst_idx = _make_idx(start=(0, 0, 0), end=(128, 64, 40)) + dst_backend.write(dst_idx, combined) + + result = dst_backend.read(dst_idx) + assert len(result) == 4 + assert list(result["x"]) == [1, 2, 3, 4] + assert list(result["val"]) == [10, 20, 30, 40] + + def test_rechunk_with_format_change_uint64(self, tmp_path): + """Rechunk AND change format simultaneously, preserving uint64.""" + src_dir = tmp_path / "src" + dst_dir = tmp_path / "dst" + + src_backend = TabularBackend( + path=str(src_dir), + resolution=Vec3D(4, 4, 40), + voxel_offset=Vec3D(0, 0, 0), + size=Vec3D(128, 64, 40), + chunk_size=Vec3D(128, 64, 40), + encoding="parquet", + column_schema=UINT64_SCHEMA, + ) + + df = _make_uint64_df() + big_idx = _make_idx(start=(0, 0, 0), end=(128, 64, 40)) + src_backend.write(big_idx, df) + + # Split into two smaller csv chunks + dst_backend = TabularBackend( + path=str(dst_dir), + resolution=Vec3D(4, 4, 40), + voxel_offset=Vec3D(0, 0, 0), + size=Vec3D(128, 64, 40), + chunk_size=Vec3D(64, 64, 40), + encoding="csv", + column_schema=UINT64_SCHEMA, + ) + + full_data = src_backend.read(big_idx) + half = len(full_data) // 2 + idx0 = _make_idx(start=(0, 0, 0), end=(64, 64, 40)) + idx1 = _make_idx(start=(64, 0, 0), end=(128, 64, 40)) + dst_backend.write(idx0, full_data.iloc[:half].reset_index(drop=True)) + dst_backend.write(idx1, full_data.iloc[half:].reset_index(drop=True)) + + recombined = pd.concat([dst_backend.read(idx0), dst_backend.read(idx1)], ignore_index=True) + np.testing.assert_array_equal( + recombined["seg_id"].to_numpy(), + df["seg_id"].to_numpy(), + ) + assert recombined["seg_id"].dtype == np.uint64 + + +BUILDER_SCHEMA = ( + {"name": "x", "dtype": "int64"}, + {"name": "y", "dtype": "float64"}, +) + + +class TestTabularBackendDeleteChunkMissing: + def test_delete_chunk_nonexistent_is_noop(self, tmp_path): + backend = _make_backend(tmp_path, encoding="parquet") + idx = _make_idx() + # Should not raise even if chunk doesn't exist + backend.delete_chunk(idx) + + +class TestReadInfoErrors: + def test_read_info_missing_path(self, tmp_path): + with pytest.raises(FileNotFoundError): + read_info(str(tmp_path / "nonexistent")) + + def test_read_info_empty_file(self, tmp_path): + (tmp_path / "info").write_text("") + with pytest.raises(FileNotFoundError): + read_info(str(tmp_path)) + + +class TestBuildVolumetricTabularLayer: + def test_write_mode(self, tmp_path): + layer = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + encoding="parquet", + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + assert not layer.readonly + assert (tmp_path / "info").exists() + + def test_read_mode(self, tmp_path): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + layer = build_volumetric_tabular_layer( + path=str(tmp_path), + mode="read", + ) + assert layer.readonly + assert layer.backend.column_schema == BUILDER_SCHEMA + + def test_read_mode_fails_without_info(self, tmp_path): + with pytest.raises(FileNotFoundError): + build_volumetric_tabular_layer( + path=str(tmp_path / "nonexistent"), + mode="read", + ) + + def test_write_mode_matching_info_succeeds(self, tmp_path): + """Write mode with matching existing info should succeed (idempotent).""" + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + # Same params again should succeed + layer = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + assert not layer.readonly + + def test_write_mode_mismatched_info_raises(self, tmp_path): + """Write mode with different params should raise.""" + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + with pytest.raises(RuntimeError, match="do not match"): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + encoding="csv", # different encoding + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + + def test_write_mode_mismatched_with_overwrite(self, tmp_path): + """Write mode with different params + info_overwrite should succeed.""" + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + layer = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + encoding="csv", + column_schema=list(BUILDER_SCHEMA), + mode="write", + info_overwrite=True, + ) + assert layer.backend.encoding == "csv" + + def test_write_mode_requires_column_schema(self, tmp_path): + with pytest.raises(ValueError, match="column_schema"): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + mode="write", + ) + + def test_replace_mode(self, tmp_path): + layer1 = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + encoding="parquet", + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + idx = _make_idx() + layer1.backend.write(idx, _make_sample_df()) + + layer2 = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + encoding="csv", + column_schema=list(SAMPLE_SCHEMA), + mode="replace", + ) + assert layer2.backend.encoding == "csv" + assert not (tmp_path / "data" / "0-64_0-64_0-40.parquet").exists() + + def test_write_mode_requires_resolution(self, tmp_path): + with pytest.raises(ValueError, match="resolution"): + build_volumetric_tabular_layer( + path=str(tmp_path), + chunk_size=[64, 64, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + + def test_write_mode_requires_chunk_size(self, tmp_path): + with pytest.raises(ValueError, match="chunk_size"): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + voxel_offset=[0, 0, 0], + dataset_size=[256, 256, 100], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + + def test_bbox_with_voxel_offset_raises(self, tmp_path): + with pytest.raises(ValueError, match="voxel_offset"): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + bbox=BBox3D(bounds=((0, 1024), (0, 1024), (0, 4000))), + voxel_offset=[0, 0, 0], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + + def test_no_bbox_or_voxel_offset_raises(self, tmp_path): + with pytest.raises(ValueError, match="bbox"): + build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + + def test_build_with_bbox(self, tmp_path): + layer = build_volumetric_tabular_layer( + path=str(tmp_path), + resolution=[4, 4, 40], + chunk_size=[64, 64, 40], + bbox=BBox3D(bounds=((0, 1024), (0, 1024), (0, 4000))), + column_schema=list(BUILDER_SCHEMA), + mode="write", + ) + assert layer.backend.voxel_offset == Vec3D(0, 0, 0) + assert layer.backend.size == Vec3D(256, 256, 100) diff --git a/zetta_utils/layer/volumetric/__init__.py b/zetta_utils/layer/volumetric/__init__.py index a478bc446..4b5720a67 100644 --- a/zetta_utils/layer/volumetric/__init__.py +++ b/zetta_utils/layer/volumetric/__init__.py @@ -29,6 +29,7 @@ from .layer_set import VolumetricLayerSet, build_volumetric_layer_set from .annotation import VolumetricAnnotationLayer, LineAnnotation from .protocols import VolumetricBasedLayerProtocol +from .tabular import TabularBackend, VolumetricTabularLayer, build_volumetric_tabular_layer VolumetricLayerDType = torch.Tensor to_vol_layer_dtype = convert.to_torch diff --git a/zetta_utils/layer/volumetric/tabular/__init__.py b/zetta_utils/layer/volumetric/tabular/__init__.py new file mode 100644 index 000000000..7e433440b --- /dev/null +++ b/zetta_utils/layer/volumetric/tabular/__init__.py @@ -0,0 +1,3 @@ +from .backend import TabularBackend +from .build import build_volumetric_tabular_layer +from .layer import VolumetricTabularLayer diff --git a/zetta_utils/layer/volumetric/tabular/backend.py b/zetta_utils/layer/volumetric/tabular/backend.py new file mode 100644 index 000000000..d6ea9419e --- /dev/null +++ b/zetta_utils/layer/volumetric/tabular/backend.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import io +import json +import os +from typing import Any + +import attrs +import pandas as pd +from cloudfiles import CloudFiles + +from zetta_utils.common import abspath, is_local +from zetta_utils.common.path import strip_prefix +from zetta_utils.geometry import Vec3D +from zetta_utils.layer.backend_base import Backend +from zetta_utils.layer.volumetric.index import VolumetricIndex + +VALID_ENCODINGS = ("parquet", "csv", "json") + + +def read_info(path: str) -> dict[str, Any]: + """Read tabular layer info file from path.""" + cf = CloudFiles(abspath(path)) + raw = cf.get("info") + if raw is None or len(raw) == 0: + raise FileNotFoundError(f"Info file not found at {path}/info") + return json.loads(raw.decode("utf-8")) + + +def _validate_encoding( + instance, attribute, value +): # noqa: ARG001 # pylint: disable=unused-argument + if value not in VALID_ENCODINGS: + raise ValueError(f"encoding must be one of {VALID_ENCODINGS}, got {value!r}") + + +def _dtypes_from_column_schema( + column_schema: tuple[dict[str, str], ...], +) -> dict[str, str]: + return {entry["name"]: entry["dtype"] for entry in column_schema if "dtype" in entry} + + +@attrs.frozen +class TabularBackend(Backend[VolumetricIndex, pd.DataFrame, pd.DataFrame]): + path: str + resolution: Vec3D + voxel_offset: Vec3D[int] + size: Vec3D[int] + chunk_size: Vec3D[int] + encoding: str = attrs.field(default="parquet", validator=_validate_encoding) + column_schema: tuple[dict[str, str], ...] = () + delete_empty_uploads: bool = True + + @property + def name(self) -> str: + return f"TabularBackend[{self.path}]" + + @property + def _file_extension(self) -> str: + return self.encoding + + def _get_cf(self) -> CloudFiles: + return CloudFiles(abspath(self.path)) + + def _chunk_relative_path(self, idx: VolumetricIndex) -> str: + start = idx.start + end = idx.stop + fname = ( + f"{int(start[0])}-{int(end[0])}" + f"_{int(start[1])}-{int(end[1])}" + f"_{int(start[2])}-{int(end[2])}.{self._file_extension}" + ) + return f"data/{fname}" + + def _serialize(self, data: pd.DataFrame) -> bytes: + buf = io.BytesIO() + if self.encoding == "parquet": + data.to_parquet(buf, index=False) + elif self.encoding == "csv": + data.to_csv(buf, index=False) + elif self.encoding == "json": + data.to_json(buf, orient="records") + buf.seek(0) + return buf.getvalue() + + def _deserialize(self, raw: bytes) -> pd.DataFrame: + buf = io.BytesIO(raw) + if self.encoding == "parquet": + return pd.read_parquet(buf) + elif self.encoding == "csv": + # Read as string to avoid lossy type inference for large integers + df = pd.read_csv(buf, dtype=str) + elif self.encoding == "json": + df = pd.read_json(buf, orient="records", dtype=str) + else: # pragma: no cover + raise ValueError(f"Unknown encoding: {self.encoding!r}") + + # Restore dtypes from column_schema (critical for uint64 in CSV/JSON) + dtypes = _dtypes_from_column_schema(self.column_schema) + for col, dtype_str in dtypes.items(): + if col in df.columns: + df[col] = df[col].astype(dtype_str) + return df + + def write_info(self) -> None: + """Write info file to disk.""" + info: dict[str, Any] = { + "type": "volumetric_tabular", + "encoding": self.encoding, + "resolution": list(self.resolution), + "voxel_offset": list(self.voxel_offset), + "size": list(self.size), + "chunk_size": list(self.chunk_size), + "column_schema": list(self.column_schema), + } + if is_local(self.path): + os.makedirs(strip_prefix(abspath(self.path)), exist_ok=True) + cf = self._get_cf() + cf.put( + "info", + json.dumps(info, indent=2).encode("utf-8"), + cache_control="no-cache, no-store, max-age=0, must-revalidate", + ) + + @classmethod + def from_path(cls, path: str, **overrides) -> TabularBackend: + """Load backend from existing info file.""" + info = read_info(path) + return cls( + path=path, + resolution=Vec3D(*info["resolution"]), + voxel_offset=Vec3D(*info["voxel_offset"]), + size=Vec3D(*info["size"]), + chunk_size=Vec3D(*info["chunk_size"]), + encoding=info.get("encoding", "parquet"), + column_schema=tuple( + {"name": e["name"], "dtype": e["dtype"]} + for e in info.get("column_schema", []) + if "name" in e and "dtype" in e + ), + **overrides, + ) + + def read(self, idx: VolumetricIndex) -> pd.DataFrame: + cf = self._get_cf() + rel_path = self._chunk_relative_path(idx) + raw = cf.get(rel_path) + if raw is None or len(raw) == 0: + return pd.DataFrame() + return self._deserialize(raw) + + def write(self, idx: VolumetricIndex, data: pd.DataFrame) -> None: + if len(data) == 0 and self.delete_empty_uploads: + self.delete_chunk(idx) + return + + if is_local(self.path): + data_dir = os.path.join(strip_prefix(abspath(self.path)), "data") + os.makedirs(data_dir, exist_ok=True) + + cf = self._get_cf() + rel_path = self._chunk_relative_path(idx) + cf.put( + rel_path, + self._serialize(data), + cache_control="no-cache, no-store, max-age=0, must-revalidate", + ) + + def delete(self) -> None: + path = abspath(self.path) + cf = CloudFiles(path) + file_list = list(cf.list()) + if file_list: + cf.delete(file_list) + if is_local(self.path): + local_path = strip_prefix(path) + if os.path.isdir(local_path): + for root, dirs, _ in os.walk(local_path, topdown=False): + for directory in dirs: + try: + os.rmdir(os.path.join(root, directory)) + except OSError: # pragma: no cover + pass + + def delete_chunk(self, idx: VolumetricIndex) -> None: + cf = self._get_cf() + rel_path = self._chunk_relative_path(idx) + cf.delete(rel_path) + + def with_changes(self, **kwargs) -> TabularBackend: + return attrs.evolve(self, **kwargs) + + def pformat(self) -> str: # pragma: no cover + return self.name diff --git a/zetta_utils/layer/volumetric/tabular/build.py b/zetta_utils/layer/volumetric/tabular/build.py new file mode 100644 index 000000000..2701eff34 --- /dev/null +++ b/zetta_utils/layer/volumetric/tabular/build.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import math +from typing import Literal, Sequence + +from zetta_utils import builder +from zetta_utils.geometry import BBox3D, IntVec3D, Vec3D +from zetta_utils.layer.volumetric.tabular.backend import TabularBackend, read_info +from zetta_utils.layer.volumetric.tabular.layer import VolumetricTabularLayer + + +def _resolve_bounds( + bbox: BBox3D | None, + voxel_offset: Sequence[int] | None, + dataset_size: Sequence[int] | None, + res: Vec3D, +) -> tuple[IntVec3D, IntVec3D]: + """Convert bbox or voxel_offset+dataset_size into (voxel_offset, size) Vec3Ds.""" + if bbox is not None: + if voxel_offset is not None or dataset_size is not None: + raise ValueError( + "When `bbox` is provided, `voxel_offset` and `dataset_size` should not be." + ) + vo = IntVec3D(*(math.floor(s / r) for s, r in zip(bbox.start, res))) + sz = IntVec3D(*(math.ceil(s / r) for s, r in zip(bbox.shape, res))) + return vo, sz + + if voxel_offset is None or dataset_size is None: + raise ValueError("Either `bbox` or (`voxel_offset` + `dataset_size`) is required") + return IntVec3D(*voxel_offset), IntVec3D(*dataset_size) + + +def _validate_info_compatibility( + existing_info: dict, new_info: dict, info_overwrite: bool +) -> None: + """Validate that new layer params are compatible with existing info file.""" + keys_to_check = [ + "encoding", + "resolution", + "voxel_offset", + "size", + "chunk_size", + "column_schema", + ] + diffs = {} + for key in keys_to_check: + existing_val = existing_info.get(key) + new_val = new_info.get(key) + if existing_val != new_val: + diffs[key] = {"existing": existing_val, "new": new_val} + + if diffs and not info_overwrite: + diff_str = "\n".join(f" {k}: {v['existing']} -> {v['new']}" for k, v in diffs.items()) + raise RuntimeError( + f"New layer parameters do not match existing info file. " + f"Set info_overwrite=True to overwrite.\nDifferences:\n{diff_str}" + ) + + +@builder.register("build_volumetric_tabular_layer") +def build_volumetric_tabular_layer( + path: str, + resolution: Sequence[float] | None = None, + chunk_size: Sequence[int] | None = None, + bbox: BBox3D | None = None, + voxel_offset: Sequence[int] | None = None, + dataset_size: Sequence[int] | None = None, + encoding: str = "parquet", + column_schema: Sequence[dict[str, str]] | None = None, + mode: Literal["read", "write", "replace"] = "write", + delete_empty_uploads: bool = True, + info_overwrite: bool = False, +) -> VolumetricTabularLayer: + """Build a VolumetricTabularLayer for reading/writing chunked tabular data. + + Spatial bounds can be specified via (bbox + resolution) or + (resolution + voxel_offset + dataset_size). + + :param path: Root directory for data files and info file. + :param resolution: Voxel resolution in nm (x, y, z). + :param chunk_size: Chunk size in voxels (x, y, z). + :param bbox: Bounding box for the dataset. + :param voxel_offset: Start of the dataset volume in voxels. + :param dataset_size: Dataset size in voxels. + :param encoding: File format for data files: "parquet", "csv", or "json". + :param column_schema: List of {"name": str, "dtype": str} dicts describing columns. + Required for write/replace mode. Specifies column names and numpy dtypes. + Critical for CSV/JSON formats to preserve types (especially uint64). + :param mode: How the layer should be opened: + - "read": for reading only; info file must exist. + - "write": for writing; writes info file, fails if it already exists. + - "replace": for writing; clears existing data and rewrites info file. + :param delete_empty_uploads: When True, writing an empty DataFrame deletes the chunk file. + :param info_overwrite: When True, allow overwriting an existing info file with + different parameters. When False, raise if new params don't match existing. + """ + existing_info = None + try: + existing_info = read_info(path) + except (FileNotFoundError, KeyError, OSError): + pass + + if mode == "read": + if existing_info is None: + raise FileNotFoundError(f"Tabular layer info not found at {path}") + backend = TabularBackend.from_path(path, delete_empty_uploads=delete_empty_uploads) + return VolumetricTabularLayer(backend=backend, readonly=True) + + # write / replace modes + if resolution is None: + raise ValueError("`resolution` is required for write/replace mode") + if chunk_size is None: + raise ValueError("`chunk_size` is required for write/replace mode") + if column_schema is None: + raise ValueError("`column_schema` is required for write/replace mode") + + res = Vec3D(*resolution) + cs = Vec3D(*chunk_size) + vo, sz = _resolve_bounds(bbox, voxel_offset, dataset_size, res) + + schema = tuple(column_schema) + + if mode == "write" and existing_info is not None: + new_info = { + "encoding": encoding, + "resolution": list(res), + "voxel_offset": list(vo), + "size": list(sz), + "chunk_size": list(cs), + "column_schema": list(schema), + } + _validate_info_compatibility(existing_info, new_info, info_overwrite) + if not info_overwrite: + # Info matches, reuse existing layer without rewriting info + backend = TabularBackend( + path=path, + resolution=res, + voxel_offset=vo, + size=sz, + chunk_size=cs, + encoding=encoding, + column_schema=schema, + delete_empty_uploads=delete_empty_uploads, + ) + return VolumetricTabularLayer(backend=backend) + + if mode == "replace" and existing_info is not None: + old_backend = TabularBackend.from_path(path, delete_empty_uploads=delete_empty_uploads) + old_backend.delete() + + backend = TabularBackend( + path=path, + resolution=res, + voxel_offset=vo, + size=sz, + chunk_size=cs, + encoding=encoding, + column_schema=schema, + delete_empty_uploads=delete_empty_uploads, + ) + backend.write_info() + return VolumetricTabularLayer(backend=backend) diff --git a/zetta_utils/layer/volumetric/tabular/layer.py b/zetta_utils/layer/volumetric/tabular/layer.py new file mode 100644 index 000000000..62f9bf523 --- /dev/null +++ b/zetta_utils/layer/volumetric/tabular/layer.py @@ -0,0 +1,36 @@ +from __future__ import annotations + +from typing import Union + +import attrs +import pandas as pd + +from zetta_utils.layer.layer_base import Layer +from zetta_utils.layer.tools_base import ( + DataProcessor, + IndexProcessor, + JointIndexDataProcessor, +) +from zetta_utils.layer.volumetric.index import VolumetricIndex +from zetta_utils.layer.volumetric.tabular.backend import TabularBackend + +TabularDataProcT = Union[ + DataProcessor[pd.DataFrame], + JointIndexDataProcessor[pd.DataFrame, VolumetricIndex], +] + + +@attrs.frozen +class VolumetricTabularLayer(Layer[VolumetricIndex, pd.DataFrame, pd.DataFrame]): + backend: TabularBackend + readonly: bool = False + + index_procs: tuple[IndexProcessor[VolumetricIndex], ...] = () + read_procs: tuple[TabularDataProcT, ...] = () + write_procs: tuple[TabularDataProcT, ...] = () + + def pformat(self) -> str: # pragma: no cover + return self.backend.pformat() + + def with_changes(self, **kwargs): + return attrs.evolve(self, **kwargs) # pragma: no cover