Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ and this project adheres to [PEP 440](https://www.python.org/dev/peps/pep-0440/)
and uses [Semantic Versioning](https://semver.org/spec/v2.0.0.html).


## [0.7.0]

### Added
- A `meta` and a `bulk_meta`entrypoint was added to the new `hyp3.plugin` entry-point groups.
- A `--stac-items-endpoint` option was added to the `__main__.hyp3_meta` function and the `meta` entry point for HyP3 plugins which will add (POST) STAC items to the specified STAC catalog collection items endpoint.
- A `--stac-exists-ok` option was added to the `__main__.hyp3_meta` function and the `meta` entry point for HyP3 plugins which will update (PUT) existing STAC items in the specified STAC catalog collection instead of raising an HTTP 409 error when adding.
- A `stac` helper module with methods to add (POST) items to and update (PUT) items in a stac catalog collection.

### Changed
_ The HyP3 `__main__` package entrypoint now allows selecting either the `meta` (default) or `bulk_meta` hyp3 plugin entrypoint with the `++plugin` parameter
- STAC item JSONs will be published to the same prefix in the `--publish-bucket` as the source granule and other metadata files.
- `generate.save_metadata` will now return the path to the STAC, premet, spatial, and kerchunk files (optionally) written. Note: the kerchunk file path is always returned but the file may not exist if the references weren't generated.
- `process.process_itslive_meta` will now return the path to the STAC, premet and spatial files created.

### Removed
- The now unused `--publish-prefix` argument to the HyP3 `meta` entrypoint has been removed.

## [0.6.0]

### Added
Expand Down
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,25 @@ develop = [
# "uv >=0.4.7",
]



[project.scripts]
metagen = "hyp3_itslive_metadata.cryoforge.generate:main"
ingest = "hyp3_itslive_metadata.cryoforge.ingestitem:ingest_stac"
generate-catalog = "hyp3_itslive_metadata.cryoforge.generatebulk:generate_stac_catalog"
generate-from-parquet = "hyp3_itslive_metadata.cryoforge.generatebatched:generate_stac_catalog"
search-items = "hyp3_itslive_metadata.cryoforge.search_items:search_items"

[project.entry-points."hyp3.plugins"]
meta = "hyp3_itslive_metadata.__main__:hyp3_meta"
bulk_meta = "hyp3_itslive_metadata.__main__:hyp3_bulk_meta"

[project.urls]
Homepage = "https://github.com/betolink/itslive-metadata"
Documentation = "https://hyp3-docs.asf.alaska.edu"

[tool.pytest.ini_options]
testpaths = ["tests"]
script_launch_mode = "subprocess"
# FIXME: Cryoforge generated items don"t pass STAC item validation
# FIXME: Cryoforge generated items don't pass STAC item validation
norecursedirs = "tests/cryoforge"

[tool.setuptools]
Expand Down
182 changes: 154 additions & 28 deletions src/hyp3_itslive_metadata/__main__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,60 @@
"""itslive-metadata processing for HyP3."""

import argparse
import json
import logging
from argparse import ArgumentParser
import sys
from collections.abc import Iterable
from importlib.metadata import entry_points
from pathlib import Path
from urllib.parse import urlparse

import pandas as pd
from hyp3lib.aws import upload_file_to_s3
from hyp3lib.util import string_is_true
from requests import HTTPError
from tqdm.auto import tqdm

from hyp3_itslive_metadata.aws import determine_granule_uri_from_bucket, upload_file_to_s3_with_publish_access_keys
from hyp3_itslive_metadata.process import process_itslive_metadata
from hyp3_itslive_metadata.stac import add_stac_item_to_catalog, update_stac_item_in_catalog


def _str_without_trailing_slash(s: str) -> str:
return s.rstrip('/')
tqdm.pandas()


def _hyp3_upload_and_publish(
metadata_files: Iterable[Path],
*,
bucket: str | None = None,
bucket_prefix: str = '',
publish_bucket: str | None = None,
publish_prefix: str = '',
) -> None:
if bucket and bucket_prefix:
logging.info(f'Uploading metadata files to s3://{bucket}/{bucket_prefix}/')
for file in metadata_files:
upload_file_to_s3(file, bucket, bucket_prefix)

if publish_bucket:
logging.info(f'Publishing metadata files to s3://{publish_bucket}/{publish_prefix}')
for file in metadata_files:
upload_file_to_s3_with_publish_access_keys(file, bucket=publish_bucket, prefix=publish_prefix)


def _nullable_string(argument_string: str) -> str | None:
argument_string = argument_string.replace('None', '').strip()
return argument_string if argument_string else None


def main() -> None:
def _nullable_int(argument_string: str) -> int | None:
argument_string = argument_string.replace('None', '').strip()
return int(argument_string) if argument_string else None


def hyp3_meta() -> None:
"""HyP3 entrypoint for hyp3_itslive_metadata."""
parser = ArgumentParser()
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
hyp3_group = parser.add_argument_group(
'HyP3 content bucket',
'AWS S3 bucket and prefix to upload metadata product(s) to. Will also be used to find the input granule if `--granule-uri` is not provided`.',
Expand All @@ -35,17 +67,29 @@ def main() -> None:
help='URI for a granule to generate metadata for. If not provided, will find the first granule in HyP3 content bucket.',
)

publish_group = parser.add_argument_group(
'Data publishing bucket', 'AWS S3 bucket and prefix to publish STAC JSONs to.'
)
publish_group.add_argument(
parser.add_argument(
'--publish-bucket',
type=_nullable_string,
default=None,
help='Additionally, publish products to this bucket. Necessary credentials must be provided '
'via the `PUBLISH_ACCESS_KEY_ID` and `PUBLISH_SECRET_ACCESS_KEY` environment variables.',
)
publish_group.add_argument(
'--publish-prefix',
type=_str_without_trailing_slash,

parser.add_argument(
'--stac-items-endpoint',
type=_nullable_string,
default=None,
help='URI to the STAC items endpoint for the STAC collection you want to add items to. Necessary credentials must be provided '
'via the `STAC_API_TOKEN` environment variable.',
)

parser.add_argument(
'--stac-exists-ok',
type=string_is_true,
default=False,
help='If a STAC item already exists in the collection specified by `--stac-items-endpoint`, update it instead of raising an HTTP 409 error when adding.',
)

args = parser.parse_args()

if args.granule_uri is None:
Expand All @@ -54,30 +98,112 @@ def main() -> None:
else:
raise ValueError('Must provide --granule-uri or --bucket')

if args.publish_bucket and not args.publish_prefix:
raise ValueError('If you provide --publish-bucket you mist also provide --publish-prefix')

logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO
)
logging.info(f'Processing itslive metadata with args: {args}')

metadata_files = process_itslive_metadata(args.granule_uri)
publish_prefix = str(Path(urlparse(args.granule_uri).path).parent).lstrip('/')
_hyp3_upload_and_publish(
metadata_files,
bucket=args.bucket,
bucket_prefix=args.bucket_prefix,
publish_bucket=args.publish_bucket,
publish_prefix=publish_prefix,
)

if args.bucket and args.bucket_prefix:
logging.info(f'Uploading metadata files to s3://{args.bucket}/{args.bucket_prefix}/')
for file in metadata_files:
upload_file_to_s3(file, args.bucket, args.bucket_prefix)

if args.publish_bucket:
for file in metadata_files:
if '.stac.json' in file.name:
logging.info(f'Publishing STAC JSON to: s3://{args.publish_bucket}/{args.publish_prefix}/{file.name}')
upload_file_to_s3_with_publish_access_keys(file, bucket=args.publish_bucket, prefix=args.publish_prefix)
if args.stac_items_endpoint:
item_json = json.loads(metadata_files[0].read_text())
logging.info(f'Adding {item_json["id"]} to {args.stac_items_endpoint}')
try:
add_stac_item_to_catalog(item_json, items_endpoint=args.stac_items_endpoint)
except HTTPError as e:
if e.response.status_code == 409 and args.stac_exists_ok:
logging.info(f'Exists! Updating {item_json["id"]} in {args.stac_items_endpoint}')
update_stac_item_in_catalog(item_json, items_endpoint=args.stac_items_endpoint)
else:
granule_prefix = str(Path(urlparse(args.granule_uri).path).parent).lstrip('/')
logging.info(f'Publishing {file.suffix} to: s3://{args.publish_bucket}/{granule_prefix}/{file.name}')
upload_file_to_s3_with_publish_access_keys(file, bucket=args.publish_bucket, prefix=granule_prefix)
raise e


def hyp3_bulk_meta() -> None:
"""HyP3 entrypoint for bulk running hyp3_itslive_metadata."""
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
hyp3_group = parser.add_argument_group(
'HyP3 content bucket',
'AWS S3 bucket and prefix to upload metadata product(s) to.',
)
hyp3_group.add_argument('--bucket')
hyp3_group.add_argument('--bucket-prefix', default='')

parser.add_argument(
'--granules-parquet',
default='s3://its-live-data/test-space/stac/granules_to_recrop.parquet',
help='URI to a parquet file containing granule URIs to generate metadata for',
)

parser.add_argument(
'--start-idx', type=_nullable_int, default=0, help='Start index of the granules to generate metadata for.'
)
parser.add_argument(
'--stop-idx', type=_nullable_int, default=None, help='Stop index of the granules to generate metadata for.'
)
parser.add_argument('--keep', action='store_true', help='Keep all generated metadata files on disk.')
parser.add_argument(
'--publish-bucket',
type=_nullable_string,
default=None,
help='Additionally, publish products to this bucket. Necessary credentials must be provided '
'via the `PUBLISH_ACCESS_KEY_ID` and `PUBLISH_SECRET_ACCESS_KEY` environment variables.',
)

args = parser.parse_args()

logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.INFO
)
logging.info(f'Processing bulk itslive metadata with args: {args}')

df = pd.read_parquet(args.granules_parquet, engine='pyarrow')

stac_ndjson = Path.cwd() / f'{Path(args.granules_parquet).stem}_{args.start_idx}-{args.stop_idx}.ndjson'
with stac_ndjson.open('w') as ndjson_file:
for granule_bucket, granule_key in tqdm(
df.loc[args.start_idx : args.stop_idx, ['bucket', 'key']].itertuples(index=False), initial=args.start_idx
):
metadata_files = process_itslive_metadata(f's3://{granule_bucket}/{granule_key}')

stac_line = json.dumps(json.loads(metadata_files[0].read_text()))
ndjson_file.write(stac_line + '\n')

_hyp3_upload_and_publish(
metadata_files, publish_bucket=args.publish_bucket, publish_prefix=str(Path(granule_key).parent)
)

if not args.keep:
for metadata_file in metadata_files:
metadata_file.unlink()

_hyp3_upload_and_publish([stac_ndjson], bucket=args.bucket, bucket_prefix=args.bucket_prefix)


def main() -> None:
"""Main entrypoint for HyP3."""
parser = argparse.ArgumentParser(prefix_chars='+', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'++plugin',
choices=['meta', 'bulk_meta'],
default='meta',
help='Select the hyp3-plugin entrypoint to use', # as specified in `pyproject.toml`
)
parser.add_argument('++omp-num-threads', type=int, help='The number of OpenMP threads to use for parallel regions')

args, unknowns = parser.parse_known_args()

discovered_plugins = entry_points(group='hyp3.plugins')

sys.argv = [args.plugin, *unknowns]
sys.exit(discovered_plugins[args.plugin].load()())


if __name__ == '__main__':
Expand Down
51 changes: 22 additions & 29 deletions src/hyp3_itslive_metadata/cryoforge/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,8 @@ def create_stac_item(ds, geom, url):
scene_1_frame = scene_1_split[2]
scene_2_frame = scene_2_split[2]
elif mission.startswith('S1'):
scene_1_frame = ds['img_pair_info'].frame_img1
scene_2_frame = ds['img_pair_info'].frame_img2
scene_1_frame = ds['img_pair_info'].attrs.get('frame_img1', 'N/A')
scene_2_frame = ds['img_pair_info'].attrs.get('frame_img2', 'N/A')
elif mission.startswith('S2'):
scene_1_frame = scene_1_split[5]
scene_2_frame = scene_2_split[5]
Expand Down Expand Up @@ -510,44 +510,37 @@ def generate_itslive_metadata(url: str, store: Any = None, with_kerchunk: bool =
}


def save_metadata(metadata: dict, outdir: str = '.'):
def save_metadata(metadata: dict, outdir: str = '.') -> tuple[str, str, str, str]:
"""Save STAC item to filesystem or S3"""
fs = fsspec.filesystem(outdir.split('://')[0] if '://' in outdir else 'file')
stac_id = metadata['stac'].id

if outdir.startswith('s3'):
stac_s3_url = metadata['stac'].assets.get('data').extra_fields.get('alternate', [])['s3']['href']
bucket_path = '/'.join(stac_s3_url.split('/')[0:-1])
granule_path = f'{bucket_path}/{metadata["stac"].id}'
logging.info(f'Saving metadata to {bucket_path}/')
with fs.open(f'{granule_path}.stac.json', 'w') as f:
json.dump(metadata['stac'].to_dict(), f, indent=2)

with fs.open(f'{granule_path}.nc.premet', 'w') as f:
f.write(metadata['nsidc_meta'])
with fs.open(f'{granule_path}.nc.spatial', 'w') as f:
f.write(metadata['nsidc_spatial'])

if metadata['kerchunk'] is not None:
with fs.open(f'{granule_path}.ref.json', 'w') as f:
json.dump(metadata['kerchunk'], f, indent=2)

granule_path = '/'.join(stac_s3_url.split('/')[0:-1])
else:
granule_path = Path(outdir)

with fs.open(granule_path / Path(f'{metadata["stac"].id}.stac.json'), 'w') as f:
json.dump(metadata['stac'].to_dict(), f, indent=2)
logging.info(f'Saving metadata to {granule_path}')

with fs.open(granule_path / Path(f'{metadata["stac"].id}.nc.premet'), 'w') as f:
f.write(metadata['nsidc_meta'])
stac_item = f'{granule_path}/{stac_id}.stac.json'
with fs.open(stac_item, 'w') as f:
json.dump(metadata['stac'].to_dict(), f, indent=2)

with fs.open(granule_path / Path(f'{metadata["stac"].id}.nc.spatial'), 'w') as f:
f.write(metadata['nsidc_spatial'])
if metadata['kerchunk'] is not None:
with fs.open(granule_path / Path(f'{metadata["stac"].id}.ref.json'), 'w') as f:
json.dump(metadata['kerchunk'], f, indent=2)
premet = f'{granule_path}/{stac_id}.nc.premet'
with fs.open(premet, 'w') as f:
f.write(metadata['nsidc_meta'])

logging.info(f'Saving metadata to {granule_path}')
spatial = f'{granule_path}/{stac_id}.nc.spatial'
with fs.open(spatial, 'w') as f:
f.write(metadata['nsidc_spatial'])

kerchunk = f'{granule_path}/{stac_id}.ref.json'
if metadata['kerchunk'] is not None:
with fs.open(kerchunk, 'w') as f:
json.dump(metadata['kerchunk'], f, indent=2)

# save stac item
return stac_item, premet, spatial, kerchunk


def parse_args() -> argparse.Namespace:
Expand Down
11 changes: 6 additions & 5 deletions src/hyp3_itslive_metadata/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@
log = logging.getLogger(__name__)


def process_itslive_metadata(granule_uri: str) -> list[Path]:
def process_itslive_metadata(granule_uri: str) -> tuple[Path, Path, Path]:
"""Generates ITS_LIVE granule metadata files from a source S3 bucket and prefix.

Args:
granule_uri: URI to the granule or folder (s3://<bucket>/<prefix>) for the granule.

Outputs:
str: S3 path of the generated STAC item.
stac_item: local path to the generated STAC item.
premet: local path to the generated NSIDC premet file.
spatial: local path to the generated NSIDC spatial file.
"""
log.info(f'Processing itslive metadata for granule: {granule_uri}')
metadata = generate_itslive_metadata(
Expand All @@ -27,7 +29,6 @@ def process_itslive_metadata(granule_uri: str) -> list[Path]:
# saves the stac item and the NSIDC spatial+premet metadata files
output_path = Path('./output')
output_path.mkdir(parents=True, exist_ok=True)
save_metadata(metadata, './output')
file_paths = [f for f in Path('./output').glob('*') if not f.name.endswith('.ref.json')]
stac_item, premet, spatial, _ = save_metadata(metadata, './output')

return file_paths
return Path(stac_item), Path(premet), Path(spatial)
Loading