diff --git a/cdm_reader_mapper/cdm_mapper/mapper.py b/cdm_reader_mapper/cdm_mapper/mapper.py index 158e3599..ac1db1e7 100755 --- a/cdm_reader_mapper/cdm_mapper/mapper.py +++ b/cdm_reader_mapper/cdm_mapper/mapper.py @@ -3,9 +3,8 @@ Created on Thu Apr 11 13:45:38 2019 -Maps data contained in a pandas DataFrame (or pd.io.parsers.TextFileReader) to -the C3S Climate Data Store Common Data Model (CDM) header and observational -tables using the mapping information available in the tool's mapping library +Maps data contained in a pandas DataFrame to the C3S Climate Data Store Common Data Model (CDM) +header and observational tables using the mapping information available in the tool's mapping library for the input data model. @author: iregon @@ -13,13 +12,10 @@ from __future__ import annotations -from copy import deepcopy -from io import StringIO - import numpy as np import pandas as pd -from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr +from cdm_reader_mapper.common import logging_hdlr from . import properties from .codes.codes import get_code_table @@ -223,11 +219,11 @@ def _map_and_convert( null_label, imodel_functions, codes_subset, - cdm_tables, cdm_complete, + cdm_atts, logger, ): - atts = deepcopy(cdm_tables[table]["atts"]) + atts = cdm_atts.get(table) columns = ( [x for x in atts.keys() if x in idata.columns] if not cdm_complete @@ -258,10 +254,7 @@ def _map_and_convert( table_df_i.columns = pd.MultiIndex.from_product([[table], columns]) table_df_i = drop_duplicates(table_df_i) - table_df_i = table_df_i.fillna(null_label) - table_df_i.to_csv(cdm_tables[table]["buffer"], header=False, index=False, mode="a") - cdm_tables[table]["columns"] = table_df_i.columns - return cdm_tables + return table_df_i.fillna(null_label) def map_and_convert( @@ -284,11 +277,6 @@ def map_and_convert( imodel_functions = mapping_functions("_".join([data_model] + list(sub_models))) - # Initialize dictionary to store temporal tables (buffer) and table attributes - cdm_tables = { - k: {"buffer": StringIO(), "atts": cdm_atts.get(k)} for k in imodel_maps.keys() - } - date_columns = {} for table, values in imodel_maps.items(): date_columns[table] = [ @@ -297,39 +285,22 @@ def map_and_convert( if "timestamp" in cdm_atts.get(table, {}).get(x, {}).get("data_type") ] - for idata in data: - cols = [x for x in idata] - for table, mapping in imodel_maps.items(): - cdm_tables = _map_and_convert( - idata, - mapping, - table, - cols, - null_label, - imodel_functions, - codes_subset, - cdm_tables, - cdm_complete, - logger, - ) - table_list = [] - for table in cdm_tables.keys(): - # Convert dtime to object to be parsed by the reader - logger.debug( - f"\tParse datetime by reader; Table: {table}; Columns: {date_columns[table]}" - ) - cdm_tables[table]["buffer"].seek(0) - data = pd.read_csv( - cdm_tables[table]["buffer"], - names=cdm_tables[table]["columns"], - na_values=[], - dtype="object", - keep_default_na=False, + for table in cdm_subset: + mapping = imodel_maps[table] + table_df = _map_and_convert( + data, + mapping, + table, + data.columns, + null_label, + imodel_functions, + codes_subset, + cdm_complete, + cdm_atts, + logger, ) - cdm_tables[table]["buffer"].close() - cdm_tables[table].pop("buffer") - table_list.append(data) + table_list.append(table_df) merged = pd.concat(table_list, axis=1, join="outer") return merged.reset_index(drop=True) @@ -377,25 +348,14 @@ def map_model( return # Check input data type and content (empty?) - # Make sure data is an iterable: this is to homogenize how we handle - # dataframes and textreaders - if isinstance(data, pd.DataFrame): - logger.debug("Input data is a pd.DataFrame") - if len(data) == 0: - logger.error("Input data is empty") - return - else: - data = [data] - elif isinstance(data, pd.io.parsers.TextFileReader): - logger.debug("Input is a pd.TextFileReader") - not_empty = pandas_TextParser_hdlr.is_not_empty(data) - if not not_empty: - logger.error("Input data is empty") - return - else: + if not isinstance(data, pd.DataFrame): logger.error("Input data type " f"{type(data)}" " not supported") return + if data.empty: + logger.error("Input data is empty") + return + return map_and_convert( imodel[0], *imodel[1:], diff --git a/cdm_reader_mapper/cdm_mapper/writer.py b/cdm_reader_mapper/cdm_mapper/writer.py index 331300b1..8c622841 100755 --- a/cdm_reader_mapper/cdm_mapper/writer.py +++ b/cdm_reader_mapper/cdm_mapper/writer.py @@ -4,8 +4,7 @@ Created on Thu Apr 11 13:45:38 2019 Exports tables written in the C3S Climate Data Store Common Data Model (CDM) format to ascii files, -The tables format is contained in a python dictionary, stored as an attribute in a pandas.DataFrame -(or pd.io.parsers.TextFileReader). +The tables format is contained in a python dictionary, stored as an attribute in a pandas.DataFrame. This module uses a set of printer functions to "print" element values to a string object before exporting them to a final ascii file. diff --git a/cdm_reader_mapper/common/inspect.py b/cdm_reader_mapper/common/inspect.py index 1e7ea61c..2c9f7d8c 100755 --- a/cdm_reader_mapper/common/inspect.py +++ b/cdm_reader_mapper/common/inspect.py @@ -9,9 +9,6 @@ from __future__ import annotations import numpy as np -import pandas as pd - -from cdm_reader_mapper.common import pandas_TextParser_hdlr def count_by_cat_i(series): @@ -34,10 +31,7 @@ def get_length(data): int Total row count """ - if not isinstance(data, pd.io.parsers.TextFileReader): - return len(data) - else: - return pandas_TextParser_hdlr.get_length(data) + return len(data) def count_by_cat(data, columns=None): @@ -60,23 +54,6 @@ def count_by_cat(data, columns=None): if not isinstance(columns, list): columns = [columns] counts = {} - if not isinstance(data, pd.io.parsers.TextFileReader): - for column in columns: - counts[column] = count_by_cat_i(data[column]) - return counts - else: - for column in columns: - data_cp = pandas_TextParser_hdlr.make_copy(data) - count_dicts = [] - for df in data_cp: - count_dicts.append(count_by_cat_i(df[column])) - - data_cp.close() - cats = [list(x.keys()) for x in count_dicts] - cats = list({x for y in cats for x in y}) - cats.sort - count_dict = {} - for cat in cats: - count_dict[cat] = sum([x.get(cat) for x in count_dicts if x.get(cat)]) - counts[column] = count_dict - return counts + for column in columns: + counts[column] = count_by_cat_i(data[column]) + return counts diff --git a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py b/cdm_reader_mapper/common/pandas_TextParser_hdlr.py deleted file mode 100755 index 34bfce95..00000000 --- a/cdm_reader_mapper/common/pandas_TextParser_hdlr.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Functions for pandas TextParser objects. - -Created on Tue Apr 2 10:34:56 2019 - -Assumes we are never writing a header! - -@author: iregon -""" - -from __future__ import annotations - -from io import StringIO - -import pandas as pd - -from . import logging_hdlr - -logger = logging_hdlr.init_logger(__name__, level="DEBUG") - -read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - "delimiter", - "quotechar", - "escapechar", -] - - -def make_copy(OParser): - """Make a copy of a pandas TextParser object.""" - try: - f = OParser.handles.handle - NewRef = StringIO(f.getvalue()) - read_dict = {x: OParser.orig_options.get(x) for x in read_params} - NParser = pd.read_csv(NewRef, **read_dict) - return NParser - except Exception: - logger.error("Failed to copy TextParser", exc_info=True) - return - - -def restore(Parser): - """Restore pandas TextParser object.""" - try: - f = Parser.handles.handle - f.seek(0) - read_dict = {x: Parser.orig_options.get(x) for x in read_params} - Parser = pd.read_csv(f, **read_dict) - return Parser - except Exception: - logger.error("Failed to restore TextParser", exc_info=True) - return Parser - - -def is_not_empty(Parser): - """Return boolean whether pandas TextParser object is empty.""" - try: - Parser_copy = make_copy(Parser) - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", exc_info=True - ) - return - try: - first_chunk = Parser_copy.get_chunk() - Parser_copy.close() - if len(first_chunk) > 0: - logger.debug("Is not empty") - return True - else: - return False - except Exception: - logger.debug("Something went wrong", exc_info=True) - return False - - -def get_length(Parser): - """Get length of pandas TextParser object.""" - try: - Parser_copy = make_copy(Parser) - except Exception: - logger.error( - f"Failed to process input. Input type is {type(Parser)}", exc_info=True - ) - return - no_records = 0 - for df in Parser_copy: - no_records += len(df) - return no_records diff --git a/cdm_reader_mapper/common/select.py b/cdm_reader_mapper/common/select.py index 93c1bf5b..f6970f98 100755 --- a/cdm_reader_mapper/common/select.py +++ b/cdm_reader_mapper/common/select.py @@ -8,21 +8,6 @@ """ from __future__ import annotations -from io import StringIO - -import pandas as pd - -from cdm_reader_mapper.common import pandas_TextParser_hdlr - -# Need to define a general thing for the parser() functions, like we did with -# the dataframe_apply_index(), because they are all the same but for the -# selection applied!!!!! - -# The index of the resulting dataframe(s) is reinitialized here, it does not -# inherit from parent df -# -# data is a dataframe or a TextFileReader - def dataframe_apply_index( df, @@ -67,59 +52,7 @@ def dataframe( idx_out_offset=idx_out_offset, ) - def parser(data_parser, mask_parser, out_rejected=False, in_index=False): - mask_cp = pandas_TextParser_hdlr.make_copy(mask_parser) - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - read_dict = {x: data_parser.orig_options.get(x) for x in read_params} - in_buffer = StringIO() - if out_rejected: - out_buffer = StringIO() - index = [] - idx_in_offset = 0 - idx_out_offset = 0 - for df, mask_df in zip(data_parser, mask_cp): - o = dataframe( - df, - mask_df, - out_rejected=out_rejected, - in_index=in_index, - idx_in_offset=idx_in_offset, - idx_out_offset=idx_out_offset, - ) - o[0].to_csv(in_buffer, header=False, index=False, mode="a") - if out_rejected: - o[1].to_csv(out_buffer, header=False, index=False, mode="a") - idx_out_offset += len(o[1]) - if in_index and not out_rejected: - index.extend(o[1]) - if in_index and out_rejected: - index.extend(o[2]) - idx_in_offset += len(o[0]) - - mask_cp.close() - in_buffer.seek(0) - output = [pd.read_csv(in_buffer, **read_dict)] - if out_rejected: - out_buffer.seek(0) - output.append(pd.read_csv(out_buffer, **read_dict)) - if in_index: - output.append(index) - - return output - - if not isinstance(data, pd.io.parsers.TextFileReader): - output = dataframe(data, mask, out_rejected=out_rejected, in_index=in_index) - else: - output = parser(data, mask, out_rejected=out_rejected, in_index=in_index) - - return output + return dataframe(data, mask, out_rejected=out_rejected, in_index=in_index) def select_from_list(data, selection, out_rejected=False, in_index=False): @@ -147,62 +80,9 @@ def dataframe( idx_out_offset=idx_out_offset, ) - def parser(data_parser, col, values, out_rejected=False, in_index=False): - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - read_dict = {x: data_parser.orig_options.get(x) for x in read_params} - in_buffer = StringIO() - if out_rejected: - out_buffer = StringIO() - index = [] - idx_in_offset = 0 - idx_out_offset = 0 - for df in data_parser: - o = dataframe( - df, - col, - values, - out_rejected=out_rejected, - in_index=in_index, - idx_in_offset=idx_in_offset, - idx_out_offset=idx_out_offset, - ) - o[0].to_csv(in_buffer, header=False, index=False, mode="a") - if out_rejected: - o[1].to_csv(out_buffer, header=False, index=False, mode="a") - idx_out_offset += len(o[1]) - if in_index and not out_rejected: - index.extend(o[1]) - if in_index and out_rejected: - index.extend(o[2]) - idx_in_offset += len(o[0]) - - in_buffer.seek(0) - output = [pd.read_csv(in_buffer, **read_dict)] - if out_rejected: - out_buffer.seek(0) - output.append(pd.read_csv(out_buffer, **read_dict)) - if in_index: - output.append(index) - - return output - col = list(selection.keys())[0] values = list(selection.values())[0] - if not isinstance(data, pd.io.parsers.TextFileReader): - output = dataframe( - data, col, values, out_rejected=out_rejected, in_index=in_index - ) - else: - output = parser(data, col, values, out_rejected=out_rejected, in_index=in_index) - - return output + return dataframe(data, col, values, out_rejected=out_rejected, in_index=in_index) def select_from_index(data, index, out_rejected=False): @@ -218,36 +98,4 @@ def dataframe(df, index, out_rejected=False, idx_in_offset=0, idx_out_offset=0): idx_out_offset=idx_out_offset, ) - def parser(data_parser, index, out_rejected=False): - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - read_dict = {x: data_parser.orig_options.get(x) for x in read_params} - in_buffer = StringIO() - if out_rejected: - out_buffer = StringIO() - - for df in data_parser: - o = dataframe(df, index, out_rejected=out_rejected) - o[0].to_csv(in_buffer, header=False, index=False, mode="a") - if out_rejected: - o[1].to_csv(out_buffer, header=False, index=False, mode="a") - - in_buffer.seek(0) - output = [pd.read_csv(in_buffer, **read_dict)] - if out_rejected: - out_buffer.seek(0) - output.append(pd.read_csv(out_buffer, **read_dict)) - return output - - if not isinstance(data, pd.io.parsers.TextFileReader): - output = dataframe(data, index, out_rejected=out_rejected) - else: - output = parser(data, index, out_rejected=out_rejected) - - return output + return dataframe(data, index, out_rejected=out_rejected) diff --git a/cdm_reader_mapper/mdf_reader/reader.py b/cdm_reader_mapper/mdf_reader/reader.py index 9a0ef14e..f35e7530 100755 --- a/cdm_reader_mapper/mdf_reader/reader.py +++ b/cdm_reader_mapper/mdf_reader/reader.py @@ -3,20 +3,17 @@ from __future__ import annotations import ast -import csv import logging import os -from io import StringIO as StringIO import pandas as pd from cdm_reader_mapper.common.json_dict import open_json_file -from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy from cdm_reader_mapper.core.databundle import DataBundle from . import properties from .utils.filereader import FileReader -from .utils.utilities import adjust_dtype, remove_boolean_values, validate_arg +from .utils.utilities import adjust_dtypes, remove_boolean_values, validate_arg from .utils.validators import validate @@ -25,14 +22,12 @@ class MDFFileReader(FileReader): Attributes ---------- - data : pd.DataFrame or pd.io.parsers.TextFileReader - a pandas.DataFrame or pandas.io.parsers.TextFileReader - with the output data - mask : pd.DataFrame or pd.io.parsers.TextFileReader - a pandas.DataFrame or pandas.io.parsers.TextFileReader - with the output data validation mask + data : pd.DataFrame + a pd.DataFrame with the output data attrs : dict a dictionary with the output data elements attributes + mask : pd.DataFrame + a pd.DataFrame with the output data validation mask """ def __init__(self, *args, **kwargs): @@ -107,7 +102,6 @@ def convert_and_decode_entries( if decoder_dict is None: decoder_dict = self.configurations["convert_decode"]["decoder_dict"] if not (convert and decode): - self.dtypes = "object" return data if convert is not True: converter_dict = {} @@ -115,121 +109,30 @@ def convert_and_decode_entries( if decode is not True: decoder_dict = {} - if isinstance(data, pd.DataFrame): - data = self._convert_and_decode( - data, - converter_dict, - converter_kwargs, - decoder_dict, - ) - else: - data_buffer = StringIO() - TextParser = make_copy(data) - for i, df_ in enumerate(TextParser): - df = self._convert_and_decode( - df_, - converter_dict, - converter_kwargs, - decoder_dict, - ) - df.to_csv( - data_buffer, - header=False, - mode="a", - encoding=self.encoding, - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - - data_buffer.seek(0) - data = pd.read_csv( - data_buffer, - names=df.columns, - chunksize=self.chunksize, - dtype=object, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - return data + return self._convert_and_decode( + data, + converter_dict, + converter_kwargs, + decoder_dict, + ) def validate_entries(self, data, validate): """Validate data entries by using a pre-defined data model. Fill attribute `valid` with boolean mask. """ - if validate is not True: - mask = pd.DataFrame() - elif isinstance(data, pd.DataFrame): - mask = self._validate(data) - else: - data_buffer = StringIO() - TextParser_ = make_copy(data) - for i, df_ in enumerate(TextParser_): - mask_ = self._validate(df_) - mask_.to_csv( - data_buffer, - header=False, - mode="a", - encoding=self.encoding, - index=False, - ) - data_buffer.seek(0) - mask = pd.read_csv( - data_buffer, - names=df_.columns, - chunksize=self.chunksize, - ) - return mask + if validate is True: + return self._validate(data) + return pd.DataFrame(columns=data.columns) def remove_boolean_values(self, data): - """DOCUMENTATION""" - if isinstance(data, pd.DataFrame): - data = data.map(remove_boolean_values) - dtype = adjust_dtype(self.dtypes, data) - return data.astype(dtype) - else: - data_buffer = StringIO() - TextParser = make_copy(data) - for i, df_ in enumerate(TextParser): - df = df_.map(remove_boolean_values) - dtype = adjust_dtype(self.dtypes, df) - date_columns = [] - df.to_csv( - data_buffer, - header=False, - mode="a", - encoding=self.encoding, - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - date_columns = [] - for i, element in enumerate(list(dtype)): - if dtype.get(element) == "datetime": - date_columns.append(i) - dtype = adjust_dtype(dtype, df) - data_buffer.seek(0) - data = pd.read_csv( - data_buffer, - names=df.columns, - chunksize=self.chunksize, - dtype=dtype, - parse_dates=date_columns, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - return data + """DOCUMENTATION.""" + data = data.map(remove_boolean_values) + dtypes = adjust_dtypes(self.dtypes, self.columns) + return data.astype(dtypes, errors="ignore") def read( self, - chunksize=None, sections=None, skiprows=0, convert=True, @@ -243,8 +146,6 @@ def read( Parameters ---------- - chunksize : int, optional - Number of reports per chunk. sections : list, optional List with subset of data model sections to output, optional If None read pre-defined data model sections. @@ -266,12 +167,9 @@ def read( # 0. VALIDATE INPUT if not validate_arg("sections", sections, list): return - if not validate_arg("chunksize", chunksize, int): - return if not validate_arg("skiprows", skiprows, int): return - self.chunksize = chunksize self.skiprows = skiprows # 2. READ AND VALIDATE DATA @@ -284,7 +182,7 @@ def read( sections = read_sections_list # 2.2 Homogenize input data to an iterable with dataframes: - # a list with a single dataframe or a pd.io.parsers.TextFileReader + # a list with a single dataframe logging.info("Getting data string from source...") self.configurations = self.get_configurations(read_sections_list, sections) data = self.open_data( @@ -292,7 +190,6 @@ def read( sections, # INFO: Set default as "pandas" to account for custom schema open_with=properties.open_file.get(self.imodel, "pandas"), - chunksize=chunksize, ) # 2.3. Extract, read and validate data in same loop diff --git a/cdm_reader_mapper/mdf_reader/utils/converters.py b/cdm_reader_mapper/mdf_reader/utils/converters.py index 4c14aa5e..c35c4d1d 100755 --- a/cdm_reader_mapper/mdf_reader/utils/converters.py +++ b/cdm_reader_mapper/mdf_reader/utils/converters.py @@ -5,7 +5,6 @@ import pandas as pd from .. import properties -from .utilities import convert_str_boolean class df_converters: @@ -20,7 +19,6 @@ def to_numeric(self, data, offset, scale): """Convert object type elements of a pandas series to numeric type.""" def _to_numeric(x): - x = convert_str_boolean(x) if isinstance(x, bool): return x if isinstance(x, str): @@ -61,9 +59,9 @@ def object_to_numeric(self, data, scale=None, offset=None): Data series of type self.dtype """ - scale = scale if scale else self.numeric_scale - offset = offset if offset else self.numeric_offset if data.dtype == "object": + scale = scale if scale else self.numeric_scale + offset = offset if offset else self.numeric_offset data = self.to_numeric(data, offset, scale) return data diff --git a/cdm_reader_mapper/mdf_reader/utils/decoders.py b/cdm_reader_mapper/mdf_reader/utils/decoders.py index e1d75ba1..e2a591ac 100755 --- a/cdm_reader_mapper/mdf_reader/utils/decoders.py +++ b/cdm_reader_mapper/mdf_reader/utils/decoders.py @@ -3,7 +3,6 @@ from __future__ import annotations from .. import properties -from .utilities import convert_str_boolean class df_decoders: @@ -17,7 +16,6 @@ def base36(self, data): """DOCUMENTATION.""" def _base36(x): - x = convert_str_boolean(x) if isinstance(x, bool): return x return str(int(str(x), 36)) diff --git a/cdm_reader_mapper/mdf_reader/utils/filereader.py b/cdm_reader_mapper/mdf_reader/utils/filereader.py index 2ab03403..618da54d 100755 --- a/cdm_reader_mapper/mdf_reader/utils/filereader.py +++ b/cdm_reader_mapper/mdf_reader/utils/filereader.py @@ -2,11 +2,9 @@ from __future__ import annotations -import csv import logging import os from copy import deepcopy -from io import StringIO import pandas as pd import xarray as xr @@ -156,7 +154,6 @@ def open_data( self, order, valid, - chunksize, open_with="pandas", ): """DOCUMENTATION.""" @@ -168,37 +165,8 @@ def open_data( encoding=encoding, widths=[properties.MAX_FULL_REPORT_WIDTH], skiprows=self.skiprows, - chunksize=chunksize, ) else: raise ValueError("open_with has to be one of ['pandas', 'netcdf']") - if isinstance(TextParser, pd.DataFrame) or isinstance(TextParser, xr.Dataset): - return self._read_sections(TextParser, order, valid, open_with=open_with) - else: - data_buffer = StringIO() - for i, df_ in enumerate(TextParser): - df = self._read_sections(df_, order, valid, open_with=open_with) - df.to_csv( - data_buffer, - header=False, - mode="a", - encoding=encoding, - index=False, - quoting=csv.QUOTE_NONE, - sep=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - data_buffer.seek(0) - data = pd.read_csv( - data_buffer, - names=df.columns, - chunksize=self.chunksize, - dtype=object, - parse_dates=self.parse_dates, - delimiter=properties.internal_delimiter, - quotechar="\0", - escapechar="\0", - ) - return data + return self._read_sections(TextParser, order, valid, open_with=open_with) diff --git a/cdm_reader_mapper/mdf_reader/utils/utilities.py b/cdm_reader_mapper/mdf_reader/utils/utilities.py index cc455959..bd9c9824 100755 --- a/cdm_reader_mapper/mdf_reader/utils/utilities.py +++ b/cdm_reader_mapper/mdf_reader/utils/utilities.py @@ -62,27 +62,17 @@ def validate_path(arg_name, arg_value): return True -def adjust_dtype(dtype, df): - """Adjust dtypes to DataFrame.""" - if not isinstance(dtype, dict): - return dtype - return {k: v for k, v in dtype.items() if k in df.columns} - - -def convert_str_boolean(x): - """Convert str boolean value to boolean value.""" - if x == "True": - x = True - if x == "False": - x = False - return x - - def remove_boolean_values(x): """Remove boolean values.""" - x = convert_str_boolean(x) if x is True: return if x is False: return return x + + +def adjust_dtypes(dtypes, columns): + """DOCUMENTATION.""" + if not isinstance(dtypes, dict): + return dtypes + return {k: v for k, v in dtypes.items() if k in columns} diff --git a/cdm_reader_mapper/mdf_reader/utils/validators.py b/cdm_reader_mapper/mdf_reader/utils/validators.py index 8787dfbb..a708ec24 100755 --- a/cdm_reader_mapper/mdf_reader/utils/validators.py +++ b/cdm_reader_mapper/mdf_reader/utils/validators.py @@ -10,7 +10,6 @@ from .. import properties from ..codes import codes from ..schemas import schemas -from .utilities import convert_str_boolean def validate_datetime(elements, data): @@ -34,7 +33,6 @@ def validate_numeric(elements, data, schema): def _to_numeric(x): if x is None: return np.nan - x = convert_str_boolean(x) if isinstance(x, bool): return x return float(x) @@ -87,11 +85,9 @@ def validate_codes(elements, data, schema, imodel, ext_table_path): if not table: continue - dtype = properties.pandas_dtypes.get(schema.get(element).get("column_type")) - table_keys = list(table.keys()) validation_df = data[element] - value = validation_df.astype(dtype).astype("str") + value = validation_df.astype(str) valid = validation_df.notna() mask_ = value.isin(table_keys) mask[element] = mask_.where(valid, True) @@ -118,7 +114,6 @@ def _element_tuples(numeric_elements, datetime_elements, coded_elements): def _mask_boolean(x, boolean): - x = convert_str_boolean(x) if x is boolean: return True return False @@ -159,8 +154,7 @@ def validate( filename=None, ) # Check input - if not isinstance(data, pd.DataFrame): # or not isinstance(mask0, pd.DataFrame): - # logging.error("Input data and mask must be a pandas data frame object") + if not isinstance(data, pd.DataFrame): logging.error("input data must be a pandas DataFrame.") return diff --git a/cdm_reader_mapper/mdf_reader/writer.py b/cdm_reader_mapper/mdf_reader/writer.py index 68fec9db..cb07790e 100755 --- a/cdm_reader_mapper/mdf_reader/writer.py +++ b/cdm_reader_mapper/mdf_reader/writer.py @@ -10,7 +10,6 @@ import pandas as pd from cdm_reader_mapper.common import get_filename -from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy def _update_dtypes(dtypes, columns): @@ -107,19 +106,9 @@ def _join(col): return ":".join(col) return col - if not isinstance(data, pd.io.parsers.TextFileReader): - data = [data] - else: - data = make_copy(data) - if mask is None: mask = pd.DataFrame() - if not isinstance(mask, pd.io.parsers.TextFileReader): - mask = [mask] - else: - mask = make_copy(mask) - info = {} info["dtypes"] = dtypes info["parse_dates"] = [_join(parse_date) for parse_date in parse_dates] @@ -134,35 +123,32 @@ def _join(col): filename_info = get_filename( [prefix, "info", suffix], path=out_dir, extension="json" ) - for i, (data_df, mask_df) in enumerate(zip(data, mask)): - if col_subset is not None: - data_df = data_df[col_subset] - mask_df = mask_df[col_subset] - header = False - mode = "a" - if i == 0: - mode = "w" - header = [] - info["dtypes"] = _update_dtypes(info["dtypes"], data_df.columns) - for col in data_df.columns: - col_ = _join(col) - header.append(col_) - info["dtypes"] = _update_col_names(info["dtypes"], col, col_) - - info["parse_dates"] = [ - parse_date for parse_date in info["parse_dates"] if parse_date in header - ] - - kwargs = { - "header": header, - "mode": mode, - "encoding": encoding, - "index": False, - "sep": delimiter, - } - data_df.to_csv(os.path.join(out_dir, filename_data), **kwargs) - if not mask_df.empty: - mask_df.to_csv(os.path.join(out_dir, filename_mask), **kwargs) + + if col_subset is not None: + data = data[col_subset] + mask = mask[col_subset] + + header = [] + info["dtypes"] = _update_dtypes(info["dtypes"], data.columns) + for col in data.columns: + col_ = _join(col) + header.append(col_) + info["dtypes"] = _update_col_names(info["dtypes"], col, col_) + + info["parse_dates"] = [ + parse_date for parse_date in info["parse_dates"] if parse_date in header + ] + + kwargs = { + "header": header, + "mode": "w", + "encoding": encoding, + "index": False, + "sep": delimiter, + } + data.to_csv(os.path.join(out_dir, filename_data), **kwargs) + if not mask.empty: + mask.to_csv(os.path.join(out_dir, filename_mask), **kwargs) if info: with open(os.path.join(out_dir, filename_info), "w") as fileObj: diff --git a/cdm_reader_mapper/metmetpy/correct.py b/cdm_reader_mapper/metmetpy/correct.py index d0c1aaca..afdb014a 100755 --- a/cdm_reader_mapper/metmetpy/correct.py +++ b/cdm_reader_mapper/metmetpy/correct.py @@ -59,11 +59,7 @@ from __future__ import annotations -from io import StringIO - -import pandas as pd - -from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr +from cdm_reader_mapper.common import logging_hdlr from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts from . import properties @@ -170,25 +166,7 @@ def correct_datetime(data, imodel, log_level="INFO", _base=_base): correction_method = combine_dicts(replacements_method_files, base=_base) - if isinstance(data, pd.DataFrame): - return _correct_dt(data, imodel, dck, correction_method, log_level="INFO") - elif isinstance(data, pd.io.parsers.TextFileReader): - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - read_dict = {x: data.orig_options.get(x) for x in read_params} - buffer = StringIO() - data_ = pandas_TextParser_hdlr.make_copy(data) - for df in data_: - df = _correct_dt(df, imodel, dck, correction_method, log_level="INFO") - df.to_csv(buffer, header=False, index=False, mode="a") - buffer.seek(0) - return pd.read_csv(buffer, **read_dict) + return _correct_dt(data, imodel, dck, correction_method, log_level="INFO") def correct_pt(data, imodel, log_level="INFO", _base=_base): @@ -235,22 +213,4 @@ def correct_pt(data, imodel, log_level="INFO", _base=_base): ) return data - if isinstance(data, pd.DataFrame): - return _correct_pt(data, imodel, dck, pt_col, fix_methods, log_level="INFO") - elif isinstance(data, pd.io.parsers.TextFileReader): - read_params = [ - "chunksize", - "names", - "dtype", - "parse_dates", - "date_parser", - "infer_datetime_format", - ] - read_dict = {x: data.orig_options.get(x) for x in read_params} - buffer = StringIO() - for df in data: - df = _correct_pt(df, imodel, dck, pt_col, fix_methods, log_level="INFO") - df.to_csv(buffer, header=False, index=False, mode="a") - - buffer.seek(0) - return pd.read_csv(buffer, **read_dict) + return _correct_pt(data, imodel, dck, pt_col, fix_methods, log_level="INFO") diff --git a/cdm_reader_mapper/metmetpy/validate.py b/cdm_reader_mapper/metmetpy/validate.py index cc2b3d97..63c6e701 100755 --- a/cdm_reader_mapper/metmetpy/validate.py +++ b/cdm_reader_mapper/metmetpy/validate.py @@ -60,7 +60,7 @@ import pandas as pd -from cdm_reader_mapper.common import logging_hdlr, pandas_TextParser_hdlr +from cdm_reader_mapper.common import logging_hdlr from cdm_reader_mapper.common.json_dict import collect_json_files, combine_dicts from . import properties @@ -126,10 +126,8 @@ def validate_id(data, imodel, blank=False, log_level="INFO"): return dck = mrd[2] - if isinstance(data, pd.io.parsers.TextFileReader): - data = pandas_TextParser_hdlr.make_copy(data).read() - id_col = _get_id_col(data, mrd[0], logger) + if id_col is None: return @@ -162,9 +160,7 @@ def validate_datetime(data, imodel, log_level="INFO"): logger = logging_hdlr.init_logger(__name__, level=log_level) model = imodel.split("_")[0] - if isinstance(data, pd.io.parsers.TextFileReader): - data = pandas_TextParser_hdlr.make_copy(data).read() - elif not isinstance(data, (pd.DataFrame, pd.Series)): + if not isinstance(data, (pd.DataFrame, pd.Series)): logger.error( f"Input data must be a pd.DataFrame or pd.Series.\ Input data type is {type(data)}" diff --git a/tests/test_operations.py b/tests/test_operations.py index 365c64c4..4b44183d 100755 --- a/tests/test_operations.py +++ b/tests/test_operations.py @@ -1,10 +1,10 @@ from __future__ import annotations import pandas as pd -import pytest +import pytest # noqa from cdm_reader_mapper import read, test_data -from cdm_reader_mapper.common.pandas_TextParser_hdlr import make_copy +from cdm_reader_mapper.common import replace_columns from ._results import cdm_header, correction_df @@ -15,63 +15,45 @@ def _read_data(**kwargs): return read(**kwargs) -def _get_data(TextParser, **kwargs): - if TextParser is True: - kwargs["chunksize"] = 10000 +def _get_data(**kwargs): return _read_data(**data_dict, imodel="icoads_r300_d721", **kwargs) -@pytest.mark.parametrize("TextParser", [True, False]) -def test_select_true(TextParser): - data = _get_data(TextParser, sections=["c99_data"]) +def test_select_true(): + data = _get_data(sections=["c99_data"]) result = data.select_true(out_rejected=True) expected = data.data selected = result.data - if TextParser is True: - expected = make_copy(expected).read() - selected = make_copy(selected).read() - expected = expected[:5].reset_index(drop=True) pd.testing.assert_frame_equal(expected, selected) -@pytest.mark.parametrize("TextParser", [False, True]) -def test_select_from_index(TextParser): - data = _get_data(TextParser) +def test_select_from_index(): + data = _get_data() result = data.select_from_index([0, 2, 4]) expected = data.data selected = result.data - if TextParser is True: - expected = make_copy(expected).read() - selected = make_copy(selected).read() - idx = expected.index.isin([0, 2, 4]) expected = expected[idx].reset_index(drop=True) pd.testing.assert_frame_equal(expected, selected) -@pytest.mark.parametrize("TextParser", [True, False]) -def test_select_from_list(TextParser): - data = _get_data(TextParser) +def test_select_from_list(): + data = _get_data() selection = {("c1", "B1"): [26, 41]} result = data.select_from_list(selection, out_rejected=True, in_index=True) expected = data.data selected = result.data - if TextParser is True: - expected = make_copy(expected).read() - selected = make_copy(selected).read() - idx = expected.index.isin([1, 3]) expected = expected[idx].reset_index(drop=True) pd.testing.assert_frame_equal(expected, selected) -@pytest.mark.parametrize("TextParser", [True, False]) -def test_inspect_count_by_cat(TextParser): - data = _get_data(TextParser) +def test_inspect_count_by_cat(): + data = _get_data() result = data.unique(columns=("c1", "B1")) assert result == {("c1", "B1"): {19: 1, 26: 1, 27: 1, 41: 1, 91: 1}} diff --git a/tests/test_workflow.py b/tests/test_workflow.py index b0582b47..3f347d22 100755 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -56,16 +56,6 @@ None, {}, ), - ("icoads_r300_d714", None, None, True, None, {"chunksize": 3}), - ( - "icoads_r300_d714", - None, - None, - False, - None, - {"sections": ["c99"], "chunksize": 3}, - ), - ("icoads_r300_d721", None, None, True, None, {"chunksize": 3}), ( "icoads_r300_d702", None,