diff --git a/app/platforms/implementations/openeo.py b/app/platforms/implementations/openeo.py index f03bcc6..0a77dfd 100644 --- a/app/platforms/implementations/openeo.py +++ b/app/platforms/implementations/openeo.py @@ -12,12 +12,15 @@ from app.auth import exchange_token from app.config.schemas import AuthMethod from app.config.settings import settings +from app.error import AuthException from app.platforms.base import BaseProcessingPlatform from app.platforms.dispatcher import register_platform from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum from app.schemas.parameters import ParamTypeEnum, Parameter from app.schemas.unit_job import ServiceDetails +from openeo.rest import OpenEoApiError + load_dotenv() @@ -70,10 +73,7 @@ async def _authenticate_user( if url not in settings.backend_auth_config: raise ValueError(f"No OpenEO backend configuration found for URL: {url}") - if ( - settings.backend_auth_config[url].auth_method - == AuthMethod.USER_CREDENTIALS - ): + if settings.backend_auth_config[url].auth_method == AuthMethod.USER_CREDENTIALS: logger.debug("Using user credentials for OpenEO connection authentication") bearer_token = await exchange_token(user_token=user_token, url=url) connection.authenticate_bearer_token(bearer_token=bearer_token) @@ -184,12 +184,22 @@ async def execute_job( parameters: dict, format: OutputFormatEnum, ) -> str: - service = await self._build_datacube(user_token, title, details, parameters) - job = service.create_job(title=title, out_format=format) - logger.info(f"Executing OpenEO batch job with title={title}") - job.start() - - return job.job_id + try: + service = await self._build_datacube(user_token, title, details, parameters) + job = service.create_job(title=title, out_format=format) + logger.info(f"Executing OpenEO batch job with title={title}") + job.start() + + return job.job_id + except OpenEoApiError as e: + if e.http_status_code in (403, 401): + raise AuthException( + e.http_status_code, + f"Authentication error when executing: {e.message}", + ) + raise e + except Exception as e: + raise e async def execute_synchronous_job( self, @@ -199,14 +209,24 @@ async def execute_synchronous_job( parameters: dict, format: OutputFormatEnum, ) -> Response: - service = await self._build_datacube(user_token, title, details, parameters) - logger.info("Executing synchronous OpenEO job") - response = service.execute(auto_decode=False) - return Response( - content=response.content, - status_code=response.status_code, - media_type=response.headers.get("Content-Type"), - ) + try: + service = await self._build_datacube(user_token, title, details, parameters) + logger.info("Executing synchronous OpenEO job") + response = service.execute(auto_decode=False) + return Response( + content=response.content, + status_code=response.status_code, + media_type=response.headers.get("Content-Type"), + ) + except OpenEoApiError as e: + if e.http_status_code in (403, 401): + raise AuthException( + e.http_status_code, + f"Authentication error when executing: {e.message}", + ) + raise e + except Exception as e: + raise e def _map_openeo_status(self, status: str) -> ProcessingStatusEnum: """ @@ -248,10 +268,20 @@ async def get_job_status( async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: - logger.debug(f"Fetching job result for openEO job with ID {job_id}") - connection = await self._setup_connection(user_token, details.endpoint) - job = connection.job(job_id) - return Collection(**job.get_results().get_metadata()) + try: + logger.debug(f"Fetching job result for openEO job with ID {job_id}") + connection = await self._setup_connection(user_token, details.endpoint) + job = connection.job(job_id) + return Collection(**job.get_results().get_metadata()) + except OpenEoApiError as e: + if e.http_status_code in (403, 401): + raise AuthException( + e.http_status_code, + f"Authentication error when fetching job results for job {job_id}: {e.message}", + ) + raise e + except Exception as e: + raise e async def get_service_parameters( self, user_token: str, details: ServiceDetails diff --git a/mypy.ini b/mypy.ini index 11c23f9..9401c22 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,2 +1,4 @@ -[mypy-openeo] +[mypy] +disable_error_code = import-untyped +[mypy-openeo.*] ignore_missing_imports = True \ No newline at end of file diff --git a/tests/platforms/test_openeo_platform.py b/tests/platforms/test_openeo_platform.py index e65d96f..fa51a36 100644 --- a/tests/platforms/test_openeo_platform.py +++ b/tests/platforms/test_openeo_platform.py @@ -9,6 +9,7 @@ from app.config.schemas import AuthMethod, BackendAuthConfig from app.config.settings import settings +from app.error import AuthException from app.platforms.implementations.openeo import ( OpenEOPlatform, ) @@ -17,6 +18,8 @@ from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection +from openeo.rest import OpenEoApiError + class DummyOpenEOClient: @@ -131,6 +134,46 @@ async def test_execute_job_process_id_failure( ) +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +@patch.object( + OpenEOPlatform, + "_build_datacube", + side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=401), +) +async def test_execute_job_process_openeo_auth_error( + mock_pid, mock_connect, platform, service_details +): + with pytest.raises(AuthException, match="Authentication error"): + await platform.execute_job( + user_token="fake_token", + title="Test Job", + details=service_details, + parameters={}, + format=OutputFormatEnum.GEOTIFF, + ) + + +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +@patch.object( + OpenEOPlatform, + "_build_datacube", + side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=500), +) +async def test_execute_job_process_openeo_error( + mock_pid, mock_connect, platform, service_details +): + with pytest.raises(OpenEoApiError, match="Woops"): + await platform.execute_job( + user_token="fake_token", + title="Test Job", + details=service_details, + parameters={}, + format=OutputFormatEnum.GEOTIFF, + ) + + @pytest.mark.parametrize( "openeo_status, expected_enum", [ @@ -197,6 +240,30 @@ async def test_get_job_results_error(mock_connection, platform): assert "Connection error" in str(exc_info.value) +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +async def test_get_job_results_openeo_auth_error( + mock_connection, platform, service_details +): + mock_connection.side_effect = OpenEoApiError( + message="Woops", code="Test", http_status_code=401 + ) + details = ServiceDetails(endpoint="foo", application="bar") + with pytest.raises(AuthException, match="Authentication error"): + await platform.get_job_results("foobar", "job123", details) + + +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +async def test_get_job_results_openeo_error(mock_connection, platform, service_details): + mock_connection.side_effect = OpenEoApiError( + message="Woops", code="Test", http_status_code=500 + ) + details = ServiceDetails(endpoint="foo", application="bar") + with pytest.raises(OpenEoApiError, match="Woops"): + await platform.get_job_results("foobar", "job123", details) + + def _make_conn_with_token(token: str): # openeo.Connection-like object with auth.bearer that the implementation splits on '/' return SimpleNamespace(auth=SimpleNamespace(bearer=f"prefix/{token}")) @@ -513,6 +580,66 @@ async def test_execute_sync_job_success( mock_connect.assert_called_once_with("fake_token", service_details.endpoint) +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +@patch.object( + OpenEOPlatform, + "_build_datacube", + side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=401), +) +async def test_execute_sync_job_openeo_auth_error( + mock_pid, mock_connect, platform, service_details +): + with pytest.raises(AuthException, match="Authentication error"): + await platform.execute_synchronous_job( + user_token="fake_token", + title="Test Job", + details=service_details, + parameters={}, + format=OutputFormatEnum.GEOTIFF, + ) + + +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +@patch.object( + OpenEOPlatform, + "_build_datacube", + side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=500), +) +async def test_execute_sync_job_openeo_error( + mock_pid, mock_connect, platform, service_details +): + with pytest.raises(OpenEoApiError, match="Woops"): + await platform.execute_synchronous_job( + user_token="fake_token", + title="Test Job", + details=service_details, + parameters={}, + format=OutputFormatEnum.GEOTIFF, + ) + + +@pytest.mark.asyncio +@patch.object(OpenEOPlatform, "_setup_connection") +@patch.object( + OpenEOPlatform, + "_build_datacube", + side_effect=RuntimeError("Woops"), +) +async def test_execute_sync_job_error( + mock_pid, mock_connect, platform, service_details +): + with pytest.raises(RuntimeError, match="Woops"): + await platform.execute_synchronous_job( + user_token="fake_token", + title="Test Job", + details=service_details, + parameters={}, + format=OutputFormatEnum.GEOTIFF, + ) + + @pytest.mark.asyncio @patch("app.platforms.implementations.openeo.requests.get") async def test_get_parameters_success(mock_udp_request, platform):