diff --git a/CHANGES.rst b/CHANGES.rst index 10bf9e59..a3be1749 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -3,6 +3,22 @@ Changelog ========= +2.2.2 (unreleased) +------------------ +Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`) + +New features and enhancements +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +* ``mdf_reader.read_data`` now supports chunking (:pull:`360`) + +Breaking changes +^^^^^^^^^^^^^^^^ +* ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`) + +Internal changes +^^^^^^^^^^^^^^^^ +* re-work internal structure for more readability and better performance (:pull:`360`) + 2.2.1 (2026-01-23) ------------------ Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`) diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 1cd61314..f084adec 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -20,6 +20,8 @@ import numpy as np import pandas as pd +from pandas.io.parsers import TextFileReader + from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr from . import properties @@ -41,7 +43,7 @@ def _log_and_return_empty(msg): return _log_and_return_empty("Input data is empty") return [data] - elif isinstance(data, pd.io.parsers.TextFileReader): + elif isinstance(data, TextFileReader): logger.debug("Input is a pd.TextFileReader") if not pandas_TextParser_hdlr.is_not_empty(data): return _log_and_return_empty("Input data is empty") @@ -80,22 +82,20 @@ def _is_empty(value): def _drop_duplicated_rows(df) -> pd.DataFrame: """Drop duplicates from list.""" + list_cols = [ + col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any() + ] + + for col in list_cols: + df[col] = df[col].apply(lambda x: tuple(x) if isinstance(x, list) else x) - def list_to_tuple(v): - if isinstance(v, list): - v = tuple(v) - return v + df.drop_duplicates(ignore_index=True, inplace=True) - def tuple_to_list(v): - if isinstance(v, tuple): - v = list(v) - return v + for col in list_cols: + if df[col].apply(lambda x: isinstance(x, tuple)).any(): + df[col] = df[col].apply(lambda x: list(x) if isinstance(x, tuple) else x) - dtypes = df.dtypes - df = df.map(list_to_tuple) - df = df.drop_duplicates(ignore_index=True) - df = df.map(tuple_to_list) - return df.astype(dtypes) + return df def _get_nested_value(ndict, keys) -> Any | None: @@ -209,7 +209,7 @@ def _fill_value(series, fill_value) -> pd.Series: return series.fillna(value=fill_value).infer_objects(copy=False) -def _extract_input_data(idata, elements, cols, default, logger): +def _extract_input_data(idata, elements, default, logger): """Extract the relevant input data based on `elements`.""" def _return_default(): @@ -220,18 +220,14 @@ def _return_default(): logger.debug(f"\telements: {' '.join(map(str, elements))}") - missing_elements = [e for e in elements if e not in cols] - if missing_elements: - logger.warning( - "Missing elements from input data: {}".format( - ",".join(map(str, missing_elements)) - ) - ) - return _return_default() + cols = idata.columns + + for e in elements: + if e not in cols: + logger.warning(f"Missing element from input data: {e}") + return _return_default() - data = idata[elements] - if len(elements) == 1: - data = data.iloc[:, 0] + data = idata[elements[0]] if len(elements) == 1 else idata[elements] if _is_empty(data): return _return_default() @@ -245,7 +241,6 @@ def _column_mapping( imodel_functions, atts, codes_subset, - cols, column, logger, ): @@ -264,40 +259,46 @@ def _column_mapping( data, used_default = _extract_input_data( idata, elements, - cols, default, logger, ) - if transform and not used_default: - data = _transform( - data, - imodel_functions, - transform, - kwargs, - logger=logger, - ) + if not used_default: + if transform: + data = _transform( + data, + imodel_functions, + transform, + kwargs, + logger=logger, + ) + elif code_table: + data = _code_table( + data, + imodel_functions.imodel, + code_table, + logger=logger, + ) - elif code_table and not used_default: - data = _code_table( - data, - imodel_functions.imodel, - code_table, - logger=logger, - ) + if not isinstance(data, pd.Series): + data = pd.Series(data, index=idata.index, copy=False) + + data.name = column - data = pd.Series(data, index=idata.index, name=column) - data = _fill_value(data, fill_value) - atts["decimal_places"] = _decimal_places(decimal_places) + if fill_value is not None: + data = _fill_value(data, fill_value) - return data, atts + if atts: + atts["decimal_places"] = _decimal_places(decimal_places) + data = _convert_dtype(data, atts) + + return data def _table_mapping( idata, mapping, atts, - cols, null_label, imodel_functions, codes_subset, @@ -306,31 +307,32 @@ def _table_mapping( drop_duplicates, logger, ) -> pd.DataFrame: - columns = ( - [x for x in atts.keys() if x in idata.columns] - if not cdm_complete - else list(atts.keys()) - ) - - table_df = pd.DataFrame(index=idata.index, columns=columns) + columns = list(atts) if cdm_complete else [c for c in atts if c in idata.columns] + out = {} for column in columns: if column not in mapping.keys(): + out[column] = pd.Series( + [null_label] * len(idata), index=idata.index, name=column + ) continue logger.debug(f"\tElement: {column}") - table_df[column], atts[column] = _column_mapping( + out[column] = _column_mapping( idata, mapping[column], imodel_functions, atts[column], codes_subset, - cols, column, logger, ) - table_df[column] = _convert_dtype(table_df[column], atts.get(column)) + + if not out: + return pd.DataFrame(index=idata.index) + + table_df = pd.DataFrame(out, index=idata.index) if drop_missing_obs is True and "observation_value" in table_df: table_df = table_df.dropna(subset=["observation_value"]) @@ -345,16 +347,21 @@ def _prepare_cdm_tables(cdm_subset): """Prepare table buffers and attributes for CDM tables.""" if isinstance(cdm_subset, str): cdm_subset = [cdm_subset] + cdm_atts = get_cdm_atts(cdm_subset) if not cdm_atts: return {} - return { - table: { + + tables = {} + for table, atts in cdm_atts.items(): + for col, meta in atts.items(): + meta["decimal_places"] = _decimal_places(meta.get("decimal_places")) + tables[table] = { "buffer": StringIO(), - "atts": deepcopy(cdm_atts.get(table)), + "atts": atts, } - for table in cdm_subset - } + + return tables def _process_chunk( @@ -362,13 +369,13 @@ def _process_chunk( imodel_maps, imodel_functions, cdm_tables, - cols, null_label, codes_subset, cdm_complete, drop_missing_obs, drop_duplicates, logger, + is_reader, ): """Process one chunk of input data.""" for table, mapping in imodel_maps.items(): @@ -378,7 +385,6 @@ def _process_chunk( idata=idata, mapping=mapping, atts=deepcopy(cdm_tables[table]["atts"]), - cols=cols, null_label=null_label, imodel_functions=imodel_functions, codes_subset=codes_subset, @@ -389,13 +395,17 @@ def _process_chunk( ) table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns]) - table_df.to_csv( - cdm_tables[table]["buffer"], - header=False, - index=False, - mode="a", - ) - cdm_tables[table]["columns"] = table_df.columns + + if is_reader: + table_df.to_csv( + cdm_tables[table]["buffer"], + header=False, + index=False, + mode="a", + ) + cdm_tables[table]["columns"] = table_df.columns + else: + cdm_tables[table]["df"] = table_df.astype(object) def _finalize_output(cdm_tables, logger): @@ -405,19 +415,24 @@ def _finalize_output(cdm_tables, logger): for table, meta in cdm_tables.items(): logger.debug(f"\tParse datetime by reader; Table: {table}") - meta["buffer"].seek(0) - df = pd.read_csv( - meta["buffer"], - names=meta["columns"], - na_values=[], - dtype="object", - keep_default_na=False, - ) - - meta["buffer"].close() + if "df" not in meta: + meta["buffer"].seek(0) + df = pd.read_csv( + meta["buffer"], + names=meta["columns"], + na_values=[], + dtype="object", + keep_default_na=False, + ) + meta["buffer"].close() + else: + df = meta.get("df", pd.DataFrame()) final_tables.append(df) + if not final_tables: + return pd.DataFrame() + return pd.concat(final_tables, axis=1, join="outer").reset_index(drop=True) @@ -447,20 +462,21 @@ def _map_and_convert( cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) + is_reader = isinstance(data_iter, TextFileReader) + for idata in data_iter: - cols = list(idata.columns) _process_chunk( idata=idata, imodel_maps=imodel_maps, imodel_functions=imodel_functions, cdm_tables=cdm_tables, - cols=cols, null_label=null_label, codes_subset=codes_subset, cdm_complete=cdm_complete, drop_missing_obs=drop_missing_obs, drop_duplicates=drop_duplicates, logger=logger, + is_reader=is_reader, ) return _finalize_output(cdm_tables, logger) diff --git a/cdm_reader_mapper/common/inspect.py b/cdm_reader_mapper/common/inspect.py index 267caafa..148b13d3 100755 --- a/cdm_reader_mapper/common/inspect.py +++ b/cdm_reader_mapper/common/inspect.py @@ -12,13 +12,14 @@ import pandas as pd -from . import pandas_TextParser_hdlr +from .pandas_TextParser_hdlr import make_copy +from .pandas_TextParser_hdlr import get_length as get_length_hdlr def _count_by_cat(series) -> dict: """Count unique values in a pandas Series, including NaNs.""" counts = series.value_counts(dropna=False) - counts.index = counts.index.map(lambda x: "nan" if pd.isna(x) else x) + counts.index = counts.index.where(~counts.index.isna(), "nan") return counts.to_dict() @@ -51,29 +52,24 @@ def count_by_cat( if not isinstance(columns, list): columns = [columns] - counts = {} + counts = {col: {} for col in columns} if isinstance(data, pd.DataFrame): for column in columns: counts[column] = _count_by_cat(data[column]) return counts - for column in columns: - data_cp = pandas_TextParser_hdlr.make_copy(data) - count_dicts = [] - - for df in data_cp: - count_dicts.append(_count_by_cat(df[column])) - - data_cp.close() - - merged_counts = {} - for d in count_dicts: - for k, v in d.items(): - merged_counts[k] = merged_counts.get(k, 0) + v + data_cp = make_copy(data) + if data_cp is None: + return counts - counts[column] = merged_counts + for chunk in data_cp: + for column in columns: + chunk_counts = _count_by_cat(chunk[column]) + for k, v in chunk_counts.items(): + counts[column][k] = counts[column].get(k, 0) + v + data_cp.close() return counts @@ -98,4 +94,4 @@ def get_length(data: pd.DataFrame | pd.io.parsers.TextFileReader) -> int: """ if not isinstance(data, pd.io.parsers.TextFileReader): return len(data) - return pandas_TextParser_hdlr.get_length(data) + return get_length_hdlr(data) diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py index c6473e89..b3a17b8c 100755 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) -read_params = [ +_READ_CSV_KWARGS = [ "chunksize", "names", "dtype", @@ -22,7 +22,37 @@ ] -def make_copy(Parser: TextFileReader) -> TextFileReader | None: +def _get_raw_buffer(parser: TextFileReader) -> str | None: + if hasattr(parser, "_raw_buffer"): + return parser._raw_buffer + + f = getattr(parser.handles, "handle", None) + if f is None: + raise ValueError("TextFileReader has no accessible handle for copying.") + + try: + f = parser.handles.handle + raw = f.getvalue() + parser._raw_buffer = raw + return raw + except Exception as e: + raise RuntimeError("Failed to read raw buffer") from e + + +def _new_reader_from_buffer(parser: TextFileReader) -> TextFileReader | None: + raw = _get_raw_buffer(parser) + if raw is None: + return None + + read_dict = read_dict = { + k: parser.orig_options.get(k) + for k in _READ_CSV_KWARGS + if k in parser.orig_options + } + return pd.read_csv(StringIO(raw), **read_dict) + + +def make_copy(parser: TextFileReader) -> TextFileReader | None: """ Create a duplicate of a pandas TextFileReader object. @@ -43,16 +73,12 @@ def make_copy(Parser: TextFileReader) -> TextFileReader | None: only for in-memory file-like objects such as `StringIO`. """ try: - f = Parser.handles.handle - new_ref = StringIO(f.getvalue()) - read_dict = {k: Parser.orig_options.get(k) for k in read_params} - return pd.read_csv(new_ref, **read_dict) - except Exception: - logger.error("Failed to copy TextParser", exc_info=True) - return None + return _new_reader_from_buffer(parser) + except Exception as e: + raise RuntimeError(f"Failed to copy TextParser: {e}") from e -def restore(Parser: TextFileReader) -> TextFileReader | None: +def restore(parser: TextFileReader) -> TextFileReader | None: """ Restore a TextFileReader to its initial read position and state. @@ -66,17 +92,10 @@ def restore(Parser: TextFileReader) -> TextFileReader | None: pandas.io.parsers.TextFileReader or None Restored TextFileReader, or None if restoration fails. """ - try: - f = Parser.handles.handle - f.seek(0) - read_dict = {k: Parser.orig_options.get(k) for k in read_params} - return pd.read_csv(f, **read_dict) - except Exception: - logger.error("Failed to restore TextParser", exc_info=True) - return None + return make_copy(parser) -def is_not_empty(Parser: TextFileReader) -> bool | None: +def is_not_empty(parser: TextFileReader) -> bool | None: """ Determine whether a TextFileReader contains at least one row. @@ -92,27 +111,24 @@ def is_not_empty(Parser: TextFileReader) -> bool | None: False if empty. None if an error occurs. """ - try: - parser_copy = make_copy(Parser) - if parser_copy is None: - return None - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", - exc_info=True, - ) + if hasattr(parser, "_is_not_empty"): + return parser._is_not_empty + + reader = make_copy(parser) + if reader is None: return None try: - chunk = parser_copy.get_chunk() - parser_copy.close() - return len(chunk) > 0 - except Exception: - logger.debug("Error while checking emptiness", exc_info=True) + chunk = next(reader) + result = not chunk.empty + parser._is_not_empty = result + return result + except StopIteration: + parser._is_not_empty = False return False -def get_length(Parser: TextFileReader) -> int | None: +def get_length(parser: TextFileReader) -> int | None: """ Count total rows in a TextFileReader (consuming a copied stream). @@ -126,22 +142,18 @@ def get_length(Parser: TextFileReader) -> int | None: int or None Total number of rows, or None if processing fails. """ - try: - parser_copy = make_copy(Parser) - if parser_copy is None: - return None - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", - exc_info=True, - ) + if hasattr(parser, "_row_count"): + return parser._row_count + + reader = make_copy(parser) + if reader is None: return None total = 0 try: - for df in parser_copy: - total += len(df) + for chunk in reader: + total += len(chunk) + parser._row_count = total return total - except Exception: - logger.error("Failed while counting rows", exc_info=True) - return None + except Exception as e: + raise RuntimeError("Failed while counting rows") from e diff --git a/cdm_reader_mapper/common/replace.py b/cdm_reader_mapper/common/replace.py index 15426ec0..15bf4351 100755 --- a/cdm_reader_mapper/common/replace.py +++ b/cdm_reader_mapper/common/replace.py @@ -73,26 +73,23 @@ def replace_columns( # Check inargs if not isinstance(df_l, pd.DataFrame) or not isinstance(df_r, pd.DataFrame): logger.error("Input left and right data must be pandas DataFrames.") - return + return None - if pivot_c: + if pivot_c is not None: pivot_l = pivot_r = pivot_c - if not (pivot_l and pivot_r): + if pivot_l is None or pivot_r is None: logger.error( "Pivot columns must be declared using `pivot_c` or both `pivot_l` and `pivot_r`." ) - return - - df_l = df_l.copy().set_index(pivot_l, drop=False) - df_r = df_r.copy().set_index(pivot_r, drop=False) + return None if rep_map is None: if rep_c is None: logger.error( "Replacement columns must be declared using `rep_c` or `rep_map`." ) - return + return None if isinstance(rep_c, str): rep_c = [rep_c] rep_map = {col: col for col in rep_c} @@ -104,12 +101,18 @@ def replace_columns( ) return None - # Build right-side replacement DataFrame with left-column names - df_r_l = df_r[list(rep_map.values())].rename( - columns={v: k for k, v in rep_map.items()} + out = df_l.copy() + right_lookup = ( + df_r[[pivot_r, *rep_map.values()]] + .set_index(pivot_r) + .rename(columns={v: k for k, v in rep_map.items()}) ) - # Apply update - df_l.update(df_r_l) + # Align once using reindex (vectorized, C-level) + aligned = right_lookup.reindex(out[pivot_l].values) + + # Assign columns directly (fastest path) + for col in aligned.columns: + out[col] = aligned[col].values - return df_l.reset_index(drop=True) + return out diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 047d776e..fb1332e3 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -9,118 +9,78 @@ from __future__ import annotations from io import StringIO -from typing import Sequence, Iterable, Callable +from typing import Iterable, Callable import pandas as pd -def _select_rows_by_index( +def _split_df( df: pd.DataFrame, - index_list: str | int | Sequence, - **kwargs, -) -> pd.DataFrame: - """Select rows from a DataFrame based on index values.""" - reset_index = kwargs.get("reset_index", False) - inverse = kwargs.get("inverse", False) - - if isinstance(index_list, str): - index_list = [index_list] - if isinstance(index_list, list): - index_list = pd.Index(index_list) - index = df.index.isin(index_list) - if inverse is True: - in_df = df[~index] - else: - in_df = df[index] - - if reset_index is True: - in_df = in_df.reset_index(drop=True) - - in_df.__dict__["_prev_index"] = index_list - return in_df + mask: pd.DataFrame, + reset_index: bool = False, + inverse: bool = False, + return_rejected: bool = False, +): + if inverse: + selected = df[~mask] + rejected = df[mask] if return_rejected else df.iloc[0:0] + else: + selected = df[mask] + rejected = df[~mask] if return_rejected else df.iloc[0:0] -def _split_by_index( - df: pd.DataFrame, - indexes: str | Sequence, - **kwargs, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame into two parts based on index values.""" - return_rejected = kwargs.get("return_rejected", False) + selected.attrs["_prev_index"] = mask.index[mask] + rejected.attrs["_prev_index"] = mask.index[~mask] - out1 = _select_rows_by_index( - df, - indexes, - **kwargs, - ) - if return_rejected is True: - index2 = [idx for idx in df.index if idx not in indexes] - out2 = _select_rows_by_index(df, index2, **kwargs) - else: - out2 = pd.DataFrame(columns=out1.columns) - out2.__dict__["_prev_index"] = pd.Index([]) + if reset_index: + selected = selected.reset_index(drop=True) + rejected = rejected.reset_index(drop=True) - return out1, out2 + return selected, rejected -def _split_by_boolean_mask_( - df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on a boolean mask using `_split_by_index`.""" +def _split_by_boolean_df(df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs): if mask.empty: - if boolean: - indexes = df.index - else: - indexes = df.index.difference(df.index) - return _split_by_index(df, indexes, **kwargs) - - if boolean is True: - global_mask = mask.all(axis=1) + mask_sel = pd.Series(boolean, index=df.index) else: - global_mask = ~(mask.any(axis=1)) - - indexes = global_mask[global_mask.fillna(boolean)].index - return _split_by_index(df, indexes, **kwargs) + mask_sel = mask.all(axis=1) if boolean else ~mask.any(axis=1) + mask_sel = mask_sel.fillna(boolean) + return _split_df(df=df, mask=mask_sel, **kwargs) -def _split_by_boolean_mask( - df: pd.DataFrame, mask: pd.DataFrame, boolean: bool, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on strict boolean masks (all-True or all-False).""" - if mask.empty: - selected = pd.Series(boolean, index=df.index) - else: - selected = mask.all(axis=1) if boolean else ~mask.any(axis=1) - selected = selected.fillna(boolean) +def _split_by_column_df( + df: pd.DataFrame, + col: str, + values: Iterable, + **kwargs, +): + mask_sel = df[col].isin(values) - indexes = selected[selected].index - return _split_by_index(df, indexes, **kwargs) + return _split_df(df=df, mask=mask_sel, **kwargs) -def _split_by_column_values( - df: pd.DataFrame, col: str, values: Iterable, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on entries in a specific column using `_split_by_index`.""" - in_df = df.loc[df[col].isin(values)] - index = list(in_df.index) - return _split_by_index( - df, - index, - **kwargs, - ) +def _split_by_index_df( + df: pd.DataFrame, + index, + **kwargs, +): + index = pd.Index(index if isinstance(index, Iterable) else [index]) + mask_sel = pd.Series(df.index.isin(index), index=df.index) + return _split_df(df=df, mask=mask_sel, **kwargs) -def _split_by_index_values( - df: pd.DataFrame, index, **kwargs -) -> tuple[pd.DataFrame, pd.DataFrame]: - """Split a DataFrame based on index values using `_split_by_index`.""" - return _split_by_index(df, index, **kwargs) +def _split_text_reader( + reader, + func: Callable, + *args, + reset_index=False, + inverse=False, + return_rejected=False, +): + buffer_sel = StringIO() + buffer_rej = StringIO() -def _split_parser( - data, *args, func=None, reset_index=False, inverse=False, return_rejected=False -) -> tuple[pd.io.parsers.TextFileReader]: - """Common pandas TextFileReader selection function.""" read_params = [ "chunksize", "names", @@ -129,66 +89,94 @@ def _split_parser( "date_parser", "infer_datetime_format", ] + write_dict = {"header": None, "mode": "a", "index": not reset_index} - read_dict = {x: data[0].orig_options.get(x) for x in read_params} - buffer1 = StringIO() - buffer2 = StringIO() - _prev_index1 = None - _prev_index2 = None - for zipped in zip(*data): + read_dict = {x: reader.orig_options.get(x) for x in read_params} + + new_args = [] + new_readers = [] + + prev_index_sel = None + prev_index_rej = None + + for d in args: + if isinstance(d, pd.io.parsers.TextFileReader): + new_readers.append(d) + else: + new_args.append(d) + + readers = [reader] + new_readers + + for zipped in zip(*readers): + if not isinstance(zipped, tuple): zipped = tuple(zipped) - out1, out2 = func( + + sel, rej = func( *zipped, - *args, + *new_args, reset_index=reset_index, inverse=inverse, return_rejected=return_rejected, ) - if _prev_index1 is None: - _prev_index1 = out1.__dict__["_prev_index"] + + sel_prev_index = sel.attrs["_prev_index"] + + if prev_index_sel is None: + prev_index_sel = sel_prev_index else: - _prev_index1 = _prev_index1.union(out1.__dict__["_prev_index"]) - if _prev_index2 is None: - _prev_index2 = out2.__dict__["_prev_index"] + prev_index_sel = prev_index_sel.union(sel_prev_index) + + rej_prev_index = rej.attrs["_prev_index"] + + if prev_index_rej is None: + prev_index_rej = rej_prev_index else: - _prev_index2 = _prev_index2.union(out2.__dict__["_prev_index"]) - out1.to_csv(buffer1, **write_dict) - if return_rejected is True: - out2.to_csv(buffer2, **write_dict) + prev_index_rej = prev_index_rej.union(rej_prev_index) + + sel.to_csv(buffer_sel, **write_dict) + if return_rejected: + rej.to_csv(buffer_rej, **write_dict) + dtypes = {} - for k, v in out1.dtypes.items(): - if v == "object": - v = "str" - dtypes[k] = v + for col, dtype in sel.dtypes.items(): + if dtype == "object": + dtype = "str" + dtypes[col] = dtype + read_dict["dtype"] = dtypes - buffer1.seek(0) - buffer2.seek(0) - TextParser1 = pd.read_csv(buffer1, **read_dict) - TextParser1.__dict__["_prev_index"] = _prev_index1 - TextParser2 = pd.read_csv(buffer2, **read_dict) - TextParser2.__dict__["_prev_index"] = _prev_index2 - return TextParser1, TextParser2 - - -def _split( - data: pd.DataFrame | Iterable[pd.DataFrame], - func: Callable[..., tuple[pd.DataFrame, pd.DataFrame]], + + buffer_sel.seek(0) + buffer_rej.seek(0) + + selected = pd.read_csv(buffer_sel, **read_dict) + rejected = pd.read_csv(buffer_rej, **read_dict) + + selected.attrs = {"_prev_index": prev_index_sel} + rejected.attrs = {"_prev_index": prev_index_rej} + + return selected, rejected + + +def _split_dispatch( + data, + func: Callable, *args, **kwargs, -) -> tuple[pd.DataFrame, pd.DataFrame]: - """ - Apply a split function to one or more DataFrames. +): - - If `data` is a single DataFrame, pass it to `func`. - - If `data` is a list of DataFrames, only the first is used here - (TextFileReader logic is handled separately). - """ - if not isinstance(data, list): - data = [data] - if isinstance(data[0], pd.io.parsers.TextFileReader): - return _split_parser(data, *args, func=func, **kwargs) - return func(*data, *args, **kwargs) + if isinstance(data, pd.DataFrame): + return func(data, *args, **kwargs) + + if isinstance(data, pd.io.parsers.TextFileReader): + return _split_text_reader( + data, + func, + *args, + **kwargs, + ) + + raise TypeError("Unsupported input type for split operation.") def split_by_boolean( @@ -227,10 +215,10 @@ def split_by_boolean( Tuple ``(selected, rejected)`` returned by the underlying ``split_dataframe_by_boolean`` implementation. """ - func = _split_by_boolean_mask - return _split( - [data, mask], - func, + return _split_dispatch( + data, + _split_by_boolean_df, + mask, boolean, reset_index=reset_index, inverse=inverse, @@ -343,12 +331,10 @@ def split_by_column_entries( (pandas.DataFrame, pandas.DataFrame) Selected rows (column value in provided list) and rejected rows. """ - func = _split_by_column_values - col = next(iter(selection.keys())) - values = next(iter(selection.values())) - return _split( + col, values = next(iter(selection.items())) + return _split_dispatch( data, - func, + _split_by_column_df, col, values, reset_index=reset_index, @@ -385,10 +371,9 @@ def split_by_index( (pandas.DataFrame, pandas.DataFrame) Selected rows (index in given list) and rejected rows. """ - func = _split_by_index_values - return _split( + return _split_dispatch( data, - func, + _split_by_index_df, index, reset_index=reset_index, inverse=inverse, diff --git a/cdm_reader_mapper/core/_utilities.py b/cdm_reader_mapper/core/_utilities.py index 6c880c8f..29d603df 100755 --- a/cdm_reader_mapper/core/_utilities.py +++ b/cdm_reader_mapper/core/_utilities.py @@ -277,6 +277,10 @@ def _stack(self, other, datasets, inplace, **kwargs): for data in datasets: _data = f"_{data}" _df = getattr(db_, _data) if hasattr(db_, _data) else pd.DataFrame() + + if isinstance(_df, pd.io.parsers.TextFileReader): + raise ValueError("Data must be a DataFrame not a TextFileReader.") + to_concat = [ getattr(concat, _data) for concat in other if hasattr(concat, _data) ] diff --git a/cdm_reader_mapper/core/databundle.py b/cdm_reader_mapper/core/databundle.py index e476397b..78c2a4d2 100755 --- a/cdm_reader_mapper/core/databundle.py +++ b/cdm_reader_mapper/core/databundle.py @@ -117,7 +117,8 @@ def add(self, addition, inplace=False) -> DataBundle | None: """ db_ = self._get_db(inplace) for name, data in addition.items(): - setattr(db_, f"_{name}", data) + data_cp = _copy(data) + setattr(db_, f"_{name}", data_cp) return self._return_db(db_, inplace) def stack_v( @@ -139,7 +140,8 @@ def stack_v( Note ---- - The DataFrames in the :py:class:`~DataBundle` have to have the same data columns! + * This is only working with DataFrames, not with TextFileReaders! + * The DataFrames in the :py:class:`~DataBundle` have to have the same data columns! Returns ------- @@ -175,7 +177,8 @@ def stack_h( Note ---- - The DataFrames in the :py:class:`~DataBundle` may have different data columns! + * This is only working with DataFrames, not with TextFileReaders! + * The DataFrames in the :py:class:`~DataBundle` may have different data columns! Examples -------- @@ -236,7 +239,7 @@ def select_where_all_true( _mask = _copy(db_._mask) db_._data = split_by_boolean_true(db_._data, _mask, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -284,7 +287,7 @@ def select_where_all_false( _mask = _copy(db_._mask) db_._data = split_by_boolean_false(db_._data, _mask, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -336,7 +339,7 @@ def select_where_entry_isin( db_ = self._get_db(inplace) db_._data = split_by_column_entries(db_._data, selection, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -385,7 +388,7 @@ def select_where_index_isin( db_ = self._get_db(inplace) db_._data = split_by_index(db_._data, index, **kwargs)[0] if do_mask is True: - _prev_index = db_._data.__dict__["_prev_index"] + _prev_index = db_._data.attrs["_prev_index"] db_._mask = split_by_index(db_._mask, _prev_index, **kwargs)[0] return self._return_db(db_, inplace) @@ -428,7 +431,8 @@ def split_by_boolean_true( db1_._data, _mask, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] + db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) @@ -473,7 +477,7 @@ def split_by_boolean_false( db1_._data, _mask, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) @@ -522,7 +526,7 @@ def split_by_column_entries( db1_._data, selection, return_rejected=True, **kwargs ) if do_mask is True: - _prev_index = db1_._data.__dict__["_prev_index"] + _prev_index = db1_._data.attrs["_prev_index"] db1_._mask, db2_._mask = split_by_index( db1_._mask, _prev_index, return_rejected=True, **kwargs ) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index c921c031..ef10639e 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -277,19 +277,23 @@ def read_data( pd_kwargs.setdefault("parse_dates", parse_dates) pd_kwargs.setdefault("encoding", encoding) - data = read_csv( + data, infos = read_csv( source, col_subset=col_subset, **pd_kwargs, ) - mask = read_csv(mask, col_subset=col_subset, dtype="boolean") - if not mask.empty: - mask = mask.reindex(columns=data.columns) + + pd_kwargs = kwargs.copy() + pd_kwargs.setdefault("dtype", "boolean") + + mask, _ = read_csv( + mask, col_subset=col_subset, columns=infos["columns"], **pd_kwargs + ) return DataBundle( data=data, - columns=data.columns, - dtypes=dtype, + columns=infos["columns"], + dtypes=infos["dtypes"].to_dict(), parse_dates=parse_dates, mask=mask, imodel=imodel, diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index 5b47ef2c..f7a782bd 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -180,7 +180,43 @@ def update_column_labels(columns: Iterable[str | tuple]) -> pd.Index | pd.MultiI return pd.Index(new_cols) -def read_csv(filepath, col_subset=None, **kwargs) -> pd.DataFrame: +def update_and_select( + df: pd.DataFrame, + subset: str | list | None = None, + columns: pd.Index | pd.MultiIndex | None = None, +) -> tuple[pd.DataFrame, dict[str, Any]]: + """ + Update string column labels and select subset from DataFrame. + + Parameters + ---------- + df : pd.DataFrame + DataFrame to be updated + subset : str or list, optional + Column names to be selected + columns: + Column labels for re-indexing. + + Returns + ------- + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types + """ + df.columns = update_column_labels(df.columns) + if subset is not None: + df = df[subset] + if columns is not None and not df.empty: + df = df.reindex(columns=columns) + return df, {"columns": df.columns, "dtypes": df.dtypes} + + +def read_csv( + filepath: Path, + col_subset: str | list | None = None, + columns: pd.Index | pd.MultiIndex | None = None, + **kwargs, +) -> tuple[pd.DataFrame | Iterable[pd.DataFrame], dict[str, Any]]: """ Safe CSV reader that handles missing files and column subsets. @@ -190,24 +226,40 @@ def read_csv(filepath, col_subset=None, **kwargs) -> pd.DataFrame: Path to the CSV file. col_subset : list of str, optional Subset of columns to read from the CSV. + columns: + Column labels for re-indexing. kwargs : any Additional keyword arguments passed to pandas.read_csv. Returns ------- - pd.DataFrame - The CSV as a DataFrame. Empty if file does not exist. + tuple[pd.DataFrame, dict] + - The CSV as a DataFrame. Empty if file does not exist. + - dictionary containing data column labels and data types """ if filepath is None or not Path(filepath).is_file(): logging.warning(f"File not found: {filepath}") - return pd.DataFrame() + return pd.DataFrame(), {} - df = pd.read_csv(filepath, delimiter=",", **kwargs) - df.columns = update_column_labels(df.columns) - if col_subset is not None: - df = df[col_subset] + data = pd.read_csv(filepath, delimiter=",", **kwargs) + + if isinstance(data, pd.DataFrame): + data, info = update_and_select(data, subset=col_subset, columns=columns) + return data, info + + write_kwargs = {} + if "encoding" in kwargs: + write_kwargs["encoding"] = kwargs["encoding"] - return df + data, info = process_textfilereader( + data, + func=update_and_select, + func_kwargs={"subset": col_subset, "columns": columns}, + read_kwargs=kwargs, + write_kwargs=write_kwargs, + makecopy=False, + ) + return data, info def convert_dtypes(dtypes) -> tuple[str]: @@ -329,7 +381,7 @@ def process_textfilereader( read_kwargs: dict[str, Any] | tuple[dict[str, Any], ...] | None = None, write_kwargs: dict[str, Any] | None = None, makecopy: bool = True, -) -> tuple[pd.DataFrame, ...]: +) -> tuple[Iterable[pd.DataFrame], ...]: """ Process a stream of DataFrames using a function and return processed results. diff --git a/tests/test_cdm_mapper.py b/tests/test_cdm_mapper.py index e1f72a7f..4969fff3 100755 --- a/tests/test_cdm_mapper.py +++ b/tests/test_cdm_mapper.py @@ -354,7 +354,6 @@ def test_extract_input_data( result = _extract_input_data( data_header, elements, - data_header.columns, default, logger, ) @@ -373,13 +372,13 @@ def test_extract_input_data( @pytest.mark.parametrize( "column, expected", [ - ("duplicate_status", [4, 4, 4, 4]), - ("platform_type", [2, 33, 32, 45]), + ("duplicate_status", ["4", "4", "4", "4"]), + ("platform_type", ["2", "33", "32", "45"]), ( "report_id", ["ICOADS-30-5012", "ICOADS-30-8960", "ICOADS-30-0037", "ICOADS-30-1000"], ), - ("location_quality", [2.0, "0", "0", "0"]), + ("location_quality", ["2", "0", "0", "0"]), ("latitude", [None, None, None, None]), ], ) @@ -387,13 +386,12 @@ def test_column_mapping(imodel_maps, imodel_functions, data_header, column, expe logger = logging_hdlr.init_logger(__name__, level="INFO") mapping_column = imodel_maps["header"][column] column_atts = get_cdm_atts("header")["header"][column] - result, _ = _column_mapping( + result = _column_mapping( data_header, mapping_column, imodel_functions, column_atts, None, - data_header.columns, column, logger, ) @@ -409,7 +407,6 @@ def test_table_mapping( data_header, imodel_maps["header"], table_atts, - data_header.columns, "null", imodel_functions, None, @@ -577,7 +574,7 @@ def test_map_model_pub47(): "gdac", ], ) -def test_map_model_test_data(data_model): +def test_map_model_test_data_basic(data_model): _map_model_test_data(data_model) diff --git a/tests/test_common.py b/tests/test_common.py index 9b016c99..dea0cec7 100755 --- a/tests/test_common.py +++ b/tests/test_common.py @@ -22,12 +22,11 @@ from cdm_reader_mapper.common.select import ( - _select_rows_by_index, - _split_by_index, - _split_by_boolean_mask, - _split_by_column_values, - _split_by_index_values, - _split, + _split_df, + _split_by_index_df, + _split_by_boolean_df, + _split_by_column_df, + _split_dispatch, split_by_boolean, split_by_boolean_true, split_by_boolean_false, @@ -131,11 +130,42 @@ def sample_df(): ) +@pytest.fixture +def sample_reader(): + text = "10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" + reader = make_parser(text, names=["A", "B", "C"]) + return reader + + +@pytest.fixture +def sample_df_multi(): + return pd.DataFrame( + { + "A": [1, 2, 3, 4, 5], + "B": ["x", "y", "z", "x", "y"], + "C": [True, False, True, False, True], + }, + index=[10, 11, 12, 13, 14], + ) + + +@pytest.fixture +def sample_reader_multi(): + text = "10,1,x,True\n11,2,y,False\n12,3,z,True\n13,4,x,False\n14,5,y,True" + reader = make_parser(text, names=[("A", "a"), ("B", "b"), ("C", "c")]) + return reader + + @pytest.fixture def empty_df(): return pd.DataFrame(columns=["A", "B", "C"]) +@pytest.fixture +def empty_reader(): + return make_parser("", names=["A", "B", "C"]) + + @pytest.fixture def boolean_mask(): return pd.DataFrame( @@ -166,54 +196,35 @@ def tmp_json_file(tmp_path): return file_path, data -@pytest.mark.parametrize( - "index_list,inverse,reset_index,expected", - [ - ([10, 12], False, False, [10, 12]), - ([10, 12], True, False, [11, 13, 14]), - ([10, 12], False, True, [0, 1]), - ], -) -def test_select_rows_by_index(sample_df, index_list, inverse, reset_index, expected): - df = sample_df - selected = _select_rows_by_index( - df, index_list, inverse=inverse, reset_index=reset_index - ) - assert list(selected.index) == expected +def test_split_df(sample_df): + mask = pd.Series([True, False, False, True, False], index=sample_df.index) + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] -def test_select_rows_by_index_empty_df(empty_df): - selected = _select_rows_by_index(empty_df, [0]) - assert selected.empty +def _test_split_df_false_mask(sample_df): + mask = pd.Series([False, False, False, False, False], index=sample_df.index) + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] -@pytest.mark.parametrize( - "index_list,inverse,return_rejected,expected_selected,expected_rejected", - [ - ([11, 13], False, True, [11, 13], [10, 12, 14]), - ([11, 13], True, True, [10, 12, 14], [11, 13]), - ([11, 13], False, False, [11, 13], []), - ], -) -def test_split_by_index( - sample_df, - index_list, - inverse, - return_rejected, - expected_selected, - expected_rejected, -): - selected, rejected = _split_by_index( - sample_df, index_list, inverse=inverse, return_rejected=return_rejected +def test_split_df_multiindex(sample_df): + mask = pd.Series([True, False, False, True, False], index=sample_df.index) + sample_df.columns = pd.MultiIndex.from_tuples( + [ + ("A", "a"), + ( + "B", + "b", + ), + ("C", "c"), + ] ) - assert list(selected.index) == expected_selected - assert list(rejected.index) == expected_rejected - - -def test_split_by_index_empty_df(empty_df): - selected, rejected = _split_by_index(empty_df, [0], return_rejected=True) - assert selected.empty - assert rejected.empty + selected, rejected = _split_df(sample_df, mask, return_rejected=True) + assert list(selected.index) == [10, 13] + assert list(rejected.index) == [11, 12, 14] @pytest.mark.parametrize( @@ -223,20 +234,20 @@ def test_split_by_index_empty_df(empty_df): ("C", False, [11, 13], [10, 12, 14]), ], ) -def test_split_by_boolean_mask( +def test_split_by_boolean_df( sample_df, column, boolean, expected_selected, expected_rejected ): mask = sample_df[[column]] - selected, rejected = _split_by_boolean_mask( + selected, rejected = _split_by_boolean_df( sample_df, mask, boolean=boolean, return_rejected=True ) assert list(selected.index) == expected_selected assert list(rejected.index) == expected_rejected -def test_split_by_boolean_mask_empty_mask(sample_df): +def test_split_by_boolean_df_empty_mask(sample_df): mask = pd.DataFrame(columns=sample_df.columns) - selected, rejected = _split_by_boolean_mask( + selected, rejected = _split_by_boolean_df( sample_df, mask, boolean=True, return_rejected=True ) assert list(selected.index) == list(sample_df.index) @@ -251,10 +262,10 @@ def test_split_by_boolean_mask_empty_mask(sample_df): ("B", ["x", "z"], False, [10, 12, 13], []), ], ) -def test_split_by_column_values( +def test_split_by_column_df( sample_df, col, values, return_rejected, expected_selected, expected_rejected ): - selected, rejected = _split_by_column_values( + selected, rejected = _split_by_column_df( sample_df, col, values, return_rejected=return_rejected ) assert list(selected.index) == expected_selected @@ -269,7 +280,7 @@ def test_split_by_column_values( ([11, 13], True, True, [10, 12, 14], [11, 13]), ], ) -def test_split_by_index_values( +def test_split_by_index_df( sample_df, index_list, inverse, @@ -277,104 +288,255 @@ def test_split_by_index_values( expected_selected, expected_rejected, ): - selected, rejected = _split_by_index_values( + selected, rejected = _split_by_index_df( sample_df, index_list, inverse=inverse, return_rejected=return_rejected ) assert list(selected.index) == expected_selected assert list(rejected.index) == expected_rejected -def test_split_wrapper_index(sample_df): - selected, rejected = _split( - sample_df, _split_by_index_values, [11, 13], return_rejected=True +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_index(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, _split_by_index_df, [11, 13], return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 13] assert list(rejected.index) == [10, 12, 14] -def test_split_wrapper_column(sample_df): - selected, rejected = _split( - sample_df, _split_by_column_values, "B", ["y"], return_rejected=True +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_column(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, _split_by_column_df, "B", ["y"], return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 14] assert list(rejected.index) == [10, 12, 13] -def test_split_wrapper_boolean(sample_df, boolean_mask): - selected, rejected = _split( - sample_df, - _split_by_boolean_mask, +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_wrapper_boolean(sample_df, sample_reader, boolean_mask, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + + selected, rejected = _split_dispatch( + data, + _split_by_boolean_df, boolean_mask[["mask1"]], True, return_rejected=True, ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 13] assert list(rejected.index) == [10, 12, 14] -def test_split_by_index_public(sample_df): - selected, rejected = split_by_index(sample_df, [11, 13], return_rejected=True) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_index_basic(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_index(data, [11, 13], return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 13] assert list(rejected.index) == [10, 12, 14] -def test_split_by_column_entries_public(sample_df): +def test_split_by_index_multiindex(sample_reader_multi): + selected, rejected = split_by_index( + sample_reader_multi, [11, 13], return_rejected=True + ) + + selected = selected.read() + rejected = rejected.read() + + assert list(selected.index) == [11, 13] + assert list(rejected.index) == [10, 12, 14] + + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_column_entries_basic(sample_df, sample_reader, TextFileReader): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_column_entries( - sample_df, {"B": ["y"]}, return_rejected=True + data, {"B": ["y"]}, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [11, 14] assert list(rejected.index) == [10, 12, 13] -def test_split_by_boolean_public(sample_df, boolean_mask): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_basic_false( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean( - sample_df, boolean_mask, boolean=False, return_rejected=True + data, boolean_mask, boolean=False, return_rejected=True ) - assert list(selected.index) == [] + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + + assert selected.empty assert list(rejected.index) == [10, 11, 12, 13, 14] + +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_basic_true( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean( - sample_df, boolean_mask, boolean=True, return_rejected=True + data, boolean_mask, boolean=True, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert list(rejected.index) == [10, 11, 12, 13, 14] -def test_split_by_boolean_true_public(sample_df, boolean_mask_true): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_true_basic( + sample_df, sample_reader, boolean_mask_true, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean_true( - sample_df, boolean_mask_true, return_rejected=True + data, boolean_mask_true, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [10] assert list(rejected.index) == [11, 12, 13, 14] -def test_split_by_boolean_false_public(sample_df, boolean_mask): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_false_basic( + sample_df, sample_reader, boolean_mask, TextFileReader +): + if TextFileReader: + data = sample_reader + else: + data = sample_df + selected, rejected = split_by_boolean_false( - sample_df, boolean_mask, return_rejected=True + data, boolean_mask, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert list(selected.index) == [] assert list(rejected.index) == [10, 11, 12, 13, 14] -def test_split_by_index_empty(empty_df): - selected, rejected = split_by_index(empty_df, [0, 1], return_rejected=True) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_index_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + + selected, rejected = split_by_index(data, [0, 1], return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty -def test_split_by_column_empty(empty_df): - selected, rejected = split_by_column_entries( - empty_df, {"A": [1]}, return_rejected=True - ) +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_column_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + + selected, rejected = split_by_column_entries(data, {"A": [1]}, return_rejected=True) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty -def test_split_by_boolean_empty(empty_df): +@pytest.mark.parametrize("TextFileReader", [False, True]) +def test_split_by_boolean_empty(empty_df, empty_reader, TextFileReader): + if TextFileReader: + data = empty_reader + else: + data = empty_df + mask = empty_df.astype(bool) selected, rejected = split_by_boolean( - empty_df, mask, boolean=True, return_rejected=True + data, mask, boolean=True, return_rejected=True ) + + if TextFileReader: + selected = selected.read() + rejected = rejected.read() + assert selected.empty assert rejected.empty @@ -438,8 +600,8 @@ def test_make_copy_basic(): def test_make_copy_failure_memory(): parser = make_broken_parser("a,b\n1,2\n") - cp = make_copy(parser) - assert cp is None + with pytest.raises(RuntimeError): + make_copy(parser) def test_restore_basic(): @@ -455,8 +617,8 @@ def test_restore_basic(): def test_restore_failure_memory(): parser = make_broken_parser("a,b\n1,2\n") - restored = restore(parser) - assert restored is None + with pytest.raises(RuntimeError): + restore(parser) def test_is_not_empty_true(): @@ -471,8 +633,8 @@ def test_is_not_empty_false(): def test_is_not_empty_failure_make_copy_memory(): parser = make_broken_parser("a,b\n1,2\n") - result = is_not_empty(parser) - assert result is None + with pytest.raises(RuntimeError): + is_not_empty(parser) def test_get_length_basic(): @@ -487,13 +649,14 @@ def test_get_length_empty(): def test_get_length_failure_due_to_bad_line(): parser = make_parser("a,b\n1,2\n1,2,3\n") - assert get_length_hdlr(parser) is None + with pytest.raises(RuntimeError): + get_length_hdlr(parser) def test_get_length_failure_make_copy_memory(): parser = make_broken_parser("a,b\n1,2\n") - result = get_length_hdlr(parser) - assert result is None + with pytest.raises(RuntimeError): + get_length_hdlr(parser) def test_init_logger_returns_logger(): @@ -778,7 +941,7 @@ def test_count_by_cat_broken_parser(): 2,y """ parser = make_broken_parser(text) - with pytest.raises(Exception): + with pytest.raises(RuntimeError): count_by_cat(parser, ["A", "B"]) @@ -945,7 +1108,6 @@ def test_load_file_real(tmp_path, within_drs, cache): def test_load_file_invalid_url(): - """Test that load_file raises ValueError for unsafe URLs.""" with pytest.raises(ValueError): load_file(name="file.txt", github_url="ftp://malicious-site.com") diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 34d538fb..161a65ce 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -3,11 +3,19 @@ import pandas as pd import pytest -from cdm_reader_mapper import DataBundle +from io import StringIO + +from cdm_reader_mapper import DataBundle, read_data, test_data + + +def make_parser(text, **kwargs): + """Helper: create a TextFileReader similar to user code.""" + buffer = StringIO(text) + return pd.read_csv(buffer, chunksize=2, **kwargs) @pytest.fixture -def sample_df(): +def sample_db_df(): data = pd.DataFrame( { "A": [19, 26, 27, 41, 91], @@ -23,6 +31,28 @@ def sample_df(): return DataBundle(data=data, mask=mask) +@pytest.fixture +def sample_db_reader(): + text = "19,0\n26,1\n27,2\n41,3\n91,4" + reader_data = make_parser(text, names=["A", "B"]) + + text = "True,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" + reader_mask = make_parser(text, names=["A", "B"]) + + return DataBundle(data=reader_data, mask=reader_mask) + + +@pytest.fixture +def sample_db_reader_multi(): + text = "19,0\n26,1\n27,2\n41,3\n91,4" + reader_data = make_parser(text, names=[("A", "a"), ("B", "b")]) + + text = "True,True\nTrue,True\nTrue,True\nFalse,False\nTrue,False" + reader_mask = make_parser(text, names=["A", "B"]) + + return DataBundle(data=reader_data, mask=reader_mask) + + @pytest.fixture def sample_data(): return pd.DataFrame({"C": [20, 21, 22, 23, 24]}) @@ -33,66 +63,161 @@ def sample_mask(): return pd.DataFrame({"C": [True, False, True, False, False]}) -def test_len(sample_df): - assert len(sample_df) == 5 +@pytest.fixture +def sample_db_df_testdata(): + data_model = "icoads_r300_d714" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + + return read_data(data, mask=mask, info=info) + + +@pytest.fixture +def sample_db_reader_testdata(): + data_model = "icoads_r300_d714" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + + return read_data(data, mask=mask, info=info, chunksize=2) + + +def test_len_df(sample_db_df): + assert len(sample_db_df) == 5 -def test_print(sample_df, capsys): - print(sample_df) +def test_len_reader(sample_db_reader): + assert len(sample_db_reader) == 5 + + +def test_print_df(sample_db_df, capsys): + print(sample_db_df) captured = capsys.readouterr() assert captured.out.strip() != "" - for col in sample_df.data.columns: + for col in sample_db_df.columns: assert col in captured.out -def test_copy(sample_df): - db_cp = sample_df.copy() +def test_print_reader(sample_db_reader, capsys): + print(sample_db_reader) + + captured = capsys.readouterr() - pd.testing.assert_frame_equal(sample_df.data, db_cp.data) - pd.testing.assert_frame_equal(sample_df.mask, db_cp.mask) + assert captured.out.strip() != "" -def test_add(sample_data, sample_mask): - db = DataBundle() +def test_copy_df(sample_db_df): + db_cp = sample_db_df.copy() + + pd.testing.assert_frame_equal(sample_db_df.data, db_cp.data) + pd.testing.assert_frame_equal(sample_db_df.mask, db_cp.mask) + + +def test_copy_reader(sample_db_reader): + db_cp = sample_db_reader.copy() + + pd.testing.assert_frame_equal(sample_db_reader.data.read(), db_cp.data.read()) + pd.testing.assert_frame_equal(sample_db_reader.mask.read(), db_cp.mask.read()) + +def test_add_df(sample_db_df): + sample_data = sample_db_df.data + sample_mask = sample_db_df.mask + + db = DataBundle() db_add = db.add({"data": sample_data}) + pd.testing.assert_frame_equal(db_add.data, sample_data) db = DataBundle() - db_add = db.add({"mask": sample_mask}) + pd.testing.assert_frame_equal(db_add.mask, sample_mask) db = DataBundle() - db_add = db.add({"data": sample_data, "mask": sample_mask}) + pd.testing.assert_frame_equal(db_add.data, sample_data) pd.testing.assert_frame_equal(db_add.mask, sample_mask) -def test_stack_v(sample_df, sample_data, sample_mask): +def test_add_reader_data(sample_db_reader): + sample_data = sample_db_reader.data + + db = DataBundle() + db_add = db.add({"data": sample_data}) + + pd.testing.assert_frame_equal(db_add.data.read(), sample_data.read()) + + +def test_add_reader_mask(sample_db_reader): + sample_mask = sample_db_reader.mask + + db = DataBundle() + db_add = db.add({"mask": sample_mask}) + + pd.testing.assert_frame_equal(db_add.mask.read(), sample_mask.read()) + + +def test_add_reader_both(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask + + db = DataBundle() + db_add = db.add({"data": sample_data, "mask": sample_mask}) + + pd.testing.assert_frame_equal(db_add.data.read(), sample_data.read()) + pd.testing.assert_frame_equal(db_add.mask.read(), sample_mask.read()) + + +def test_stack_v_df(sample_db_df): + sample_data = sample_db_df.data.copy() + sample_mask = sample_db_df.mask.copy() + db = DataBundle(data=sample_data, mask=sample_mask) - expected_data = pd.concat([sample_df.data, db.data], ignore_index=True) - expected_mask = pd.concat([sample_df.mask, db.mask], ignore_index=True) - sample_df.stack_v(db) + sample_db_df.stack_v(db) - pd.testing.assert_frame_equal(sample_df.data, expected_data) - pd.testing.assert_frame_equal(sample_df.mask, expected_mask) + expected_data = pd.concat([sample_data, db.data], ignore_index=True) + expected_mask = pd.concat([sample_mask, db.mask], ignore_index=True) + pd.testing.assert_frame_equal(sample_db_df.data, expected_data) + pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) + + +def test_stack_v_reader(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask -def test_stack_h(sample_df, sample_data, sample_mask): db = DataBundle(data=sample_data, mask=sample_mask) - expected_data = pd.concat([sample_df.data, db.data], axis=1) - expected_mask = pd.concat([sample_df.mask, db.mask], axis=1) - sample_df.stack_h(db) + with pytest.raises(ValueError): + sample_db_reader.stack_v(db) + + +def test_stack_h_df(sample_db_df, sample_data, sample_mask): + db = DataBundle(data=sample_data, mask=sample_mask) + expected_data = pd.concat([sample_db_df.data, db.data], axis=1) + expected_mask = pd.concat([sample_db_df.mask, db.mask], axis=1) - pd.testing.assert_frame_equal(sample_df.data, expected_data) - pd.testing.assert_frame_equal(sample_df.mask, expected_mask) + sample_db_df.stack_h(db) + + pd.testing.assert_frame_equal(sample_db_df.data, expected_data) + pd.testing.assert_frame_equal(sample_db_df.mask, expected_mask) + + +def test_stack_h_reader(sample_db_reader): + sample_data = sample_db_reader.data + sample_mask = sample_db_reader.mask + + db = DataBundle(data=sample_data, mask=sample_mask) + + with pytest.raises(ValueError): + sample_db_reader.stack_h(db) @pytest.mark.parametrize( @@ -106,8 +231,8 @@ def test_stack_h(sample_df, sample_data, sample_mask): ) @pytest.mark.parametrize("reset_index", [False, True]) @pytest.mark.parametrize("inverse", [False, True]) -def test_select_operators( - sample_df, +def test_select_operators_df( + sample_db_df, func, args, idx_exp, @@ -115,26 +240,106 @@ def test_select_operators( reset_index, inverse, ): - result = getattr(sample_df, func)(*args, reset_index=reset_index, inverse=inverse) + result = getattr(sample_db_df, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_df.data + mask = sample_db_df.mask - expected = sample_df.data - expected_mask = sample_df.mask - selected = result.data + selected_data = result.data selected_mask = result.mask if inverse is False: - idx = expected.index.isin(idx_exp) + idx = data.index.isin(idx_exp) + else: + idx = data.index.isin(idx_rej) + + expected_data = data[idx] + expected_mask = mask[idx] + + if reset_index is True: + expected_data = expected_data.reset_index(drop=True) + expected_mask = expected_mask.reset_index(drop=True) + + pd.testing.assert_frame_equal(expected_data, selected_data) + pd.testing.assert_frame_equal(expected_mask, selected_mask) + + +@pytest.mark.parametrize( + "func, args, idx_exp, idx_rej", + [ + ("select_where_all_true", [], [0, 1, 2], [3, 4]), + ("select_where_all_false", [], [3], [0, 1, 2, 4]), + ("select_where_index_isin", [[0, 2, 4]], [0, 2, 4], [1, 3]), + ("select_where_entry_isin", [{"A": [26, 41]}], [1, 3], [0, 2, 4]), + ], +) +@pytest.mark.parametrize("reset_index", [False, True]) +@pytest.mark.parametrize("inverse", [False, True]) +def test_select_operators_reader( + sample_db_reader, + func, + args, + idx_exp, + idx_rej, + reset_index, + inverse, +): + result = getattr(sample_db_reader, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_reader.data.read() + mask = sample_db_reader.mask.read() + + selected_data = result.data.read() + selected_mask = result.mask.read() + + if inverse is False: + idx = data.index.isin(idx_exp) else: - idx = expected.index.isin(idx_rej) + idx = data.index.isin(idx_rej) - expected = expected[idx] - expected_mask = expected_mask[idx] + expected_data = data[idx] + expected_mask = mask[idx] if reset_index is True: - expected = expected.reset_index(drop=True) + expected_data = expected_data.reset_index(drop=True) expected_mask = expected_mask.reset_index(drop=True) - pd.testing.assert_frame_equal(expected, selected) + pd.testing.assert_frame_equal(expected_data, selected_data) + pd.testing.assert_frame_equal(expected_mask, selected_mask) + + +@pytest.mark.parametrize( + "func, args, idx_exp", + [ + # ("select_where_all_true", [], [0, 1, 2], [3, 4]), + # ("select_where_all_false", [], [3], [0, 1, 2, 4]), + ("select_where_index_isin", [[0, 2, 4]], [0, 2, 4]), + # ("select_where_entry_isin", [{("core", "ID"): [25629, 26558]}], [1, 3]), + ], +) +def test_select_operators_testdata_reader( + sample_db_reader_testdata, + func, + args, + idx_exp, +): + result = getattr(sample_db_reader_testdata, func)(*args) + data = sample_db_reader_testdata.data.read() + mask = sample_db_reader_testdata.mask.read() + + selected_data = result.data.read() + selected_mask = result.mask.read() + + idx = data.index.isin(idx_exp) + + expected_data = data[idx] + expected_mask = mask[idx] + + pd.testing.assert_frame_equal(expected_data, selected_data, check_dtype=False) pd.testing.assert_frame_equal(expected_mask, selected_mask) @@ -149,8 +354,8 @@ def test_select_operators( ) @pytest.mark.parametrize("reset_index", [False, True]) @pytest.mark.parametrize("inverse", [False, True]) -def test_split_operators( - sample_df, +def test_split_operators_df( + sample_db_df, func, args, idx_exp, @@ -158,52 +363,141 @@ def test_split_operators( reset_index, inverse, ): - result = getattr(sample_df, func)(*args, reset_index=reset_index, inverse=inverse) + result = getattr(sample_db_df, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_df.data + mask = sample_db_df.mask - expected = sample_df.data - expected_mask = sample_df.mask - selected = result[0].data + selected_data = result[0].data selected_mask = result[0].mask - rejected = result[1].data + rejected_data = result[1].data rejected_mask = result[1].mask if inverse is False: - idx1 = expected.index.isin(idx_exp) - idx2 = expected.index.isin(idx_rej) + idx1 = data.index.isin(idx_exp) + idx2 = data.index.isin(idx_rej) else: - idx1 = expected.index.isin(idx_rej) - idx2 = expected.index.isin(idx_exp) + idx1 = data.index.isin(idx_rej) + idx2 = data.index.isin(idx_exp) - expected1 = expected[idx1] - expected2 = expected[idx2] - expected_mask1 = expected_mask[idx1] - expected_mask2 = expected_mask[idx2] + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] if reset_index is True: - expected1 = expected1.reset_index(drop=True) - expected2 = expected2.reset_index(drop=True) + expected_data1 = expected_data1.reset_index(drop=True) + expected_data2 = expected_data2.reset_index(drop=True) expected_mask1 = expected_mask1.reset_index(drop=True) expected_mask2 = expected_mask2.reset_index(drop=True) - pd.testing.assert_frame_equal(expected1, selected) - pd.testing.assert_frame_equal(expected2, rejected) + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) pd.testing.assert_frame_equal(expected_mask1, selected_mask) pd.testing.assert_frame_equal(expected_mask2, rejected_mask) -def test_unique(sample_df): - result = sample_df.unique(columns=("A")) +@pytest.mark.parametrize( + "func, args, idx_exp, idx_rej", + [ + ("split_by_boolean_true", [], [0, 1, 2], [3, 4]), + ("split_by_boolean_false", [], [3], [0, 1, 2, 4]), + ("split_by_index", [[0, 2, 4]], [0, 2, 4], [1, 3]), + ("split_by_column_entries", [{"A": [26, 41]}], [1, 3], [0, 2, 4]), + ], +) +@pytest.mark.parametrize("reset_index", [False, True]) +@pytest.mark.parametrize("inverse", [False, True]) +def test_split_operators_reader( + sample_db_reader, + func, + args, + idx_exp, + idx_rej, + reset_index, + inverse, +): + result = getattr(sample_db_reader, func)( + *args, reset_index=reset_index, inverse=inverse + ) + + data = sample_db_reader.data.read() + mask = sample_db_reader.mask.read() + + selected_data = result[0].data.read() + selected_mask = result[0].mask.read() + rejected_data = result[1].data.read() + rejected_mask = result[1].mask.read() + + if inverse is False: + idx1 = data.index.isin(idx_exp) + idx2 = data.index.isin(idx_rej) + else: + idx1 = data.index.isin(idx_rej) + idx2 = data.index.isin(idx_exp) + + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] + + if reset_index is True: + expected_data1 = expected_data1.reset_index(drop=True) + expected_data2 = expected_data2.reset_index(drop=True) + expected_mask1 = expected_mask1.reset_index(drop=True) + expected_mask2 = expected_mask2.reset_index(drop=True) + + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) + pd.testing.assert_frame_equal(expected_mask1, selected_mask) + pd.testing.assert_frame_equal(expected_mask2, rejected_mask) + + +def test_split_by_index_multi(sample_db_reader_multi): + result = sample_db_reader_multi.split_by_column_entries({("A", "a"): [26, 41]}) + + data = sample_db_reader_multi.data.read() + mask = sample_db_reader_multi.mask.read() + + selected_data = result[0].data.read() + selected_mask = result[0].mask.read() + rejected_data = result[1].data.read() + rejected_mask = result[1].mask.read() + + idx1 = data.index.isin([1, 3]) + idx2 = data.index.isin([0, 2, 4]) + + expected_data1 = data[idx1] + expected_data2 = data[idx2] + expected_mask1 = mask[idx1] + expected_mask2 = mask[idx2] + + pd.testing.assert_frame_equal(expected_data1, selected_data) + pd.testing.assert_frame_equal(expected_data2, rejected_data) + pd.testing.assert_frame_equal(expected_mask1, selected_mask) + pd.testing.assert_frame_equal(expected_mask2, rejected_mask) + + +def test_unique_df(sample_db_df): + result = sample_db_df.unique(columns=("A")) + assert result == {"A": {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} + + +def test_unique_reader(sample_db_reader): + result = sample_db_reader.unique(columns=("A")) assert result == {"A": {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} -def test_replace_columns(sample_df): +def test_replace_columns(sample_db_df): df_corr = pd.DataFrame( { "A_new": [101, 201, 301, 401, 501], "B": range(5), } ) - result = sample_df.replace_columns( + result = sample_db_df.replace_columns( df_corr, subset=["B", "A"], rep_map={"A": "A_new"}, diff --git a/tests/test_mdf_reader.py b/tests/test_mdf_reader.py index 59c75f50..e1467c0a 100755 --- a/tests/test_mdf_reader.py +++ b/tests/test_mdf_reader.py @@ -296,7 +296,7 @@ def test_read_data_no_info(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.MultiIndex) - assert db.dtypes == "object" + assert isinstance(db.dtypes, dict) assert db.parse_dates is False assert db.encoding is None assert db.imodel is None @@ -364,7 +364,7 @@ def test_read_data_encoding(): assert isinstance(db.data, pd.DataFrame) assert isinstance(db.mask, pd.DataFrame) assert isinstance(db.columns, pd.Index) - assert db.dtypes == "object" + assert isinstance(db.dtypes, dict) assert db.parse_dates is False assert isinstance(db.encoding, str) assert db.encoding == "cp1252" @@ -376,6 +376,42 @@ def test_read_data_encoding(): assert db.size == 1705 +def test_read_data_textfilereader(): + data_model = "icoads_r300_d721" + data = test_data[f"test_{data_model}"]["mdf_data"] + mask = test_data[f"test_{data_model}"]["mdf_mask"] + info = test_data[f"test_{data_model}"]["mdf_info"] + db = read_data(data, mask=mask, info=info, chunksize=3) + + assert isinstance(db, DataBundle) + + for attr in [ + "data", + "mask", + "columns", + "dtypes", + "parse_dates", + "encoding", + "imodel", + "mode", + ]: + assert hasattr(db, attr) + + assert isinstance(db.data, pd.io.parsers.TextFileReader) + assert isinstance(db.mask, pd.io.parsers.TextFileReader) + assert isinstance(db.columns, pd.MultiIndex) + assert isinstance(db.dtypes, dict) + assert db.parse_dates == [] + assert isinstance(db.encoding, str) + assert db.encoding == "cp1252" + assert db.imodel is None + assert isinstance(db.mode, str) + assert db.mode == "data" + assert len(db) == 5 + assert db.shape == (5, 341) + assert db.size == 1705 + + def test_validate_read_mdf_args_pass(tmp_path): source = tmp_path / "file.mdf" source.touch() diff --git a/tests/test_reader_utilities.py b/tests/test_reader_utilities.py index f4a46639..2ae892be 100755 --- a/tests/test_reader_utilities.py +++ b/tests/test_reader_utilities.py @@ -167,21 +167,32 @@ def test_update_column_labels_mixed(): def test_read_csv_file_exists(tmp_csv_file): file_path, data = tmp_csv_file - df = read_csv(file_path) + df, info = read_csv(file_path) pd.testing.assert_frame_equal(df, data) + assert "columns" in info + pd.testing.assert_index_equal(info["columns"], df.columns) + assert "dtypes" in info + pd.testing.assert_series_equal(info["dtypes"], df.dtypes) + def test_read_csv_file_missing(tmp_path): missing_file = tmp_path / "missing.csv" - df = read_csv(missing_file) + df, info = read_csv(missing_file) assert df.empty + assert info == {} def test_read_csv_with_col_subset(tmp_csv_file): file_path, _ = tmp_csv_file - df = read_csv(file_path, col_subset=["B"]) + df, info = read_csv(file_path, col_subset=["B"]) assert list(df.columns) == ["B"] + assert "columns" in info + pd.testing.assert_index_equal(info["columns"], df.columns) + assert "dtypes" in info + pd.testing.assert_series_equal(info["dtypes"], df.dtypes) + def test_convert_dtypes_basic(): dtypes = {"A": "int", "B": "datetime", "C": "float"} @@ -252,7 +263,6 @@ def test_process_textfilereader_only_df(sample_reader): (reader_out,) = process_textfilereader( sample_reader, sample_func_only_df, read_kwargs={"chunksize": 1} ) - print(reader_out) assert isinstance(reader_out, TextFileReader) df_out = reader_out.read() assert df_out.shape == (2, 2)