diff --git a/README.md b/README.md index a60d89bb29..fcb7e7df10 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ [![Flake8](https://img.shields.io/badge/Flake8-passed-brightgreen)](https://github.com/RuminantFarmSystems/MASM/actions/workflows/combined_format_lint_test_mypy.yml) [![Pytest](https://img.shields.io/badge/Pytest-passed-brightgreen)](https://github.com/RuminantFarmSystems/MASM/actions/workflows/combined_format_lint_test_mypy.yml) [![Coverage](https://img.shields.io/badge/Coverage-99%25-brightgreen)](https://github.com/RuminantFarmSystems/MASM/actions/workflows/combined_format_lint_test_mypy.yml) -[![Mypy](https://img.shields.io/badge/Mypy-1215%20errors-red)](https://github.com/RuminantFarmSystems/MASM/actions/workflows/combined_format_lint_test_mypy.yml) +[![Mypy](https://img.shields.io/badge/Mypy-1212%20errors-red)](https://github.com/RuminantFarmSystems/MASM/actions/workflows/combined_format_lint_test_mypy.yml) # RuFaS: Ruminant Farm Systems diff --git a/RUFAS/EEE/energy.py b/RUFAS/EEE/energy.py index d1ce7d18ba..db1a850c39 100644 --- a/RUFAS/EEE/energy.py +++ b/RUFAS/EEE/energy.py @@ -12,6 +12,115 @@ from RUFAS.EEE.tractor import Tractor from RUFAS.EEE.tractor_implement import TractorImplement +EEE_TO_OM_KEY_MAPPING = { + FieldOperationEvent.PLANTING: { + "crop_type": "crop", + "clay_percent": "average_clay_percent", + "field_production_size": "field_size", + "operation_year": "year", + "operation_day": "day", + "field_name": "field_name", + }, + FieldOperationEvent.HARVEST: { + "crop_type": "crop", + "crop_yield": "dry_yield", + "field_production_size": "field_size", + "operation_year": "harvest_year", + "operation_day": "harvest_day", + "field_name": "field_name", + "harvest_type": "harvest_type", + }, + FieldOperationEvent.MANURE_APPLICATION: { + "mass": "dry_matter_mass", + "dry_matter_fraction": "dry_matter_fraction", + "application_depth": "application_depth", + "field_production_size": "field_size", + "clay_percent": "average_clay_percent", + "operation_year": "year", + "operation_day": "day", + "field_name": "field_name", + }, + FieldOperationEvent.TILLING: { + "application_depth": "tillage_depth", + "tillage_implement": "implement", + "field_production_size": "field_size", + "clay_percent": "average_clay_percent", + "operation_year": "year", + "operation_day": "day", + "field_name": "field_name", + }, + FieldOperationEvent.FERTILIZER_APPLICATION: { + "mass": "mass", + "application_depth": "application_depth", + "field_production_size": "field_size", + "clay_percent": "average_clay_percent", + "operation_year": "year", + "operation_day": "day", + "field_name": "field_name", + }, +} + +CROP_AND_SOIL_FILTERS: list[dict[str, Any]] = [ + { + "name": FieldOperationEvent.FERTILIZER_APPLICATION, + "filters": ["Field._record_fertilizer_application.fertilizer_application.field='.*'"], + "variables": [ + "mass", + "application_depth", + "field_size", + "average_clay_percent", + "year", + "day", + "field_name", + ], + }, + { + "name": FieldOperationEvent.TILLING, + "filters": ["TillageApplication._record_tillage.tillage_record.field='.*'"], + "variables": [ + "tillage_depth", + "implement", + "field_size", + "average_clay_percent", + "year", + "day", + "field_name", + ], + }, + { + "name": FieldOperationEvent.MANURE_APPLICATION, + "filters": ["Field._record_manure_application.manure_application.field='.*'"], + "variables": [ + "dry_matter_mass", + "dry_matter_fraction", + "application_depth", + "field_size", + "average_clay_percent", + "year", + "day", + "field_name", + ], + }, + { + "name": FieldOperationEvent.HARVEST, + "filters": ["CropManagement._record_yield.harvest_yield.field='.*'"], + "variables": [ + "dry_yield", + "crop", + "field_size", + "harvest_year", + "harvest_day", + "field_name", + "harvest_type", + ], + }, + { + "name": FieldOperationEvent.PLANTING, + "filters": ["Field._record_planting.crop_planting.field='.*'"], + "variables": ["crop", "field_size", "average_clay_percent", "year", "day", "field_name"], + }, +] + im = InputManager() om = OutputManager() @@ -29,7 +138,7 @@ def estimate_all() -> None: } estimator = EnergyEstimator() diesel_consumption_data_list = estimator.parse_inputs_for_diesel_consumption_calculation() - total_diesel_consumption_tractor_implement_liter_per_ha = 0 + total_diesel_consumption_tractor_implement_liter_per_ha: float = 0.0 herd_size = im.get_data("animal.herd_information.herd_num") for diesel_consumption_data_item in diesel_consumption_data_list: harvest_type: HarvestOperation | None = None @@ -41,7 +150,7 @@ def estimate_all() -> None: herd_size=herd_size, application_depth=diesel_consumption_data_item.get("application_depth"), tillage_implement=diesel_consumption_data_item.get("tillage_implement"), - harvest_type=harvest_type + harvest_type=harvest_type, ) diesel_consumption_tractor_implement_liter_per_ha = estimator.calculate_diesel_consumption( @@ -88,7 +197,7 @@ def report_diesel_consumption( """ base_info_map = { "class": EnergyEstimator.__name__, - "function": EnergyEstimator.estimate_all.__name__, + "function": EnergyEstimator.report_diesel_consumption.__name__, } operation_event: FieldOperationEvent = diesel_consumption_data["operation_event"] operation_event_str: str = ( @@ -133,23 +242,18 @@ def report_diesel_consumption( diesel_consumption_data.get("application_depth"), {**base_info_map, **{"units": MeasurementUnits.CENTIMETERS}}, ) - if operation_event in [ - FieldOperationEvent.MANURE_APPLICATION, - FieldOperationEvent.FERTILIZER_APPLICATION - ]: + if operation_event in [FieldOperationEvent.MANURE_APPLICATION, FieldOperationEvent.FERTILIZER_APPLICATION]: om.add_variable( f"application_mass_{suffix}", diesel_consumption_data.get("mass"), {**base_info_map, **{"units": MeasurementUnits.KILOGRAMS_PER_HECTARE}}, ) if operation_event == FieldOperationEvent.TILLING: + tillage_implement_enum = diesel_consumption_data.get("tillage_implement") + tillage_implement = tillage_implement_enum.value if tillage_implement_enum is not None else None om.add_variable( f"tillage_implement_for_{suffix}", - ( - diesel_consumption_data.get("tillage_implement").value - if diesel_consumption_data.get("tillage_implement") - else diesel_consumption_data.get("tillage_implement") - ), + tillage_implement, {**base_info_map, **{"units": MeasurementUnits.UNITLESS}}, ) om.add_variable( @@ -162,132 +266,57 @@ def parse_inputs_for_diesel_consumption_calculation(self) -> list[dict[str, Any] """ Parses the OutputManager variables pool for diesel consumption calculation. """ - crop_and_soil_filters = [ - { - "name": FieldOperationEvent.FERTILIZER_APPLICATION, - "use_name": True, - "filters": ["Field._record_fertilizer_application.fertilizer_application.field='.*'"], - "variables": [ - "mass", - "application_depth", - "field_size", - "average_clay_percent", - "year", - "day", - "field_name", - ], - }, - { - "name": FieldOperationEvent.TILLING, - "use_name": True, - "filters": ["TillageApplication._record_tillage.tillage_record.field='.*'"], - "variables": [ - "tillage_depth", - "implement", - "field_size", - "average_clay_percent", - "year", - "day", - "field_name", - ], - }, - { - "name": FieldOperationEvent.MANURE_APPLICATION, - "use_name": True, - "filters": ["Field._record_manure_application.manure_application.field='.*'"], - "variables": [ - "dry_matter_mass", - "dry_matter_fraction", - "application_depth", - "field_size", - "average_clay_percent", - "year", - "day", - "field_name", - ], - }, - { - "name": FieldOperationEvent.HARVEST, - "use_name": True, - "filters": ["CropManagement._record_yield.harvest_yield.field='.*'"], - "variables": [ - "dry_yield", "crop", "field_size", "harvest_year", "harvest_day", "field_name", "harvest_type" - ], - }, - { - "name": FieldOperationEvent.PLANTING, - "use_name": True, - "filters": ["Field._record_planting.crop_planting.field='.*'"], - "variables": ["crop", "field_size", "average_clay_percent", "year", "day", "field_name"], - }, - ] result: list[dict[str, Any]] = [] - eee_to_om_key_mapping = { - FieldOperationEvent.PLANTING: { - "crop_type": "crop", - "clay_percent": "average_clay_percent", - "field_production_size": "field_size", - "operation_year": "year", - "operation_day": "day", - "field_name": "field_name", - }, - FieldOperationEvent.HARVEST: { - "crop_type": "crop", - "crop_yield": "dry_yield", - "field_production_size": "field_size", - "operation_year": "harvest_year", - "operation_day": "harvest_day", - "field_name": "field_name", - "harvest_type": "harvest_type" - }, - FieldOperationEvent.MANURE_APPLICATION: { - "mass": "dry_matter_mass", - "dry_matter_fraction": "dry_matter_fraction", - "application_depth": "application_depth", - "field_production_size": "field_size", - "clay_percent": "average_clay_percent", - "operation_year": "year", - "operation_day": "day", - "field_name": "field_name", - }, - FieldOperationEvent.TILLING: { - "application_depth": "tillage_depth", - "tillage_implement": "implement", - "field_production_size": "field_size", - "clay_percent": "average_clay_percent", - "operation_year": "year", - "operation_day": "day", - "field_name": "field_name", - }, - FieldOperationEvent.FERTILIZER_APPLICATION: { - "mass": "mass", - "application_depth": "application_depth", - "field_production_size": "field_size", - "clay_percent": "average_clay_percent", - "operation_year": "year", - "operation_day": "day", - "field_name": "field_name", - }, - } - for filter in crop_and_soil_filters: - filtered_pool = om.filter_variables_pool(filter) - max_index = Utility.find_max_index_from_keys(filtered_pool) - if max_index is None or max_index < 0: + + for filter_config in CROP_AND_SOIL_FILTERS: + filtered_pool = om.filter_variables_pool(filter_config) + if not filtered_pool: continue - first_key_in_pool = next(iter(filtered_pool.keys())) - for event_type, key_mappings in eee_to_om_key_mapping.items(): - if first_key_in_pool.startswith(event_type.value): - for index in range(max_index + 1): - key_prefix = f"{event_type}_{index}" - _, first_om_key_in_the_map = next(iter(key_mappings.items())) - length = len(filtered_pool[f"{key_prefix}.{first_om_key_in_the_map}"]["values"]) - for i in range(length): - event_data = { - eee_key: filtered_pool[f"{key_prefix}.{om_key_suffix}"]["values"][i] - for eee_key, om_key_suffix in key_mappings.items() - } - event_data["operation_event"] = event_type - result.append(event_data) + + event_type = filter_config["name"] + key_mappings = EEE_TO_OM_KEY_MAPPING[event_type] + required_suffixes = set(key_mappings.values()) + + group_prefixes = Utility.find_group_prefixes_from_keys( + data=filtered_pool, + required_suffixes=required_suffixes, + ) + if not group_prefixes: + continue + + first_required_suffix = next(iter(key_mappings.values())) + + for key_prefix in group_prefixes: + first_key = f"{key_prefix}.{first_required_suffix}" + if first_key not in filtered_pool: + continue + + values = filtered_pool[first_key].get("values", []) + length = len(values) + + for i in range(length): + event_data: dict[str, Any] = {} + + for eee_key, om_key_suffix in key_mappings.items(): + full_key = f"{key_prefix}.{om_key_suffix}" + if full_key not in filtered_pool: + raise KeyError( + f"Expected key '{full_key}' not found in filtered pool for " + f"event type '{event_type.value}'." + ) + + field_values = filtered_pool[full_key].get("values", []) + if i >= len(field_values): + raise IndexError( + f"Index {i} out of range for key '{full_key}' while building " + f"diesel consumption event data." + ) + + event_data[eee_key] = field_values[i] + + event_data["operation_event"] = event_type + result.append(event_data) + return result def calculate_diesel_consumption( @@ -297,7 +326,7 @@ def calculate_diesel_consumption( tractor: Tractor, clay_percent: float, application_mass: float | None = None, - application_dm_content: float | None = None + application_dm_content: float | None = None, ) -> float: """ General estimate of diesel fuel consumption for a given attachment type and tractor size. @@ -328,8 +357,9 @@ def calculate_diesel_consumption( for implement in tractor.implements: crop_yield_ton_ha = crop_yield * GeneralConstants.KILOGRAMS_TO_MEGAGRAMS if application_mass and application_dm_content: - application_mass_per_ha = (application_mass * GeneralConstants.KILOGRAMS_TO_MEGAGRAMS - / application_dm_content) / field_production_size + application_mass_per_ha = ( + application_mass * GeneralConstants.KILOGRAMS_TO_MEGAGRAMS / application_dm_content + ) / field_production_size else: application_mass_per_ha = None diff --git a/RUFAS/output_manager.py b/RUFAS/output_manager.py index ca6dc5871c..d79b1548af 100644 --- a/RUFAS/output_manager.py +++ b/RUFAS/output_manager.py @@ -1302,14 +1302,12 @@ def filter_variables_pool( A filtered variables pool based on either inclusion or exclusion. """ filter_name: str = filter_content.get("name", "NO NAME FOUND") - use_filter_name: bool = filter_content.get("use_name", False) filter_by_exclusion: bool = filter_content.get("filter_by_exclusion", False) info_map = { "class": self.__class__.__name__, "function": self.filter_variables_pool.__name__, "filter_name": filter_name, "filter_by_exclusion": filter_by_exclusion, - "use_filter_name": use_filter_name, } if filter_by_exclusion: filter_excl_msg = f"Performing filtering by exclusion per filter's contents. {filter_name=}" @@ -1334,7 +1332,6 @@ def filter_variables_pool( filtered_pool, selected_variables, filter_name, - use_filter_name, filter_by_exclusion, ) @@ -1368,7 +1365,6 @@ def _parse_filtered_variables( filtered_pool: dict[str, OutputManager.pool_element_type], selected_variables: list[str] | None, filter_name: str, - use_filter_name: bool, filter_by_exclusion: bool, ) -> dict[str, OutputManager.pool_element_type]: """ @@ -1382,8 +1378,6 @@ def _parse_filtered_variables( list of key names to select or exclude from variables containing dictionaries. filter_name : str Name of the filter used to collect variables for the filtered pool. - use_filter_name : bool - Whether to use the filter name when constructing the key name for data pulled from the filtered pool. filter_by_exclusion : bool Whether keys in dictionaries should be filtered by exclusion. @@ -1399,7 +1393,6 @@ def _parse_filtered_variables( "function": self._parse_filtered_variables.__name__, "filter_name": filter_name, "filter_by_exclusion": filter_by_exclusion, - "use_filter_name": use_filter_name, } results: dict[str, OutputManager.pool_element_type] = {} counter: int = 0 @@ -1410,8 +1403,7 @@ def _parse_filtered_variables( data: list[Any] = filtered_pool[key]["values"] is_data_in_dict: bool = all(isinstance(element, dict) for element in data) if selected_variables is None or not is_data_in_dict: - combined_key = f"{filter_name}_{counter}" if use_filter_name else key - results[combined_key] = ({"info_maps": info_maps} if info_maps else {}) | {"values": data} + results[key] = ({"info_maps": info_maps} if info_maps else {}) | {"values": data} self._variables_usage_counter.update([key]) elif is_data_in_dict: if not isinstance(selected_variables, list): @@ -1424,9 +1416,7 @@ def _parse_filtered_variables( temp_data = Utility.convert_list_of_dicts_to_dict_of_lists(data) filtered_data = Utility.filter_dictionary(temp_data, selected_variables, filter_by_exclusion) for filtered_key, filtered_value in filtered_data.items(): - combined_key = ( - f"{filter_name}_{counter}.{filtered_key}" if use_filter_name else f"{key}.{filtered_key}" - ) + combined_key = f"{key}.{filtered_key}" if combined_key in results.keys(): results[combined_key].get("info_maps", []).extend(info_maps) results[combined_key]["values"].extend(filtered_value) diff --git a/RUFAS/util.py b/RUFAS/util.py index e2c49ceece..f45edd677a 100644 --- a/RUFAS/util.py +++ b/RUFAS/util.py @@ -111,32 +111,49 @@ def flatten_keys_to_nested_structure(input_dict: Dict[str, Any]) -> Dict[str, An return nested_structure @staticmethod - def find_max_index_from_keys(data: dict[str, Any]) -> int | None: + def find_group_prefixes_from_keys( + data: dict[str, Any], + required_suffixes: set[str] | None = None, + ) -> list[str]: """ - Extracts and returns the maximum index (n) from the keys of the given dictionary. - Assumes keys follow the format `_.` and number >= 0. + Extracts unique group prefixes from flattened keys of the form: + + . + + For example: + Field._record_fertilizer_application.fertilizer_application.field='field_1'.mass + Field._record_fertilizer_application.fertilizer_application.field='field_1'.year + + would yield the group prefix: + Field._record_fertilizer_application.fertilizer_application.field='field_1' Parameters ---------- - data: Dict[str, Any] - The dictionary whose keys will be analyzed. + data : dict[str, Any] + Dictionary whose keys are flattened variable names. + required_suffixes : set[str] | None, default None + If provided, only prefixes that have at least one matching suffix from this set + will be included. Returns ------- - int | None - The maximum index found among the keys, or None if no numeric index is found. + list[str] + Sorted list of unique group prefixes. """ - pattern = re.compile(r"_([0-9]+)\.") - max_number = -1 + prefixes: set[str] = set() + + for key in data: + if "." not in key: + continue + + prefix, suffix = key.rsplit(".", 1) + + if required_suffixes is not None and suffix not in required_suffixes: + continue - for key in data.keys(): - match = pattern.search(key) - if match: - number = int(match.group(1)) - if number > max_number: - max_number = number + prefixes.add(prefix) - return max_number if max_number != -1 else None + return sorted(prefixes) @staticmethod def expand_data_temporally( diff --git a/changelog.md b/changelog.md index ca144ea1f0..5c2b8b24f1 100644 --- a/changelog.md +++ b/changelog.md @@ -47,8 +47,9 @@ v1.0.0 - [2852](https://github.com/RuminantFarmSystems/MASM/pull/2852) - [minor change] [NoInputChange] [NoOutputChange] Fix AssertionError on `dev`. - [2866](https://github.com/RuminantFarmSystems/MASM/pull/2866) - [minor change] [NoInputChange] [NoOutputChange] Clears all mypy errors in test_field_manager.py. - [2863](https://github.com/RuminantFarmSystems/MASM/pull/2863) - [minor change] [NoInputChange] [NoOutputChange] Updates TaskManager to avoid using multiprocessing when running single tasks. -- [2854](https://github.com/RuminantFarmSystems/MASM/pull/2854) - [minor change] [NoInputChange] [NoOutputChange] Update `emissions.py` filtering process and remove `use_filter_key_name` option in the OM filter. +- [2854](https://github.com/RuminantFarmSystems/MASM/pull/2854) - [minor change] [Emissions][OutputManager] [NoInputChange] [NoOutputChange] Update `emissions.py` filtering process and remove `use_filter_key_name` option in the OM filter. - [2872](https://github.com/RuminantFarmSystems/RuFaS/pull/2872) - [minor change] [NoInputChange] [NoOutputChange] Adds information and links for onboarding videos. +- [2876](https://github.com/RuminantFarmSystems/RuFaS/pull/2876) - [minor change] [DieselConsumption][OutputManager] [NoInputChange] [NoOutputChange] Removes `use_name` output filter option and updates `DieselConsumption` to filter properly without it. - [2850](https://github.com/RuminantFarmSystems/MASM/pull/2850) - [minor change] [NoInputChange] [NoOutputChange] Refactor `Pen.get_manure_stream()`. - [2869](https://github.com/RuminantFarmSystems/RuFaS/pull/2869) - [minor change] [InputChange] [NoOutputChange] Removes `properties` and `cross-validation` file path definitions from metadata JSON files. Properties file paths are now defined as a module-level constant in `input_manager.py`, and cross-validation file paths are moved to the task configuration JSON files. diff --git a/tests/test_output_manager.py b/tests/test_output_manager.py index 26a1ebf0de..61556c0a06 100644 --- a/tests/test_output_manager.py +++ b/tests/test_output_manager.py @@ -2336,21 +2336,6 @@ def test_filter_variables_pool_complex( """Test case for pattern pool with regex patterns and exclude keyword with function filter_variables_pool in output_manager.py""" mock_output_manager.variables_pool = mock_variables_pool_complex - # use filter_name - filter_content: dict[str, Any] = { - "name": "test_case_1", - "filters": ["^DummyClass1.*"], - "filter_by_exclusion": False, - "use_name": True, - "variables": ["var2", "a"], - } - expected_result: dict[str, OutputManager.pool_element_type] = { - "test_case_1_0": {"values": ["value1", "value2", "value3"]}, - "test_case_1_1.a": {"values": ["A", "AA"]}, - "test_case_1_2.a": {"values": ["AAA"]}, - } - assert mock_output_manager.filter_variables_pool(filter_content) == expected_result - # unpacking pool error filter_content = {"filters": ["^DummyClass1.*"], "filter_by_exclusion": False, "variables": "a"} expected_result = { @@ -2372,7 +2357,6 @@ def test_filter_variables_pool_complex( "function": "_parse_filtered_variables", "filter_name": "NO NAME FOUND", "filter_by_exclusion": False, - "use_filter_name": False, }, ), call( @@ -2385,7 +2369,6 @@ def test_filter_variables_pool_complex( "function": "_parse_filtered_variables", "filter_name": "NO NAME FOUND", "filter_by_exclusion": False, - "use_filter_name": False, }, ), ], @@ -2394,12 +2377,10 @@ def test_filter_variables_pool_complex( assert actual == expected_result - # use_name in dict data filter_content = { "name": "test_case_3", "filters": ["^DummyClass1.*"], "filter_by_exclusion": False, - "use_name": False, "variables": ["a", "b", "c"], } expected_result = { @@ -2481,7 +2462,7 @@ def test_parse_filtered_variables( """Tests _parse_filtered_variables in the Output Manager.""" mock_output_manager._variables_usage_counter = Counter() - actual = mock_output_manager._parse_filtered_variables(pool, vars, "test", False, exclusion) + actual = mock_output_manager._parse_filtered_variables(pool, vars, "test", exclusion) assert actual == expected assert mock_output_manager._variables_usage_counter == expected_counter diff --git a/tests/test_util.py b/tests/test_util.py index 2b7e1170e0..e5947ec197 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -343,6 +343,119 @@ def test_flatten_keys_to_nested_structure_dict_w_list() -> None: assert actual == expected +def test_find_group_prefixes_basic() -> None: + """Test extraction of prefixes from standard flattened keys.""" + data: dict[str, Any] = { + "a.b": 1, + "a.c": 2, + "d.e": 3, + } + + result = Utility.find_group_prefixes_from_keys(data) + + assert result == ["a", "d"] + + +def test_find_group_prefixes_deduplication() -> None: + """Ensure duplicate prefixes are only returned once.""" + data: dict[str, Any] = { + "group1.x": 1, + "group1.y": 2, + "group1.z": 3, + } + + result = Utility.find_group_prefixes_from_keys(data) + + assert result == ["group1"] + + +def test_find_group_prefixes_complex_keys() -> None: + """Test realistic complex keys similar to production format.""" + data: dict[str, Any] = { + "Field._record_fertilizer_application.fertilizer_application.field='field_1'.mass": 1, + "Field._record_fertilizer_application.fertilizer_application.field='field_1'.year": 2020, + "Field._record_fertilizer_application.fertilizer_application.field='field_2'.mass": 2, + } + + result = Utility.find_group_prefixes_from_keys(data) + + expected = [ + "Field._record_fertilizer_application.fertilizer_application.field='field_1'", + "Field._record_fertilizer_application.fertilizer_application.field='field_2'", + ] + + assert result == sorted(expected) + + +def test_find_group_prefixes_with_required_suffixes() -> None: + """Test filtering using required_suffixes.""" + data: dict[str, Any] = { + "a.mass": 1, + "a.year": 2020, + "b.day": 10, + "c.mass": 5, + } + + result = Utility.find_group_prefixes_from_keys( + data, + required_suffixes={"mass"}, + ) + + assert result == ["a", "c"] + + +def test_find_group_prefixes_required_suffixes_no_match() -> None: + """Test when no suffix matches the required set.""" + data: dict[str, Any] = { + "a.year": 2020, + "b.day": 10, + } + + result = Utility.find_group_prefixes_from_keys( + data, + required_suffixes={"mass"}, + ) + + assert result == [] + + +def test_find_group_prefixes_ignores_keys_without_dot() -> None: + """Ensure keys without a dot are ignored.""" + data: dict[str, Any] = { + "a": 1, + "b": 2, + "c.d": 3, + } + + result = Utility.find_group_prefixes_from_keys(data) + + assert result == ["c"] + + +def test_find_group_prefixes_empty_input() -> None: + """Test behavior with empty input.""" + result = Utility.find_group_prefixes_from_keys({}) + + assert result == [] + + +def test_find_group_prefixes_multiple_suffix_filtering() -> None: + """Test filtering with multiple allowed suffixes.""" + data: dict[str, Any] = { + "a.mass": 1, + "a.year": 2020, + "b.day": 10, + "c.depth": 5, + } + + result = Utility.find_group_prefixes_from_keys( + data, + required_suffixes={"mass", "year"}, + ) + + assert result == ["a"] + + @pytest.mark.parametrize( "data_to_pad,fill_value,gap_pad,end_pad,expected", [ @@ -892,7 +1005,7 @@ def test_combine( def test_convert_dict_of_lists_to_list_of_dicts_normal_case() -> None: - input_dict = {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]} + input_dict: dict[str, list[Any]] = {"id": [1, 2, 3], "name": ["Alice", "Bob", "Charlie"], "age": [25, 30, 35]} expected_output = [ {"id": 1, "name": "Alice", "age": 25}, {"id": 2, "name": "Bob", "age": 30}, @@ -902,13 +1015,13 @@ def test_convert_dict_of_lists_to_list_of_dicts_normal_case() -> None: def test_convert_dict_of_lists_to_list_of_dicts_empty_input() -> None: - input_dict = {} - expected_output = [] + input_dict: dict[str, list[Any]] = {} + expected_output: list[dict[str, Any]] = [] assert Utility.convert_dict_of_lists_to_list_of_dicts(input_dict) == expected_output def test_convert_dict_of_lists_to_list_of_dicts_single_element_lists() -> None: - input_dict = {"id": [1], "name": ["Alice"], "age": [25]} + input_dict: dict[str, list[Any]] = {"id": [1], "name": ["Alice"], "age": [25]} expected_output = [{"id": 1, "name": "Alice", "age": 25}] assert Utility.convert_dict_of_lists_to_list_of_dicts(input_dict) == expected_output @@ -923,13 +1036,13 @@ def test_convert_list_to_dict_by_key_basic() -> None: def test_convert_list_to_dict_by_key_empty_list() -> None: - list_of_dicts = [] - expected_output = {} + list_of_dicts: list[dict[str, Any]] = [] + expected_output: dict[Any, dict[str, Any]] = {} assert Utility.convert_list_to_dict_by_key(list_of_dicts, "ID") == expected_output def test_convert_list_to_dict_by_key_missing_key() -> None: - list_of_dicts = [{"ID": 1, "value": 2}, {"value": 3}] # Missing 'ID' + list_of_dicts = [{"ID": 1, "value": 2}, {"value": 3}] with pytest.raises(KeyError): Utility.convert_list_to_dict_by_key(list_of_dicts, "ID") @@ -940,38 +1053,6 @@ def test_convert_list_to_dict_by_key_different_key() -> None: assert Utility.convert_list_to_dict_by_key(list_of_dicts, "unique_id") == expected_output -def test_find_max_index_from_keys_mixed_single_and_multi_digit_numbers() -> None: - data = { - "Prefix_0.suffix": ["value"], - "Prefix_1.suffix": ["value"], - "Prefix_10.suffix": ["value"], - "Prefix_2.suffix": ["value"], - "Prefix_21.suffix": ["value"], - } - assert Utility.find_max_index_from_keys(data) == 21 - - -def test_find_max_index_from_keys_no_matching_keys() -> None: - data = { - "NoPrefixOrNumber.suffix": ["value"], - "AnotherWithoutNumber": ["value"], - } - assert Utility.find_max_index_from_keys(data) is None - - -def test_find_max_index_from_keys_negative_numbers() -> None: - data = { - "Prefix_-1.suffix": ["value"], - "Prefix_-2.suffix": ["value"], - } - assert Utility.find_max_index_from_keys(data) is None - - -def test_find_max_index_from_keys_empty_dictionary() -> None: - data = {} - assert Utility.find_max_index_from_keys(data) is None - - @pytest.mark.parametrize( "test_list,length,expected", [