Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
Changelog
=========

2.2.2 (unreleased)
------------------
Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`)

New features and enhancements
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``mdf_reader.read_data`` now supports chunking (:pull:`360`)

Breaking changes
^^^^^^^^^^^^^^^^
* ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`)

Internal changes
^^^^^^^^^^^^^^^^
* re-work internal structure for more readability and better performance (:pull:`360`)

2.2.1 (2026-01-23)
------------------
Contributors to this version: Ludwig Lierhammer (:user:`ludwiglierhammer`)
Expand Down
184 changes: 100 additions & 84 deletions cdm_reader_mapper/cdm_mapper/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import numpy as np
import pandas as pd

from pandas.io.parsers import TextFileReader

from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr

from . import properties
Expand All @@ -41,7 +43,7 @@ def _log_and_return_empty(msg):
return _log_and_return_empty("Input data is empty")
return [data]

elif isinstance(data, pd.io.parsers.TextFileReader):
elif isinstance(data, TextFileReader):
logger.debug("Input is a pd.TextFileReader")
if not pandas_TextParser_hdlr.is_not_empty(data):
return _log_and_return_empty("Input data is empty")
Expand Down Expand Up @@ -80,22 +82,20 @@ def _is_empty(value):

def _drop_duplicated_rows(df) -> pd.DataFrame:
"""Drop duplicates from list."""
list_cols = [
col for col in df.columns if df[col].apply(lambda x: isinstance(x, list)).any()
]

for col in list_cols:
df[col] = df[col].apply(lambda x: tuple(x) if isinstance(x, list) else x)

def list_to_tuple(v):
if isinstance(v, list):
v = tuple(v)
return v
df.drop_duplicates(ignore_index=True, inplace=True)

def tuple_to_list(v):
if isinstance(v, tuple):
v = list(v)
return v
for col in list_cols:
if df[col].apply(lambda x: isinstance(x, tuple)).any():
df[col] = df[col].apply(lambda x: list(x) if isinstance(x, tuple) else x)

dtypes = df.dtypes
df = df.map(list_to_tuple)
df = df.drop_duplicates(ignore_index=True)
df = df.map(tuple_to_list)
return df.astype(dtypes)
return df


def _get_nested_value(ndict, keys) -> Any | None:
Expand Down Expand Up @@ -209,7 +209,7 @@ def _fill_value(series, fill_value) -> pd.Series:
return series.fillna(value=fill_value).infer_objects(copy=False)


def _extract_input_data(idata, elements, cols, default, logger):
def _extract_input_data(idata, elements, default, logger):
"""Extract the relevant input data based on `elements`."""

def _return_default():
Expand All @@ -220,18 +220,14 @@ def _return_default():

logger.debug(f"\telements: {' '.join(map(str, elements))}")

missing_elements = [e for e in elements if e not in cols]
if missing_elements:
logger.warning(
"Missing elements from input data: {}".format(
",".join(map(str, missing_elements))
)
)
return _return_default()
cols = idata.columns

for e in elements:
if e not in cols:
logger.warning(f"Missing element from input data: {e}")
return _return_default()

data = idata[elements]
if len(elements) == 1:
data = data.iloc[:, 0]
data = idata[elements[0]] if len(elements) == 1 else idata[elements]

if _is_empty(data):
return _return_default()
Expand All @@ -245,7 +241,6 @@ def _column_mapping(
imodel_functions,
atts,
codes_subset,
cols,
column,
logger,
):
Expand All @@ -264,40 +259,46 @@ def _column_mapping(
data, used_default = _extract_input_data(
idata,
elements,
cols,
default,
logger,
)

if transform and not used_default:
data = _transform(
data,
imodel_functions,
transform,
kwargs,
logger=logger,
)
if not used_default:
if transform:
data = _transform(
data,
imodel_functions,
transform,
kwargs,
logger=logger,
)
elif code_table:
data = _code_table(
data,
imodel_functions.imodel,
code_table,
logger=logger,
)

elif code_table and not used_default:
data = _code_table(
data,
imodel_functions.imodel,
code_table,
logger=logger,
)
if not isinstance(data, pd.Series):
data = pd.Series(data, index=idata.index, copy=False)

data.name = column

data = pd.Series(data, index=idata.index, name=column)
data = _fill_value(data, fill_value)
atts["decimal_places"] = _decimal_places(decimal_places)
if fill_value is not None:
data = _fill_value(data, fill_value)

return data, atts
if atts:
atts["decimal_places"] = _decimal_places(decimal_places)
data = _convert_dtype(data, atts)

return data


def _table_mapping(
idata,
mapping,
atts,
cols,
null_label,
imodel_functions,
codes_subset,
Expand All @@ -306,31 +307,32 @@ def _table_mapping(
drop_duplicates,
logger,
) -> pd.DataFrame:
columns = (
[x for x in atts.keys() if x in idata.columns]
if not cdm_complete
else list(atts.keys())
)

table_df = pd.DataFrame(index=idata.index, columns=columns)
columns = list(atts) if cdm_complete else [c for c in atts if c in idata.columns]
out = {}

for column in columns:
if column not in mapping.keys():
out[column] = pd.Series(
[null_label] * len(idata), index=idata.index, name=column
)
continue

logger.debug(f"\tElement: {column}")

table_df[column], atts[column] = _column_mapping(
out[column] = _column_mapping(
idata,
mapping[column],
imodel_functions,
atts[column],
codes_subset,
cols,
column,
logger,
)
table_df[column] = _convert_dtype(table_df[column], atts.get(column))

if not out:
return pd.DataFrame(index=idata.index)

table_df = pd.DataFrame(out, index=idata.index)

if drop_missing_obs is True and "observation_value" in table_df:
table_df = table_df.dropna(subset=["observation_value"])
Expand All @@ -345,30 +347,35 @@ def _prepare_cdm_tables(cdm_subset):
"""Prepare table buffers and attributes for CDM tables."""
if isinstance(cdm_subset, str):
cdm_subset = [cdm_subset]

cdm_atts = get_cdm_atts(cdm_subset)
if not cdm_atts:
return {}
return {
table: {

tables = {}
for table, atts in cdm_atts.items():
for col, meta in atts.items():
meta["decimal_places"] = _decimal_places(meta.get("decimal_places"))
tables[table] = {
"buffer": StringIO(),
"atts": deepcopy(cdm_atts.get(table)),
"atts": atts,
}
for table in cdm_subset
}

return tables


def _process_chunk(
idata,
imodel_maps,
imodel_functions,
cdm_tables,
cols,
null_label,
codes_subset,
cdm_complete,
drop_missing_obs,
drop_duplicates,
logger,
is_reader,
):
"""Process one chunk of input data."""
for table, mapping in imodel_maps.items():
Expand All @@ -378,7 +385,6 @@ def _process_chunk(
idata=idata,
mapping=mapping,
atts=deepcopy(cdm_tables[table]["atts"]),
cols=cols,
null_label=null_label,
imodel_functions=imodel_functions,
codes_subset=codes_subset,
Expand All @@ -389,13 +395,17 @@ def _process_chunk(
)

table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns])
table_df.to_csv(
cdm_tables[table]["buffer"],
header=False,
index=False,
mode="a",
)
cdm_tables[table]["columns"] = table_df.columns

if is_reader:
table_df.to_csv(
cdm_tables[table]["buffer"],
header=False,
index=False,
mode="a",
)
cdm_tables[table]["columns"] = table_df.columns
else:
cdm_tables[table]["df"] = table_df.astype(object)


def _finalize_output(cdm_tables, logger):
Expand All @@ -405,19 +415,24 @@ def _finalize_output(cdm_tables, logger):
for table, meta in cdm_tables.items():
logger.debug(f"\tParse datetime by reader; Table: {table}")

meta["buffer"].seek(0)
df = pd.read_csv(
meta["buffer"],
names=meta["columns"],
na_values=[],
dtype="object",
keep_default_na=False,
)

meta["buffer"].close()
if "df" not in meta:
meta["buffer"].seek(0)
df = pd.read_csv(
meta["buffer"],
names=meta["columns"],
na_values=[],
dtype="object",
keep_default_na=False,
)
meta["buffer"].close()
else:
df = meta.get("df", pd.DataFrame())

final_tables.append(df)

if not final_tables:
return pd.DataFrame()

return pd.concat(final_tables, axis=1, join="outer").reset_index(drop=True)


Expand Down Expand Up @@ -447,20 +462,21 @@ def _map_and_convert(

cdm_tables = _prepare_cdm_tables(imodel_maps.keys())

is_reader = isinstance(data_iter, TextFileReader)

for idata in data_iter:
cols = list(idata.columns)
_process_chunk(
idata=idata,
imodel_maps=imodel_maps,
imodel_functions=imodel_functions,
cdm_tables=cdm_tables,
cols=cols,
null_label=null_label,
codes_subset=codes_subset,
cdm_complete=cdm_complete,
drop_missing_obs=drop_missing_obs,
drop_duplicates=drop_duplicates,
logger=logger,
is_reader=is_reader,
)

return _finalize_output(cdm_tables, logger)
Expand Down
Loading
Loading