Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions app/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,14 @@ async def exchange_token(user_token: str, url: str) -> str:
:return: The bearer token as a string.
"""

logger.debug(f"Exchanging token for backend {url}")

provider = settings.backend_auth_config[url].token_provider
token_prefix = settings.backend_auth_config[url].token_prefix

if not provider or not token_prefix:
if not provider:
raise ValueError(
f"Backend '{url}' must define 'token_provider' and 'token_prefix'"
f"Backend '{url}' must define 'token_provider'"
)

platform_token = await _exchange_token_for_provider(
Expand Down
169 changes: 162 additions & 7 deletions app/platforms/implementations/ogc_api_process.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,55 @@
import re
from typing import List
from app.auth import exchange_token
from fastapi import Response
from loguru import logger

from app.platforms.base import BaseProcessingPlatform
from app.platforms.dispatcher import register_platform
from app.schemas.enum import OutputFormatEnum, ProcessTypeEnum, ProcessingStatusEnum
from app.schemas.parameters import Parameter
from app.schemas.unit_job import ServiceDetails
from stac_pydantic import Collection
from ogc_api_client import Configuration
from ogc_api_client.api_client_wrapper import ApiClientWrapper
from ogc_api_client.models.status_info import StatusCode


@register_platform(ProcessTypeEnum.OGC_API_PROCESS)
class OGCAPIProcessPlatform(BaseProcessingPlatform):

application_path_regex = re.compile(
r"(?P<namespace>.+)/processes/(?P<process_id>[^/]+)$"
)

"""
OGC API Process processing platform implementation.
This class handles the execution of processing jobs on the OGC API Process platform.
"""

def _split_job_id(self, job_id):
parts = job_id.split(":", 1)
if len(parts) != 2:
return (None, job_id)
return tuple(parts)

async def _create_api_client_instance(
self,
endpoint: str,
namespace: str,
user_token: str = None,
) -> ApiClientWrapper:
configuration: Configuration = Configuration(
host=f"{endpoint}/{namespace}" if namespace else endpoint
)

additional_args = {}
if user_token:
additional_args["header_name"] = "Authorization"
additional_args["header_value"] = f"Bearer {user_token}"

return ApiClientWrapper(configuration, **additional_args)

async def execute_job(
self,
user_token: str,
Expand All @@ -23,7 +58,39 @@ async def execute_job(
parameters: dict,
format: OutputFormatEnum,
) -> str:
raise NotImplementedError("OGC API Process job execution not implemented yet.")
logger.info(f"Executing OGC API job with title={title}")

# Exchanging token
logger.debug("Exchanging user token for OGC API Process execution...")
exchanged_token = await exchange_token(
user_token=user_token, url=details.endpoint
)

# Output format omitted from request
api_client = await self._create_api_client_instance(
details.endpoint, details.namespace, exchanged_token
)

headers = {
"accept": "*/*",
# "Prefer": "respond-async;return=representation",
"Content-Type": "application/json",
}
if exchanged_token:
headers["Authorization"] = f"Bearer {exchanged_token}"

data = {"inputs": {key: value for key, value in parameters.items()}}

content = api_client.execute_simple(
process_id=details.application, execute=data, _headers=headers
)

job_id = content.job_id

# Return the namespace along with the job ID if needed
if details.namespace:
return f"{details.namespace}:{job_id}"
return job_id

async def execute_synchronous_job(
self,
Expand All @@ -33,25 +100,113 @@ async def execute_synchronous_job(
parameters: dict,
format: OutputFormatEnum,
) -> Response:
# This is currently not supported

raise NotImplementedError("OGC API Process job execution not implemented yet.")

def _map_ogcapi_status(self, ogcapi_status: str) -> ProcessingStatusEnum:
"""
Map the status returned by OGC API to a status known within the API.

:param status: Status text returned by OGC API.
:return: ProcessingStatusEnum corresponding to the input.
"""

logger.debug(f"Mapping OGC API status {ogcapi_status} to ProcessingStatusEnum")

mapping = {
StatusCode.ACCEPTED: ProcessingStatusEnum.CREATED,
StatusCode.RUNNING: ProcessingStatusEnum.RUNNING,
StatusCode.DISMISSED: ProcessingStatusEnum.CANCELED,
StatusCode.SUCCESSFUL: ProcessingStatusEnum.FINISHED,
StatusCode.FAILED: ProcessingStatusEnum.FAILED,
}

try:
return mapping[ogcapi_status]
except (AttributeError, KeyError):
logger.warning("Mapping of unknown OGC API status: %r", ogcapi_status)
return ProcessingStatusEnum.UNKNOWN

async def get_job_status(
self, user_token: str, job_id: str, details: ServiceDetails
) -> ProcessingStatusEnum:
raise NotImplementedError(
"OGC API Process job status retrieval not implemented yet."
logger.debug(f"Fetching job status for OGC API job with ID {job_id}")

logger.debug("Exchanging user token for OGC API Process execution...")
exchanged_token = await exchange_token(
user_token=user_token, url=details.endpoint
)

# Job ID is composed of namespace and internal job id
namespace, internal_job_id = self._split_job_id(job_id)
api_client = await self._create_api_client_instance(
details.endpoint, namespace, exchanged_token
)

status_info = api_client.get_status(job_id=internal_job_id)
return self._map_ogcapi_status(status_info.status)

async def get_job_results(
self, user_token: str, job_id: str, details: ServiceDetails
) -> Collection:
raise NotImplementedError(
"OGC API Process job result retrieval not implemented yet."
logger.debug(f"Fetching job result for opfenEO job with ID {job_id}")

logger.debug("Exchanging user token for OGC API Process execution...")
exchanged_token = await exchange_token(
user_token=user_token, url=details.endpoint
)

# Job ID is composed of namespace and internal job id
namespace, internal_job_id = self._split_job_id(job_id)
api_client = await self._create_api_client_instance(
details.endpoint, namespace, exchanged_token
)

result = api_client.get_result(job_id=internal_job_id)
return Collection(result[0])

async def get_service_parameters(
self, user_token: str, details: ServiceDetails
) -> List[Parameter]:
raise NotImplementedError(
"OGC API Process service parameter retrieval not implemented yet."

parameters = []
logger.debug(
f"Fetching service parameters for OGC API process with ID {details.application}"
)

logger.debug("Exchanging user token for OGC API Process execution...")
exchanged_token = await exchange_token(
user_token=user_token, url=details.endpoint
)

api_client = await self._create_api_client_instance(
details.endpoint, details.namespace, exchanged_token
)
process_description = api_client.get_process_description(details.application)

for input_id, input_details in process_description.inputs.items():
input_type = input_id, input_details.model_dump().get("var_schema", {}).get(
"actual_instance", {}
).get("type", "string")
if isinstance(input_type, tuple):
input_type = next(
(
t
for t in input_type
if t in ["date-interval", "bounding-box", "boolean"]
),
"string",
)

parameters.append(
Parameter(
name=input_id,
description=input_details.description,
default=None,
optional=(input_details.min_occurs == 0),
type="string",
)
)

return parameters
2 changes: 2 additions & 0 deletions app/schemas/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@


class ParamTypeEnum(str, Enum):
DATETIME = "datetime"
DATE_INTERVAL = "date-interval"
BOUNDING_BOX = "bounding-box"
POLYGON = "polygon"
BOOLEAN = "boolean"
INTEGER = "integer"
DOUBLE = "double"
STRING = "string"
ARRAY_STRING = "array-string"

Expand Down
8 changes: 8 additions & 0 deletions app/schemas/unit_job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime

from pydantic import BaseModel, Field
from typing import Optional

from app.schemas.enum import OutputFormatEnum, ProcessingStatusEnum, ProcessTypeEnum

Expand All @@ -13,6 +14,13 @@ class ServiceDetails(BaseModel):
"platform API",
examples=["https://openeofed.dataspace.copernicus.eu"],
)
namespace: Optional[str] = Field(
default=None,
description="Namespace under the endpoint where the service is hosted. For openEO, this"
"field is not set. For OGC API Processes, this field should include the namespace ID "
"representing under which the namespace-related API is deployed",
examples=["https://openeofed.dataspace.copernicus.eu"]
)
application: str = Field(
...,
description="Path to the application that needs to be executed. For openEO this is "
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ SQLAlchemy
stac-pydantic
types-requests
types-shapely
uvicorn[standard]
uvicorn[standard]
ogc-api-client @ git+https://github.com/EOEPCA/ogc-api-client.git@develop#subdirectory=src
Loading