Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 88 additions & 109 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1064,25 +1064,17 @@ def job_results_canonical_url() -> str:
# TODO: also encrypt user id?
# TODO: encode all stuff (signature, userid, expiry) in a single blob in the URL

if partial:
return url_for(
".list_job_results_signed",
job_id=job_id,
user_base64=user_base64,
expires=expires,
secure_key=secure_key,
_external=True,
partial="true",
)
else:
return url_for(
".list_job_results_signed",
job_id=job_id,
user_base64=user_base64,
expires=expires,
secure_key=secure_key,
_external=True,
)
extra_values = {"partial": "true"} if partial else {}

return url_for(
".list_job_results_signed",
job_id=job_id,
user_base64=user_base64,
expires=expires,
secure_key=secure_key,
_external=True,
**extra_values,
)

with TimingLogger(f"backend_implementation.batch_jobs.get_job_info({job_id=}, {user_id=})", _log):
job_info = backend_implementation.batch_jobs.get_job_info(job_id, user_id)
Expand All @@ -1094,7 +1086,7 @@ def job_results_canonical_url() -> str:
result = {
"openeo:status": PARTIAL_JOB_STATUS.for_job_status(job_info.status),
"type": "Collection",
"stac_version": "1.0.0",
"stac_version": "1.0.0", # no result metadata yet to go by, default
"id": job_id,
"title": job_info.title or "Unfinished batch job {job_id}",
"description": job_info.description or f"Results for batch job {job_id}",
Expand All @@ -1117,9 +1109,6 @@ def job_results_canonical_url() -> str:
result_metadata = backend_implementation.batch_jobs.get_result_metadata(
job_id=job_id, user_id=user_id
)
result_assets = result_metadata.assets
result_items = result_metadata.items
providers = result_metadata.providers

# TODO: remove feature toggle, during refactoring for openeo-geopyspark-driver#440
# https://github.com/Open-EO/openeo-geopyspark-driver/issues/440
Expand Down Expand Up @@ -1155,14 +1144,19 @@ def job_results_canonical_url() -> str:
}
)

assets = {
result_assets = result_metadata.assets
result_items = result_metadata.items
providers = result_metadata.providers

stac_1_1 = bool(result_items)

out_assets = {
filename: _asset_object(
job_id=job_id,
user_id=user_id,
filename=filename,
asset_metadata=asset_metadata,
job_info=job_info,
stac11=False,
)
for filename, asset_metadata in result_assets.items()
if asset_metadata.get("asset", True)
Expand All @@ -1171,11 +1165,11 @@ def job_results_canonical_url() -> str:
if TREAT_JOB_RESULTS_V100_LIKE_V110 or requested_api_version().at_least("1.1.0"):
ml_model_metadata = None

def job_result_item_url(item_id, is11 = False) -> str:
def job_result_item_url(item_id) -> str:
signer = get_backend_config().url_signer

method_start = ".get_job_result_item"
if is11:
if stac_1_1:
method_start = method_start + "11"
if not signer:
return url_for(method_start, job_id=job_id, item_id=item_id, _external=True)
Expand All @@ -1193,33 +1187,18 @@ def job_result_item_url(item_id, is11 = False) -> str:
_external=True,
)

if stac_1_1:
out_assets = {}

for item_id, item_metadata in result_items.items():
for asset_key, asset_metadata in item_metadata.get("assets", {}).items():
# top-level asset keys should be unique
out_assets[f"{item_id}_{asset_key}"] = _publish_asset(asset_metadata, job_id, user_id)

if result_metadata.items :
assets = {}
for item_key, item_metadata in result_items.items():
for asset_key, asset_metadata in item_metadata.get("assets").items():
if "output_dir" in asset_metadata:
out_dir = asset_metadata.get("output_dir")
_log.info(f"asset has output dir {out_dir} and href {asset_metadata.get('href')}")
common = os.path.commonpath([asset_metadata.get('href'), out_dir])
href = os.path.relpath(asset_metadata.get('href'),common)
else:
href = asset_metadata.get("href")
assets[item_key + "_" + asset_key] = _asset_object(
job_id=job_id,
user_id=user_id,
filename= href,
asset_metadata=asset_metadata,
job_info=job_info,
stac11=True,
)
for item_id in result_metadata.items.keys():
links.append(
{"rel": "item", "href": job_result_item_url(item_id=item_id, is11=True), "type": stac_item_media_type}
{"rel": "item", "href": job_result_item_url(item_id=item_id), "type": stac_item_media_type}
)
stac_version = "1.1.0"
else:

for filename, metadata in result_assets.items():
if ("data" in metadata.get("roles", []) and
any(media_type in metadata.get("type", "") for media_type in
Expand All @@ -1233,12 +1212,11 @@ def job_result_item_url(item_id, is11 = False) -> str:
links.append(
{"rel": "item", "href": job_result_item_url(item_id=filename), "type": "application/json"}
)
stac_version = "1.0.0"

result = dict_no_none(
{
"type": "Collection",
"stac_version": stac_version,
"stac_version": "1.1.0" if stac_1_1 else "1.0.0",
"stac_extensions": [
STAC_EXTENSION.EO_V110,
STAC_EXTENSION.FILEINFO,
Expand All @@ -1258,7 +1236,7 @@ def job_result_item_url(item_id, is11 = False) -> str:
"summaries": {"instruments": job_info.instruments } if job_info.instruments else {},
"providers": providers or None,
"links": links,
"assets": assets,
"assets": out_assets,
"openeo:status": PARTIAL_JOB_STATUS.FINISHED,
}
)
Expand All @@ -1285,7 +1263,7 @@ def job_result_item_url(item_id, is11 = False) -> str:
"stac_version": "0.9.0",
"id": job_info.id,
"properties": _properties_from_job_info(job_info),
"assets": assets,
"assets": out_assets,
"links": links,
"openeo:status": PARTIAL_JOB_STATUS.FINISHED,
}
Expand Down Expand Up @@ -1314,6 +1292,38 @@ def job_result_item_url(item_id, is11 = False) -> str:
# TODO "OpenEO-Costs" header?
return jsonify(result)

def _publish_asset(asset_metadata: dict, job_id: str, user_id: str) -> dict: # TODO: for lack of a better name
result_asset = copy.deepcopy(asset_metadata)

public_href = result_asset.get(BatchJobs.ASSET_PUBLIC_HREF)
if not public_href:
# TODO: explicit relative path (https://github.com/Open-EO/openeo-python-driver/issues/449), especially
# because this does not work for S3 assets that lack output_dir (unless you configure PresignedS3AssetUrls)
# asset key is no longer relative asset path so construct it ourselves
output_dir = result_asset.get("output_dir")
if output_dir:
common = os.path.commonpath([result_asset["href"], output_dir])
relative_asset_path = os.path.relpath(result_asset["href"], common)
else:
relative_asset_path = os.path.basename(result_asset["href"]) # TODO: does not work for filepath_per_band (https://github.com/Open-EO/openeo-python-driver/issues/449)

public_href = backend_implementation.config.asset_url.build_url(
asset_metadata=result_asset, asset_name=relative_asset_path, job_id=job_id, user_id=user_id
)

result_asset["href"] = public_href

# TODO: settle on convention for internal properties
for internal_property in [
"asset",
"output_dir",
BatchJobs.ASSET_PUBLIC_HREF,
]:
result_asset.pop(internal_property, None)

return result_asset


# TODO: Issue #232, TBD: refactor download functionality? more abstract, just stream blocks of bytes from S3 or from a directory.
def _download_job_result(
job_id: str, filename: str, user_id: str
Expand Down Expand Up @@ -1435,31 +1445,23 @@ def get_job_result_item11(job_id: str, item_id: str, user: User) -> flask.Respon
return _get_job_result_item11(job_id, item_id, user.user_id)

def _get_job_result_item11(job_id, item_id, user_id):
if item_id == DriverMlModel.METADATA_FILE_NAME:
return _download_ml_model_metadata(job_id, item_id, user_id)

metadata = backend_implementation.batch_jobs.get_result_metadata(
job_id=job_id, user_id=user_id
)

if item_id not in metadata.items:
raise OpenEOApiException("Item with id {item_id!r} not found in job {job_id!r}".format(item_id=item_id, job_id=job_id), status_code=404)
item_metadata = metadata.items.get(item_id,None)
item_metadata = metadata.items.get(item_id)

job_info = backend_implementation.batch_jobs.get_job_info(job_id, user_id)
if not item_metadata:
raise OpenEOApiException(
"Item with id {item_id!r} not found in job {job_id!r}".format(item_id=item_id, job_id=job_id),
status_code=404,
)

assets = {}
for asset_key, asset in item_metadata.get("assets", {}).items():
if "output_dir" in asset:
out_dir = asset.get("output_dir")
_log.info(f"asset has output dir {out_dir} and href {asset.get('href')}")
common = os.path.commonpath([asset.get('href'), out_dir])
href = os.path.relpath(asset.get('href'), common)
else:
_log.info(f"asset has no output dir and href {asset.get('href')}")
href = asset.get("href")
assets[asset_key] = _asset_object(job_id, user_id, href, asset, job_info, stac11=True)
job_info = backend_implementation.batch_jobs.get_job_info(job_id, user_id)

out_assets = {}
for asset_key, asset_metadata in item_metadata.get("assets", {}).items():
out_assets[f"{item_id}_{asset_key}"] = _publish_asset(asset_metadata, job_id, user_id)

properties = item_metadata.get("properties", {"datetime": item_metadata.get("datetime")})
if properties["datetime"] is None:
Expand Down Expand Up @@ -1522,7 +1524,7 @@ def _get_job_result_item11(job_id, item_id, user_id):
},
]
+ auxiliary_links,
"assets": assets,
"assets": out_assets,
"collection": job_id,
}
# Add optional items, if they are present.
Expand Down Expand Up @@ -1689,9 +1691,7 @@ def _get_job_result_item(job_id, item_id, user_id):
"type": "application/json",
},
],
"assets": {
asset_filename: _asset_object(job_id, user_id, asset_filename, asset_metadata, job_info, stac11=False)
},
"assets": {asset_filename: _asset_object(job_id, user_id, asset_filename, asset_metadata, job_info)},
"collection": job_id,
}
# Add optional items, if they are present.
Expand Down Expand Up @@ -1735,9 +1735,7 @@ def _download_ml_model_metadata(job_id: str, file_name: str, user_id) -> flask.R
resp.mimetype = stac_item_media_type
return resp

def _asset_object(
job_id, user_id, filename: str, asset_metadata: dict, job_info: BatchJobMetadata, stac11: bool
) -> dict:
def _asset_object(job_id, user_id, filename: str, asset_metadata: dict, job_info: BatchJobMetadata) -> dict:
result_dict = dict_no_none(
{
"title": asset_metadata.get("title", filename),
Expand All @@ -1748,7 +1746,7 @@ def _asset_object(
"type": asset_metadata.get("type", asset_metadata.get("media_type", "application/octet-stream")),
"roles": asset_metadata.get("roles", ["data"]),
# TODO: eliminate this legacy "raster:bands" construct at some point?
"raster:bands": None if stac11 else asset_metadata.get("raster:bands"),
"raster:bands": asset_metadata.get("raster:bands"),
"file:size": asset_metadata.get("file:size"),
"alternate": asset_metadata.get("alternate"),
}
Expand Down Expand Up @@ -1776,35 +1774,16 @@ def _asset_object(
]

# TODO: eliminate this legacy "eo:bands" construct at some point?
if not stac11:
result_dict["eo:bands"] = [
dict_no_none(
{
"name": band.name,
"common_name": band.common_name,
"center_wavelength": band.wavelength_um,
}
)
for band in bands
]
else:
def raster_bands(band_index) -> dict:
rb = asset_metadata.get("raster:bands", [])
return rb[band_index] if band_index < len(rb) else {}

result_dict["bands"] = [
dict_no_none(
{
**{
"name": band.name,
"eo:common_name": band.common_name,
"eo:center_wavelength": band.wavelength_um,
},
**raster_bands(i),
}
)
for (i, band) in enumerate(bands)
]
result_dict["eo:bands"] = [
dict_no_none(
{
"name": band.name,
"common_name": band.common_name,
"center_wavelength": band.wavelength_um,
}
)
for band in bands
]

result_dict.update(
dict_no_none(
Expand Down
Loading