Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 52 additions & 22 deletions app/platforms/implementations/openeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
from app.auth import exchange_token
from app.config.schemas import AuthMethod
from app.config.settings import settings
from app.error import AuthException
from app.platforms.base import BaseProcessingPlatform
from app.platforms.dispatcher import register_platform
from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum
from app.schemas.parameters import ParamTypeEnum, Parameter
from app.schemas.unit_job import ServiceDetails

from openeo.rest import OpenEoApiError

load_dotenv()


Expand Down Expand Up @@ -70,10 +73,7 @@ async def _authenticate_user(
if url not in settings.backend_auth_config:
raise ValueError(f"No OpenEO backend configuration found for URL: {url}")

if (
settings.backend_auth_config[url].auth_method
== AuthMethod.USER_CREDENTIALS
):
if settings.backend_auth_config[url].auth_method == AuthMethod.USER_CREDENTIALS:
logger.debug("Using user credentials for OpenEO connection authentication")
bearer_token = await exchange_token(user_token=user_token, url=url)
connection.authenticate_bearer_token(bearer_token=bearer_token)
Expand Down Expand Up @@ -184,12 +184,22 @@ async def execute_job(
parameters: dict,
format: OutputFormatEnum,
) -> str:
service = await self._build_datacube(user_token, title, details, parameters)
job = service.create_job(title=title, out_format=format)
logger.info(f"Executing OpenEO batch job with title={title}")
job.start()

return job.job_id
try:
service = await self._build_datacube(user_token, title, details, parameters)
job = service.create_job(title=title, out_format=format)
logger.info(f"Executing OpenEO batch job with title={title}")
job.start()

return job.job_id
except OpenEoApiError as e:
if e.http_status_code in (403, 401):
raise AuthException(
e.http_status_code,
f"Authentication error when executing: {e.message}",
)
raise e
except Exception as e:
raise e

async def execute_synchronous_job(
self,
Expand All @@ -199,14 +209,24 @@ async def execute_synchronous_job(
parameters: dict,
format: OutputFormatEnum,
) -> Response:
service = await self._build_datacube(user_token, title, details, parameters)
logger.info("Executing synchronous OpenEO job")
response = service.execute(auto_decode=False)
return Response(
content=response.content,
status_code=response.status_code,
media_type=response.headers.get("Content-Type"),
)
try:
service = await self._build_datacube(user_token, title, details, parameters)
logger.info("Executing synchronous OpenEO job")
response = service.execute(auto_decode=False)
return Response(
content=response.content,
status_code=response.status_code,
media_type=response.headers.get("Content-Type"),
)
except OpenEoApiError as e:
if e.http_status_code in (403, 401):
raise AuthException(
e.http_status_code,
f"Authentication error when executing: {e.message}",
)
raise e
except Exception as e:
raise e

def _map_openeo_status(self, status: str) -> ProcessingStatusEnum:
"""
Expand Down Expand Up @@ -248,10 +268,20 @@ async def get_job_status(
async def get_job_results(
self, user_token: str, job_id: str, details: ServiceDetails
) -> Collection:
logger.debug(f"Fetching job result for openEO job with ID {job_id}")
connection = await self._setup_connection(user_token, details.endpoint)
job = connection.job(job_id)
return Collection(**job.get_results().get_metadata())
try:
logger.debug(f"Fetching job result for openEO job with ID {job_id}")
connection = await self._setup_connection(user_token, details.endpoint)
job = connection.job(job_id)
return Collection(**job.get_results().get_metadata())
except OpenEoApiError as e:
if e.http_status_code in (403, 401):
raise AuthException(
e.http_status_code,
f"Authentication error when fetching job results for job {job_id}: {e.message}",
)
raise e
except Exception as e:
raise e

async def get_service_parameters(
self, user_token: str, details: ServiceDetails
Expand Down
4 changes: 3 additions & 1 deletion mypy.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[mypy-openeo]
[mypy]
disable_error_code = import-untyped
[mypy-openeo.*]
ignore_missing_imports = True
127 changes: 127 additions & 0 deletions tests/platforms/test_openeo_platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from app.config.schemas import AuthMethod, BackendAuthConfig
from app.config.settings import settings
from app.error import AuthException
from app.platforms.implementations.openeo import (
OpenEOPlatform,
)
Expand All @@ -17,6 +18,8 @@
from app.schemas.unit_job import ServiceDetails
from stac_pydantic import Collection

from openeo.rest import OpenEoApiError


class DummyOpenEOClient:

Expand Down Expand Up @@ -131,6 +134,46 @@ async def test_execute_job_process_id_failure(
)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
@patch.object(
OpenEOPlatform,
"_build_datacube",
side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=401),
)
async def test_execute_job_process_openeo_auth_error(
mock_pid, mock_connect, platform, service_details
):
with pytest.raises(AuthException, match="Authentication error"):
await platform.execute_job(
user_token="fake_token",
title="Test Job",
details=service_details,
parameters={},
format=OutputFormatEnum.GEOTIFF,
)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
@patch.object(
OpenEOPlatform,
"_build_datacube",
side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=500),
)
async def test_execute_job_process_openeo_error(
mock_pid, mock_connect, platform, service_details
):
with pytest.raises(OpenEoApiError, match="Woops"):
await platform.execute_job(
user_token="fake_token",
title="Test Job",
details=service_details,
parameters={},
format=OutputFormatEnum.GEOTIFF,
)


@pytest.mark.parametrize(
"openeo_status, expected_enum",
[
Expand Down Expand Up @@ -197,6 +240,30 @@ async def test_get_job_results_error(mock_connection, platform):
assert "Connection error" in str(exc_info.value)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
async def test_get_job_results_openeo_auth_error(
mock_connection, platform, service_details
):
mock_connection.side_effect = OpenEoApiError(
message="Woops", code="Test", http_status_code=401
)
details = ServiceDetails(endpoint="foo", application="bar")
with pytest.raises(AuthException, match="Authentication error"):
await platform.get_job_results("foobar", "job123", details)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
async def test_get_job_results_openeo_error(mock_connection, platform, service_details):
mock_connection.side_effect = OpenEoApiError(
message="Woops", code="Test", http_status_code=500
)
details = ServiceDetails(endpoint="foo", application="bar")
with pytest.raises(OpenEoApiError, match="Woops"):
await platform.get_job_results("foobar", "job123", details)


def _make_conn_with_token(token: str):
# openeo.Connection-like object with auth.bearer that the implementation splits on '/'
return SimpleNamespace(auth=SimpleNamespace(bearer=f"prefix/{token}"))
Expand Down Expand Up @@ -513,6 +580,66 @@ async def test_execute_sync_job_success(
mock_connect.assert_called_once_with("fake_token", service_details.endpoint)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
@patch.object(
OpenEOPlatform,
"_build_datacube",
side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=401),
)
async def test_execute_sync_job_openeo_auth_error(
mock_pid, mock_connect, platform, service_details
):
with pytest.raises(AuthException, match="Authentication error"):
await platform.execute_synchronous_job(
user_token="fake_token",
title="Test Job",
details=service_details,
parameters={},
format=OutputFormatEnum.GEOTIFF,
)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
@patch.object(
OpenEOPlatform,
"_build_datacube",
side_effect=OpenEoApiError(message="Woops", code="Test", http_status_code=500),
)
async def test_execute_sync_job_openeo_error(
mock_pid, mock_connect, platform, service_details
):
with pytest.raises(OpenEoApiError, match="Woops"):
await platform.execute_synchronous_job(
user_token="fake_token",
title="Test Job",
details=service_details,
parameters={},
format=OutputFormatEnum.GEOTIFF,
)


@pytest.mark.asyncio
@patch.object(OpenEOPlatform, "_setup_connection")
@patch.object(
OpenEOPlatform,
"_build_datacube",
side_effect=RuntimeError("Woops"),
)
async def test_execute_sync_job_error(
mock_pid, mock_connect, platform, service_details
):
with pytest.raises(RuntimeError, match="Woops"):
await platform.execute_synchronous_job(
user_token="fake_token",
title="Test Job",
details=service_details,
parameters={},
format=OutputFormatEnum.GEOTIFF,
)


@pytest.mark.asyncio
@patch("app.platforms.implementations.openeo.requests.get")
async def test_get_parameters_success(mock_udp_request, platform):
Expand Down
Loading