Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
c09a06f
Change from TextFileReader to ParquetStreamReader for (better) handli…
JanWillruth Jan 15, 2026
c266a4d
Save columns schemas alongside parquet to restore MultiIndex column n…
JanWillruth Jan 15, 2026
8588068
Merge remote-tracking branch 'origin/main' into reader_io
JanWillruth Jan 29, 2026
a6e7b86
try to use ParquetStremReader
ludwiglierhammer Jan 29, 2026
858b4ca
re-add make_copy for TextFileReader objects
ludwiglierhammer Jan 29, 2026
f223253
test_mdf_reader:test_read_data_textfilereader TextFileReader -> Parq…
ludwiglierhammer Jan 29, 2026
691444d
ParquetStreamReader to cdm_reader_mapper.common.iterators
ludwiglierhammer Jan 29, 2026
4d49bde
Update cdm_reader_mapper/mdf_reader/utils/utilities.py
ludwiglierhammer Feb 6, 2026
ae65dff
remove unused variable
ludwiglierhammer Feb 6, 2026
914a725
explicitly set data types
ludwiglierhammer Feb 6, 2026
29f789d
use common.iterators.process_disk_backed
ludwiglierhammer Feb 6, 2026
88b1601
new function: common.iterators.is_valid_iterable
ludwiglierhammer Feb 6, 2026
954be46
make common.iterators.process_disk_backed run with Iterable[pd-Series]
ludwiglierhammer Feb 6, 2026
5416ea8
use common.iterators.process_disk_backed in metmetpy.validate
ludwiglierhammer Feb 6, 2026
0e80b6e
optionally aggregate non data outputs
ludwiglierhammer Feb 9, 2026
75bb38a
remove common.pandas_TextParser_hdlr.get_length
ludwiglierhammer Feb 9, 2026
7202ee2
use common.iterators in common.inspect
ludwiglierhammer Feb 9, 2026
a154ac5
allow indexing
ludwiglierhammer Feb 9, 2026
6c875c4
use common.iterators in common.select
ludwiglierhammer Feb 9, 2026
f30daad
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 9, 2026
9392e9c
set makecopy to False
ludwiglierhammer Feb 9, 2026
35f03af
use commin.iterators in common.replace
ludwiglierhammer Feb 9, 2026
bcfa4fe
Merge branch 'reader_io' of https://github.com/JanWillruth/cdm_reader…
ludwiglierhammer Feb 9, 2026
ce99f77
Merge branch 'main' into reader_io
ludwiglierhammer Feb 10, 2026
f160e29
run databundel with common.iterators
ludwiglierhammer Feb 11, 2026
e2f92d1
cdm_mapper.mapper uses common.iterators
ludwiglierhammer Feb 11, 2026
696a4dc
mdf_reader.utils now uses common.iterators
ludwiglierhammer Feb 11, 2026
0151861
make core using comon.iterators
ludwiglierhammer Feb 12, 2026
85ac14d
re-work indexing
ludwiglierhammer Feb 12, 2026
172dba1
new ParquetStream method: reset_index
ludwiglierhammer Feb 13, 2026
417dba5
internally: reset_index from _split_df to _split_dispatch
ludwiglierhammer Feb 13, 2026
98a103f
preserve indexes while parsing
ludwiglierhammer Feb 13, 2026
d0b1b08
update tests
ludwiglierhammer Feb 13, 2026
5403fff
reduce complexity of process_disk_backed
ludwiglierhammer Feb 13, 2026
8e6f20f
remove TextFileReader references
ludwiglierhammer Feb 13, 2026
c67f361
__getattr__ mkaes real copies
ludwiglierhammer Feb 13, 2026
3147b3f
delete print statement
ludwiglierhammer Feb 17, 2026
651e356
use isinstance ParquetReaderStream
ludwiglierhammer Feb 17, 2026
cf2f186
new postprocessing decorator to apply a function to both pd.DataFrame…
ludwiglierhammer Feb 17, 2026
c7837a2
use postprocessing decorator I
ludwiglierhammer Feb 17, 2026
da40ff6
use postprocessing decorator II
ludwiglierhammer Feb 17, 2026
9598308
use postprocessing decorator III
ludwiglierhammer Feb 17, 2026
eb8f185
introduce ProcessFunction class
ludwiglierhammer Feb 17, 2026
2f33ec1
add AI unit tests
ludwiglierhammer Feb 18, 2026
a42fa76
fixing pylint
ludwiglierhammer Feb 18, 2026
7dc7b2a
Merge branch 'glamod:main' into reader_io
JanWillruth Feb 19, 2026
fe3743f
update CHANGELOG
ludwiglierhammer Feb 19, 2026
ab539f6
Merge branch 'main' into reader_io
ludwiglierhammer Feb 19, 2026
1d66505
fixing merge conflicts manually
ludwiglierhammer Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ New features and enhancements
* `cdm_mapper.read_tables`
* `cdm_mapper.write_tables`

* introduce `ParquetStreamReader` to replace `pd.parsers.io.TextfileReader` (:issue:`8`, :pull:`348`)
* ``cdm_reader.map_model`` now supports both `pd.DataFrame` and `ParquetStreamReader` as output (:pull:`348`)
* ``common.replace_columns`` now supports both `pd.DataFrame` and `ParquetStreamReader` as output (:pull:`348`)

Breaking changes
^^^^^^^^^^^^^^^^
* ``DataBundle.stack_v`` and ``DataBundle.stack_h`` only support `pd.DataFrames` as input, otherwise raises an `ValueError` (:pull:`360`)
Expand All @@ -27,12 +31,18 @@ Breaking changes

* set default for `extension` from ``csv` to specified `data_format` in `mdf_reader.write_data` (:pull:`363`)
* `mdf_reader.read_data`: save `dtypes` in return DataBundle as `pd.Series` not `dict` (:pull:`363`)
* remove ``common.pandas_TextParser_hdlr`` (:issue:`8`, :pull:`348`)
* ``cdm_reader_mapper`` now raises errors instead of logging them (:pull:`348`)
* ``DataBundle`` now converts all iterables of `pd.DataFrame`/`pd.Series` to `ParquetStreamReader` when initialized (:pull:`348`)
* all main functions in `common.select` now return a tuple of 4 (selected values, rejected values, original indexes of selected values, original indexes of rejected values) (:pull:`348`)
* move `ParquetStreamReader` and all corresponding methods to `common.iterables` to handle chunking outside of `mdf_reader`/`cdm_mapper`/`core`/`metmetpy` (:issue:`349`, :pull:`348`)

Internal changes
^^^^^^^^^^^^^^^^
* re-work internal structure for more readability and better performance (:pull:`360`)
* use pre-defined `Literal` constants in `cdm_reader_mapper.properties` (:pull:`363`)
* `mdf_reader.utils.utilities.read_csv`: parameter `columns` to `column_names` (:pull:`363`)
* introduce post-processing decorator that handles both `pd.DataFrame` and `ParquetStreamReader` (:pull:`348`)

2.2.1 (2026-01-23)
------------------
Expand Down
230 changes: 73 additions & 157 deletions cdm_reader_mapper/cdm_mapper/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

Created on Thu Apr 11 13:45:38 2019

Maps data contained in a pandas DataFrame (or pd.io.parsers.TextFileReader) to
Maps data contained in a pandas DataFrame (or Iterable[pd.DataFrame]) to
the C3S Climate Data Store Common Data Model (CDM) header and observational
tables using the mapping information available in the tool's mapping library
for the input data model.
Expand All @@ -15,14 +15,18 @@

from copy import deepcopy
from io import StringIO
from typing import Any, get_args
from typing import Any, Iterable, get_args

import numpy as np
import pandas as pd

from pandas.io.parsers import TextFileReader
from cdm_reader_mapper.common import logging_hdlr

from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr
from cdm_reader_mapper.common.iterators import (
ParquetStreamReader,
ProcessFunction,
process_function,
)

from . import properties
from .codes.codes import get_code_table
Expand All @@ -31,41 +35,6 @@
from .utils.mapping_functions import mapping_functions


def _check_input_data_type(data, logger):
"""Check whether inpuit data type is valid."""

def _log_and_return_empty(msg):
logger.error(msg)

if isinstance(data, pd.DataFrame):
logger.debug("Input data is a pd.DataFrame")
if data.empty:
return _log_and_return_empty("Input data is empty")
return [data]

elif isinstance(data, TextFileReader):
logger.debug("Input is a pd.TextFileReader")
if not pandas_TextParser_hdlr.is_not_empty(data):
return _log_and_return_empty("Input data is empty")

return data

return _log_and_return_empty("Input data type " f"{type(data)}" " not supported")


def _normalize_input_data(data, logger):
"""Return an iterator of DataFrames irrespective of input type."""
data = _check_input_data_type(data, logger)

if data is None:
return iter(())

if isinstance(data, list):
return iter(data)

return data


def _is_empty(value):
"""Check whether a value is considered empty."""
if value is None:
Expand Down Expand Up @@ -369,7 +338,7 @@ def _prepare_cdm_tables(cdm_subset):
return tables


def _process_chunk(
def _map_data_model(
idata,
imodel_maps,
imodel_functions,
Expand All @@ -380,9 +349,14 @@ def _process_chunk(
drop_missing_obs,
drop_duplicates,
logger,
is_reader,
):
"""Process one chunk of input data."""
if ":" in idata.columns[0]:
idata.columns = pd.MultiIndex.from_tuples(
col.split(":") for col in idata.columns
)

all_tables = []
for table, mapping in imodel_maps.items():
logger.debug(f"Table: {table}")

Expand All @@ -400,118 +374,37 @@ def _process_chunk(
)

table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns])
table_df = table_df.astype(object)
all_tables.append(table_df)

if is_reader:
table_df.to_csv(
cdm_tables[table]["buffer"],
header=False,
index=False,
mode="a",
)
cdm_tables[table]["columns"] = table_df.columns
else:
cdm_tables[table]["df"] = table_df.astype(object)


def _finalize_output(cdm_tables, logger):
"""Turn buffers into DataFrames and combine all tables."""
final_tables = []

for table, meta in cdm_tables.items():
logger.debug(f"\tParse datetime by reader; Table: {table}")

if "df" not in meta:
meta["buffer"].seek(0)
df = pd.read_csv(
meta["buffer"],
names=meta["columns"],
na_values=[],
dtype="object",
keep_default_na=False,
)
meta["buffer"].close()
else:
df = meta.get("df", pd.DataFrame())

final_tables.append(df)

if not final_tables:
return pd.DataFrame()

return pd.concat(final_tables, axis=1, join="outer").reset_index(drop=True)


def _map_and_convert(
data_model,
*sub_models,
data=None,
cdm_subset=None,
codes_subset=None,
cdm_complete=True,
drop_missing_obs=True,
drop_duplicates=True,
null_label="null",
logger=None,
) -> pd.DataFrame:
"""Map and convert MDF data to CDM tables."""
data_iter = _normalize_input_data(data, logger)

if data_iter is None:
return pd.DataFrame()

if not cdm_subset:
cdm_subset = properties.cdm_tables

imodel_maps = get_imodel_maps(data_model, *sub_models, cdm_tables=cdm_subset)
imodel_functions = mapping_functions("_".join([data_model] + list(sub_models)))

cdm_tables = _prepare_cdm_tables(imodel_maps.keys())

is_reader = isinstance(data_iter, TextFileReader)

for idata in data_iter:
_process_chunk(
idata=idata,
imodel_maps=imodel_maps,
imodel_functions=imodel_functions,
cdm_tables=cdm_tables,
null_label=null_label,
codes_subset=codes_subset,
cdm_complete=cdm_complete,
drop_missing_obs=drop_missing_obs,
drop_duplicates=drop_duplicates,
logger=logger,
is_reader=is_reader,
)

return _finalize_output(cdm_tables, logger)
return pd.concat(all_tables, axis=1, join="outer").reset_index(drop=True)


def map_model(
data,
imodel,
cdm_subset=None,
codes_subset=None,
null_label="null",
cdm_complete=True,
drop_missing_obs=True,
drop_duplicates=True,
log_level="INFO",
) -> pd.DataFrame:
data: pd.DataFrame | Iterable[pd.DataFrame],
imodel: str,
cdm_subset: str | list[str] | None = None,
codes_subset: str | list[str] | None = None,
null_label: str = "null",
cdm_complete: bool = True,
drop_missing_obs: bool = True,
drop_duplicates: bool = True,
log_level: str = "INFO",
) -> pd.DataFrame | ParquetStreamReader:
"""Map a pandas DataFrame to the CDM header and observational tables.

Parameters
----------
data: pandas.DataFrame, pd.parser.TextFileReader or io.String
data: pandas.DataFrame or Iterable[pd.DataFrame]
input data to map.
imodel: str
A specific mapping from generic data model to CDM, like map a SID-DCK from IMMA1’s core and attachments to
CDM in a specific way.
e.g. ``icoads_r300_d704``
cdm_subset: list, optional
cdm_subset: str or list, optional
subset of CDM model tables to map.
Defaults to the full set of CDM tables defined for the imodel.
codes_subset: list, optional
codes_subset: str or list, optional
subset of code mapping tables to map.
Default to the full set of code mapping tables defined for the imodel.
null_label: str
Expand All @@ -536,30 +429,53 @@ def map_model(
cdm_tables: pandas.DataFrame
DataFrame with MultiIndex columns (cdm_table, column_name).
"""

@process_function(data_only=True)
def _map_model():
return ProcessFunction(
data=data,
func=_map_data_model,
func_kwargs={
"imodel_maps": imodel_maps,
"imodel_functions": imodel_functions,
"cdm_tables": cdm_tables,
"null_label": null_label,
"codes_subset": codes_subset,
"cdm_complete": cdm_complete,
"drop_missing_obs": drop_missing_obs,
"drop_duplicates": drop_duplicates,
"logger": logger,
},
makecopy=False,
)

logger = logging_hdlr.init_logger(__name__, level=log_level)

if imodel is None:
logger.error("Input data model 'imodel' is not defined.")
return
raise ValueError("Input data model 'imodel' is not defined.")

if not isinstance(imodel, str):
logger.error(f"Input data model type is not supported: {type(imodel)}")
return
raise TypeError(f"Input data model type is not supported: {type(imodel)}")

imodel = imodel.split("_")
if imodel[0] not in get_args(properties.SupportedDataModels):
logger.error("Input data model " f"{imodel[0]}" " not supported")
return
data_model = imodel.split("_")
if data_model[0] not in get_args(properties.SupportedDataModels):
raise ValueError("Input data model " f"{data_model[0]}" " not supported")

if not cdm_subset:
cdm_subset = properties.cdm_tables

imodel_maps = get_imodel_maps(*data_model, cdm_tables=cdm_subset)
imodel_functions = mapping_functions(imodel)

cdm_tables = _prepare_cdm_tables(imodel_maps.keys())

result = _map_model()

if isinstance(result, pd.DataFrame):
return pd.DataFrame(result)
elif isinstance(result, ParquetStreamReader):
return result

return _map_and_convert(
imodel[0],
*imodel[1:],
data=data,
cdm_subset=cdm_subset,
codes_subset=codes_subset,
null_label=null_label,
cdm_complete=cdm_complete,
drop_missing_obs=drop_missing_obs,
drop_duplicates=drop_duplicates,
logger=logger,
raise ValueError(
f"result mus be a pd.DataFrame or ParquetStreamReader, not {type(result)}."
)
2 changes: 1 addition & 1 deletion cdm_reader_mapper/cdm_mapper/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

Exports tables written in the C3S Climate Data Store Common Data Model (CDM) format to ascii files,
The tables format is contained in a python dictionary, stored as an attribute in a pandas.DataFrame
(or pd.io.parsers.TextFileReader).
(or Iterable[pd.DataFrame]).

This module uses a set of printer functions to "print" element values to a
string object before exporting them to a final ascii file.
Expand Down
Loading