Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ cython_debug/

# PyPI configuration file
.pypirc

# WFDB test input data (add back after approved PR to avoid large diffs while reviewing)
tests/data/input/mitdb_wfdb/
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"mlcroissant >= 1.0.0",
"pandas >= 1.3.0",
"rich >= 13.0.0",
"wfdb >= 4.0.0",
]

# Optional dependencies, installed via `pip install '.[test]'`
Expand Down
2 changes: 2 additions & 0 deletions src/croissant_maker/handlers/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def register_all_handlers() -> None:
"""
# Import and register all handlers here
from croissant_maker.handlers.csv_handler import CSVHandler
from croissant_maker.handlers.wfdb_handler import WFDBHandler

register_handler(CSVHandler())
register_handler(WFDBHandler())

# Future handlers go here. Example:
# from croissant_maker.handlers.parquet_handler import ParquetHandler
Expand Down
104 changes: 104 additions & 0 deletions src/croissant_maker/handlers/wfdb_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""WFDB file handler for physiological waveform data."""

from pathlib import Path
import wfdb

from croissant_maker.handlers.base_handler import FileTypeHandler
from croissant_maker.handlers.utils import compute_file_hash


class WFDBHandler(FileTypeHandler):
"""
Handler for WFDB format files (PhysioNet waveform databases).

WFDB records consist of multiple related files:
- .hea: Header file (text, contains metadata)
- .dat: Data file (binary, contains signals)
- .atr: Annotation file (optional, contains beat/event annotations)

The 'related_files' pattern in extract_metadata() allows handlers to indicate
that multiple physical files form a single logical record. This is a general
capability that other multi-file formats (DICOM series, HDF5 with external
links, Parquet partitions) can also use. Each related file becomes a separate
cr:FileObject in the Croissant metadata, but they all describe one RecordSet.
"""

def can_handle(self, file_path: Path) -> bool:
return file_path.suffix.lower() == ".hea"

def extract_metadata(self, file_path: Path) -> dict:
if not file_path.exists():
raise FileNotFoundError(f"WFDB header file not found: {file_path}")

record_path = file_path.with_suffix("")
record = wfdb.rdheader(str(record_path))

dat_file = file_path.with_suffix(".dat")
if not dat_file.exists():
raise ValueError(f"WFDB data file missing for header: {file_path}")

# Build list of related files that form this logical WFDB record.
# Each becomes a separate cr:FileObject in Croissant metadata.
related_files = [
{
"path": str(dat_file),
"name": dat_file.name,
"type": "data",
"encoding": "application/x-wfdb-data",
"size": dat_file.stat().st_size,
"sha256": compute_file_hash(dat_file),
}
]

atr_file = file_path.with_suffix(".atr")
if atr_file.exists():
related_files.append(
{
"path": str(atr_file),
"name": atr_file.name,
"type": "annotation",
"encoding": "application/x-wfdb-annotation",
"size": atr_file.stat().st_size,
"sha256": compute_file_hash(atr_file),
}
)

signal_types = {sig: "sc:Float" for sig in record.sig_name}
duration_seconds = record.sig_len / record.fs if record.fs > 0 else 0

metadata = {
"file_path": str(file_path),
"file_name": file_path.name,
"file_size": file_path.stat().st_size,
"sha256": compute_file_hash(file_path),
"encoding_format": "application/x-wfdb-header",
"record_name": record.record_name,
"related_files": related_files,
"signal_names": record.sig_name,
"signal_types": signal_types,
"units": record.units,
"sampling_frequency": record.fs,
"num_samples": record.sig_len,
"num_signals": record.n_sig,
"duration_seconds": duration_seconds,
"comments": record.comments if record.comments else [],
}

if hasattr(record, "base_datetime") and record.base_datetime:
metadata["base_datetime"] = record.base_datetime.isoformat()
if hasattr(record, "base_date") and record.base_date:
metadata["base_date"] = str(record.base_date)
if hasattr(record, "base_time") and record.base_time:
metadata["base_time"] = str(record.base_time)
if hasattr(record, "adc_gain") and record.adc_gain:
metadata["adc_gain"] = record.adc_gain
if hasattr(record, "baseline") and record.baseline:
metadata["baseline"] = record.baseline
if hasattr(record, "init_value") and record.init_value:
metadata["init_value"] = record.init_value
if hasattr(record, "checksum") and record.checksum:
metadata["checksum"] = record.checksum
if hasattr(record, "fmt") and record.fmt:
metadata["fmt"] = record.fmt

return metadata
59 changes: 58 additions & 1 deletion src/croissant_maker/metadata_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,15 @@ def generate_metadata(self) -> dict:

file_objects = []
record_sets = []
# Use a counter instead of enumerate(i) to ensure unique FileObject IDs.
# Some formats (e.g., WFDB) create multiple FileObjects per iteration via
# related_files, so enumerate(i) would cause ID conflicts. The counter
# increments for every FileObject created, not just per iteration.
file_counter = 0

for i, file_meta in enumerate(file_metadata):
file_id = f"file_{i}"
file_id = f"file_{file_counter}"
file_counter += 1

# Create FileObject for each file
# What this section does well:
Expand All @@ -228,6 +234,28 @@ def generate_metadata(self) -> dict:
)
file_objects.append(file_obj)

# Handle multi-file records (e.g., WFDB: .hea + .dat + .atr)
# Some formats like WFDB have multiple physical files per logical record
related_file_ids = []
if "related_files" in file_meta:
for related in file_meta["related_files"]:
related_id = f"file_{file_counter}"
file_counter += 1
related_file_ids.append(related_id)

rel_path = Path(related["path"])
relative_path = str(rel_path.relative_to(self.dataset_path))

related_obj = mlc.FileObject(
id=related_id,
name=related["name"],
content_url=relative_path,
encoding_formats=[related["encoding"]],
content_size=str(related["size"]),
sha256=related["sha256"],
)
file_objects.append(related_obj)

# Create RecordSet and Fields for files with structured data that have column types
# What this section does well:
# - Creates a cr:RecordSet for each file with structured data.
Expand Down Expand Up @@ -263,6 +291,35 @@ def generate_metadata(self) -> dict:
)
record_sets.append(record_set)

# Create RecordSet for signal data (e.g., WFDB physiological waveforms)
# Signal data has continuous time-series rather than discrete columns
elif "signal_types" in file_meta:
fields = []
for signal_name, signal_type in file_meta["signal_types"].items():
field = mlc.Field(
id=f"{file_id}_{signal_name}",
name=signal_name,
description=f"Signal '{signal_name}' from {file_meta['record_name']}",
data_types=[signal_type],
source=mlc.Source(
id=f"{file_id}_source_{signal_name}",
file_object=file_id,
),
)
fields.append(field)

duration = file_meta.get("duration_seconds", 0)
num_samples = file_meta.get("num_samples", 0)
sampling_freq = file_meta.get("sampling_frequency", 0)

record_set = mlc.RecordSet(
id=f"recordset_{i}",
name=file_meta["record_name"],
description=f"WFDB record {file_meta['record_name']}: {file_meta.get('num_signals', 0)} signals at {sampling_freq} Hz, {num_samples} samples ({duration:.2f} seconds)",
fields=fields,
)
record_sets.append(record_set)

metadata.distribution = file_objects
metadata.record_sets = record_sets

Expand Down
Loading