diff --git a/CHANGES.rst b/CHANGES.rst index 727b5abd..e3f1f4ad 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -20,6 +20,10 @@ New features and enhancements * new Pub47 testdata (test_data["test_pub47"]) (:pull:`327`) +Breaking changes +^^^^^^^^^^^^^^^^ +* ``cdm_reader_mapper.cdm_mapper``: rename `map_and_covnert` to helepr function `_map_and_convert` (:pull:`343`) + Internal changes ^^^^^^^^^^^^^^^^ * implement map_model test for Pub47 data (:issue:`310`, :pull:`327`) @@ -29,6 +33,7 @@ Internal changes * ``cdm_reader_mapper.cdm_mapper``: update mapping functions for more readability (:pull:`324`) * ``cdm_reader_mapper.cdm_mapper``: introdice some helper functions (:pull:`324`) * add more unit tests (:issue:`311`, :pull:`324`) +* ``cdm_reader_mapper.cdm_mapper``: split `map_and_convert` into multiple helper functions (:issue:`333`, :pull:`343`) Bug fixes ^^^^^^^^^ diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 09e1e880..7013a725 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -15,6 +15,7 @@ from copy import deepcopy from io import StringIO +from typing import Any import numpy as np import pandas as pd @@ -28,7 +29,56 @@ from .utils.mapping_functions import mapping_functions -def drop_duplicated_rows(df) -> pd.DataFrame: +def _check_input_data_type(data, logger): + """Check whether inpuit data type is valid.""" + + def _log_and_return_empty(msg): + logger.error(msg) + + if isinstance(data, pd.DataFrame): + logger.debug("Input data is a pd.DataFrame") + if data.empty: + return _log_and_return_empty("Input data is empty") + return [data] + + elif isinstance(data, pd.io.parsers.TextFileReader): + logger.debug("Input is a pd.TextFileReader") + if not pandas_TextParser_hdlr.is_not_empty(data): + return _log_and_return_empty("Input data is empty") + + return data + + return _log_and_return_empty("Input data type " f"{type(data)}" " not supported") + + +def _normalize_input_data(data, logger): + """Return an iterator of DataFrames irrespective of input type.""" + data = _check_input_data_type(data, logger) + + if data is None: + return iter(()) + + if isinstance(data, list): + return iter(data) + + return data + + +def _is_empty(value): + """Check whether a value is considered empty.""" + if value is None: + return True + + if hasattr(value, "empty"): + return bool(value.empty) + + if not value: + return True + + return False + + +def _drop_duplicated_rows(df) -> pd.DataFrame: """Drop duplicates from list.""" def list_to_tuple(v): @@ -48,115 +98,158 @@ def tuple_to_list(v): return df.astype(dtypes) -def _map_to_df(m, x): - if not isinstance(m, dict): - return - for x_ in x: - if x_ in m.keys(): - v = m[x_] - if isinstance(v, dict): - m = v - continue - else: - return v +def _get_nested_value(ndict, keys) -> Any | None: + """Traverse nested dictionaries along a sequence of keys.""" + if not isinstance(ndict, dict): return + current = ndict + for key in keys: + if not isinstance(current, dict): + return + if key not in current: + return + value = current[key] + if isinstance(value, dict): + current = value + continue + return value -def _decimal_places( - entry, - decimal_places, -) -> int: - if decimal_places is not None: - if isinstance(decimal_places, int): - entry["decimal_places"] = decimal_places - else: - entry["decimal_places"] = properties.default_decimal_places +def _convert_dtype(series, atts) -> pd.DataFrame: + """Convert data to the type specified in `atts`.""" + if atts is None: + return np.nan - return entry + dtype = atts.get("data_type") + if not dtype: + return series + + converter = converters.get(dtype) + if not converter: + return series + + converter_keys = iconverters_kwargs.get(dtype) + if converter_keys: + kwargs = {key: atts.get(key) for key in converter_keys} + else: + kwargs = {} + + return converter(series, np.nan, **kwargs) + + +def _decimal_places(decimal_places) -> int: + """Set the 'decimal_places' in the entry dictionary.""" + if decimal_places is None or not isinstance(decimal_places, int): + return properties.default_decimal_places + + return decimal_places def _transform( - series, + data, imodel_functions, transform, kwargs, logger, ) -> pd.Series: - logger.debug(f"\ttransform: {transform}") - logger.debug("\tkwargs: {}".format(",".join(list(kwargs.keys())))) - trans = getattr(imodel_functions, transform) - return trans(series, **kwargs) + """Apply a transformation function from imodel_functions to a pandas Series.""" + logger.debug(f"Applying transform: {transform}") + if kwargs: + logger.debug(f"With kwargs: {', '.join(kwargs.keys())}") + try: + trans_func = getattr(imodel_functions, transform) + except AttributeError: + logger.error(f"Transform '{transform}' not found in imodel_functions") + return data + + return trans_func(data, **kwargs) def _code_table( - series, + data, data_model, code_table, logger, ) -> pd.Series: + """Map values in a Series or DataFrame using a (possibly nested) code table.""" + logger.debug(f"Mapping code table: {code_table}") table_map = get_code_table(*data_model.split("_"), code_table=code_table) + try: - series = series.to_frame() + df = data.to_frame() if isinstance(data, pd.Series) else data.copy() except Exception: - logger.warning(f"Could not convert {series} to frame.") + logger.warning(f"Could not convert {data} to a DataFrame.") + return pd.Series([None] * len(data), index=data.index) + + df = df.astype(str) + + df.columns = [ + "_".join(col) if isinstance(col, tuple) else str(col) for col in df.columns + ] - series_str = series.astype(str) - series_str.columns = ["_".join(col) for col in series_str.columns.values] - return series_str.apply(lambda x: _map_to_df(table_map, x), axis=1) + def _map_col(col): + return _get_nested_value(table_map, col.tolist()) + + return df.apply(_map_col, axis=1) def _default( default, length, -) -> list | int: - if isinstance(default, list): - return [default] * length - return default +) -> list: + """Return a list of a given length filled with the default value.""" + return [default] * length -def _fill_value(series, fill_value) -> pd.Series | int: +def _fill_value(series, fill_value) -> pd.Series: + """Fill missing values in series.""" if fill_value is None: return series - if series is None: - return fill_value return series.fillna(value=fill_value).infer_objects(copy=False) -def _map_data( - series, - transform, - code_table, - default, - fill_value, - imodel_functions, - kwargs, - length, - logger, -) -> pd.Series: - if (series is None or series.empty) and not transform: - series = _default(default, length) - elif transform: - series = _transform( - series, - imodel_functions, - transform, - kwargs, - logger=logger, - ) - elif code_table: - series = _code_table( - series, - imodel_functions.imodel, - code_table, - logger=logger, +def _extract_input_data(idata, elements, cols, default, logger): + """Extract the relevant input data based on `elements`.""" + + def _return_default(): + return pd.Series(_default(default, len(idata))), True + + if not elements: + return _return_default() + + logger.debug(f"\telements: {' '.join(map(str, elements))}") + + missing_elements = [e for e in elements if e not in cols] + if missing_elements: + logger.warning( + "Missing elements from input data: {}".format( + ",".join(map(str, missing_elements)) + ) ) - return _fill_value(series, fill_value) + return _return_default() + data = idata[elements] + if len(elements) == 1: + data = data.iloc[:, 0] -def _mapping( - idata, imapping, imodel_functions, atts, codes_subset, cols, logger -) -> pd.DataFrame: + if _is_empty(data): + return _return_default() + + return data, False + + +def _column_mapping( + idata, + imapping, + imodel_functions, + atts, + codes_subset, + cols, + column, + logger, +): + """Map a column (or multiple elements) in input data according to mapping rules.""" elements = imapping.get("elements") transform = imapping.get("transform") kwargs = imapping.get("kwargs", {}) @@ -165,112 +258,173 @@ def _mapping( fill_value = imapping.get("fill_value") decimal_places = imapping.get("decimal_places") - if codes_subset: - if code_table not in codes_subset: - code_table = None - - to_map = None - if elements: - logger.debug("\telements: {}".format(" ".join([str(x) for x in elements]))) - missing_els = [x for x in elements if x not in cols] - if len(missing_els) > 0: - logger.warning( - "Following elements from data model missing from input data: {} to map.".format( - ",".join([str(x) for x in missing_els]) - ) - ) - return _default(None, len(idata)), atts - - to_map = idata[elements] - if len(elements) == 1: - to_map = to_map.iloc[:, 0] + if codes_subset and code_table not in codes_subset: + code_table = None - data = _map_data( - to_map, - transform, - code_table, + data, used_default = _extract_input_data( + idata, + elements, + cols, default, - fill_value, - imodel_functions, - kwargs, - len(idata), logger, ) - atts = _decimal_places(atts, decimal_places) - return data, atts + if transform and not used_default: + data = _transform( + data, + imodel_functions, + transform, + kwargs, + logger=logger, + ) + elif code_table and not used_default: + data = _code_table( + data, + imodel_functions.imodel, + code_table, + logger=logger, + ) -def _convert_dtype(data, atts) -> pd.DataFrame: - if atts is None: - return np.nan - itype = atts.get("data_type") - if converters.get(itype): - iconverter_kwargs = iconverters_kwargs.get(itype) - if iconverter_kwargs: - kwargs = {x: atts.get(x) for x in iconverter_kwargs} - else: - kwargs = {} - data = converters.get(itype)(data, np.nan, **kwargs) + data = pd.Series(data, index=idata.index, name=column) + data = _fill_value(data, fill_value) + atts["decimal_places"] = _decimal_places(decimal_places) - return data + return data, atts -def _map_and_convert( +def _table_mapping( idata, mapping, - table, + atts, cols, null_label, imodel_functions, codes_subset, - cdm_tables, cdm_complete, drop_missing_obs, drop_duplicates, logger, ) -> pd.DataFrame: - atts = deepcopy(cdm_tables[table]["atts"]) columns = ( [x for x in atts.keys() if x in idata.columns] if not cdm_complete else list(atts.keys()) ) - table_df_i = pd.DataFrame(index=idata.index, columns=columns) - logger.debug(f"Table: {table}") + table_df = pd.DataFrame(index=idata.index, columns=columns) + for column in columns: if column not in mapping.keys(): continue + logger.debug(f"\tElement: {column}") - table_df_i[column], atts[column] = _mapping( + + table_df[column], atts[column] = _column_mapping( idata, mapping[column], imodel_functions, atts[column], codes_subset, cols, + column, logger, ) - table_df_i[column] = _convert_dtype(table_df_i[column], atts.get(column)) + table_df[column] = _convert_dtype(table_df[column], atts.get(column)) - if drop_missing_obs is True and "observation_value" in table_df_i: - table_df_i = table_df_i.dropna(subset=["observation_value"]) + if drop_missing_obs is True and "observation_value" in table_df: + table_df = table_df.dropna(subset=["observation_value"]) - table_df_i.columns = pd.MultiIndex.from_product([[table], columns]) if drop_duplicates: - table_df_i = drop_duplicated_rows(table_df_i) + table_df = _drop_duplicated_rows(table_df) + + return table_df.fillna(null_label) + + +def _prepare_cdm_tables(cdm_subset): + """Prepare table buffers and attributes for CDM tables.""" + if isinstance(cdm_subset, str): + cdm_subset = [cdm_subset] + cdm_atts = get_cdm_atts(cdm_subset) + if not cdm_atts: + return {} + return { + table: { + "buffer": StringIO(), + "atts": deepcopy(cdm_atts.get(table)), + } + for table in cdm_subset + } + + +def _process_chunk( + idata, + imodel_maps, + imodel_functions, + cdm_tables, + cols, + null_label, + codes_subset, + cdm_complete, + drop_missing_obs, + drop_duplicates, + logger, +): + """Process one chunk of input data.""" + for table, mapping in imodel_maps.items(): + logger.debug(f"Table: {table}") + + table_df = _table_mapping( + idata=idata, + mapping=mapping, + atts=deepcopy(cdm_tables[table]["atts"]), + cols=cols, + null_label=null_label, + imodel_functions=imodel_functions, + codes_subset=codes_subset, + cdm_complete=cdm_complete, + drop_missing_obs=drop_missing_obs, + drop_duplicates=drop_duplicates, + logger=logger, + ) - table_df_i = table_df_i.fillna(null_label) - table_df_i.to_csv(cdm_tables[table]["buffer"], header=False, index=False, mode="a") - cdm_tables[table]["columns"] = table_df_i.columns - return cdm_tables + table_df.columns = pd.MultiIndex.from_product([[table], table_df.columns]) + table_df.to_csv( + cdm_tables[table]["buffer"], + header=False, + index=False, + mode="a", + ) + cdm_tables[table]["columns"] = table_df.columns + + +def _finalize_output(cdm_tables, logger): + """Turn buffers into DataFrames and combine all tables.""" + final_tables = [] + + for table, meta in cdm_tables.items(): + logger.debug(f"\tParse datetime by reader; Table: {table}") + + meta["buffer"].seek(0) + df = pd.read_csv( + meta["buffer"], + names=meta["columns"], + na_values=[], + dtype="object", + keep_default_na=False, + ) + meta["buffer"].close() -def map_and_convert( + final_tables.append(df) + + return pd.concat(final_tables, axis=1, join="outer").reset_index(drop=True) + + +def _map_and_convert( data_model, *sub_models, - data=pd.DataFrame(), + data=None, cdm_subset=None, codes_subset=None, cdm_complete=True, @@ -280,66 +434,36 @@ def map_and_convert( logger=None, ) -> pd.DataFrame: """Map and convert MDF data to CDM tables.""" + data_iter = _normalize_input_data(data, logger) + + if data_iter is None: + return pd.DataFrame() + if not cdm_subset: cdm_subset = properties.cdm_tables - cdm_atts = get_cdm_atts(cdm_subset) - imodel_maps = get_imodel_maps(data_model, *sub_models, cdm_tables=cdm_subset) - imodel_functions = mapping_functions("_".join([data_model] + list(sub_models))) - # Initialize dictionary to store temporal tables (buffer) and table attributes - cdm_tables = { - k: {"buffer": StringIO(), "atts": cdm_atts.get(k)} for k in imodel_maps.keys() - } - - date_columns = {} - for table, values in imodel_maps.items(): - date_columns[table] = [ - i - for i, x in enumerate(list(values)) - if "timestamp" in cdm_atts.get(table, {}).get(x, {}).get("data_type") - ] - - for idata in data: - cols = [x for x in idata] - for table, mapping in imodel_maps.items(): - cdm_tables = _map_and_convert( - idata, - mapping, - table, - cols, - null_label, - imodel_functions, - codes_subset, - cdm_tables, - cdm_complete, - drop_missing_obs, - drop_duplicates, - logger, - ) - - table_list = [] - for table in cdm_tables.keys(): - # Convert dtime to object to be parsed by the reader - logger.debug( - f"\tParse datetime by reader; Table: {table}; Columns: {date_columns[table]}" - ) - cdm_tables[table]["buffer"].seek(0) - data = pd.read_csv( - cdm_tables[table]["buffer"], - names=cdm_tables[table]["columns"], - na_values=[], - dtype="object", - keep_default_na=False, + cdm_tables = _prepare_cdm_tables(imodel_maps.keys()) + + for idata in data_iter: + cols = list(idata.columns) + _process_chunk( + idata=idata, + imodel_maps=imodel_maps, + imodel_functions=imodel_functions, + cdm_tables=cdm_tables, + cols=cols, + null_label=null_label, + codes_subset=codes_subset, + cdm_complete=cdm_complete, + drop_missing_obs=drop_missing_obs, + drop_duplicates=drop_duplicates, + logger=logger, ) - cdm_tables[table]["buffer"].close() - cdm_tables[table].pop("buffer") - table_list.append(data) - merged = pd.concat(table_list, axis=1, join="outer") - return merged.reset_index(drop=True) + return _finalize_output(cdm_tables, logger) def map_model( @@ -393,32 +517,11 @@ def map_model( """ logger = logging_hdlr.init_logger(__name__, level=log_level) imodel = imodel.split("_") - # Check we have imodel registered, leave otherwise if imodel[0] not in properties.supported_data_models: logger.error("Input data model " f"{imodel[0]}" " not supported") return - # Check input data type and content (empty?) - # Make sure data is an iterable: this is to homogenize how we handle - # dataframes and textreaders - if isinstance(data, pd.DataFrame): - logger.debug("Input data is a pd.DataFrame") - if len(data) == 0: - logger.error("Input data is empty") - return - else: - data = [data] - elif isinstance(data, pd.io.parsers.TextFileReader): - logger.debug("Input is a pd.TextFileReader") - not_empty = pandas_TextParser_hdlr.is_not_empty(data) - if not not_empty: - logger.error("Input data is empty") - return - else: - logger.error("Input data type " f"{type(data)}" " not supported") - return - - return map_and_convert( + return _map_and_convert( imodel[0], *imodel[1:], data=data, diff --git a/cdm_reader_mapper/cdm_mapper/tables/tables.py b/cdm_reader_mapper/cdm_mapper/tables/tables.py index cfe64a6d..7b22a617 100755 --- a/cdm_reader_mapper/cdm_mapper/tables/tables.py +++ b/cdm_reader_mapper/cdm_mapper/tables/tables.py @@ -63,6 +63,8 @@ def get_cdm_atts( cdm_atts = {} for cdm_table in cdm_table_list: + if cdm_table not in properties.cdm_tables: + continue if cdm_table == "header": cdm_atts[cdm_table] = deepcopy(header_dict) else: diff --git a/tests/test_cdm_mapper.py b/tests/test_cdm_mapper.py index 6fc7ff1a..e1f72a7f 100755 --- a/tests/test_cdm_mapper.py +++ b/tests/test_cdm_mapper.py @@ -6,18 +6,21 @@ from io import StringIO from cdm_reader_mapper.cdm_mapper.mapper import ( - drop_duplicated_rows, - _map_to_df, + _check_input_data_type, + _is_empty, + _drop_duplicated_rows, + _get_nested_value, _decimal_places, _transform, _code_table, _default, _fill_value, - _map_data, - _mapping, + _extract_input_data, + _column_mapping, _convert_dtype, + _table_mapping, _map_and_convert, - map_and_convert, + _prepare_cdm_tables, map_model, ) @@ -31,6 +34,33 @@ from cdm_reader_mapper.data import test_data +@pytest.fixture +def sample_df(): + return pd.DataFrame({"A": [1, 2]}) + + +@pytest.fixture +def sample_df_empty(): + return pd.DataFrame() + + +@pytest.fixture +def sample_tfr(): + csv_data = "A\n1\n2" + return pd.read_csv(StringIO(csv_data), chunksize=1) + + +@pytest.fixture +def sample_tfr_empty(): + csv_data = "A\n" + return pd.read_csv(StringIO(csv_data), chunksize=1) + + +@pytest.fixture +def sample_string(): + return "A" + + @pytest.fixture def imodel_maps(): return get_imodel_maps("icoads", "r300", "d720", cdm_tables=["header"]) @@ -100,11 +130,63 @@ def _map_model_test_data(data_model, encoding="utf-8", select=None, **kwargs): pd.testing.assert_frame_equal(result_table, expected) +def test_check_input_data_type_df_non_empty(sample_df): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _check_input_data_type(sample_df, logger) + + assert result == [sample_df] + + +def test_check_input_data_type_df_empty(sample_df_empty): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _check_input_data_type(sample_df_empty, logger) + + assert result is None + + +def test_check_input_data_type_textfilereader_non_empty(sample_tfr): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _check_input_data_type(sample_tfr, logger) + + assert result is sample_tfr + + +def test_check_input_data_type_textfilereader_empty(sample_tfr_empty): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _check_input_data_type(sample_tfr_empty, logger) + + assert result is None + + +def test_check_input_data_type_invalid_type(sample_string): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _check_input_data_type(sample_string, logger) + + assert result is None + + +@pytest.mark.parametrize( + "value, expected", + [ + (None, True), + (pd.DataFrame(), True), + (pd.DataFrame({"a": [1]}), False), + (123, False), + ("string", False), + ([], True), + ({}, True), + ("", True), + ], +) +def test_is_empty(value, expected): + assert _is_empty(value) is expected + + def test_drop_duplicated_rows(): data = pd.DataFrame( data={"col1": [1, 2, 3, 4, 3], "col2": [[5, 9], [6, 9], [7, 9], [8, 9], [7, 9]]} ) - result = drop_duplicated_rows(data) + result = _drop_duplicated_rows(data) expected = pd.DataFrame( data={"col1": [1, 2, 3, 4], "col2": [[5, 9], [6, 9], [7, 9], [8, 9]]} ) @@ -120,21 +202,55 @@ def test_drop_duplicated_rows(): (["4", "5"], 5000), ], ) -def test_map_to_df(value, expected): +def test_get_nested_value(value, expected): mapping_table = {"1": 1000, "2": 2000, "4": {"5": 5000}} - assert _map_to_df(mapping_table, value) == expected + assert _get_nested_value(mapping_table, value) == expected + + +def test_get_nested_value_none(): + assert _get_nested_value(None, "4") is None + + +@pytest.mark.parametrize( + "table", + [ + "header", + "observations-at", + "observations-dpt", + "observations-slp", + "observations-sst", + "observations-wbt", + "observations-wd", + "observations-ws", + ], +) +@pytest.mark.parametrize("is_list", [True, False]) +def test_prepare_cdm_tables(table, is_list): + if is_list is True: + table_in = [table] + else: + table_in = table + + result = _prepare_cdm_tables(table_in) + assert isinstance(result, dict) + assert list(result.keys()) == [table] + assert "buffer" in result[table] + assert isinstance(result[table]["buffer"], StringIO) + assert "atts" in result[table] -def test_map_to_df_none(): - assert _map_to_df(None, "4") is None + +def test_prepare_cdm_tables_invalid(): + result = _prepare_cdm_tables("invalid") + assert result == {} @pytest.mark.parametrize( "decimal_places,expected", - [(None, {}), (4, {"decimal_places": 4}), ("4", {"decimal_places": 5})], + [(None, 5), (4, 4), ("4", 5)], ) def test_decimal_places(decimal_places, expected): - assert _decimal_places({}, decimal_places) == expected + assert _decimal_places(decimal_places) == expected def test_transform(imodel_functions): @@ -145,6 +261,13 @@ def test_transform(imodel_functions): pd.testing.assert_series_equal(result, expected) +def test_transform_notfound(imodel_functions): + series = pd.Series(data={"a": 1, "b": 2, "c": np.nan}, index=["a", "b", "c"]) + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _transform(series, imodel_functions, "invalid_function", {}, logger) + pd.testing.assert_series_equal(result, series) + + def test_code_table(): series = pd.Series( data={"a": "1", "b": "2", "c": np.nan}, @@ -154,7 +277,8 @@ def test_code_table(): logger = logging_hdlr.init_logger(__name__, level="INFO") result = _code_table(series, "icoads_r300_d721", "baro_units", logger) expected = pd.Series( - data={"a": 1001.0, "b": 1004.0, "c": np.nan}, index=["a", "b", "c"] + data={"a": 1001.0, "b": 1004.0, "c": np.nan}, + index=["a", "b", "c"], ) pd.testing.assert_series_equal(result, expected) @@ -162,8 +286,8 @@ def test_code_table(): @pytest.mark.parametrize( "default,length,expected", [ - (5, 1, 5), - (5, 5, 5), + (5, 1, [5]), + (5, 5, [5, 5, 5, 5, 5]), ([5], 5, [[5], [5], [5], [5], [5]]), ([5, 6], 5, [[5, 6], [5, 6], [5, 6], [5, 6], [5, 6]]), ], @@ -180,7 +304,6 @@ def test_default(default, length, expected): None, pd.Series(data={"a": 1, "b": 2, "c": 3}, index=["a", "b", "c"]), ), - (None, 5, 5), ( pd.Series(data={"a": 1, "b": None, "c": np.nan}, index=["a", "b", "c"]), 5, @@ -197,168 +320,115 @@ def test_fill_value(series, fill_value, expected): @pytest.mark.parametrize( - "series,transform,code_table,default,fill_value,expected", + "value,atts,expected", [ - (None, None, None, 5, None, 5), - (pd.Series(), None, None, 5, None, 5), - ( - pd.Series(data={"a": 1, "b": None, "c": np.nan}, index=["a", "b", "c"]), - None, - None, - None, - 5, - pd.Series(data={"a": 1, "b": 5, "c": 5.0}, index=["a", "b", "c"]), - ), - ( - pd.Series( - data={"a": "1", "b": "2", "c": np.nan}, - index=["a", "b", "c"], - name=("test", "data"), - ), - None, - "baro_units", - None, - None, - pd.Series( - data={"a": 1001.0, "b": 1002.0, "c": np.nan}, index=["a", "b", "c"] - ), - ), - ( - pd.Series(data={"a": 1, "b": 2, "c": np.nan}, index=["a", "b", "c"]), - "integer_to_float", - None, - None, - None, - pd.Series(data={"a": 1.0, "b": 2.0, "c": np.nan}, index=["a", "b", "c"]), - ), - ( - pd.Series( - data={"a": "1", "b": "2", "c": np.nan}, - index=["a", "b", "c"], - name=("test", "data"), - ), - None, - "baro_units", - None, - 5000.0, - pd.Series( - data={"a": 1001.0, "b": 1002.0, "c": 5000.0}, index=["a", "b", "c"] - ), - ), + (5, {"data_type": "numeric", "decimal_places": 2}, "5.00"), + (5, None, np.nan), + (5, {"data_type": "invalid"}, 5), + ("5", {"data_type": "int"}, "5"), ], ) -def test_map_data(series, transform, code_table, default, fill_value, expected): - logger = logging_hdlr.init_logger(__name__, level="INFO") - kwargs = { - "kwargs": {}, - "length": None, - "logger": logger, - } - imodel_functions = mapping_functions("icoads_r300_d701") - result = _map_data( - series, transform, code_table, default, fill_value, imodel_functions, **kwargs - ) +def test_convert_dtype(value, atts, expected): + idata = pd.Series(value) + result = _convert_dtype(idata, atts) if isinstance(result, pd.Series): - pd.testing.assert_series_equal(result, expected) + pd.testing.assert_series_equal(result, pd.Series(expected)) else: - assert result == expected + assert result, expected @pytest.mark.parametrize( - "column,expected", + "column, elements, default, use_default, exp", [ - ("duplicate_status", 4), + ("duplicate_status", None, 4, True, [4, 4, 4, 4]), + ("platform_type", [("c1", "PT")], None, False, "idata"), + ("report_id", [("c98", "UID")], None, False, "idata"), + ("latitude", [("core", "LAT")], None, True, [None, None, None, None]), + ("location_quality", [("c1", "LZ")], None, False, "idata"), + ], +) +def test_extract_input_data( + imodel_maps, data_header, column, elements, default, use_default, exp +): + logger = logging_hdlr.init_logger(__name__, level="INFO") + result = _extract_input_data( + data_header, + elements, + data_header.columns, + default, + logger, + ) + assert isinstance(result, tuple) + + assert result[1] is use_default + + if exp == "idata": + exp = data_header[elements[0]] + elif isinstance(exp, list): + exp = pd.Series(exp) + + pd.testing.assert_series_equal(result[0], exp) + + +@pytest.mark.parametrize( + "column, expected", + [ + ("duplicate_status", [4, 4, 4, 4]), ("platform_type", [2, 33, 32, 45]), ( "report_id", ["ICOADS-30-5012", "ICOADS-30-8960", "ICOADS-30-0037", "ICOADS-30-1000"], ), ("location_quality", [2.0, "0", "0", "0"]), + ("latitude", [None, None, None, None]), ], ) -def test_mapping(imodel_maps, imodel_functions, data_header, column, expected): +def test_column_mapping(imodel_maps, imodel_functions, data_header, column, expected): logger = logging_hdlr.init_logger(__name__, level="INFO") - imapping = imodel_maps["header"][column] - result = _mapping( + mapping_column = imodel_maps["header"][column] + column_atts = get_cdm_atts("header")["header"][column] + result, _ = _column_mapping( data_header, - imapping, + mapping_column, imodel_functions, - {}, + column_atts, None, data_header.columns, + column, logger, - )[0] - if isinstance(result, pd.Series): - expected = pd.Series(expected) - pd.testing.assert_series_equal(result, pd.Series(expected)) - elif isinstance(result, np.ndarray): - np.testing.assert_array_equal(result, np.array(expected)) - else: - assert result, expected - - -@pytest.mark.parametrize( - "value,atts,expected", - [ - (5, {"data_type": "numeric", "decimal_places": 2}, "5.00"), - (5, None, np.nan), - (5, {"data_type": "invalid"}, 5), - ("5", {"data_type": "int"}, "5"), - ], -) -def test_convert_dtype(value, atts, expected): - idata = pd.Series(value) - result = _convert_dtype(idata, atts) - if isinstance(result, pd.Series): - pd.testing.assert_series_equal(result, pd.Series(expected)) - else: - assert result, expected + ) + pd.testing.assert_series_equal(result, pd.Series(expected, name=column)) -def test_map_and_convert( +def test_table_mapping( imodel_maps, imodel_functions, data_header, data_header_expected ): logger = logging_hdlr.init_logger(__name__, level="INFO") - cdm_atts = get_cdm_atts("header") - cdm_tables = { - "header": {"buffer": StringIO(), "atts": cdm_atts["header"]}, - } - cdm_tables = _map_and_convert( + table_atts = get_cdm_atts("header")["header"] + result = _table_mapping( data_header, imodel_maps["header"], - "header", + table_atts, data_header.columns, "null", imodel_functions, None, - cdm_tables, True, False, False, logger, ) - cdm_tables["header"]["buffer"].seek(0) - result = pd.read_csv( - cdm_tables["header"]["buffer"], - names=cdm_tables["header"]["columns"], - na_values=[], - dtype="object", - keep_default_na=False, - ) - cdm_tables["header"]["buffer"].close() - cdm_tables["header"].pop("buffer") - pd.testing.assert_frame_equal( - result[data_header_expected.columns], data_header_expected - ) + expected = data_header_expected["header"] + pd.testing.assert_frame_equal(result[expected.columns], expected) -def test_map_and_convert_func(data_header, data_header_expected): +def test_map_and_convert(data_header, data_header_expected): logger = logging_hdlr.init_logger(__name__, level="INFO") - result = map_and_convert( + result = _map_and_convert( "icoads", "r300", "d720", - data=[data_header], + data=data_header, cdm_subset=["header"], logger=logger, ) diff --git a/tests/test_mapper_tables_codes.py b/tests/test_mapper_tables_codes.py index 943c1dae..b278801d 100755 --- a/tests/test_mapper_tables_codes.py +++ b/tests/test_mapper_tables_codes.py @@ -131,6 +131,10 @@ def test_get_cdm_atts(cdm_tables): _assert_dict_keys(cdm_atts, expected_tables) +def test_get_cdm_atts_invalid(): + assert get_cdm_atts("invalid") == {} + + @pytest.mark.parametrize( "dataset,cdm_tables", [