diff --git a/climada/engine/impact_calc.py b/climada/engine/impact_calc.py index 0586166173..f58316bf56 100644 --- a/climada/engine/impact_calc.py +++ b/climada/engine/impact_calc.py @@ -29,6 +29,8 @@ from climada import CONFIG from climada.engine.impact import Impact +from climada.engine.impact_forecast import ImpactForecast +from climada.hazard.forecast import HazardForecast LOGGER = logging.getLogger(__name__) @@ -217,7 +219,7 @@ def _return_impact(self, imp_mat_gen, save_mat): Returns ------- - Impact + Impact or ImpactForecast Impact Object initialize from the impact matrix See Also @@ -230,12 +232,31 @@ def _return_impact(self, imp_mat_gen, save_mat): at_event, eai_exp, aai_agg = self.risk_metrics( imp_mat, self.hazard.frequency ) + if isinstance(self.hazard, HazardForecast): + eai_exp = np.full_like(eai_exp, np.nan, dtype=eai_exp.dtype) + aai_agg = np.full_like(aai_agg, np.nan, dtype=aai_agg.dtype) + LOGGER.warning( + "eai_exp and aai_agg are undefined with forecasts. " + "Setting them to NaN arrays." + ) + else: + if isinstance(self.hazard, HazardForecast): + raise ValueError( + "Saving impact matrix is required when using HazardForecast." + "Please set save_mat=True." + ) imp_mat = None at_event, eai_exp, aai_agg = self.stitch_risk_metrics(imp_mat_gen) - return Impact.from_eih( + + impact = Impact.from_eih( self.exposures, self.hazard, at_event, eai_exp, aai_agg, imp_mat ) + if isinstance(self.hazard, HazardForecast): + return ImpactForecast.from_impact( + impact, self.hazard.lead_time, self.hazard.member + ) + return impact def _return_empty(self, save_mat): """ @@ -248,21 +269,37 @@ def _return_empty(self, save_mat): Returns ------- - Impact + Impact or ImpactForecast Empty impact object with correct array sizes. """ at_event = np.zeros(self.n_events) - eai_exp = np.zeros(self.n_exp_pnt) - aai_agg = 0.0 + if isinstance(self.hazard, HazardForecast): + eai_exp = np.full(self.n_exp_pnt, np.nan) + aai_agg = np.nan + else: + eai_exp = np.zeros(self.n_exp_pnt) + aai_agg = 0.0 + if save_mat: imp_mat = sparse.csr_matrix( (self.n_events, self.n_exp_pnt), dtype=np.float64 ) else: + if isinstance(self.hazard, HazardForecast): + raise ValueError( + "Saving impact matrix is required when using HazardForecast. " + "Please set save_mat=True." + ) imp_mat = None - return Impact.from_eih( + + impact = Impact.from_eih( self.exposures, self.hazard, at_event, eai_exp, aai_agg, imp_mat ) + if isinstance(self.hazard, HazardForecast): + return ImpactForecast.from_impact( + impact, self.hazard.lead_time, self.hazard.member + ) + return impact def minimal_exp_gdf( self, impf_col, assign_centroids, ignore_cover, ignore_deductible diff --git a/climada/engine/impact_forecast.py b/climada/engine/impact_forecast.py index 6b18c61659..1406f4ae5f 100644 --- a/climada/engine/impact_forecast.py +++ b/climada/engine/impact_forecast.py @@ -90,6 +90,88 @@ def from_impact( haz_type=impact.haz_type, ) + @property + def at_event(self): + """Get the total impact for each member/lead_time combination.""" + LOGGER.warning( + "at_event gives the total impact for one specific combination of member and " + "lead_time." + ) + return self._at_event + + @at_event.setter + def at_event(self, value): + """Set the total impact for each member/lead_time combination.""" + self._at_event = value + + def local_exceedance_impact( + self, + return_periods=(25, 50, 100, 250), + method="interpolate", + min_impact=0, + log_frequency=True, + log_impact=True, + bin_decimals=None, + ): + """Compution of local exceedance impact for given return periods is not + implemented for ImpactForecast. + + See Also + -------- + See :py:meth:`~climada.engine.impact.Impact.local_exceedance_impact` + + Raises + ------ + NotImplementedError + """ + + LOGGER.error("local_exceedance_impact is not defined for ImpactForecast") + raise NotImplementedError( + "local_exceedance_impact is not defined for ImpactForecast" + ) + + def local_return_period( + self, + threshold_impact=(1000.0, 10000.0), + method="interpolate", + min_impact=0, + log_frequency=True, + log_impact=True, + bin_decimals=None, + ): + """Compution of local return period for given impact thresholds is not + implemented for ImpactForecast. + + See Also + -------- + See :py:meth:`~climada.engine.impact.Impact.local_return_period` + + Raises + ------- + NotImplementedError + """ + + LOGGER.error("local_return_period is not defined for ImpactForecast") + raise NotImplementedError( + "local_return_period is not defined for ImpactForecast" + ) + + def calc_freq_curve(self, return_per=None): + """Computation of the impact exceedance frequency curve is not + implemented for ImpactForecast. + + See Also + -------- + See :py:meth:`~climada.engine.impact.Impact.calc_freq_curve` + + Raises + ------ + NotImplementedError + """ + + LOGGER.error("calc_freq_curve is not defined for ImpactForecast") + raise NotImplementedError("calc_freq_curve is not defined for ImpactForecast") + def _check_sizes(self): """Check sizes of forecast data vs. impact data. diff --git a/climada/engine/test/test_impact_calc.py b/climada/engine/test/test_impact_calc.py index bd606c6e19..dd69de7249 100644 --- a/climada/engine/test/test_impact_calc.py +++ b/climada/engine/test/test_impact_calc.py @@ -26,14 +26,18 @@ import geopandas as gpd import numpy as np +import pandas as pd +import pytest from scipy import sparse from climada import CONFIG from climada.engine import Impact, ImpactCalc from climada.engine.impact_calc import LOGGER as ILOG +from climada.engine.impact_forecast import ImpactForecast from climada.entity import Exposures, ImpactFunc, ImpactFuncSet, ImpfTropCyclone from climada.entity.entity_def import Entity from climada.hazard.base import Centroids, Hazard +from climada.hazard.forecast import HazardForecast from climada.test import get_test_file from climada.util.api_client import Client from climada.util.config import Config @@ -46,17 +50,102 @@ DATA_FOLDER.mkdir(exist_ok=True) -def check_impact(self, imp, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array=None): - """Test properties of imapcts""" - self.assertEqual(len(haz.event_id), len(imp.at_event)) - self.assertIsInstance(imp, Impact) +@pytest.fixture(params=[50, 1, 0]) +def exposure(request): + n_exp = request.param + lats = np.linspace(-10, 10, n_exp) + lons = np.linspace(-10, 10, n_exp) + data = gpd.GeoDataFrame( + { + "impf_TC": 1, + "value": 1, + }, + index=range(n_exp), + geometry=gpd.points_from_xy(lons, lats), + crs="EPSG:4326", + ) + exposures = Exposures(data=data) + return exposures + + +@pytest.fixture +def hazard(exposure): + n_events = 10 + centroids = Centroids( + lat=exposure.gdf.geometry.x, + lon=exposure.gdf.geometry.y, + ) + intensity = sparse.csr_matrix( + np.ones((n_events, exposure.gdf.shape[0])) * 50 + ) # uniform intensity + haz = Hazard() + haz.event_id = np.arange(n_events) + haz.event_name = haz.event_id.tolist() + haz.haz_type = "TC" + haz.date = haz.event_id + haz.frequency_unit = "m/s" + haz.centroids = centroids + haz.intensity = intensity + haz.frequency = 1 / 10 * np.ones(n_events) # uniform frequency (10 n_events) + return haz + + +@pytest.fixture +def hazard_forecast(hazard): + n_events = hazard.size + lead_time = pd.timedelta_range("1h", periods=n_events).to_numpy() + member = np.arange(n_events) + haz_fc = HazardForecast.from_hazard( + hazard=hazard, + lead_time=lead_time, + member=member, + ) + return haz_fc + + +@pytest.fixture +def impact_func_set(exposure, hazard): + step_impf = ImpactFunc() + step_impf.id = 1 + try: + step_impf.id = exposure.data[f"impf_{hazard.haz_type}"].unique()[0] + except IndexError: + pass + step_impf.haz_type = hazard.haz_type + step_impf.name = "fixture step function" + step_impf.intensity_unit = "" + step_impf.intensity = np.array([0, 0.495, 0.4955, 0.5, 1, 10]) + step_impf.mdd = np.array([0, 0, 0, 1, 1, 1]) + step_impf.paa = np.sort(np.linspace(1, 1, num=6)) + return ImpactFuncSet([step_impf]) + + +@pytest.fixture +def impact_calc(exposure, hazard): + imp_mat = np.ones((len(hazard.event_id), exposure.gdf.shape[0])) + aai_agg = np.sum(exposure.gdf["value"]) * hazard.frequency[0] + eai_exp = np.ones(exposure.gdf.shape[0]) * hazard.frequency[0] + at_event = np.ones(hazard.size) * np.sum(exposure.gdf["value"]) + return { + "imp_mat": imp_mat, + "aai_agg": aai_agg, + "eai_exp": eai_exp, + "at_event": at_event, + } + + +def check_impact(imp, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array=None): + """Test properties of impacts""" + # NOTE: Correctly compares NaNs! + assert len(haz.event_id) == len(imp.at_event) + assert isinstance(imp, Impact) np.testing.assert_allclose(imp.coord_exp[:, 0], exp.latitude) np.testing.assert_allclose(imp.coord_exp[:, 1], exp.longitude) - self.assertAlmostEqual(imp.aai_agg, aai_agg, 3) + np.testing.assert_allclose(imp.aai_agg, aai_agg, rtol=1e-3) np.testing.assert_allclose(imp.eai_exp, eai_exp, rtol=1e-5) np.testing.assert_allclose(imp.at_event, at_event, rtol=1e-5) if imp_mat_array is not None: - np.testing.assert_allclose(imp.imp_mat.toarray().ravel(), imp_mat_array.ravel()) + np.testing.assert_allclose(imp.imp_mat.todense(), imp_mat_array) class TestImpactCalc(unittest.TestCase): @@ -300,7 +389,7 @@ def test_calc_impact_RF_pass(self): ] ) # fmt: on - check_impact(self, impact, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array) + check_impact(impact, haz, exp, aai_agg, eai_exp, at_event, imp_mat_array) def test_empty_impact(self): """Check that empty impact is returned if no centroids match the exposures""" @@ -311,11 +400,11 @@ def test_empty_impact(self): aai_agg = 0.0 eai_exp = np.zeros(len(exp.gdf)) at_event = np.zeros(HAZ.size) - check_impact(self, impact, HAZ, exp, aai_agg, eai_exp, at_event, None) + check_impact(impact, HAZ, exp, aai_agg, eai_exp, at_event, None) impact = icalc.impact(save_mat=True, assign_centroids=False) imp_mat_array = sparse.csr_matrix((HAZ.size, len(exp.gdf))).toarray() - check_impact(self, impact, HAZ, exp, aai_agg, eai_exp, at_event, imp_mat_array) + check_impact(impact, HAZ, exp, aai_agg, eai_exp, at_event, imp_mat_array) def test_single_event_impact(self): """Check impact for single event""" @@ -325,11 +414,11 @@ def test_single_event_impact(self): aai_agg = 0.0 eai_exp = np.zeros(len(ENT.exposures.gdf)) at_event = np.array([0]) - check_impact(self, impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, None) + check_impact(impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, None) impact = icalc.impact(save_mat=True, assign_centroids=False) imp_mat_array = sparse.csr_matrix((haz.size, len(ENT.exposures.gdf))).toarray() check_impact( - self, impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, imp_mat_array + impact, haz, ENT.exposures, aai_agg, eai_exp, at_event, imp_mat_array ) def test_calc_impact_save_mat_pass(self): @@ -603,7 +692,47 @@ def test_single_exp_zero_mdr(self): imp = ImpactCalc(exp, impf_set, haz).impact( assign_centroids=False, save_mat=True ) - check_impact(self, imp, haz, exp, aai_agg, eai_exp, at_event, at_event) + check_impact(imp, haz, exp, aai_agg, eai_exp, at_event, np.array([at_event]).T) + + +class TestImpactCalcForecast: + """Test impact calc for forecast hazard""" + + @pytest.fixture + def impact_calc_forecast(self, impact_calc): + """Write NaNs to attributes that are not used""" + impact_calc["aai_agg"] = np.full_like(impact_calc["aai_agg"], np.nan) + impact_calc["eai_exp"] = np.full_like(impact_calc["eai_exp"], np.nan) + + def test_impact_forecast( + self, + exposure, + hazard_forecast, + impact_func_set, + impact_calc, + impact_calc_forecast, + ): + """Test that ImpactForecast is returned correctly""" + impact = ImpactCalc(exposure, impact_func_set, hazard_forecast).impact( + assign_centroids=True, save_mat=True + ) + # check that impact is indeed ImpactForecast + impact_calc["imp_mat_array"] = impact_calc.pop("imp_mat") + check_impact(imp=impact, haz=hazard_forecast, exp=exposure, **impact_calc) + assert isinstance(impact, ImpactForecast) + np.testing.assert_array_equal(impact.lead_time, hazard_forecast.lead_time) + assert impact.lead_time.dtype == hazard_forecast.lead_time.dtype + np.testing.assert_array_equal(impact.member, hazard_forecast.member) + + def test_impact_forecast_empty_impmat_error( + self, hazard_forecast, exposure, impact_func_set + ): + """Test that error is raised when trying to compute impact forecast + without saving impact matrix + """ + icalc = ImpactCalc(exposure, impact_func_set, hazard_forecast) + with pytest.raises(ValueError, match="Saving impact matrix is required"): + icalc.impact(assign_centroids=True, save_mat=False) class TestImpactMatrixCalc(unittest.TestCase): @@ -901,6 +1030,7 @@ def test_skip_mat(self, from_eih_mock): # Execute Tests if __name__ == "__main__": TESTS = unittest.TestLoader().loadTestsFromTestCase(TestImpactCalc) + TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactCalcForecast)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestReturnImpact)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactMatrix)) TESTS.addTests(unittest.TestLoader().loadTestsFromTestCase(TestImpactMatrixCalc)) diff --git a/climada/engine/test/test_impact_forecast.py b/climada/engine/test/test_impact_forecast.py index cc461b6101..33566acd5a 100644 --- a/climada/engine/test/test_impact_forecast.py +++ b/climada/engine/test/test_impact_forecast.py @@ -153,3 +153,15 @@ def test_impact_forecast_concat(impact_forecast, member): [impact_forecast, impact_forecast], reset_event_ids=True ) npt.assert_array_equal(impact_fc.member, np.concatenate([member, member])) + + +def test_impact_forecast_blocked_methods(impact_forecast): + """Check if blocked methods raise NotImplementedError""" + with pytest.raises(NotImplementedError): + impact_forecast.local_exceedance_impact(np.array([10, 50, 100])) + + with pytest.raises(NotImplementedError): + impact_forecast.local_return_period(np.array([10, 50, 100])) + + with pytest.raises(NotImplementedError): + impact_forecast.calc_freq_curve(np.array([10, 50, 100]))