From 01070bc058b19ba78f6a1a9b56df0e7d459a9bd2 Mon Sep 17 00:00:00 2001 From: stripodi Date: Tue, 20 Jan 2026 12:23:02 +0100 Subject: [PATCH 01/11] feat: ogc-api-processes-client initial plug --- .../implementations/ogc_api_process.py | 24 ++++++++++++++++++- requirements.txt | 3 ++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 620a3b5..d55daa7 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -6,7 +6,10 @@ from app.schemas.parameters import Parameter from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection - +from ogc_api_client import Configuration +from ogc_api_client.api.execute_api import ExecuteApi +from ogc_api_client.api_client_wrapper import ApiClientWrapper +from ogc_api_client.rest import ApiException @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): @@ -15,6 +18,15 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform): This class handles the execution of processing jobs on the OGC API Process platform. """ + def _create_api_client_instance( + self, + details: ServiceDetails + ) -> ApiClientWrapper: + configuration: Configuration = Configuration( + host = details.endpoint + ) + return ApiClientWrapper(configuration) + async def execute_job( self, user_token: str, @@ -23,6 +35,8 @@ async def execute_job( parameters: dict, format: OutputFormatEnum, ) -> str: + api_client = self._create_api_client_instance(details) + raise NotImplementedError("OGC API Process job execution not implemented yet.") async def execute_synchronous_job( @@ -33,11 +47,15 @@ async def execute_synchronous_job( parameters: dict, format: OutputFormatEnum, ) -> Response: + # This is currently not supported + raise NotImplementedError("OGC API Process job execution not implemented yet.") async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: + api_client = self._create_api_client_instance(details) + raise NotImplementedError( "OGC API Process job status retrieval not implemented yet." ) @@ -45,6 +63,8 @@ async def get_job_status( async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: + api_client = self._create_api_client_instance(details) + raise NotImplementedError( "OGC API Process job result retrieval not implemented yet." ) @@ -52,6 +72,8 @@ async def get_job_results( async def get_service_parameters( self, user_token: str, details: ServiceDetails ) -> List[Parameter]: + api_client = self._create_api_client_instance(details) + raise NotImplementedError( "OGC API Process service parameter retrieval not implemented yet." ) diff --git a/requirements.txt b/requirements.txt index 115e5b6..e8d92ef 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,4 +29,5 @@ SQLAlchemy stac-pydantic types-requests types-shapely -uvicorn[standard] \ No newline at end of file +uvicorn[standard] +ogc-api-client @ git+https://github.com/EOEPCA/ogc-api-client.git@develop#subdirectory=src From aaaa9259d0cac964b31d6ad01b42b52fd0d29504 Mon Sep 17 00:00:00 2001 From: floeschau Date: Wed, 18 Feb 2026 17:21:51 +0100 Subject: [PATCH 02/11] Changes for OGC API platform --- .../implementations/ogc_api_process.py | 134 +++++++++++++++--- app/schemas/parameters.py | 3 + app/schemas/unit_job.py | 8 ++ 3 files changed, 129 insertions(+), 16 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index d55daa7..3f7a4ef 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -1,5 +1,10 @@ +import json + +import re from typing import List from fastapi import Response +from loguru import logger + from app.platforms.base import BaseProcessingPlatform from app.platforms.dispatcher import register_platform from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum @@ -10,22 +15,41 @@ from ogc_api_client.api.execute_api import ExecuteApi from ogc_api_client.api_client_wrapper import ApiClientWrapper from ogc_api_client.rest import ApiException +from ogc_api_client.models.status_info import StatusInfo, StatusCode @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): + + application_path_regex = re.compile(r"(?P.+)/processes/(?P[^/]+)$") + """ OGC API Process processing platform implementation. This class handles the execution of processing jobs on the OGC API Process platform. """ + def _split_job_id(self, job_id): + parts = job_id.split(":", 1) + if len(parts) != 2: + return (None, job_id) + return tuple(parts) + + def _create_api_client_instance( self, - details: ServiceDetails + endpoint: str, + namespace: str, + user_token: str = None, ) -> ApiClientWrapper: configuration: Configuration = Configuration( - host = details.endpoint + host = f"{endpoint}/{namespace}" if namespace else endpoint ) - return ApiClientWrapper(configuration) + + additional_args = {} + if user_token: + additional_args["header_name"] = "Authorization" + additional_args["header_value"] = f"Bearer {user_token}" + + return ApiClientWrapper(configuration, **additional_args) async def execute_job( self, @@ -35,9 +59,32 @@ async def execute_job( parameters: dict, format: OutputFormatEnum, ) -> str: - api_client = self._create_api_client_instance(details) + logger.info(f"Executing OGC API job with title={title}") + # Output format omitted from request + + api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) + + headers = { + "accept": "*/*", + #"Prefer": "respond-async;return=representation", + "Content-Type": "application/json" + } + if user_token: + headers["Authorization"] = f"Bearer {user_token}" + + data = { + "inputs": {key: value for key, value in parameters.items()} + } + + content = api_client.execute_simple(process_id=details.application, execute=data, _headers=headers) + + job_id = content.job_id + + # Return the namespace along with the job ID if needed + if details.namespace: + return f"{details.namespace}:{job_id}" + return job_id - raise NotImplementedError("OGC API Process job execution not implemented yet.") async def execute_synchronous_job( self, @@ -51,29 +98,84 @@ async def execute_synchronous_job( raise NotImplementedError("OGC API Process job execution not implemented yet.") + + def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: + """ + Map the status returned by OGC API to a status known within the API. + + :param status: Status text returned by OGC API. + :return: ProcessingStatusEnum corresponding to the input. + """ + + logger.debug(f"Mapping OGC API status {ogcapi_status} to ProcessingStatusEnum") + + mapping = { + StatusCode.ACCEPTED: ProcessingStatusEnum.CREATED, + StatusCode.RUNNING: ProcessingStatusEnum.RUNNING, + StatusCode.DISMISSED: ProcessingStatusEnum.CANCELED, + StatusCode.SUCCESSFUL: ProcessingStatusEnum.FINISHED, + StatusCode.FAILED: ProcessingStatusEnum.FAILED, + } + + try: + return mapping[ogcapi_status] + except (AttributeError, KeyError): + logger.warning("Mapping of unknown OGC API status: %r", ogcapi_status) + return ProcessingStatusEnum.UNKNOWN + + + async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: - api_client = self._create_api_client_instance(details) + logger.debug(f"Fetching job status for OGC API job with ID {job_id}") + + # Job ID is composed of namespace and internal job id + namespace, internal_job_id = self._split_job_id(job_id) + api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) + + status_info = api_client.get_status(job_id=internal_job_id) + return self._map_ogcapi_status(status_info.status) - raise NotImplementedError( - "OGC API Process job status retrieval not implemented yet." - ) async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: - api_client = self._create_api_client_instance(details) + logger.debug(f"Fetching job result for opfenEO job with ID {job_id}") + + # Job ID is composed of namespace and internal job id + namespace, internal_job_id = self._split_job_id(job_id) + api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) + + result = api_client.get_result(job_id=internal_job_id) + return Collection(result[0]) - raise NotImplementedError( - "OGC API Process job result retrieval not implemented yet." - ) async def get_service_parameters( self, user_token: str, details: ServiceDetails ) -> List[Parameter]: - api_client = self._create_api_client_instance(details) - raise NotImplementedError( - "OGC API Process service parameter retrieval not implemented yet." + parameters = [] + logger.debug( + f"Fetching service parameters for OGC API process with ID {details.application}" ) + + api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) + process_description = api_client.get_process_description(details.application) + + for input_id, input_details in process_description.inputs.items(): + input_type = input_id, input_details.model_dump().get("var_schema", {}).get("actual_instance", {}).get("type", "string") + if isinstance(input_type, tuple): + input_type = next((t for t in input_type if t in ["date-interval", "bounding-box", "boolean"]), "string") + + parameters.append( + Parameter( + name=input_id, + description=input_details.description, + default=None, + optional=(input_details.min_occurs == 0), + type="string", + ) + ) + + return parameters diff --git a/app/schemas/parameters.py b/app/schemas/parameters.py index 04e6f46..0e4c286 100644 --- a/app/schemas/parameters.py +++ b/app/schemas/parameters.py @@ -7,9 +7,12 @@ class ParamTypeEnum(str, Enum): + DATETIME = "datetime" DATE_INTERVAL = "date-interval" BOUNDING_BOX = "bounding-box" BOOLEAN = "boolean" + INTEGER = "integer" + DOUBLE = "double" STRING = "string" diff --git a/app/schemas/unit_job.py b/app/schemas/unit_job.py index 2bc997f..5194576 100644 --- a/app/schemas/unit_job.py +++ b/app/schemas/unit_job.py @@ -1,6 +1,7 @@ from datetime import datetime from pydantic import BaseModel, Field +from typing import Optional from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum @@ -13,6 +14,13 @@ class ServiceDetails(BaseModel): "platform API", examples=["https://openeofed.dataspace.copernicus.eu"], ) + namespace: Optional[str] = Field( + default=None, + description="Namespace under the endpoint where the service is hosted. For openEO, this field" + "is not set. For OGC API Processes, this field should include the namespace ID representing" + "under which the namespace-related API is deployed", + examples=["https://openeofed.dataspace.copernicus.eu"] + ) application: str = Field( ..., description="Path to the application that needs to be executed. For openEO this is " From dc6f6c4d4bda567df01b1d50742da01650250d4a Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 13:46:26 +0100 Subject: [PATCH 03/11] fix: integration of token exchange in ogc api implementation --- app/auth.py | 6 ++++-- .../implementations/ogc_api_process.py | 17 +++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/app/auth.py b/app/auth.py index fbced09..1253c42 100644 --- a/app/auth.py +++ b/app/auth.py @@ -98,12 +98,14 @@ async def exchange_token(user_token: str, url: str) -> str: :return: The bearer token as a string. """ + logger.debug(f"Exchanging token for backend {url}") + provider = settings.backend_auth_config[url].token_provider token_prefix = settings.backend_auth_config[url].token_prefix - if not provider or not token_prefix: + if not provider: raise ValueError( - f"Backend '{url}' must define 'token_provider' and 'token_prefix'" + f"Backend '{url}' must define 'token_provider'" ) platform_token = await _exchange_token_for_provider( diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 3f7a4ef..b719f42 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -2,6 +2,7 @@ import re from typing import List +from app.auth import exchange_token from fastapi import Response from loguru import logger @@ -34,7 +35,7 @@ def _split_job_id(self, job_id): return tuple(parts) - def _create_api_client_instance( + async def _create_api_client_instance( self, endpoint: str, namespace: str, @@ -60,17 +61,21 @@ async def execute_job( format: OutputFormatEnum, ) -> str: logger.info(f"Executing OGC API job with title={title}") - # Output format omitted from request - api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) + # Exchanging token + logger.debug("Exchanging user token for OGC API Process execution...") + exchanged_token = await exchange_token(user_token=user_token, url=details.endpoint) + + # Output format omitted from request + api_client = await self._create_api_client_instance(details.endpoint, details.namespace, exchanged_token) headers = { "accept": "*/*", #"Prefer": "respond-async;return=representation", "Content-Type": "application/json" } - if user_token: - headers["Authorization"] = f"Bearer {user_token}" + if exchanged_token: + headers["Authorization"] = f"Bearer {exchanged_token}" data = { "inputs": {key: value for key, value in parameters.items()} @@ -132,7 +137,7 @@ async def get_job_status( # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) + api_client = await self._create_api_client_instance(details.endpoint, namespace, user_token) status_info = api_client.get_status(job_id=internal_job_id) return self._map_ogcapi_status(status_info.status) From e7ef339bcada05c9554c1b24acc44d3000f1c5ea Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:33:03 +0100 Subject: [PATCH 04/11] chore: cleaned up imports --- app/platforms/implementations/ogc_api_process.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index b719f42..607fa61 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -1,5 +1,3 @@ -import json - import re from typing import List from app.auth import exchange_token @@ -13,10 +11,8 @@ from app.schemas.unit_job import ServiceDetails from stac_pydantic import Collection from ogc_api_client import Configuration -from ogc_api_client.api.execute_api import ExecuteApi from ogc_api_client.api_client_wrapper import ApiClientWrapper -from ogc_api_client.rest import ApiException -from ogc_api_client.models.status_info import StatusInfo, StatusCode +from ogc_api_client.models.status_info import StatusCode @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): From e28cf064059a0cb9072f078e4179a426eec25109 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:33:39 +0100 Subject: [PATCH 05/11] chore: formatting of the doc --- .../implementations/ogc_api_process.py | 65 ++++++++++++------- 1 file changed, 40 insertions(+), 25 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 607fa61..09e3ba1 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -14,10 +14,13 @@ from ogc_api_client.api_client_wrapper import ApiClientWrapper from ogc_api_client.models.status_info import StatusCode + @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): - application_path_regex = re.compile(r"(?P.+)/processes/(?P[^/]+)$") + application_path_regex = re.compile( + r"(?P.+)/processes/(?P[^/]+)$" + ) """ OGC API Process processing platform implementation. @@ -30,7 +33,6 @@ def _split_job_id(self, job_id): return (None, job_id) return tuple(parts) - async def _create_api_client_instance( self, endpoint: str, @@ -38,7 +40,7 @@ async def _create_api_client_instance( user_token: str = None, ) -> ApiClientWrapper: configuration: Configuration = Configuration( - host = f"{endpoint}/{namespace}" if namespace else endpoint + host=f"{endpoint}/{namespace}" if namespace else endpoint ) additional_args = {} @@ -60,33 +62,36 @@ async def execute_job( # Exchanging token logger.debug("Exchanging user token for OGC API Process execution...") - exchanged_token = await exchange_token(user_token=user_token, url=details.endpoint) + exchanged_token = await exchange_token( + user_token=user_token, url=details.endpoint + ) # Output format omitted from request - api_client = await self._create_api_client_instance(details.endpoint, details.namespace, exchanged_token) + api_client = await self._create_api_client_instance( + details.endpoint, details.namespace, exchanged_token + ) headers = { "accept": "*/*", - #"Prefer": "respond-async;return=representation", - "Content-Type": "application/json" + # "Prefer": "respond-async;return=representation", + "Content-Type": "application/json", } if exchanged_token: headers["Authorization"] = f"Bearer {exchanged_token}" - data = { - "inputs": {key: value for key, value in parameters.items()} - } - - content = api_client.execute_simple(process_id=details.application, execute=data, _headers=headers) + data = {"inputs": {key: value for key, value in parameters.items()}} + + content = api_client.execute_simple( + process_id=details.application, execute=data, _headers=headers + ) job_id = content.job_id - + # Return the namespace along with the job ID if needed if details.namespace: return f"{details.namespace}:{job_id}" return job_id - async def execute_synchronous_job( self, user_token: str, @@ -99,7 +104,6 @@ async def execute_synchronous_job( raise NotImplementedError("OGC API Process job execution not implemented yet.") - def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: """ Map the status returned by OGC API to a status known within the API. @@ -124,21 +128,20 @@ def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: logger.warning("Mapping of unknown OGC API status: %r", ogcapi_status) return ProcessingStatusEnum.UNKNOWN - - async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: logger.debug(f"Fetching job status for OGC API job with ID {job_id}") - + # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = await self._create_api_client_instance(details.endpoint, namespace, user_token) + api_client = await self._create_api_client_instance( + details.endpoint, namespace, user_token + ) status_info = api_client.get_status(job_id=internal_job_id) return self._map_ogcapi_status(status_info.status) - async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: @@ -146,12 +149,13 @@ async def get_job_results( # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) + api_client = self._create_api_client_instance( + details.endpoint, namespace, user_token + ) result = api_client.get_result(job_id=internal_job_id) return Collection(result[0]) - async def get_service_parameters( self, user_token: str, details: ServiceDetails ) -> List[Parameter]: @@ -161,13 +165,24 @@ async def get_service_parameters( f"Fetching service parameters for OGC API process with ID {details.application}" ) - api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) + api_client = self._create_api_client_instance( + details.endpoint, details.namespace, user_token + ) process_description = api_client.get_process_description(details.application) for input_id, input_details in process_description.inputs.items(): - input_type = input_id, input_details.model_dump().get("var_schema", {}).get("actual_instance", {}).get("type", "string") + input_type = input_id, input_details.model_dump().get("var_schema", {}).get( + "actual_instance", {} + ).get("type", "string") if isinstance(input_type, tuple): - input_type = next((t for t in input_type if t in ["date-interval", "bounding-box", "boolean"]), "string") + input_type = next( + ( + t + for t in input_type + if t in ["date-interval", "bounding-box", "boolean"] + ), + "string", + ) parameters.append( Parameter( From 4c95099b24adb1fb2b03807a2c5fbb9a9a119112 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:34:15 +0100 Subject: [PATCH 06/11] chore: format of the doc --- .vscode/settings.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c5e6474..3acec9e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -4,5 +4,6 @@ ], "python.testing.unittestEnabled": false, "python.testing.pytestEnabled": true, - "python-envs.pythonProjects": [] + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda" } \ No newline at end of file From d0ba1c04883c3dc16e03ef185a001fa5084af34c Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:34:57 +0100 Subject: [PATCH 07/11] chore: fix doc formatting --- .../implementations/ogc_api_process.py | 66 ++++++++----------- 1 file changed, 26 insertions(+), 40 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 09e3ba1..3d87e10 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -1,3 +1,4 @@ + import re from typing import List from app.auth import exchange_token @@ -14,13 +15,10 @@ from ogc_api_client.api_client_wrapper import ApiClientWrapper from ogc_api_client.models.status_info import StatusCode - @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): - application_path_regex = re.compile( - r"(?P.+)/processes/(?P[^/]+)$" - ) + application_path_regex = re.compile(r"(?P.+)/processes/(?P[^/]+)$") """ OGC API Process processing platform implementation. @@ -33,6 +31,7 @@ def _split_job_id(self, job_id): return (None, job_id) return tuple(parts) + async def _create_api_client_instance( self, endpoint: str, @@ -40,7 +39,7 @@ async def _create_api_client_instance( user_token: str = None, ) -> ApiClientWrapper: configuration: Configuration = Configuration( - host=f"{endpoint}/{namespace}" if namespace else endpoint + host = f"{endpoint}/{namespace}" if namespace else endpoint ) additional_args = {} @@ -62,36 +61,33 @@ async def execute_job( # Exchanging token logger.debug("Exchanging user token for OGC API Process execution...") - exchanged_token = await exchange_token( - user_token=user_token, url=details.endpoint - ) + exchanged_token = await exchange_token(user_token=user_token, url=details.endpoint) # Output format omitted from request - api_client = await self._create_api_client_instance( - details.endpoint, details.namespace, exchanged_token - ) + api_client = await self._create_api_client_instance(details.endpoint, details.namespace, exchanged_token) headers = { "accept": "*/*", - # "Prefer": "respond-async;return=representation", - "Content-Type": "application/json", + #"Prefer": "respond-async;return=representation", + "Content-Type": "application/json" } if exchanged_token: headers["Authorization"] = f"Bearer {exchanged_token}" - data = {"inputs": {key: value for key, value in parameters.items()}} - - content = api_client.execute_simple( - process_id=details.application, execute=data, _headers=headers - ) + data = { + "inputs": {key: value for key, value in parameters.items()} + } + + content = api_client.execute_simple(process_id=details.application, execute=data, _headers=headers) job_id = content.job_id - + # Return the namespace along with the job ID if needed if details.namespace: return f"{details.namespace}:{job_id}" return job_id + async def execute_synchronous_job( self, user_token: str, @@ -104,6 +100,7 @@ async def execute_synchronous_job( raise NotImplementedError("OGC API Process job execution not implemented yet.") + def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: """ Map the status returned by OGC API to a status known within the API. @@ -128,20 +125,21 @@ def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: logger.warning("Mapping of unknown OGC API status: %r", ogcapi_status) return ProcessingStatusEnum.UNKNOWN + + async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: logger.debug(f"Fetching job status for OGC API job with ID {job_id}") - + # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = await self._create_api_client_instance( - details.endpoint, namespace, user_token - ) + api_client = await self._create_api_client_instance(details.endpoint, namespace, user_token) status_info = api_client.get_status(job_id=internal_job_id) return self._map_ogcapi_status(status_info.status) + async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: @@ -149,13 +147,12 @@ async def get_job_results( # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = self._create_api_client_instance( - details.endpoint, namespace, user_token - ) + api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) result = api_client.get_result(job_id=internal_job_id) return Collection(result[0]) + async def get_service_parameters( self, user_token: str, details: ServiceDetails ) -> List[Parameter]: @@ -165,24 +162,13 @@ async def get_service_parameters( f"Fetching service parameters for OGC API process with ID {details.application}" ) - api_client = self._create_api_client_instance( - details.endpoint, details.namespace, user_token - ) + api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) process_description = api_client.get_process_description(details.application) for input_id, input_details in process_description.inputs.items(): - input_type = input_id, input_details.model_dump().get("var_schema", {}).get( - "actual_instance", {} - ).get("type", "string") + input_type = input_id, input_details.model_dump().get("var_schema", {}).get("actual_instance", {}).get("type", "string") if isinstance(input_type, tuple): - input_type = next( - ( - t - for t in input_type - if t in ["date-interval", "bounding-box", "boolean"] - ), - "string", - ) + input_type = next((t for t in input_type if t in ["date-interval", "bounding-box", "boolean"]), "string") parameters.append( Parameter( From c0e84b49be8e3df26e58dda8391982a4a172f7b8 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:36:05 +0100 Subject: [PATCH 08/11] chore: fix some of the linting --- .../implementations/ogc_api_process.py | 66 +++++++++++-------- 1 file changed, 40 insertions(+), 26 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 3d87e10..09e3ba1 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -1,4 +1,3 @@ - import re from typing import List from app.auth import exchange_token @@ -15,10 +14,13 @@ from ogc_api_client.api_client_wrapper import ApiClientWrapper from ogc_api_client.models.status_info import StatusCode + @register_platform(ProcessTypeEnum.OGC_API_PROCESS) class OGCAPIProcessPlatform(BaseProcessingPlatform): - application_path_regex = re.compile(r"(?P.+)/processes/(?P[^/]+)$") + application_path_regex = re.compile( + r"(?P.+)/processes/(?P[^/]+)$" + ) """ OGC API Process processing platform implementation. @@ -31,7 +33,6 @@ def _split_job_id(self, job_id): return (None, job_id) return tuple(parts) - async def _create_api_client_instance( self, endpoint: str, @@ -39,7 +40,7 @@ async def _create_api_client_instance( user_token: str = None, ) -> ApiClientWrapper: configuration: Configuration = Configuration( - host = f"{endpoint}/{namespace}" if namespace else endpoint + host=f"{endpoint}/{namespace}" if namespace else endpoint ) additional_args = {} @@ -61,33 +62,36 @@ async def execute_job( # Exchanging token logger.debug("Exchanging user token for OGC API Process execution...") - exchanged_token = await exchange_token(user_token=user_token, url=details.endpoint) + exchanged_token = await exchange_token( + user_token=user_token, url=details.endpoint + ) # Output format omitted from request - api_client = await self._create_api_client_instance(details.endpoint, details.namespace, exchanged_token) + api_client = await self._create_api_client_instance( + details.endpoint, details.namespace, exchanged_token + ) headers = { "accept": "*/*", - #"Prefer": "respond-async;return=representation", - "Content-Type": "application/json" + # "Prefer": "respond-async;return=representation", + "Content-Type": "application/json", } if exchanged_token: headers["Authorization"] = f"Bearer {exchanged_token}" - data = { - "inputs": {key: value for key, value in parameters.items()} - } - - content = api_client.execute_simple(process_id=details.application, execute=data, _headers=headers) + data = {"inputs": {key: value for key, value in parameters.items()}} + + content = api_client.execute_simple( + process_id=details.application, execute=data, _headers=headers + ) job_id = content.job_id - + # Return the namespace along with the job ID if needed if details.namespace: return f"{details.namespace}:{job_id}" return job_id - async def execute_synchronous_job( self, user_token: str, @@ -100,7 +104,6 @@ async def execute_synchronous_job( raise NotImplementedError("OGC API Process job execution not implemented yet.") - def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: """ Map the status returned by OGC API to a status known within the API. @@ -125,21 +128,20 @@ def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum: logger.warning("Mapping of unknown OGC API status: %r", ogcapi_status) return ProcessingStatusEnum.UNKNOWN - - async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: logger.debug(f"Fetching job status for OGC API job with ID {job_id}") - + # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = await self._create_api_client_instance(details.endpoint, namespace, user_token) + api_client = await self._create_api_client_instance( + details.endpoint, namespace, user_token + ) status_info = api_client.get_status(job_id=internal_job_id) return self._map_ogcapi_status(status_info.status) - async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: @@ -147,12 +149,13 @@ async def get_job_results( # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = self._create_api_client_instance(details.endpoint, namespace, user_token) + api_client = self._create_api_client_instance( + details.endpoint, namespace, user_token + ) result = api_client.get_result(job_id=internal_job_id) return Collection(result[0]) - async def get_service_parameters( self, user_token: str, details: ServiceDetails ) -> List[Parameter]: @@ -162,13 +165,24 @@ async def get_service_parameters( f"Fetching service parameters for OGC API process with ID {details.application}" ) - api_client = self._create_api_client_instance(details.endpoint, details.namespace, user_token) + api_client = self._create_api_client_instance( + details.endpoint, details.namespace, user_token + ) process_description = api_client.get_process_description(details.application) for input_id, input_details in process_description.inputs.items(): - input_type = input_id, input_details.model_dump().get("var_schema", {}).get("actual_instance", {}).get("type", "string") + input_type = input_id, input_details.model_dump().get("var_schema", {}).get( + "actual_instance", {} + ).get("type", "string") if isinstance(input_type, tuple): - input_type = next((t for t in input_type if t in ["date-interval", "bounding-box", "boolean"]), "string") + input_type = next( + ( + t + for t in input_type + if t in ["date-interval", "bounding-box", "boolean"] + ), + "string", + ) parameters.append( Parameter( From 501e766b7e5dcce3d4dcb3104d8f7d990af46dd0 Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:45:19 +0100 Subject: [PATCH 09/11] feat: integrated token exchange for other requests --- .../implementations/ogc_api_process.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 09e3ba1..d45fe46 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -133,10 +133,15 @@ async def get_job_status( ) -> ProcessingStatusEnum: logger.debug(f"Fetching job status for OGC API job with ID {job_id}") + logger.debug("Exchanging user token for OGC API Process execution...") + exchanged_token = await exchange_token( + user_token=user_token, url=details.endpoint + ) + # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) api_client = await self._create_api_client_instance( - details.endpoint, namespace, user_token + details.endpoint, namespace, exchanged_token ) status_info = api_client.get_status(job_id=internal_job_id) @@ -147,10 +152,15 @@ async def get_job_results( ) -> Collection: logger.debug(f"Fetching job result for opfenEO job with ID {job_id}") + logger.debug("Exchanging user token for OGC API Process execution...") + exchanged_token = await exchange_token( + user_token=user_token, url=details.endpoint + ) + # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) api_client = self._create_api_client_instance( - details.endpoint, namespace, user_token + details.endpoint, namespace, exchanged_token ) result = api_client.get_result(job_id=internal_job_id) @@ -165,8 +175,13 @@ async def get_service_parameters( f"Fetching service parameters for OGC API process with ID {details.application}" ) + logger.debug("Exchanging user token for OGC API Process execution...") + exchanged_token = await exchange_token( + user_token=user_token, url=details.endpoint + ) + api_client = self._create_api_client_instance( - details.endpoint, details.namespace, user_token + details.endpoint, details.namespace, exchanged_token ) process_description = api_client.get_process_description(details.application) From d81a9086f915f2401a5fd6b55f81ea7aad3bf6ae Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:47:32 +0100 Subject: [PATCH 10/11] fix: fixed async calls --- app/platforms/implementations/ogc_api_process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index d45fe46..a6cb7b4 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -159,7 +159,7 @@ async def get_job_results( # Job ID is composed of namespace and internal job id namespace, internal_job_id = self._split_job_id(job_id) - api_client = self._create_api_client_instance( + api_client = await self._create_api_client_instance( details.endpoint, namespace, exchanged_token ) @@ -180,7 +180,7 @@ async def get_service_parameters( user_token=user_token, url=details.endpoint ) - api_client = self._create_api_client_instance( + api_client = await self._create_api_client_instance( details.endpoint, details.namespace, exchanged_token ) process_description = api_client.get_process_description(details.application) From 85cd2f9e822c83f3c60ae151a7d02c851915036f Mon Sep 17 00:00:00 2001 From: bramjanssen Date: Wed, 11 Mar 2026 14:52:09 +0100 Subject: [PATCH 11/11] chore: fixed lint issues --- app/schemas/unit_job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/app/schemas/unit_job.py b/app/schemas/unit_job.py index 5194576..beff81c 100644 --- a/app/schemas/unit_job.py +++ b/app/schemas/unit_job.py @@ -16,9 +16,9 @@ class ServiceDetails(BaseModel): ) namespace: Optional[str] = Field( default=None, - description="Namespace under the endpoint where the service is hosted. For openEO, this field" - "is not set. For OGC API Processes, this field should include the namespace ID representing" - "under which the namespace-related API is deployed", + description="Namespace under the endpoint where the service is hosted. For openEO, this" + "field is not set. For OGC API Processes, this field should include the namespace ID " + "representing under which the namespace-related API is deployed", examples=["https://openeofed.dataspace.copernicus.eu"] ) application: str = Field(