From 4fb7c004ba0e73a8141f27da1e5b76b064809090 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 08:05:40 +0100 Subject: [PATCH 01/14] first try to speed up common functions --- cdm_reader_mapper/common/inspect.py | 32 +- .../common/pandas_TextParser_hdlr.py | 108 +++--- cdm_reader_mapper/common/replace.py | 31 +- cdm_reader_mapper/common/select.py | 266 ++++++--------- tests/test_common.py | 322 +++++++++--------- 5 files changed, 357 insertions(+), 402 deletions(-) diff --git a/cdm_reader_mapper/common/inspect.py b/cdm_reader_mapper/common/inspect.py index 267caafa..148b13d3 100755 --- a/cdm_reader_mapper/common/inspect.py +++ b/cdm_reader_mapper/common/inspect.py @@ -12,13 +12,14 @@ import pandas as pd -from . import pandas_TextParser_hdlr +from .pandas_TextParser_hdlr import make_copy +from .pandas_TextParser_hdlr import get_length as get_length_hdlr def _count_by_cat(series) -> dict: """Count unique values in a pandas Series, including NaNs.""" counts = series.value_counts(dropna=False) - counts.index = counts.index.map(lambda x: "nan" if pd.isna(x) else x) + counts.index = counts.index.where(~counts.index.isna(), "nan") return counts.to_dict() @@ -51,29 +52,24 @@ def count_by_cat( if not isinstance(columns, list): columns = [columns] - counts = {} + counts = {col: {} for col in columns} if isinstance(data, pd.DataFrame): for column in columns: counts[column] = _count_by_cat(data[column]) return counts - for column in columns: - data_cp = pandas_TextParser_hdlr.make_copy(data) - count_dicts = [] - - for df in data_cp: - count_dicts.append(_count_by_cat(df[column])) - - data_cp.close() - - merged_counts = {} - for d in count_dicts: - for k, v in d.items(): - merged_counts[k] = merged_counts.get(k, 0) + v + data_cp = make_copy(data) + if data_cp is None: + return counts - counts[column] = merged_counts + for chunk in data_cp: + for column in columns: + chunk_counts = _count_by_cat(chunk[column]) + for k, v in chunk_counts.items(): + counts[column][k] = counts[column].get(k, 0) + v + data_cp.close() return counts @@ -98,4 +94,4 @@ def get_length(data: pd.DataFrame | pd.io.parsers.TextFileReader) -> int: """ if not isinstance(data, pd.io.parsers.TextFileReader): return len(data) - return pandas_TextParser_hdlr.get_length(data) + return get_length_hdlr(data) diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index c6473e89..8d8c3b46 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -read_params = [ +_READ_CSV_KWARGS = [ "chunksize", "names", "dtype", @@ -22,7 +22,37 @@ ] -def make_copy(Parser: TextFileReader) -> TextFileReader | None: +def _get_raw_buffer(parser: TextFileReader) -> str | None: + if hasattr(parser, "_raw_buffer"): + return parser._raw_buffer + + f = getattr(parser.handles, "handle", None) + if f is None: + raise ValueError("TextFileReader has no accessible handle for copying.") + + try: + f = parser.handles.handle + raw = f.getvalue() + parser._raw_buffer = raw + return raw + except Exception as e: + raise RuntimeError("Failed to read raw buffer") from e + + +def _new_reader_from_buffer(parser: TextFileReader) -> TextFileReader | None: + raw = _get_raw_buffer(parser) + if raw is None: + return None + + read_dict = read_dict = { + k: parser.orig_options.get(k) + for k in _READ_CSV_KWARGS + if k in parser.orig_options + } + return pd.read_csv(StringIO(raw), **read_dict) + + +def make_copy(parser: TextFileReader) -> TextFileReader | None: """ Create a duplicate of a pandas TextFileReader object. @@ -43,16 +73,12 @@ def make_copy(Parser: TextFileReader) -> TextFileReader | None: only for in-memory file-like objects such as `StringIO`. """ try: - f = Parser.handles.handle - new_ref = StringIO(f.getvalue()) - read_dict = {k: Parser.orig_options.get(k) for k in read_params} - return pd.read_csv(new_ref, **read_dict) - except Exception: - logger.error("Failed to copy TextParser", exc_info=True) - return None + return _new_reader_from_buffer(parser) + except Exception as e: + raise RuntimeError("Failed to copy TextParser") from e -def restore(Parser: TextFileReader) -> TextFileReader | None: +def restore(parser: TextFileReader) -> TextFileReader | None: """ Restore a TextFileReader to its initial read position and state. @@ -66,17 +92,10 @@ def restore(Parser: TextFileReader) -> TextFileReader | None: pandas.io.parsers.TextFileReader or None Restored TextFileReader, or None if restoration fails. """ - try: - f = Parser.handles.handle - f.seek(0) - read_dict = {k: Parser.orig_options.get(k) for k in read_params} - return pd.read_csv(f, **read_dict) - except Exception: - logger.error("Failed to restore TextParser", exc_info=True) - return None + return make_copy(parser) -def is_not_empty(Parser: TextFileReader) -> bool | None: +def is_not_empty(parser: TextFileReader) -> bool | None: """ Determine whether a TextFileReader contains at least one row. @@ -92,27 +111,24 @@ def is_not_empty(Parser: TextFileReader) -> bool | None: False if empty. None if an error occurs. """ - try: - parser_copy = make_copy(Parser) - if parser_copy is None: - return None - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", - exc_info=True, - ) + if hasattr(parser, "_is_not_empty"): + return parser._is_not_empty + + reader = make_copy(parser) + if reader is None: return None try: - chunk = parser_copy.get_chunk() - parser_copy.close() - return len(chunk) > 0 - except Exception: - logger.debug("Error while checking emptiness", exc_info=True) + chunk = next(reader) + result = not chunk.empty + parser._is_not_empty = result + return result + except StopIteration: + parser._is_not_empty = False return False -def get_length(Parser: TextFileReader) -> int | None: +def get_length(parser: TextFileReader) -> int | None: """ Count total rows in a TextFileReader (consuming a copied stream). @@ -126,22 +142,18 @@ def get_length(Parser: TextFileReader) -> int | None: int or None Total number of rows, or None if processing fails. """ - try: - parser_copy = make_copy(Parser) - if parser_copy is None: - return None - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", - exc_info=True, - ) + if hasattr(parser, "_row_count"): + return parser._row_count + + reader = make_copy(parser) + if reader is None: return None total = 0 try: - for df in parser_copy: - total += len(df) + for chunk in reader: + total += len(chunk) + parser._row_count = total return total - except Exception: - logger.error("Failed while counting rows", exc_info=True) - return None + except Exception as e: + raise RuntimeError("Failed while counting rows") from e diff --git a/cdm_reader_mapper/common/replace.py b/cdm_reader_mapper/common/replace.py index 15426ec0..15bf4351 100755 --- a/cdm_reader_mapper/common/replace.py +++ b/cdm_reader_mapper/common/replace.py @@ -73,26 +73,23 @@ def replace_columns( # Check inargs if not isinstance(df_l, pd.DataFrame) or not isinstance(df_r, pd.DataFrame): logger.error("Input left and right data must be pandas DataFrames.") - return + return None - if pivot_c: + if pivot_c is not None: pivot_l = pivot_r = pivot_c - if not (pivot_l and pivot_r): + if pivot_l is None or pivot_r is None: logger.error( "Pivot columns must be declared using `pivot_c` or both `pivot_l` and `pivot_r`." ) - return - - df_l = df_l.copy().set_index(pivot_l, drop=False) - df_r = df_r.copy().set_index(pivot_r, drop=False) + return None if rep_map is None: if rep_c is None: logger.error( "Replacement columns must be declared using `rep_c` or `rep_map`." ) - return + return None if isinstance(rep_c, str): rep_c = [rep_c] rep_map = {col: col for col in rep_c} @@ -104,12 +101,18 @@ def replace_columns( ) return None - # Build right-side replacement DataFrame with left-column names - df_r_l = df_r[list(rep_map.values())].rename( - columns={v: k for k, v in rep_map.items()} + out = df_l.copy() + right_lookup = ( + df_r[[pivot_r, *rep_map.values()]] + .set_index(pivot_r) + .rename(columns={v: k for k, v in rep_map.items()}) ) - # Apply update - df_l.update(df_r_l) + # Align once using reindex (vectorized, C-level) + aligned = right_lookup.reindex(out[pivot_l].values) + + # Assign columns directly (fastest path) + for col in aligned.columns: + out[col] = aligned[col].values - return df_l.reset_index(drop=True) + return out diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 047d776e..721f215c 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -9,186 +9,133 @@ from __future__ import annotations from io import StringIO -from typing import Sequence, Iterable, Callable +from typing import Iterable, Callable import pandas as pd -def _select_rows_by_index( +def _split_by_boolean_df( df: pd.DataFrame, - index_list: str | int | Sequence, - **kwargs, -) -> pd.DataFrame: - """Select rows from a DataFrame based on index values.""" - reset_index = kwargs.get("reset_index", False) - inverse = kwargs.get("inverse", False) - - if isinstance(index_list, str): - index_list = [index_list] - if isinstance(index_list, list): - index_list = pd.Index(index_list) - index = df.index.isin(index_list) - if inverse is True: - in_df = df[~index] + mask: pd.DataFrame, + boolean: bool, + reset_index: bool = False, + inverse: bool = False, + return_rejected: bool = False, +): + if mask.empty: + selected_mask = pd.Series(boolean, index=df.index) else: - in_df = df[index] + selected_mask = mask.all(axis=1) if boolean else ~mask.any(axis=1) + selected_mask = selected_mask.fillna(boolean) + + selected = df[selected_mask] + rejected = df[~selected_mask] if return_rejected else df.iloc[0:0] - if reset_index is True: - in_df = in_df.reset_index(drop=True) + if reset_index: + selected = selected.reset_index(drop=True) + rejected = rejected.reset_index(drop=True) - in_df.__dict__["_prev_index"] = index_list - return in_df + return selected, rejected -def _split_by_index( +def _split_by_column_df( df: pd.DataFrame, - indexes: str | Sequence, - **kwargs, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame into two parts based on index values.""" - return_rejected = kwargs.get("return_rejected", False) + col: str, + values: Iterable, + reset_index: bool = False, + inverse: bool = False, + return_rejected: bool = False, +): + mask = df[col].isin(values) - out1 = _select_rows_by_index( - df, - indexes, - **kwargs, - ) - if return_rejected is True: - index2 = [idx for idx in df.index if idx not in indexes] - out2 = _select_rows_by_index(df, index2, **kwargs) - else: - out2 = pd.DataFrame(columns=out1.columns) - out2.__dict__["_prev_index"] = pd.Index([]) + if inverse: + mask = ~mask - return out1, out2 + selected = df[mask] + rejected = df[~mask] if return_rejected else df.iloc[0:0] + if reset_index: + selected = selected.reset_index(drop=True) + rejected = rejected.reset_index(drop=True) -def _split_by_boolean_mask_( - df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on a boolean mask using `_split_by_index`.""" - if mask.empty: - if boolean: - indexes = df.index - else: - indexes = df.index.difference(df.index) - return _split_by_index(df, indexes, **kwargs) - - if boolean is True: - global_mask = mask.all(axis=1) - else: - global_mask = ~(mask.any(axis=1)) + return selected, rejected - indexes = global_mask[global_mask.fillna(boolean)].index - return _split_by_index(df, indexes, **kwargs) +def _split_by_index_df( + df: pd.DataFrame, + index, + reset_index: bool = False, + inverse: bool = False, + return_rejected: bool = False, +): + index = pd.Index(index if isinstance(index, Iterable) else [index]) + mask = df.index.isin(index) -def _split_by_boolean_mask( - df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on strict boolean masks (all-True or all-False).""" - if mask.empty: - selected = pd.Series(boolean, index=df.index) - else: - selected = mask.all(axis=1) if boolean else ~mask.any(axis=1) - selected = selected.fillna(boolean) + if inverse: + mask = ~mask - indexes = selected[selected].index - return _split_by_index(df, indexes, **kwargs) + selected = df[mask] + rejected = df[~mask] if return_rejected else df.iloc[0:0] + if reset_index: + selected = selected.reset_index(drop=True) + rejected = rejected.reset_index(drop=True) -def _split_by_column_values( - df: pd.DataFrame, col: str, values: Iterable, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on entries in a specific column using `_split_by_index`.""" - in_df = df.loc[df[col].isin(values)] - index = list(in_df.index) - return _split_by_index( - df, - index, - **kwargs, - ) + return selected, rejected -def _split_by_index_values( - df: pd.DataFrame, index, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on index values using `_split_by_index`.""" - return _split_by_index(df, index, **kwargs) - - -def _split_parser( - data, *args, func=None, reset_index=False, inverse=False, return_rejected=False -) -> tuple[pd.io.parsers.TextFileReader]: - """Common pandas TextFileReader selection function.""" - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - write_dict = {"header": None, "mode": "a", "index": not reset_index} - read_dict = {x: data[0].orig_options.get(x) for x in read_params} - buffer1 = StringIO() - buffer2 = StringIO() - _prev_index1 = None - _prev_index2 = None - for zipped in zip(*data): - if not isinstance(zipped, tuple): - zipped = tuple(zipped) - out1, out2 = func( - *zipped, +def _split_text_reader( + readers, + func: Callable, + *args, + reset_index=False, + inverse=False, + return_rejected=False, +): + buffer_sel = StringIO() + buffer_rej = StringIO() + + write_opts = {"header": None, "mode": "a", "index": not reset_index} + + for chunks in zip(*readers): + sel, rej = func( + *chunks, *args, reset_index=reset_index, inverse=inverse, return_rejected=return_rejected, ) - if _prev_index1 is None: - _prev_index1 = out1.__dict__["_prev_index"] - else: - _prev_index1 = _prev_index1.union(out1.__dict__["_prev_index"]) - if _prev_index2 is None: - _prev_index2 = out2.__dict__["_prev_index"] - else: - _prev_index2 = _prev_index2.union(out2.__dict__["_prev_index"]) - out1.to_csv(buffer1, **write_dict) - if return_rejected is True: - out2.to_csv(buffer2, **write_dict) - dtypes = {} - for k, v in out1.dtypes.items(): - if v == "object": - v = "str" - dtypes[k] = v - read_dict["dtype"] = dtypes - buffer1.seek(0) - buffer2.seek(0) - TextParser1 = pd.read_csv(buffer1, **read_dict) - TextParser1.__dict__["_prev_index"] = _prev_index1 - TextParser2 = pd.read_csv(buffer2, **read_dict) - TextParser2.__dict__["_prev_index"] = _prev_index2 - return TextParser1, TextParser2 - - -def _split( - data: pd.DataFrame | Iterable[pd.DataFrame], - func: Callable[..., tuple[pd.DataFrame, pd.DataFrame]], + sel.to_csv(buffer_sel, **write_opts) + if return_rejected: + rej.to_csv(buffer_rej, **write_opts) + + buffer_sel.seek(0) + buffer_rej.seek(0) + + selected = pd.read_csv(buffer_sel) + rejected = pd.read_csv(buffer_rej) if return_rejected else selected.iloc[0:0] + + return selected, rejected + + +def _split_dispatch( + data, + func: Callable, *args, **kwargs, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """ - Apply a split function to one or more DataFrames. +): + if isinstance(data, pd.DataFrame): + return func(data, *args, **kwargs) + + if isinstance(data, list) and isinstance(data[0], pd.io.parsers.TextFileReader): + return _split_text_reader( + data, + func, + *args, + **kwargs, + ) - - If `data` is a single DataFrame, pass it to `func`. - - If `data` is a list of DataFrames, only the first is used here - (TextFileReader logic is handled separately). - """ - if not isinstance(data, list): - data = [data] - if isinstance(data[0], pd.io.parsers.TextFileReader): - return _split_parser(data, *args, func=func, **kwargs) - return func(*data, *args, **kwargs) + raise TypeError("Unsupported input type for split operation.") def split_by_boolean( @@ -227,10 +174,10 @@ def split_by_boolean( Tuple ``(selected, rejected)`` returned by the underlying ``split_dataframe_by_boolean`` implementation. """ - func = _split_by_boolean_mask - return _split( - [data, mask], - func, + return _split_dispatch( + data, + _split_by_boolean_df, + mask, boolean, reset_index=reset_index, inverse=inverse, @@ -343,12 +290,10 @@ def split_by_column_entries( (pandas.DataFrame, pandas.DataFrame) Selected rows (column value in provided list) and rejected rows. """ - func = _split_by_column_values - col = next(iter(selection.keys())) - values = next(iter(selection.values())) - return _split( + col, values = next(iter(selection.items())) + return _split_dispatch( data, - func, + _split_by_column_df, col, values, reset_index=reset_index, @@ -385,10 +330,9 @@ def split_by_index( (pandas.DataFrame, pandas.DataFrame) Selected rows (index in given list) and rejected rows. """ - func = _split_by_index_values - return _split( + return _split_dispatch( data, - func, + _split_by_index_df, index, reset_index=reset_index, inverse=inverse, diff --git a/tests/test_common.py b/tests/test_common.py index 9b016c99..dcce1cdb 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -22,12 +22,12 @@ from cdm_reader_mapper.common.select import ( - _select_rows_by_index, - _split_by_index, - _split_by_boolean_mask, - _split_by_column_values, - _split_by_index_values, - _split, + # _select_rows_by_index, + # _split_by_index, + # _split_by_boolean_mask, + # _split_by_column_values, + # _split_by_index_values, + # _split, split_by_boolean, split_by_boolean_true, split_by_boolean_false, @@ -166,150 +166,150 @@ def tmp_json_file(tmp_path): return file_path, data -@pytest.mark.parametrize( - "index_list,inverse,reset_index,expected", - [ - ([10, 12], False, False, [10, 12]), - ([10, 12], True, False, [11, 13, 14]), - ([10, 12], False, True, [0, 1]), - ], -) -def test_select_rows_by_index(sample_df, index_list, inverse, reset_index, expected): - df = sample_df - selected = _select_rows_by_index( - df, index_list, inverse=inverse, reset_index=reset_index - ) - assert list(selected.index) == expected - - -def test_select_rows_by_index_empty_df(empty_df): - selected = _select_rows_by_index(empty_df, [0]) - assert selected.empty - - -@pytest.mark.parametrize( - "index_list,inverse,return_rejected,expected_selected,expected_rejected", - [ - ([11, 13], False, True, [11, 13], [10, 12, 14]), - ([11, 13], True, True, [10, 12, 14], [11, 13]), - ([11, 13], False, False, [11, 13], []), - ], -) -def test_split_by_index( - sample_df, - index_list, - inverse, - return_rejected, - expected_selected, - expected_rejected, -): - selected, rejected = _split_by_index( - sample_df, index_list, inverse=inverse, return_rejected=return_rejected - ) - assert list(selected.index) == expected_selected - assert list(rejected.index) == expected_rejected - - -def test_split_by_index_empty_df(empty_df): - selected, rejected = _split_by_index(empty_df, [0], return_rejected=True) - assert selected.empty - assert rejected.empty - - -@pytest.mark.parametrize( - "column,boolean,expected_selected,expected_rejected", - [ - ("C", True, [10, 12, 14], [11, 13]), - ("C", False, [11, 13], [10, 12, 14]), - ], -) -def test_split_by_boolean_mask( - sample_df, column, boolean, expected_selected, expected_rejected -): - mask = sample_df[[column]] - selected, rejected = _split_by_boolean_mask( - sample_df, mask, boolean=boolean, return_rejected=True - ) - assert list(selected.index) == expected_selected - assert list(rejected.index) == expected_rejected - - -def test_split_by_boolean_mask_empty_mask(sample_df): - mask = pd.DataFrame(columns=sample_df.columns) - selected, rejected = _split_by_boolean_mask( - sample_df, mask, boolean=True, return_rejected=True - ) - assert list(selected.index) == list(sample_df.index) - assert rejected.empty - - -@pytest.mark.parametrize( - "col,values,return_rejected,expected_selected,expected_rejected", - [ - ("B", ["x", "z"], True, [10, 12, 13], [11, 14]), - ("B", ["missing"], True, [], [10, 11, 12, 13, 14]), - ("B", ["x", "z"], False, [10, 12, 13], []), - ], -) -def test_split_by_column_values( - sample_df, col, values, return_rejected, expected_selected, expected_rejected -): - selected, rejected = _split_by_column_values( - sample_df, col, values, return_rejected=return_rejected - ) - assert list(selected.index) == expected_selected - assert list(rejected.index) == expected_rejected - - -@pytest.mark.parametrize( - "index_list,inverse,return_rejected,expected_selected,expected_rejected", - [ - ([11, 13], False, True, [11, 13], [10, 12, 14]), - ([11, 13], False, False, [11, 13], []), - ([11, 13], True, True, [10, 12, 14], [11, 13]), - ], -) -def test_split_by_index_values( - sample_df, - index_list, - inverse, - return_rejected, - expected_selected, - expected_rejected, -): - selected, rejected = _split_by_index_values( - sample_df, index_list, inverse=inverse, return_rejected=return_rejected - ) - assert list(selected.index) == expected_selected - assert list(rejected.index) == expected_rejected - - -def test_split_wrapper_index(sample_df): - selected, rejected = _split( - sample_df, _split_by_index_values, [11, 13], return_rejected=True - ) - assert list(selected.index) == [11, 13] - assert list(rejected.index) == [10, 12, 14] - - -def test_split_wrapper_column(sample_df): - selected, rejected = _split( - sample_df, _split_by_column_values, "B", ["y"], return_rejected=True - ) - assert list(selected.index) == [11, 14] - assert list(rejected.index) == [10, 12, 13] - - -def test_split_wrapper_boolean(sample_df, boolean_mask): - selected, rejected = _split( - sample_df, - _split_by_boolean_mask, - boolean_mask[["mask1"]], - True, - return_rejected=True, - ) - assert list(selected.index) == [11, 13] - assert list(rejected.index) == [10, 12, 14] +# @pytest.mark.parametrize( +# "index_list,inverse,reset_index,expected", +# [ +# ([10, 12], False, False, [10, 12]), +# ([10, 12], True, False, [11, 13, 14]), +# ([10, 12], False, True, [0, 1]), +# ], +# ) +# def test_select_rows_by_index(sample_df, index_list, inverse, reset_index, expected): +# df = sample_df +# selected = _select_rows_by_index( +# df, index_list, inverse=inverse, reset_index=reset_index +# ) +# assert list(selected.index) == expected + + +# def test_select_rows_by_index_empty_df(empty_df): +# selected = _select_rows_by_index(empty_df, [0]) +# assert selected.empty + + +# @pytest.mark.parametrize( +# "index_list,inverse,return_rejected,expected_selected,expected_rejected", +# [ +# ([11, 13], False, True, [11, 13], [10, 12, 14]), +# ([11, 13], True, True, [10, 12, 14], [11, 13]), +# ([11, 13], False, False, [11, 13], []), +# ], +# ) +# def test_split_by_index( +# sample_df, +# index_list, +# inverse, +# return_rejected, +# expected_selected, +# expected_rejected, +# ): +# selected, rejected = _split_by_index( +# sample_df, index_list, inverse=inverse, return_rejected=return_rejected +# ) +# assert list(selected.index) == expected_selected +# assert list(rejected.index) == expected_rejected + + +# def test_split_by_index_empty_df(empty_df): +# selected, rejected = _split_by_index(empty_df, [0], return_rejected=True) +# assert selected.empty +# assert rejected.empty + + +# @pytest.mark.parametrize( +# "column,boolean,expected_selected,expected_rejected", +# [ +# ("C", True, [10, 12, 14], [11, 13]), +# ("C", False, [11, 13], [10, 12, 14]), +# ], +# ) +# def test_split_by_boolean_mask( +# sample_df, column, boolean, expected_selected, expected_rejected +# ): +# mask = sample_df[[column]] +# selected, rejected = _split_by_boolean_mask( +# sample_df, mask, boolean=boolean, return_rejected=True +# ) +# assert list(selected.index) == expected_selected +# assert list(rejected.index) == expected_rejected + + +# def test_split_by_boolean_mask_empty_mask(sample_df): +# mask = pd.DataFrame(columns=sample_df.columns) +# selected, rejected = _split_by_boolean_mask( +# sample_df, mask, boolean=True, return_rejected=True +# ) +# assert list(selected.index) == list(sample_df.index) +# assert rejected.empty + + +# @pytest.mark.parametrize( +# "col,values,return_rejected,expected_selected,expected_rejected", +# [ +# ("B", ["x", "z"], True, [10, 12, 13], [11, 14]), +# ("B", ["missing"], True, [], [10, 11, 12, 13, 14]), +# ("B", ["x", "z"], False, [10, 12, 13], []), +# ], +# ) +# def test_split_by_column_values( +# sample_df, col, values, return_rejected, expected_selected, expected_rejected +# ): +# selected, rejected = _split_by_column_values( +# sample_df, col, values, return_rejected=return_rejected +# ) +# assert list(selected.index) == expected_selected +# assert list(rejected.index) == expected_rejected + + +# @pytest.mark.parametrize( +# "index_list,inverse,return_rejected,expected_selected,expected_rejected", +# [ +# ([11, 13], False, True, [11, 13], [10, 12, 14]), +# ([11, 13], False, False, [11, 13], []), +# ([11, 13], True, True, [10, 12, 14], [11, 13]), +# ], +# ) +# def test_split_by_index_values( +# sample_df, +# index_list, +# inverse, +# return_rejected, +# expected_selected, +# expected_rejected, +# ): +# selected, rejected = _split_by_index_values( +# sample_df, index_list, inverse=inverse, return_rejected=return_rejected +# ) +# assert list(selected.index) == expected_selected +# assert list(rejected.index) == expected_rejected + + +# def test_split_wrapper_index(sample_df): +# selected, rejected = _split( +# sample_df, _split_by_index_values, [11, 13], return_rejected=True +# ) +# assert list(selected.index) == [11, 13] +# assert list(rejected.index) == [10, 12, 14] + + +# def test_split_wrapper_column(sample_df): +# selected, rejected = _split( +# sample_df, _split_by_column_values, "B", ["y"], return_rejected=True +# ) +# assert list(selected.index) == [11, 14] +# assert list(rejected.index) == [10, 12, 13] + + +# def test_split_wrapper_boolean(sample_df, boolean_mask): +# selected, rejected = _split( +# sample_df, +# _split_by_boolean_mask, +# boolean_mask[["mask1"]], +# True, +# return_rejected=True, +# ) +# assert list(selected.index) == [11, 13] +# assert list(rejected.index) == [10, 12, 14] def test_split_by_index_public(sample_df): @@ -438,8 +438,8 @@ def test_make_copy_basic(): def test_make_copy_failure_memory(): parser = make_broken_parser("a,b\n1,2\n") - cp = make_copy(parser) - assert cp is None + with pytest.raises(RuntimeError): + make_copy(parser) def test_restore_basic(): @@ -455,8 +455,8 @@ def test_restore_basic(): def test_restore_failure_memory(): parser = make_broken_parser("a,b\n1,2\n") - restored = restore(parser) - assert restored is None + with pytest.raises(RuntimeError): + restore(parser) def test_is_not_empty_true(): @@ -471,8 +471,8 @@ def test_is_not_empty_false(): def test_is_not_empty_failure_make_copy_memory(): parser = make_broken_parser("a,b\n1,2\n") - result = is_not_empty(parser) - assert result is None + with pytest.raises(RuntimeError): + is_not_empty(parser) def test_get_length_basic(): @@ -487,13 +487,14 @@ def test_get_length_empty(): def test_get_length_failure_due_to_bad_line(): parser = make_parser("a,b\n1,2\n1,2,3\n") - assert get_length_hdlr(parser) is None + with pytest.raises(RuntimeError): + get_length_hdlr(parser) def test_get_length_failure_make_copy_memory(): parser = make_broken_parser("a,b\n1,2\n") - result = get_length_hdlr(parser) - assert result is None + with pytest.raises(RuntimeError): + get_length_hdlr(parser) def test_init_logger_returns_logger(): @@ -778,7 +779,7 @@ def test_count_by_cat_broken_parser(): 2,y """ parser = make_broken_parser(text) - with pytest.raises(Exception): + with pytest.raises(RuntimeError): count_by_cat(parser, ["A", "B"]) @@ -945,7 +946,6 @@ def test_load_file_real(tmp_path, within_drs, cache): def test_load_file_invalid_url(): - """Test that load_file raises ValueError for unsafe URLs.""" with pytest.raises(ValueError): load_file(name="file.txt", github_url="ftp://malicious-site.com") From 4cf052d5dbc063d2a3eed4b68a6ab704300c73fd Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 11:10:31 +0100 Subject: [PATCH 02/14] fixing df, TextFileReader not working --- cdm_reader_mapper/common/select.py | 76 ++++++++++++---------------- cdm_reader_mapper/core/databundle.py | 15 +++--- tests/test_common.py | 28 +++++++++- 3 files changed, 66 insertions(+), 53 deletions(-) diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 721f215c..520e0105 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -14,22 +14,22 @@ import pandas as pd -def _split_by_boolean_df( +def _split_df( df: pd.DataFrame, mask: pd.DataFrame, - boolean: bool, reset_index: bool = False, inverse: bool = False, return_rejected: bool = False, ): - if mask.empty: - selected_mask = pd.Series(boolean, index=df.index) + if inverse: + selected = df[~mask] + rejected = df[mask] if return_rejected else df.iloc[0:0] else: - selected_mask = mask.all(axis=1) if boolean else ~mask.any(axis=1) - selected_mask = selected_mask.fillna(boolean) + selected = df[mask] + rejected = df[~mask] if return_rejected else df.iloc[0:0] - selected = df[selected_mask] - rejected = df[~selected_mask] if return_rejected else df.iloc[0:0] + selected.attrs["_prev_index"] = mask.index[mask] + rejected.attrs["_prev_index"] = mask.index[~mask] if reset_index: selected = selected.reset_index(drop=True) @@ -38,54 +38,40 @@ def _split_by_boolean_df( return selected, rejected +def _split_by_boolean_df(df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs): + if mask.empty: + mask_sel = pd.Series(boolean, index=df.index) + else: + mask_sel = mask.all(axis=1) if boolean else ~mask.any(axis=1) + mask_sel = mask_sel.fillna(boolean) + + return _split_df(df=df, mask=mask_sel, **kwargs) + + def _split_by_column_df( df: pd.DataFrame, col: str, values: Iterable, - reset_index: bool = False, - inverse: bool = False, - return_rejected: bool = False, + **kwargs, ): - mask = df[col].isin(values) - - if inverse: - mask = ~mask + mask_sel = df[col].isin(values) - selected = df[mask] - rejected = df[~mask] if return_rejected else df.iloc[0:0] - - if reset_index: - selected = selected.reset_index(drop=True) - rejected = rejected.reset_index(drop=True) - - return selected, rejected + return _split_df(df=df, mask=mask_sel, **kwargs) def _split_by_index_df( df: pd.DataFrame, index, - reset_index: bool = False, - inverse: bool = False, - return_rejected: bool = False, + **kwargs, ): index = pd.Index(index if isinstance(index, Iterable) else [index]) - mask = df.index.isin(index) - - if inverse: - mask = ~mask - - selected = df[mask] - rejected = df[~mask] if return_rejected else df.iloc[0:0] - - if reset_index: - selected = selected.reset_index(drop=True) - rejected = rejected.reset_index(drop=True) + mask_sel = pd.Series(df.index.isin(index), index=df.index) - return selected, rejected + return _split_df(df=df, mask=mask_sel, **kwargs) def _split_text_reader( - readers, + reader, func: Callable, *args, reset_index=False, @@ -97,9 +83,9 @@ def _split_text_reader( write_opts = {"header": None, "mode": "a", "index": not reset_index} - for chunks in zip(*readers): + for chunk in reader: sel, rej = func( - *chunks, + chunk, *args, reset_index=reset_index, inverse=inverse, @@ -112,8 +98,10 @@ def _split_text_reader( buffer_sel.seek(0) buffer_rej.seek(0) - selected = pd.read_csv(buffer_sel) - rejected = pd.read_csv(buffer_rej) if return_rejected else selected.iloc[0:0] + selected = pd.read_csv(buffer_sel, chunksize=2) + rejected = ( + pd.read_csv(buffer_rej, chunksize=2) if return_rejected else selected.iloc[0:0] + ) return selected, rejected @@ -127,7 +115,7 @@ def _split_dispatch( if isinstance(data, pd.DataFrame): return func(data, *args, **kwargs) - if isinstance(data, list) and isinstance(data[0], pd.io.parsers.TextFileReader): + if isinstance(data, pd.io.parsers.TextFileReader): return _split_text_reader( data, func, diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index e476397b..f45c5758 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -236,7 +236,7 @@ def select_where_all_true( _mask = _copy(db_._mask) db_._data = split_by_boolean_true(db_._data, _mask, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -284,7 +284,7 @@ def select_where_all_false( _mask = _copy(db_._mask) db_._data = split_by_boolean_false(db_._data, _mask, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -336,7 +336,7 @@ def select_where_entry_isin( db_ = self._get_db(inplace) db_._data = split_by_column_entries(db_._data, selection, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -385,7 +385,7 @@ def select_where_index_isin( db_ = self._get_db(inplace) db_._data = split_by_index(db_._data, index, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -428,7 +428,8 @@ def split_by_boolean_true( db1_._data, _mask, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] + db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) @@ -473,7 +474,7 @@ def split_by_boolean_false( db1_._data, _mask, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) @@ -522,7 +523,7 @@ def split_by_column_entries( db1_._data, selection, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) diff --git a/tests/test_common.py b/tests/test_common.py index dcce1cdb..a2d5b991 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -131,11 +131,22 @@ def sample_df(): ) +@pytest.fixture +def sample_reader(): + text = ",A,B,C\n10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" + return make_parser(text, index_col=0) + + @pytest.fixture def empty_df(): return pd.DataFrame(columns=["A", "B", "C"]) +@pytest.fixture +def empty_reader(): + return make_parser("A,B,C") + + @pytest.fixture def boolean_mask(): return pd.DataFrame( @@ -312,8 +323,21 @@ def tmp_json_file(tmp_path): # assert list(rejected.index) == [10, 12, 14] -def test_split_by_index_public(sample_df): - selected, rejected = split_by_index(sample_df, [11, 13], return_rejected=True) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_index_public(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_index(data, [11, 13], return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + print(selected) + print(rejected) + assert list(selected.index) == [11, 13] assert list(rejected.index) == [10, 12, 14] From b6ebf2edce87e2708540c9e9a221587a01b350cb Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 12:02:37 +0100 Subject: [PATCH 03/14] fixing TextFileReader --- cdm_reader_mapper/common/select.py | 24 +++-- tests/test_common.py | 145 ++++++++++++++++++++++++----- 2 files changed, 137 insertions(+), 32 deletions(-) diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 520e0105..9c7d0aa9 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -21,6 +21,7 @@ def _split_df( inverse: bool = False, return_rejected: bool = False, ): + if inverse: selected = df[~mask] rejected = df[mask] if return_rejected else df.iloc[0:0] @@ -44,7 +45,6 @@ def _split_by_boolean_df(df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, ** else: mask_sel = mask.all(axis=1) if boolean else ~mask.any(axis=1) mask_sel = mask_sel.fillna(boolean) - return _split_df(df=df, mask=mask_sel, **kwargs) @@ -81,7 +81,17 @@ def _split_text_reader( buffer_sel = StringIO() buffer_rej = StringIO() - write_opts = {"header": None, "mode": "a", "index": not reset_index} + read_params = [ + "chunksize", + "names", + "dtype", + "parse_dates", + "date_parser", + "infer_datetime_format", + ] + + write_dict = {"header": None, "mode": "a", "index": not reset_index} + read_dict = {x: reader.orig_options.get(x) for x in read_params} for chunk in reader: sel, rej = func( @@ -91,17 +101,15 @@ def _split_text_reader( inverse=inverse, return_rejected=return_rejected, ) - sel.to_csv(buffer_sel, **write_opts) + sel.to_csv(buffer_sel, **write_dict) if return_rejected: - rej.to_csv(buffer_rej, **write_opts) + rej.to_csv(buffer_rej, **write_dict) buffer_sel.seek(0) buffer_rej.seek(0) - selected = pd.read_csv(buffer_sel, chunksize=2) - rejected = ( - pd.read_csv(buffer_rej, chunksize=2) if return_rejected else selected.iloc[0:0] - ) + selected = pd.read_csv(buffer_sel, **read_dict) + rejected = pd.read_csv(buffer_rej, **read_dict) return selected, rejected diff --git a/tests/test_common.py b/tests/test_common.py index a2d5b991..eb2240d3 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -134,7 +134,9 @@ def sample_df(): @pytest.fixture def sample_reader(): text = ",A,B,C\n10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" - return make_parser(text, index_col=0) + reader = make_parser(text, index_col=0) + reader.orig_options["names"] = ["A", "B", "C"] + return reader @pytest.fixture @@ -144,7 +146,9 @@ def empty_df(): @pytest.fixture def empty_reader(): - return make_parser("A,B,C") + reader = make_parser("A,B,C") + reader.orig_options["names"] = ["A", "B", "C"] + return reader @pytest.fixture @@ -324,7 +328,7 @@ def tmp_json_file(tmp_path): @pytest.mark.parametrize("TextFileReader", [False, True]) -def test_split_by_index_public(sample_df, sample_reader, TextFileReader): +def test_split_by_index_basic(sample_df, sample_reader, TextFileReader): if TextFileReader: data = sample_reader else: @@ -335,70 +339,163 @@ def test_split_by_index_public(sample_df, sample_reader, TextFileReader): selected = selected.read() rejected = rejected.read() - print(selected) - print(rejected) - assert list(selected.index) == [11, 13] assert list(rejected.index) == [10, 12, 14] -def test_split_by_column_entries_public(sample_df): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_column_entries_basic(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_column_entries( - sample_df, {"B": ["y"]}, return_rejected=True + data, {"B": ["y"]}, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 14] assert list(rejected.index) == [10, 12, 13] -def test_split_by_boolean_public(sample_df, boolean_mask): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_basic_false( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean( - sample_df, boolean_mask, boolean=False, return_rejected=True + data, boolean_mask, boolean=False, return_rejected=True ) - assert list(selected.index) == [] + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + assert selected.empty assert list(rejected.index) == [10, 11, 12, 13, 14] + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_basic_true( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean( - sample_df, boolean_mask, boolean=True, return_rejected=True + data, boolean_mask, boolean=True, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert list(rejected.index) == [10, 11, 12, 13, 14] -def test_split_by_boolean_true_public(sample_df, boolean_mask_true): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_true_basic( + sample_df, sample_reader, boolean_mask_true, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean_true( - sample_df, boolean_mask_true, return_rejected=True + data, boolean_mask_true, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [10] assert list(rejected.index) == [11, 12, 13, 14] -def test_split_by_boolean_false_public(sample_df, boolean_mask): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_false_basic( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean_false( - sample_df, boolean_mask, return_rejected=True + data, boolean_mask, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [] assert list(rejected.index) == [10, 11, 12, 13, 14] -def test_split_by_index_empty(empty_df): - selected, rejected = split_by_index(empty_df, [0, 1], return_rejected=True) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_index_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + + selected, rejected = split_by_index(data, [0, 1], return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty -def test_split_by_column_empty(empty_df): - selected, rejected = split_by_column_entries( - empty_df, {"A": [1]}, return_rejected=True - ) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_column_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + + selected, rejected = split_by_column_entries(data, {"A": [1]}, return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty -def test_split_by_boolean_empty(empty_df): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + mask = empty_df.astype(bool) selected, rejected = split_by_boolean( - empty_df, mask, boolean=True, return_rejected=True + data, mask, boolean=True, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty From c6b88adce35572a9725405b7aae24086470d0335 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 12:57:05 +0100 Subject: [PATCH 04/14] more testfilereader tests --- tests/test_databundle.py | 144 +++++++++++++++++++++++++++++---------- 1 file changed, 107 insertions(+), 37 deletions(-) diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 34d538fb..37b52a06 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -3,11 +3,19 @@ import pandas as pd import pytest +from io import StringIO + from cdm_reader_mapper import DataBundle +def make_parser(text, **kwargs): + """Helper: create a TextFileReader similar to user code.""" + buffer = StringIO(text) + return pd.read_csv(buffer, chunksize=2, **kwargs) + + @pytest.fixture -def sample_df(): +def sample_db_df(): data = pd.DataFrame( { "A": [19, 26, 27, 41, 91], @@ -23,6 +31,17 @@ def sample_df(): return DataBundle(data=data, mask=mask) +@pytest.fixture +def sample_db_reader(): + text = "A,B,\n19,0\n26,1\n27,2\n41,3\n91,4" + reader_data = make_parser(text) + + text = "A,B,\nTrue,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" + reader_mask = make_parser(text) + + return DataBundle(data=reader_data, mask=reader_mask) + + @pytest.fixture def sample_data(): return pd.DataFrame({"C": [20, 21, 22, 23, 24]}) @@ -33,31 +52,53 @@ def sample_mask(): return pd.DataFrame({"C": [True, False, True, False, False]}) -def test_len(sample_df): - assert len(sample_df) == 5 +def test_len_df(sample_db_df): + assert len(sample_db_df) == 5 + +def test_len_reader(sample_db_reader): + assert len(sample_db_reader) == 5 -def test_print(sample_df, capsys): - print(sample_df) + +def test_print_df(sample_db_df, capsys): + print(sample_db_df) captured = capsys.readouterr() assert captured.out.strip() != "" - for col in sample_df.data.columns: + for col in sample_db_df.columns: assert col in captured.out -def test_copy(sample_df): - db_cp = sample_df.copy() +def test_print_reader(sample_db_reader, capsys): + print(sample_db_reader) + + captured = capsys.readouterr() + + assert captured.out.strip() != "" + + +def test_copy_df(sample_db_df): + db_cp = sample_db_df.copy() + + pd.testing.assert_frame_equal(sample_db_df.data, db_cp.data) + pd.testing.assert_frame_equal(sample_db_df.mask, db_cp.mask) + - pd.testing.assert_frame_equal(sample_df.data, db_cp.data) - pd.testing.assert_frame_equal(sample_df.mask, db_cp.mask) +def test_copy_reader(sample_db_reader): + db_cp = sample_db_reader.copy() + pd.testing.assert_frame_equal(sample_db_reader.data.read(), db_cp.data.read()) + pd.testing.assert_frame_equal(sample_db_reader.mask.read(), db_cp.mask.read()) -def test_add(sample_data, sample_mask): + +def test_add_df(sample_db_df): db = DataBundle() + sample_data = sample_db_df.data + sample_mask = sample_db_df.mask + db_add = db.add({"data": sample_data}) pd.testing.assert_frame_equal(db_add.data, sample_data) @@ -73,26 +114,38 @@ def test_add(sample_data, sample_mask): pd.testing.assert_frame_equal(db_add.mask, sample_mask) -def test_stack_v(sample_df, sample_data, sample_mask): +def test_add_reader(sample_db_reader): + raise NotImplementedError + + +def test_stack_v_df(sample_db_df, sample_data, sample_mask): db = DataBundle(data=sample_data, mask=sample_mask) - expected_data = pd.concat([sample_df.data, db.data], ignore_index=True) - expected_mask = pd.concat([sample_df.mask, db.mask], ignore_index=True) + expected_data = pd.concat([sample_db_df.data, db.data], ignore_index=True) + expected_mask = pd.concat([sample_db_df.mask, db.mask], ignore_index=True) + + sample_db_df.stack_v(db) - sample_df.stack_v(db) + pd.testing.assert_frame_equal(sample_db_df.data, expected_data) + pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) - pd.testing.assert_frame_equal(sample_df.data, expected_data) - pd.testing.assert_frame_equal(sample_df.mask, expected_mask) +def test_stack_v_reader(): + raise NotImplementedError -def test_stack_h(sample_df, sample_data, sample_mask): + +def test_stack_h_df(sample_db_df, sample_data, sample_mask): db = DataBundle(data=sample_data, mask=sample_mask) - expected_data = pd.concat([sample_df.data, db.data], axis=1) - expected_mask = pd.concat([sample_df.mask, db.mask], axis=1) + expected_data = pd.concat([sample_db_df.data, db.data], axis=1) + expected_mask = pd.concat([sample_db_df.mask, db.mask], axis=1) + + sample_db_df.stack_h(db) - sample_df.stack_h(db) + pd.testing.assert_frame_equal(sample_db_df.data, expected_data) + pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) - pd.testing.assert_frame_equal(sample_df.data, expected_data) - pd.testing.assert_frame_equal(sample_df.mask, expected_mask) + +def test_stack_h_reader(): + raise NotImplementedError @pytest.mark.parametrize( @@ -106,8 +159,8 @@ def test_stack_h(sample_df, sample_data, sample_mask): ) @pytest.mark.parametrize("reset_index", [False, True]) @pytest.mark.parametrize("inverse", [False, True]) -def test_select_operators( - sample_df, +def test_select_operators_df( + sample_db_df, func, args, idx_exp, @@ -115,10 +168,12 @@ def test_select_operators( reset_index, inverse, ): - result = getattr(sample_df, func)(*args, reset_index=reset_index, inverse=inverse) + result = getattr(sample_db_df, func)( + *args, reset_index=reset_index, inverse=inverse + ) - expected = sample_df.data - expected_mask = sample_df.mask + expected = sample_db_df.data + expected_mask = sample_db_df.mask selected = result.data selected_mask = result.mask @@ -138,6 +193,10 @@ def test_select_operators( pd.testing.assert_frame_equal(expected_mask, selected_mask) +def test_select_operators_reader(): + raise NotImplementedError + + @pytest.mark.parametrize( "func, args, idx_exp, idx_rej", [ @@ -149,8 +208,8 @@ def test_select_operators( ) @pytest.mark.parametrize("reset_index", [False, True]) @pytest.mark.parametrize("inverse", [False, True]) -def test_split_operators( - sample_df, +def test_split_operators_df( + sample_db_df, func, args, idx_exp, @@ -158,10 +217,12 @@ def test_split_operators( reset_index, inverse, ): - result = getattr(sample_df, func)(*args, reset_index=reset_index, inverse=inverse) + result = getattr(sample_db_df, func)( + *args, reset_index=reset_index, inverse=inverse + ) - expected = sample_df.data - expected_mask = sample_df.mask + expected = sample_db_df.data + expected_mask = sample_db_df.mask selected = result[0].data selected_mask = result[0].mask rejected = result[1].data @@ -191,19 +252,28 @@ def test_split_operators( pd.testing.assert_frame_equal(expected_mask2, rejected_mask) -def test_unique(sample_df): - result = sample_df.unique(columns=("A")) +def test_split_operators_reader(): + raise NotImplementedError + + +def test_unique_df(sample_db_df): + result = sample_db_df.unique(columns=("A")) + assert result == {"A": {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} + + +def test_unique_reader(sample_db_reader): + result = sample_db_reader.unique(columns=("A")) assert result == {"A": {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} -def test_replace_columns(sample_df): +def test_replace_columns(sample_db_df): df_corr = pd.DataFrame( { "A_new": [101, 201, 301, 401, 501], "B": range(5), } ) - result = sample_df.replace_columns( + result = sample_db_df.replace_columns( df_corr, subset=["B", "A"], rep_map={"A": "A_new"}, From e253fbbc3bfffccbce6930517229a74c59d918ec Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 14:14:37 +0100 Subject: [PATCH 05/14] first try: speed-up cdm_mapper --- cdm_reader_mapper/cdm_mapper/mapper.py | 133 +++++++++++++------------ tests/test_cdm_mapper.py | 13 +-- 2 files changed, 72 insertions(+), 74 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 1cd61314..014cd7c2 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -80,22 +80,20 @@ def _is_empty(value): def _drop_duplicated_rows(df) -> pd.DataFrame: """Drop duplicates from list.""" + list_cols = [ + col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any() + ] + + for col in list_cols: + df[col] = df[col].apply(lambda x: tuple(x) if isinstance(x, list) else x) - def list_to_tuple(v): - if isinstance(v, list): - v = tuple(v) - return v + df.drop_duplicates(ignore_index=True, inplace=True) - def tuple_to_list(v): - if isinstance(v, tuple): - v = list(v) - return v + for col in list_cols: + if df[col].apply(lambda x: isinstance(x, tuple)).any(): + df[col] = df[col].apply(lambda x: list(x) if isinstance(x, tuple) else x) - dtypes = df.dtypes - df = df.map(list_to_tuple) - df = df.drop_duplicates(ignore_index=True) - df = df.map(tuple_to_list) - return df.astype(dtypes) + return df def _get_nested_value(ndict, keys) -> Any | None: @@ -209,7 +207,7 @@ def _fill_value(series, fill_value) -> pd.Series: return series.fillna(value=fill_value).infer_objects(copy=False) -def _extract_input_data(idata, elements, cols, default, logger): +def _extract_input_data(idata, elements, default, logger): """Extract the relevant input data based on `elements`.""" def _return_default(): @@ -220,18 +218,14 @@ def _return_default(): logger.debug(f"\telements: {' '.join(map(str, elements))}") - missing_elements = [e for e in elements if e not in cols] - if missing_elements: - logger.warning( - "Missing elements from input data: {}".format( - ",".join(map(str, missing_elements)) - ) - ) - return _return_default() + cols = idata.columns + + for e in elements: + if e not in cols: + logger.warning(f"Missing element from input data: {e}") + return _return_default() - data = idata[elements] - if len(elements) == 1: - data = data.iloc[:, 0] + data = idata[elements[0]] if len(elements) == 1 else idata[elements] if _is_empty(data): return _return_default() @@ -245,7 +239,6 @@ def _column_mapping( imodel_functions, atts, codes_subset, - cols, column, logger, ): @@ -264,40 +257,46 @@ def _column_mapping( data, used_default = _extract_input_data( idata, elements, - cols, default, logger, ) - if transform and not used_default: - data = _transform( - data, - imodel_functions, - transform, - kwargs, - logger=logger, - ) + if not used_default: + if transform: + data = _transform( + data, + imodel_functions, + transform, + kwargs, + logger=logger, + ) + elif code_table: + data = _code_table( + data, + imodel_functions.imodel, + code_table, + logger=logger, + ) - elif code_table and not used_default: - data = _code_table( - data, - imodel_functions.imodel, - code_table, - logger=logger, - ) + if not isinstance(data, pd.Series): + data = pd.Series(data, index=idata.index, copy=False) + + data.name = column + + if fill_value is not None: + data = _fill_value(data, fill_value) - data = pd.Series(data, index=idata.index, name=column) - data = _fill_value(data, fill_value) - atts["decimal_places"] = _decimal_places(decimal_places) + if atts: + atts["decimal_places"] = _decimal_places(decimal_places) + data = _convert_dtype(data, atts) - return data, atts + return data def _table_mapping( idata, mapping, atts, - cols, null_label, imodel_functions, codes_subset, @@ -306,31 +305,32 @@ def _table_mapping( drop_duplicates, logger, ) -> pd.DataFrame: - columns = ( - [x for x in atts.keys() if x in idata.columns] - if not cdm_complete - else list(atts.keys()) - ) - - table_df = pd.DataFrame(index=idata.index, columns=columns) + columns = list(atts) if cdm_complete else [c for c in atts if c in idata.columns] + out = {} for column in columns: if column not in mapping.keys(): + out[column] = pd.Series( + [null_label] * len(idata), index=idata.index, name=column + ) continue logger.debug(f"\tElement: {column}") - table_df[column], atts[column] = _column_mapping( + out[column] = _column_mapping( idata, mapping[column], imodel_functions, atts[column], codes_subset, - cols, column, logger, ) - table_df[column] = _convert_dtype(table_df[column], atts.get(column)) + + if not out: + return pd.DataFrame(index=idata.index) + + table_df = pd.DataFrame(out, index=idata.index) if drop_missing_obs is True and "observation_value" in table_df: table_df = table_df.dropna(subset=["observation_value"]) @@ -345,16 +345,21 @@ def _prepare_cdm_tables(cdm_subset): """Prepare table buffers and attributes for CDM tables.""" if isinstance(cdm_subset, str): cdm_subset = [cdm_subset] + cdm_atts = get_cdm_atts(cdm_subset) if not cdm_atts: return {} - return { - table: { + + tables = {} + for table, atts in cdm_atts.items(): + for col, meta in atts.items(): + meta["decimal_places"] = _decimal_places(meta.get("decimal_places")) + tables[table] = { "buffer": StringIO(), - "atts": deepcopy(cdm_atts.get(table)), + "atts": atts, } - for table in cdm_subset - } + + return tables def _process_chunk( @@ -362,7 +367,6 @@ def _process_chunk( imodel_maps, imodel_functions, cdm_tables, - cols, null_label, codes_subset, cdm_complete, @@ -378,7 +382,6 @@ def _process_chunk( idata=idata, mapping=mapping, atts=deepcopy(cdm_tables[table]["atts"]), - cols=cols, null_label=null_label, imodel_functions=imodel_functions, codes_subset=codes_subset, @@ -448,13 +451,11 @@ def _map_and_convert( cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) for idata in data_iter: - cols = list(idata.columns) _process_chunk( idata=idata, imodel_maps=imodel_maps, imodel_functions=imodel_functions, cdm_tables=cdm_tables, - cols=cols, null_label=null_label, codes_subset=codes_subset, cdm_complete=cdm_complete, diff --git a/tests/test_cdm_mapper.py b/tests/test_cdm_mapper.py index e1f72a7f..4969fff3 100755 --- a/tests/test_cdm_mapper.py +++ b/tests/test_cdm_mapper.py @@ -354,7 +354,6 @@ def test_extract_input_data( result = _extract_input_data( data_header, elements, - data_header.columns, default, logger, ) @@ -373,13 +372,13 @@ def test_extract_input_data( @pytest.mark.parametrize( "column, expected", [ - ("duplicate_status", [4, 4, 4, 4]), - ("platform_type", [2, 33, 32, 45]), + ("duplicate_status", ["4", "4", "4", "4"]), + ("platform_type", ["2", "33", "32", "45"]), ( "report_id", ["ICOADS-30-5012", "ICOADS-30-8960", "ICOADS-30-0037", "ICOADS-30-1000"], ), - ("location_quality", [2.0, "0", "0", "0"]), + ("location_quality", ["2", "0", "0", "0"]), ("latitude", [None, None, None, None]), ], ) @@ -387,13 +386,12 @@ def test_column_mapping(imodel_maps, imodel_functions, data_header, column, expe logger = logging_hdlr.init_logger(__name__, level="INFO") mapping_column = imodel_maps["header"][column] column_atts = get_cdm_atts("header")["header"][column] - result, _ = _column_mapping( + result = _column_mapping( data_header, mapping_column, imodel_functions, column_atts, None, - data_header.columns, column, logger, ) @@ -409,7 +407,6 @@ def test_table_mapping( data_header, imodel_maps["header"], table_atts, - data_header.columns, "null", imodel_functions, None, @@ -577,7 +574,7 @@ def test_map_model_pub47(): "gdac", ], ) -def test_map_model_test_data(data_model): +def test_map_model_test_data_basic(data_model): _map_model_test_data(data_model) From 2bee70bf9d692d19c3d9acfc5653aae015ef17a6 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Mon, 26 Jan 2026 15:26:06 +0100 Subject: [PATCH 06/14] write and read only TextFileReader data on disk --- cdm_reader_mapper/cdm_mapper/mapper.py | 51 +++++++++++++++++--------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 014cd7c2..f084adec 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -20,6 +20,8 @@ import numpy as np import pandas as pd +from pandas.io.parsers import TextFileReader + from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr from . import properties @@ -41,7 +43,7 @@ def _log_and_return_empty(msg): return _log_and_return_empty("Input data is empty") return [data] - elif isinstance(data, pd.io.parsers.TextFileReader): + elif isinstance(data, TextFileReader): logger.debug("Input is a pd.TextFileReader") if not pandas_TextParser_hdlr.is_not_empty(data): return _log_and_return_empty("Input data is empty") @@ -373,6 +375,7 @@ def _process_chunk( drop_missing_obs, drop_duplicates, logger, + is_reader, ): """Process one chunk of input data.""" for table, mapping in imodel_maps.items(): @@ -392,13 +395,17 @@ def _process_chunk( ) table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns]) - table_df.to_csv( - cdm_tables[table]["buffer"], - header=False, - index=False, - mode="a", - ) - cdm_tables[table]["columns"] = table_df.columns + + if is_reader: + table_df.to_csv( + cdm_tables[table]["buffer"], + header=False, + index=False, + mode="a", + ) + cdm_tables[table]["columns"] = table_df.columns + else: + cdm_tables[table]["df"] = table_df.astype(object) def _finalize_output(cdm_tables, logger): @@ -408,19 +415,24 @@ def _finalize_output(cdm_tables, logger): for table, meta in cdm_tables.items(): logger.debug(f"\tParse datetime by reader; Table: {table}") - meta["buffer"].seek(0) - df = pd.read_csv( - meta["buffer"], - names=meta["columns"], - na_values=[], - dtype="object", - keep_default_na=False, - ) - - meta["buffer"].close() + if "df" not in meta: + meta["buffer"].seek(0) + df = pd.read_csv( + meta["buffer"], + names=meta["columns"], + na_values=[], + dtype="object", + keep_default_na=False, + ) + meta["buffer"].close() + else: + df = meta.get("df", pd.DataFrame()) final_tables.append(df) + if not final_tables: + return pd.DataFrame() + return pd.concat(final_tables, axis=1, join="outer").reset_index(drop=True) @@ -450,6 +462,8 @@ def _map_and_convert( cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) + is_reader = isinstance(data_iter, TextFileReader) + for idata in data_iter: _process_chunk( idata=idata, @@ -462,6 +476,7 @@ def _map_and_convert( drop_missing_obs=drop_missing_obs, drop_duplicates=drop_duplicates, logger=logger, + is_reader=is_reader, ) return _finalize_output(cdm_tables, logger) From b35940ba4daab6a79975ad761ac91246d77b8f94 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 10:20:33 +0100 Subject: [PATCH 07/14] make stack only working with DataFrames --- cdm_reader_mapper/core/_utilities.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cdm_reader_mapper/core/_utilities.py b/cdm_reader_mapper/core/_utilities.py index 6c880c8f..29d603df 100755 --- a/cdm_reader_mapper/core/_utilities.py +++ b/cdm_reader_mapper/core/_utilities.py @@ -277,6 +277,10 @@ def _stack(self, other, datasets, inplace, **kwargs): for data in datasets: _data = f"_{data}" _df = getattr(db_, _data) if hasattr(db_, _data) else pd.DataFrame() + + if isinstance(_df, pd.io.parsers.TextFileReader): + raise ValueError("Data must be a DataFrame not a TextFileReader.") + to_concat = [ getattr(concat, _data) for concat in other if hasattr(concat, _data) ] From 866da1b4a0076a25e953827d5fd8b1f666d2ce1d Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 10:20:49 +0100 Subject: [PATCH 08/14] add: make a copy first --- cdm_reader_mapper/core/databundle.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index f45c5758..78c2a4d2 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -117,7 +117,8 @@ def add(self, addition, inplace=False) -> DataBundle | None: """ db_ = self._get_db(inplace) for name, data in addition.items(): - setattr(db_, f"_{name}", data) + data_cp = _copy(data) + setattr(db_, f"_{name}", data_cp) return self._return_db(db_, inplace) def stack_v( @@ -139,7 +140,8 @@ def stack_v( Note ---- - The DataFrames in the :py:class:`~DataBundle` have to have the same data columns! + * This is only working with DataFrames, not with TextFileReaders! + * The DataFrames in the :py:class:`~DataBundle` have to have the same data columns! Returns ------- @@ -175,7 +177,8 @@ def stack_h( Note ---- - The DataFrames in the :py:class:`~DataBundle` may have different data columns! + * This is only working with DataFrames, not with TextFileReaders! + * The DataFrames in the :py:class:`~DataBundle` may have different data columns! Examples -------- From f1b4830751d7e46f8b90aaeb6bc91420e994a440 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 10:21:09 +0100 Subject: [PATCH 09/14] make selection running with TextFileReaders --- cdm_reader_mapper/common/select.py | 43 +++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 9c7d0aa9..f682fc14 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -93,14 +93,47 @@ def _split_text_reader( write_dict = {"header": None, "mode": "a", "index": not reset_index} read_dict = {x: reader.orig_options.get(x) for x in read_params} - for chunk in reader: + new_args = [] + new_readers = [] + + prev_index_sel = None + prev_index_rej = None + + for d in args: + if isinstance(d, pd.io.parsers.TextFileReader): + new_readers.append(d) + else: + new_args.append(d) + + readers = [reader] + new_readers + + for zipped in zip(*readers): + + if not isinstance(zipped, tuple): + zipped = tuple(zipped) + sel, rej = func( - chunk, - *args, + *zipped, + *new_args, reset_index=reset_index, inverse=inverse, return_rejected=return_rejected, ) + + sel_prev_index = sel.attrs["_prev_index"] + + if prev_index_sel is None: + prev_index_sel = sel_prev_index + else: + prev_index_sel = prev_index_sel.union(sel_prev_index) + + rej_prev_index = rej.attrs["_prev_index"] + + if prev_index_rej is None: + prev_index_rej = rej_prev_index + else: + prev_index_rej = prev_index_rej.union(rej_prev_index) + sel.to_csv(buffer_sel, **write_dict) if return_rejected: rej.to_csv(buffer_rej, **write_dict) @@ -111,6 +144,9 @@ def _split_text_reader( selected = pd.read_csv(buffer_sel, **read_dict) rejected = pd.read_csv(buffer_rej, **read_dict) + selected.attrs = {"_prev_index": prev_index_sel} + rejected.attrs = {"_prev_index": prev_index_rej} + return selected, rejected @@ -120,6 +156,7 @@ def _split_dispatch( *args, **kwargs, ): + if isinstance(data, pd.DataFrame): return func(data, *args, **kwargs) From d4b76916c106725a9383c9527093bc2be0ad701a Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 10:21:20 +0100 Subject: [PATCH 10/14] update tests --- tests/test_common.py | 298 +++++++++++++++++++-------------------- tests/test_databundle.py | 229 ++++++++++++++++++++++++------ 2 files changed, 325 insertions(+), 202 deletions(-) diff --git a/tests/test_common.py b/tests/test_common.py index eb2240d3..3951447c 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -22,12 +22,11 @@ from cdm_reader_mapper.common.select import ( - # _select_rows_by_index, - # _split_by_index, - # _split_by_boolean_mask, - # _split_by_column_values, - # _split_by_index_values, - # _split, + _split_df, + _split_by_index_df, + _split_by_boolean_df, + _split_by_column_df, + _split_dispatch, split_by_boolean, split_by_boolean_true, split_by_boolean_false, @@ -133,9 +132,8 @@ def sample_df(): @pytest.fixture def sample_reader(): - text = ",A,B,C\n10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" - reader = make_parser(text, index_col=0) - reader.orig_options["names"] = ["A", "B", "C"] + text = "10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" + reader = make_parser(text, names=["A", "B", "C"]) return reader @@ -146,9 +144,7 @@ def empty_df(): @pytest.fixture def empty_reader(): - reader = make_parser("A,B,C") - reader.orig_options["names"] = ["A", "B", "C"] - return reader + return make_parser("", names=["A", "B", "C"]) @pytest.fixture @@ -181,150 +177,140 @@ def tmp_json_file(tmp_path): return file_path, data -# @pytest.mark.parametrize( -# "index_list,inverse,reset_index,expected", -# [ -# ([10, 12], False, False, [10, 12]), -# ([10, 12], True, False, [11, 13, 14]), -# ([10, 12], False, True, [0, 1]), -# ], -# ) -# def test_select_rows_by_index(sample_df, index_list, inverse, reset_index, expected): -# df = sample_df -# selected = _select_rows_by_index( -# df, index_list, inverse=inverse, reset_index=reset_index -# ) -# assert list(selected.index) == expected - - -# def test_select_rows_by_index_empty_df(empty_df): -# selected = _select_rows_by_index(empty_df, [0]) -# assert selected.empty - - -# @pytest.mark.parametrize( -# "index_list,inverse,return_rejected,expected_selected,expected_rejected", -# [ -# ([11, 13], False, True, [11, 13], [10, 12, 14]), -# ([11, 13], True, True, [10, 12, 14], [11, 13]), -# ([11, 13], False, False, [11, 13], []), -# ], -# ) -# def test_split_by_index( -# sample_df, -# index_list, -# inverse, -# return_rejected, -# expected_selected, -# expected_rejected, -# ): -# selected, rejected = _split_by_index( -# sample_df, index_list, inverse=inverse, return_rejected=return_rejected -# ) -# assert list(selected.index) == expected_selected -# assert list(rejected.index) == expected_rejected - - -# def test_split_by_index_empty_df(empty_df): -# selected, rejected = _split_by_index(empty_df, [0], return_rejected=True) -# assert selected.empty -# assert rejected.empty - - -# @pytest.mark.parametrize( -# "column,boolean,expected_selected,expected_rejected", -# [ -# ("C", True, [10, 12, 14], [11, 13]), -# ("C", False, [11, 13], [10, 12, 14]), -# ], -# ) -# def test_split_by_boolean_mask( -# sample_df, column, boolean, expected_selected, expected_rejected -# ): -# mask = sample_df[[column]] -# selected, rejected = _split_by_boolean_mask( -# sample_df, mask, boolean=boolean, return_rejected=True -# ) -# assert list(selected.index) == expected_selected -# assert list(rejected.index) == expected_rejected - - -# def test_split_by_boolean_mask_empty_mask(sample_df): -# mask = pd.DataFrame(columns=sample_df.columns) -# selected, rejected = _split_by_boolean_mask( -# sample_df, mask, boolean=True, return_rejected=True -# ) -# assert list(selected.index) == list(sample_df.index) -# assert rejected.empty - - -# @pytest.mark.parametrize( -# "col,values,return_rejected,expected_selected,expected_rejected", -# [ -# ("B", ["x", "z"], True, [10, 12, 13], [11, 14]), -# ("B", ["missing"], True, [], [10, 11, 12, 13, 14]), -# ("B", ["x", "z"], False, [10, 12, 13], []), -# ], -# ) -# def test_split_by_column_values( -# sample_df, col, values, return_rejected, expected_selected, expected_rejected -# ): -# selected, rejected = _split_by_column_values( -# sample_df, col, values, return_rejected=return_rejected -# ) -# assert list(selected.index) == expected_selected -# assert list(rejected.index) == expected_rejected - - -# @pytest.mark.parametrize( -# "index_list,inverse,return_rejected,expected_selected,expected_rejected", -# [ -# ([11, 13], False, True, [11, 13], [10, 12, 14]), -# ([11, 13], False, False, [11, 13], []), -# ([11, 13], True, True, [10, 12, 14], [11, 13]), -# ], -# ) -# def test_split_by_index_values( -# sample_df, -# index_list, -# inverse, -# return_rejected, -# expected_selected, -# expected_rejected, -# ): -# selected, rejected = _split_by_index_values( -# sample_df, index_list, inverse=inverse, return_rejected=return_rejected -# ) -# assert list(selected.index) == expected_selected -# assert list(rejected.index) == expected_rejected - - -# def test_split_wrapper_index(sample_df): -# selected, rejected = _split( -# sample_df, _split_by_index_values, [11, 13], return_rejected=True -# ) -# assert list(selected.index) == [11, 13] -# assert list(rejected.index) == [10, 12, 14] - - -# def test_split_wrapper_column(sample_df): -# selected, rejected = _split( -# sample_df, _split_by_column_values, "B", ["y"], return_rejected=True -# ) -# assert list(selected.index) == [11, 14] -# assert list(rejected.index) == [10, 12, 13] - - -# def test_split_wrapper_boolean(sample_df, boolean_mask): -# selected, rejected = _split( -# sample_df, -# _split_by_boolean_mask, -# boolean_mask[["mask1"]], -# True, -# return_rejected=True, -# ) -# assert list(selected.index) == [11, 13] -# assert list(rejected.index) == [10, 12, 14] +def test_split_df(sample_df): + mask = pd.Series([True, False, False, True, False], index=sample_df.index) + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] + + +@pytest.mark.parametrize( + "column,boolean,expected_selected,expected_rejected", + [ + ("C", True, [10, 12, 14], [11, 13]), + ("C", False, [11, 13], [10, 12, 14]), + ], +) +def test_split_by_boolean_df( + sample_df, column, boolean, expected_selected, expected_rejected +): + mask = sample_df[[column]] + selected, rejected = _split_by_boolean_df( + sample_df, mask, boolean=boolean, return_rejected=True + ) + assert list(selected.index) == expected_selected + assert list(rejected.index) == expected_rejected + + +def test_split_by_boolean_df_empty_mask(sample_df): + mask = pd.DataFrame(columns=sample_df.columns) + selected, rejected = _split_by_boolean_df( + sample_df, mask, boolean=True, return_rejected=True + ) + assert list(selected.index) == list(sample_df.index) + assert rejected.empty + + +@pytest.mark.parametrize( + "col,values,return_rejected,expected_selected,expected_rejected", + [ + ("B", ["x", "z"], True, [10, 12, 13], [11, 14]), + ("B", ["missing"], True, [], [10, 11, 12, 13, 14]), + ("B", ["x", "z"], False, [10, 12, 13], []), + ], +) +def test_split_by_column_df( + sample_df, col, values, return_rejected, expected_selected, expected_rejected +): + selected, rejected = _split_by_column_df( + sample_df, col, values, return_rejected=return_rejected + ) + assert list(selected.index) == expected_selected + assert list(rejected.index) == expected_rejected + + +@pytest.mark.parametrize( + "index_list,inverse,return_rejected,expected_selected,expected_rejected", + [ + ([11, 13], False, True, [11, 13], [10, 12, 14]), + ([11, 13], False, False, [11, 13], []), + ([11, 13], True, True, [10, 12, 14], [11, 13]), + ], +) +def test_split_by_index_df( + sample_df, + index_list, + inverse, + return_rejected, + expected_selected, + expected_rejected, +): + selected, rejected = _split_by_index_df( + sample_df, index_list, inverse=inverse, return_rejected=return_rejected + ) + assert list(selected.index) == expected_selected + assert list(rejected.index) == expected_rejected + + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_index(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, _split_by_index_df, [11, 13], return_rejected=True + ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + assert list(selected.index) == [11, 13] + assert list(rejected.index) == [10, 12, 14] + + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_column(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, _split_by_column_df, "B", ["y"], return_rejected=True + ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + assert list(selected.index) == [11, 14] + assert list(rejected.index) == [10, 12, 13] + + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_boolean(sample_df, sample_reader, boolean_mask, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, + _split_by_boolean_df, + boolean_mask[["mask1"]], + True, + return_rejected=True, + ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + assert list(selected.index) == [11, 13] + assert list(rejected.index) == [10, 12, 14] @pytest.mark.parametrize("TextFileReader", [False, True]) diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 37b52a06..480218ce 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -33,11 +33,11 @@ def sample_db_df(): @pytest.fixture def sample_db_reader(): - text = "A,B,\n19,0\n26,1\n27,2\n41,3\n91,4" - reader_data = make_parser(text) + text = "19,0\n26,1\n27,2\n41,3\n91,4" + reader_data = make_parser(text, names=["A", "B"]) - text = "A,B,\nTrue,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" - reader_mask = make_parser(text) + text = "True,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" + reader_mask = make_parser(text, names=["A", "B"]) return DataBundle(data=reader_data, mask=reader_mask) @@ -94,43 +94,78 @@ def test_copy_reader(sample_db_reader): def test_add_df(sample_db_df): - db = DataBundle() - sample_data = sample_db_df.data sample_mask = sample_db_df.mask + db = DataBundle() db_add = db.add({"data": sample_data}) + pd.testing.assert_frame_equal(db_add.data, sample_data) db = DataBundle() - db_add = db.add({"mask": sample_mask}) + pd.testing.assert_frame_equal(db_add.mask, sample_mask) db = DataBundle() - db_add = db.add({"data": sample_data, "mask": sample_mask}) + pd.testing.assert_frame_equal(db_add.data, sample_data) pd.testing.assert_frame_equal(db_add.mask, sample_mask) -def test_add_reader(sample_db_reader): - raise NotImplementedError +def test_add_reader_data(sample_db_reader): + sample_data = sample_db_reader.data + + db = DataBundle() + db_add = db.add({"data": sample_data}) + + pd.testing.assert_frame_equal(db_add.data.read(), sample_data.read()) + + +def test_add_reader_mask(sample_db_reader): + sample_mask = sample_db_reader.mask + + db = DataBundle() + db_add = db.add({"mask": sample_mask}) + + pd.testing.assert_frame_equal(db_add.mask.read(), sample_mask.read()) + + +def test_add_reader_both(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask + + db = DataBundle() + db_add = db.add({"data": sample_data, "mask": sample_mask}) + + pd.testing.assert_frame_equal(db_add.data.read(), sample_data.read()) + pd.testing.assert_frame_equal(db_add.mask.read(), sample_mask.read()) + +def test_stack_v_df(sample_db_df): + sample_data = sample_db_df.data.copy() + sample_mask = sample_db_df.mask.copy() -def test_stack_v_df(sample_db_df, sample_data, sample_mask): db = DataBundle(data=sample_data, mask=sample_mask) - expected_data = pd.concat([sample_db_df.data, db.data], ignore_index=True) - expected_mask = pd.concat([sample_db_df.mask, db.mask], ignore_index=True) sample_db_df.stack_v(db) + expected_data = pd.concat([sample_data, db.data], ignore_index=True) + expected_mask = pd.concat([sample_mask, db.mask], ignore_index=True) + pd.testing.assert_frame_equal(sample_db_df.data, expected_data) pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) -def test_stack_v_reader(): - raise NotImplementedError +def test_stack_v_reader(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask + + db = DataBundle(data=sample_data, mask=sample_mask) + + with pytest.raises(ValueError): + sample_db_reader.stack_v(db) def test_stack_h_df(sample_db_df, sample_data, sample_mask): @@ -144,8 +179,14 @@ def test_stack_h_df(sample_db_df, sample_data, sample_mask): pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) -def test_stack_h_reader(): - raise NotImplementedError +def test_stack_h_reader(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask + + db = DataBundle(data=sample_data, mask=sample_mask) + + with pytest.raises(ValueError): + sample_db_reader.stack_h(db) @pytest.mark.parametrize( @@ -172,29 +213,72 @@ def test_select_operators_df( *args, reset_index=reset_index, inverse=inverse ) - expected = sample_db_df.data - expected_mask = sample_db_df.mask - selected = result.data + data = sample_db_df.data + mask = sample_db_df.mask + + selected_data = result.data selected_mask = result.mask if inverse is False: - idx = expected.index.isin(idx_exp) + idx = data.index.isin(idx_exp) else: - idx = expected.index.isin(idx_rej) + idx = data.index.isin(idx_rej) - expected = expected[idx] - expected_mask = expected_mask[idx] + expected_data = data[idx] + expected_mask = mask[idx] if reset_index is True: - expected = expected.reset_index(drop=True) + expected_data = expected_data.reset_index(drop=True) expected_mask = expected_mask.reset_index(drop=True) - pd.testing.assert_frame_equal(expected, selected) + pd.testing.assert_frame_equal(expected_data, selected_data) pd.testing.assert_frame_equal(expected_mask, selected_mask) -def test_select_operators_reader(): - raise NotImplementedError +@pytest.mark.parametrize( + "func, args, idx_exp, idx_rej", + [ + ("select_where_all_true", [], [0, 1, 2], [3, 4]), + ("select_where_all_false", [], [3], [0, 1, 2, 4]), + ("select_where_index_isin", [[0, 2, 4]], [0, 2, 4], [1, 3]), + ("select_where_entry_isin", [{"A": [26, 41]}], [1, 3], [0, 2, 4]), + ], +) +@pytest.mark.parametrize("reset_index", [False, True]) +@pytest.mark.parametrize("inverse", [False, True]) +def test_select_operators_reader( + sample_db_reader, + func, + args, + idx_exp, + idx_rej, + reset_index, + inverse, +): + result = getattr(sample_db_reader, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_reader.data.read() + mask = sample_db_reader.mask.read() + + selected_data = result.data.read() + selected_mask = result.mask.read() + + if inverse is False: + idx = data.index.isin(idx_exp) + else: + idx = data.index.isin(idx_rej) + + expected_data = data[idx] + expected_mask = mask[idx] + + if reset_index is True: + expected_data = expected_data.reset_index(drop=True) + expected_mask = expected_mask.reset_index(drop=True) + + pd.testing.assert_frame_equal(expected_data, selected_data) + pd.testing.assert_frame_equal(expected_mask, selected_mask) @pytest.mark.parametrize( @@ -221,39 +305,92 @@ def test_split_operators_df( *args, reset_index=reset_index, inverse=inverse ) - expected = sample_db_df.data - expected_mask = sample_db_df.mask - selected = result[0].data + data = sample_db_df.data + mask = sample_db_df.mask + + selected_data = result[0].data selected_mask = result[0].mask - rejected = result[1].data + rejected_data = result[1].data rejected_mask = result[1].mask if inverse is False: - idx1 = expected.index.isin(idx_exp) - idx2 = expected.index.isin(idx_rej) + idx1 = data.index.isin(idx_exp) + idx2 = data.index.isin(idx_rej) else: - idx1 = expected.index.isin(idx_rej) - idx2 = expected.index.isin(idx_exp) + idx1 = data.index.isin(idx_rej) + idx2 = data.index.isin(idx_exp) - expected1 = expected[idx1] - expected2 = expected[idx2] - expected_mask1 = expected_mask[idx1] - expected_mask2 = expected_mask[idx2] + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] if reset_index is True: - expected1 = expected1.reset_index(drop=True) - expected2 = expected2.reset_index(drop=True) + expected_data1 = expected_data1.reset_index(drop=True) + expected_data2 = expected_data2.reset_index(drop=True) expected_mask1 = expected_mask1.reset_index(drop=True) expected_mask2 = expected_mask2.reset_index(drop=True) - pd.testing.assert_frame_equal(expected1, selected) - pd.testing.assert_frame_equal(expected2, rejected) + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) pd.testing.assert_frame_equal(expected_mask1, selected_mask) pd.testing.assert_frame_equal(expected_mask2, rejected_mask) -def test_split_operators_reader(): - raise NotImplementedError +@pytest.mark.parametrize( + "func, args, idx_exp, idx_rej", + [ + ("split_by_boolean_true", [], [0, 1, 2], [3, 4]), + ("split_by_boolean_false", [], [3], [0, 1, 2, 4]), + ("split_by_index", [[0, 2, 4]], [0, 2, 4], [1, 3]), + ("split_by_column_entries", [{"A": [26, 41]}], [1, 3], [0, 2, 4]), + ], +) +@pytest.mark.parametrize("reset_index", [False, True]) +@pytest.mark.parametrize("inverse", [False, True]) +def test_split_operators_reader( + sample_db_reader, + func, + args, + idx_exp, + idx_rej, + reset_index, + inverse, +): + result = getattr(sample_db_reader, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_reader.data.read() + mask = sample_db_reader.mask.read() + + selected_data = result[0].data.read() + selected_mask = result[0].mask.read() + rejected_data = result[1].data.read() + rejected_mask = result[1].mask.read() + + if inverse is False: + idx1 = data.index.isin(idx_exp) + idx2 = data.index.isin(idx_rej) + else: + idx1 = data.index.isin(idx_rej) + idx2 = data.index.isin(idx_exp) + + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] + + if reset_index is True: + expected_data1 = expected_data1.reset_index(drop=True) + expected_data2 = expected_data2.reset_index(drop=True) + expected_mask1 = expected_mask1.reset_index(drop=True) + expected_mask2 = expected_mask2.reset_index(drop=True) + + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) + pd.testing.assert_frame_equal(expected_mask1, selected_mask) + pd.testing.assert_frame_equal(expected_mask2, rejected_mask) def test_unique_df(sample_db_df): From 6eb8030c3204dc18d122fa346d9b7b07afb79a81 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 14:07:59 +0100 Subject: [PATCH 11/14] explicitly set dtypes --- cdm_reader_mapper/common/select.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index f682fc14..fb1332e3 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -138,6 +138,14 @@ def _split_text_reader( if return_rejected: rej.to_csv(buffer_rej, **write_dict) + dtypes = {} + for col, dtype in sel.dtypes.items(): + if dtype == "object": + dtype = "str" + dtypes[col] = dtype + + read_dict["dtype"] = dtypes + buffer_sel.seek(0) buffer_rej.seek(0) From cba96bb2146395b5126a573b9a37a578de7f8d45 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Wed, 28 Jan 2026 16:58:58 +0100 Subject: [PATCH 12/14] update testing suite --- tests/test_common.py | 55 ++++++++++++++++++++++++++++++++++++++++ tests/test_databundle.py | 36 ++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) diff --git a/tests/test_common.py b/tests/test_common.py index 3951447c..dea0cec7 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -137,6 +137,25 @@ def sample_reader(): return reader +@pytest.fixture +def sample_df_multi(): + return pd.DataFrame( + { + "A": [1, 2, 3, 4, 5], + "B": ["x", "y", "z", "x", "y"], + "C": [True, False, True, False, True], + }, + index=[10, 11, 12, 13, 14], + ) + + +@pytest.fixture +def sample_reader_multi(): + text = "10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" + reader = make_parser(text, names=[("A", "a"), ("B", "b"), ("C", "c")]) + return reader + + @pytest.fixture def empty_df(): return pd.DataFrame(columns=["A", "B", "C"]) @@ -184,6 +203,30 @@ def test_split_df(sample_df): assert list(rejected.index) == [11, 12, 14] +def _test_split_df_false_mask(sample_df): + mask = pd.Series([False, False, False, False, False], index=sample_df.index) + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] + + +def test_split_df_multiindex(sample_df): + mask = pd.Series([True, False, False, True, False], index=sample_df.index) + sample_df.columns = pd.MultiIndex.from_tuples( + [ + ("A", "a"), + ( + "B", + "b", + ), + ("C", "c"), + ] + ) + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] + + @pytest.mark.parametrize( "column,boolean,expected_selected,expected_rejected", [ @@ -329,6 +372,18 @@ def test_split_by_index_basic(sample_df, sample_reader, TextFileReader): assert list(rejected.index) == [10, 12, 14] +def test_split_by_index_multiindex(sample_reader_multi): + selected, rejected = split_by_index( + sample_reader_multi, [11, 13], return_rejected=True + ) + + selected = selected.read() + rejected = rejected.read() + + assert list(selected.index) == [11, 13] + assert list(rejected.index) == [10, 12, 14] + + @pytest.mark.parametrize("TextFileReader", [False, True]) def test_split_by_column_entries_basic(sample_df, sample_reader, TextFileReader): if TextFileReader: diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 480218ce..af6adead 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -42,6 +42,17 @@ def sample_db_reader(): return DataBundle(data=reader_data, mask=reader_mask) +@pytest.fixture +def sample_db_reader_multi(): + text = "19,0\n26,1\n27,2\n41,3\n91,4" + reader_data = make_parser(text, names=[("A", "a"), ("B", "b")]) + + text = "True,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" + reader_mask = make_parser(text, names=["A", "B"]) + + return DataBundle(data=reader_data, mask=reader_mask) + + @pytest.fixture def sample_data(): return pd.DataFrame({"C": [20, 21, 22, 23, 24]}) @@ -393,6 +404,31 @@ def test_split_operators_reader( pd.testing.assert_frame_equal(expected_mask2, rejected_mask) +def test_split_by_index_multi(sample_db_reader_multi): + result = sample_db_reader_multi.split_by_column_entries({("A", "a"): [26, 41]}) + + data = sample_db_reader_multi.data.read() + mask = sample_db_reader_multi.mask.read() + + selected_data = result[0].data.read() + selected_mask = result[0].mask.read() + rejected_data = result[1].data.read() + rejected_mask = result[1].mask.read() + + idx1 = data.index.isin([1, 3]) + idx2 = data.index.isin([0, 2, 4]) + + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] + + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) + pd.testing.assert_frame_equal(expected_mask1, selected_mask) + pd.testing.assert_frame_equal(expected_mask2, rejected_mask) + + def test_unique_df(sample_db_df): result = sample_db_df.unique(columns=("A")) assert result == {"A": {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} From 391030692a580a6808a7dd01285ee1a53627781b Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Thu, 29 Jan 2026 11:34:56 +0100 Subject: [PATCH 13/14] read data with chunksize --- .../common/pandas_TextParser_hdlr.py | 2 +- cdm_reader_mapper/mdf_reader/reader.py | 16 +++-- .../mdf_reader/utils/utilities.py | 72 ++++++++++++++++--- tests/test_databundle.py | 53 +++++++++++++- tests/test_mdf_reader.py | 40 ++++++++++- tests/test_reader_utilities.py | 18 +++-- 6 files changed, 177 insertions(+), 24 deletions(-) diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index 8d8c3b46..b3a17b8c 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -75,7 +75,7 @@ def make_copy(parser: TextFileReader) -> TextFileReader | None: try: return _new_reader_from_buffer(parser) except Exception as e: - raise RuntimeError("Failed to copy TextParser") from e + raise RuntimeError(f"Failed to copy TextParser: {e}") from e def restore(parser: TextFileReader) -> TextFileReader | None: diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index c921c031..ef10639e 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -277,19 +277,23 @@ def read_data( pd_kwargs.setdefault("parse_dates", parse_dates) pd_kwargs.setdefault("encoding", encoding) - data = read_csv( + data, infos = read_csv( source, col_subset=col_subset, **pd_kwargs, ) - mask = read_csv(mask, col_subset=col_subset, dtype="boolean") - if not mask.empty: - mask = mask.reindex(columns=data.columns) + + pd_kwargs = kwargs.copy() + pd_kwargs.setdefault("dtype", "boolean") + + mask, _ = read_csv( + mask, col_subset=col_subset, columns=infos["columns"], **pd_kwargs + ) return DataBundle( data=data, - columns=data.columns, - dtypes=dtype, + columns=infos["columns"], + dtypes=infos["dtypes"].to_dict(), parse_dates=parse_dates, mask=mask, imodel=imodel, diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 5b47ef2c..f7a782bd 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -180,7 +180,43 @@ def update_column_labels(columns: Iterable[str | tuple]) -> pd.Index | pd.MultiI return pd.Index(new_cols) -def read_csv(filepath, col_subset=None, **kwargs) -> pd.DataFrame: +def update_and_select( + df: pd.DataFrame, + subset: str | list | None = None, + columns: pd.Index | pd.MultiIndex | None = None, +) -> tuple[pd.DataFrame, dict[str, Any]]: + """ + Update string column labels and select subset from DataFrame. + + Parameters + ---------- + df : pd.DataFrame + DataFrame to be updated + subset : str or list, optional + Column names to be selected + columns: + Column labels for re-indexing. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + df.columns = update_column_labels(df.columns) + if subset is not None: + df = df[subset] + if columns is not None and not df.empty: + df = df.reindex(columns=columns) + return df, {"columns": df.columns, "dtypes": df.dtypes} + + +def read_csv( + filepath: Path, + col_subset: str | list | None = None, + columns: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: """ Safe CSV reader that handles missing files and column subsets. @@ -190,24 +226,40 @@ def read_csv(filepath, col_subset=None, **kwargs) -> pd.DataFrame: Path to the CSV file. col_subset : list of str, optional Subset of columns to read from the CSV. + columns: + Column labels for re-indexing. kwargs : any Additional keyword arguments passed to pandas.read_csv. Returns ------- - pd.DataFrame - The CSV as a DataFrame. Empty if file does not exist. + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types """ if filepath is None or not Path(filepath).is_file(): logging.warning(f"File not found: {filepath}") - return pd.DataFrame() + return pd.DataFrame(), {} - df = pd.read_csv(filepath, delimiter=",", **kwargs) - df.columns = update_column_labels(df.columns) - if col_subset is not None: - df = df[col_subset] + data = pd.read_csv(filepath, delimiter=",", **kwargs) + + if isinstance(data, pd.DataFrame): + data, info = update_and_select(data, subset=col_subset, columns=columns) + return data, info + + write_kwargs = {} + if "encoding" in kwargs: + write_kwargs["encoding"] = kwargs["encoding"] - return df + data, info = process_textfilereader( + data, + func=update_and_select, + func_kwargs={"subset": col_subset, "columns": columns}, + read_kwargs=kwargs, + write_kwargs=write_kwargs, + makecopy=False, + ) + return data, info def convert_dtypes(dtypes) -> tuple[str]: @@ -329,7 +381,7 @@ def process_textfilereader( read_kwargs: dict[str, Any] | tuple[dict[str, Any], ...] | None = None, write_kwargs: dict[str, Any] | None = None, makecopy: bool = True, -) -> tuple[pd.DataFrame, ...]: +) -> tuple[Iterable[pd.DataFrame], ...]: """ Process a stream of DataFrames using a function and return processed results. diff --git a/tests/test_databundle.py b/tests/test_databundle.py index af6adead..161a65ce 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -5,7 +5,7 @@ from io import StringIO -from cdm_reader_mapper import DataBundle +from cdm_reader_mapper import DataBundle, read_data, test_data def make_parser(text, **kwargs): @@ -63,6 +63,26 @@ def sample_mask(): return pd.DataFrame({"C": [True, False, True, False, False]}) +@pytest.fixture +def sample_db_df_testdata(): + data_model = "icoads_r300_d714" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + + return read_data(data, mask=mask, info=info) + + +@pytest.fixture +def sample_db_reader_testdata(): + data_model = "icoads_r300_d714" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + + return read_data(data, mask=mask, info=info, chunksize=2) + + def test_len_df(sample_db_df): assert len(sample_db_df) == 5 @@ -292,6 +312,37 @@ def test_select_operators_reader( pd.testing.assert_frame_equal(expected_mask, selected_mask) +@pytest.mark.parametrize( + "func, args, idx_exp", + [ + # ("select_where_all_true", [], [0, 1, 2], [3, 4]), + # ("select_where_all_false", [], [3], [0, 1, 2, 4]), + ("select_where_index_isin", [[0, 2, 4]], [0, 2, 4]), + # ("select_where_entry_isin", [{("core", "ID"): [25629, 26558]}], [1, 3]), + ], +) +def test_select_operators_testdata_reader( + sample_db_reader_testdata, + func, + args, + idx_exp, +): + result = getattr(sample_db_reader_testdata, func)(*args) + data = sample_db_reader_testdata.data.read() + mask = sample_db_reader_testdata.mask.read() + + selected_data = result.data.read() + selected_mask = result.mask.read() + + idx = data.index.isin(idx_exp) + + expected_data = data[idx] + expected_mask = mask[idx] + + pd.testing.assert_frame_equal(expected_data, selected_data, check_dtype=False) + pd.testing.assert_frame_equal(expected_mask, selected_mask) + + @pytest.mark.parametrize( "func, args, idx_exp, idx_rej", [ diff --git a/tests/test_mdf_reader.py b/tests/test_mdf_reader.py index 59c75f50..e1467c0a 100755 --- a/tests/test_mdf_reader.py +++ b/tests/test_mdf_reader.py @@ -296,7 +296,7 @@ def test_read_data_no_info(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert db.dtypes == "object" + assert isinstance(db.dtypes, dict) assert db.parse_dates is False assert db.encoding is None assert db.imodel is None @@ -364,7 +364,7 @@ def test_read_data_encoding(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert db.dtypes == "object" + assert isinstance(db.dtypes, dict) assert db.parse_dates is False assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -376,6 +376,42 @@ def test_read_data_encoding(): assert db.size == 1705 +def test_read_data_textfilereader(): + data_model = "icoads_r300_d721" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + db = read_data(data, mask=mask, info=info, chunksize=3) + + assert isinstance(db, DataBundle) + + for attr in [ + "data", + "mask", + "columns", + "dtypes", + "parse_dates", + "encoding", + "imodel", + "mode", + ]: + assert hasattr(db, attr) + + assert isinstance(db.data, pd.io.parsers.TextFileReader) + assert isinstance(db.mask, pd.io.parsers.TextFileReader) + assert isinstance(db.columns, pd.MultiIndex) + assert isinstance(db.dtypes, dict) + assert db.parse_dates == [] + assert isinstance(db.encoding, str) + assert db.encoding == "cp1252" + assert db.imodel is None + assert isinstance(db.mode, str) + assert db.mode == "data" + assert len(db) == 5 + assert db.shape == (5, 341) + assert db.size == 1705 + + def test_validate_read_mdf_args_pass(tmp_path): source = tmp_path / "file.mdf" source.touch() diff --git a/tests/test_reader_utilities.py b/tests/test_reader_utilities.py index f4a46639..2ae892be 100755 --- a/tests/test_reader_utilities.py +++ b/tests/test_reader_utilities.py @@ -167,21 +167,32 @@ def test_update_column_labels_mixed(): def test_read_csv_file_exists(tmp_csv_file): file_path, data = tmp_csv_file - df = read_csv(file_path) + df, info = read_csv(file_path) pd.testing.assert_frame_equal(df, data) + assert "columns" in info + pd.testing.assert_index_equal(info["columns"], df.columns) + assert "dtypes" in info + pd.testing.assert_series_equal(info["dtypes"], df.dtypes) + def test_read_csv_file_missing(tmp_path): missing_file = tmp_path / "missing.csv" - df = read_csv(missing_file) + df, info = read_csv(missing_file) assert df.empty + assert info == {} def test_read_csv_with_col_subset(tmp_csv_file): file_path, _ = tmp_csv_file - df = read_csv(file_path, col_subset=["B"]) + df, info = read_csv(file_path, col_subset=["B"]) assert list(df.columns) == ["B"] + assert "columns" in info + pd.testing.assert_index_equal(info["columns"], df.columns) + assert "dtypes" in info + pd.testing.assert_series_equal(info["dtypes"], df.dtypes) + def test_convert_dtypes_basic(): dtypes = {"A": "int", "B": "datetime", "C": "float"} @@ -252,7 +263,6 @@ def test_process_textfilereader_only_df(sample_reader): (reader_out,) = process_textfilereader( sample_reader, sample_func_only_df, read_kwargs={"chunksize": 1} ) - print(reader_out) assert isinstance(reader_out, TextFileReader) df_out = reader_out.read() assert df_out.shape == (2, 2) From 3a51a1f15b75a3e1ad3700c6dcd4c2cc640efc7b Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Thu, 29 Jan 2026 11:46:02 +0100 Subject: [PATCH 14/14] update CHANGELOG --- CHANGES.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 10bf9e59..a3be1749 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,22 @@ Changelog ========= +2.2.2 (unreleased) +------------------ +Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`) + +New features and enhancements +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +* ``mdf_reader.read_data`` now supports chunking (:pull:`360`) + +Breaking changes +^^^^^^^^^^^^^^^^ +* ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`) + +Internal changes +^^^^^^^^^^^^^^^^ +* re-work internal structure for more readability and better performance (:pull:`360`) + 2.2.1 (2026-01-23) ------------------ Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`)