Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0308139
BREAKING CHANGE : removing cumulus code
wphyojpl Oct 7, 2025
1ebec57
BREAKING CHANGE : removing cumulus code
wphyojpl Oct 15, 2025
513c8d3
feat: new daac delivery logic
wphyojpl Oct 30, 2025
c9677cb
feat: implement some methods with claude help
wphyojpl Oct 30, 2025
4d7620a
fix: update staging file method
wphyojpl Nov 4, 2025
7842a6d
feat: keep it to 1 method
wphyojpl Nov 5, 2025
2b006a5
fix: finish testing update stac with mock stac fast api
wphyojpl Nov 5, 2025
9aa23a4
feat: old extract files method copied
wphyojpl Nov 5, 2025
7181599
feat: update extract_files method
wphyojpl Nov 5, 2025
1b18ccc
feat; add test case
wphyojpl Nov 5, 2025
c418d0a
feat: more test case
wphyojpl Nov 5, 2025
ab3d8ec
feat: ddb for authorizer
wphyojpl Nov 24, 2025
f3f8276
fix: case insensitivity
wphyojpl Nov 24, 2025
9408694
feat: update authorize methods based on main business logic
wphyojpl Nov 25, 2025
9a3e738
feat: re-creating vpc
wphyojpl Nov 25, 2025
daac3b0
fix: add dependency for cidr blk in subnet
wphyojpl Nov 25, 2025
f3e1aa3
chore: rename folder
wphyojpl Nov 25, 2025
305f3b6
feat: adding terraform code for catalya
wphyojpl Dec 4, 2025
23381a0
feat: adding code for catalya use case
wphyojpl Dec 10, 2025
20a5f3d
chore: prep for collection archive
wphyojpl Dec 10, 2025
5aa7f42
feat: untested AI code for collection archive
wphyojpl Dec 10, 2025
38402df
fix: add test case for parallelism processing
wphyojpl Dec 16, 2025
0d33503
feat: update code to store response in ddb
wphyojpl Dec 17, 2025
2744bb2
feat: update terraform
wphyojpl Dec 17, 2025
2c5b2db
fix: using ssm to get details
wphyojpl Jan 6, 2026
061e317
feat: ecs tf
wphyojpl Jan 8, 2026
e3b001f
feat: docker image from var
wphyojpl Jan 21, 2026
1413a6c
feat: api gateway added
wphyojpl Jan 22, 2026
28cee51
fix: add missing module
wphyojpl Jan 22, 2026
41aaeff
feat: bucket creation
wphyojpl Jan 22, 2026
c5b09ff
chore: update description
wphyojpl Jan 22, 2026
1974cbb
feat: starting authorizer
wphyojpl Jan 22, 2026
bc9af43
fix: add backoff lib
wphyojpl Jan 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 204 additions & 0 deletions cumulus_lambda_functions/catalya_uds_api/auth_admin_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
from typing import Union

from cumulus_lambda_functions.daac_archiver.catalia_auth_db import CataliaAuthDb
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
from fastapi import APIRouter, HTTPException, Request, Response

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


import json
import os

from pydantic import BaseModel

from cumulus_lambda_functions.lib.authorization.uds_authorizer_abstract import UDSAuthorizorAbstract
from cumulus_lambda_functions.lib.authorization.uds_authorizer_factory import UDSAuthorizerFactory
from mdps_ds_lib.lib.utils.json_validator import JsonValidator

from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())


class AuthDeleteModel(BaseModel):
source: str
target: str
group_name: str


delete_schema = {
'type': 'object',
'required': ['source', 'target', 'group_name'],
'properties': {
'source': {'type': 'string'},
'target': {'type': 'string'},
'group_name': {'type': 'string'},
}
}


class AuthListModel(BaseModel):
group_name: list[str]


list_schema = {
'type': 'object',
'properties': {
'tenant': {'type': 'string'},
'venue': {'type': 'string'},
'group_names': {
'type': 'array',
'items': {'type': 'string'},
'minItems': 1,
},
}
}


class AuthAddModel(BaseModel):
source: str
target: str
group_name: str
access: bool


add_schema = {
'type': 'object',
'required': ['source', 'target', 'group_name', 'access'],
'properties': {
'source': {'type': 'string'},
'target': {'type': 'string'},
'group_name': {'type': 'string'},
'access': {'type': 'boolean'},
}
}


class AuthCrud:
def __init__(self, authorization_info, request_body):
required_env = ['ADMIN_COMMA_SEP_GROUPS', 'CATALYA_DB_NAME']
if not all([k in os.environ for k in required_env]):
raise EnvironmentError(f'one or more missing env: {required_env}')
self.__request_body = request_body
self.__authorization_info = authorization_info
self.__admin_groups = [k.strip() for k in os.getenv('ADMIN_COMMA_SEP_GROUPS').split(',')]
self.__cad = CataliaAuthDb(os.getenv('CATALYA_DB_NAME'))

def is_admin(self):
belonged_admin_groups = list(set(self.__admin_groups) & set(self.__authorization_info['ldap_groups']))
if len(belonged_admin_groups) < 1:
LOGGER.warn(f'unauthorized attempt to admin function: {self.__authorization_info}')
return {
'statusCode': 403,
'body': {'message': f'user is not in admin groups: {self.__admin_groups}'}
}
return {
'statusCode': 200,
'body': {}
}

def list_all_record(self):
return {
'statusCode': 501,
'body': {'message': 'Not Implemented Yet'}
}
# return {
# 'statusCode': 200,
# 'body': all_records
# }

def add_new_record(self):
body_validator_result = JsonValidator(add_schema).validate(self.__request_body)
if body_validator_result is not None:
LOGGER.error(f'invalid add body: {body_validator_result}. request_body: {self.__request_body}')
return {
'statusCode': 500,
'body': f'invalid add body: {body_validator_result}. request_body: {self.__request_body}'
}
self.__cad.add(self.__request_body['group_name'], self.__request_body['source'], self.__request_body['target'], self.__request_body['access'])
return {
'statusCode': 200,
'body': {'message': 'inserted'}
}

def delete_record(self):
body_validator_result = JsonValidator(delete_schema).validate(self.__request_body)
if body_validator_result is not None:
LOGGER.error(f'invalid delete body: {body_validator_result}. request_body: {self.__request_body}')
return {
'statusCode': 500,
'body': f'invalid delete body: {body_validator_result}. request_body: {self.__request_body}'
}
self.__cad.delete(self.__request_body['group_name'], self.__request_body['source'], self.__request_body['target'])
return {
'statusCode': 200,
'body': {'message': 'deleted'}
}


router = APIRouter(
prefix=f'/{WebServiceConstants.ADMIN}/auth',
tags=["Admin Records CRUD (Admins-Only)"],
responses={404: {"description": "Not found"}},
)

@router.delete("")
@router.delete("/")
async def delete_auth_mapping(request: Request, delete_body: AuthDeleteModel):
"""
Deleting one authorization mapping
"""
LOGGER.debug(f'started delete_auth_mapping')
auth_info = FastApiUtils.get_authorization_info(request)
auth_crud = AuthCrud(auth_info, delete_body.model_dump())
is_admin_result = auth_crud.is_admin()
if is_admin_result['statusCode'] != 200:
raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body'])
delete_result = auth_crud.delete_record()
if delete_result['statusCode'] == 200:
return delete_result['body']
raise HTTPException(status_code=delete_result['statusCode'], detail=delete_result['body'])

@router.post("")
@router.post("/")
async def add_auth_mapping(request: Request, new_body: AuthAddModel):
"""
Adding a new Authorization mapping
"""
LOGGER.debug(f'started add_auth_mapping. sss {new_body.model_dump()}')
auth_info = FastApiUtils.get_authorization_info(request)
auth_crud = AuthCrud(auth_info, new_body.model_dump())
is_admin_result = auth_crud.is_admin()
if is_admin_result['statusCode'] != 200:
raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body'])
add_result = auth_crud.add_new_record()
if add_result['statusCode'] == 200:
return add_result['body']
raise HTTPException(status_code=add_result['statusCode'], detail=add_result['body'])


@router.get("")
@router.get("/")
async def list_auth_mappings(request: Request, tenant: Union[str, None]=None, venue: Union[str, None]=None, group_names: Union[str, None]=None):
"""
Listing all exsiting Authorization Mapping.

"""
LOGGER.debug(f'started list_auth_mappings')
auth_info = FastApiUtils.get_authorization_info(request)
query_body = {
'tenant': tenant,
'venue': venue,
'ldap_group_names': group_names if group_names is None else [k.strip() for k in group_names.split(',')],
}
auth_crud = AuthCrud(auth_info, query_body)
is_admin_result = auth_crud.is_admin()
if is_admin_result['statusCode'] != 200:
raise HTTPException(status_code=is_admin_result['statusCode'], detail=is_admin_result['body'])
query_result = auth_crud.list_all_record()
if query_result['statusCode'] == 200:
return query_result['body']
raise HTTPException(status_code=query_result['statusCode'], detail=query_result['body'])
132 changes: 132 additions & 0 deletions cumulus_lambda_functions/catalya_uds_api/granules_archive_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import json
import os
from typing import Optional

from cumulus_lambda_functions.daac_archiver.catalia_auth_db import CataliaAuthDb
from cumulus_lambda_functions.daac_archiver.catalia_daac_handshakes_db import CataliaDaacHandshakesDb
from cumulus_lambda_functions.daac_archiver.daac_archiver_catalia import DaacArchiverCatalia
from cumulus_lambda_functions.lib.lambda_logger_generator import LambdaLoggerGenerator
from cumulus_lambda_functions.uds_api.web_service_constants import WebServiceConstants
from cumulus_lambda_functions.uds_api.fast_api_utils import FastApiUtils
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel

LOGGER = LambdaLoggerGenerator.get_logger(__name__, LambdaLoggerGenerator.get_level_from_env())

router = APIRouter(
prefix=f'/{WebServiceConstants.COLLECTIONS}',
tags=["Granules Archive CRUD API"],
responses={404: {"description": "Not found"}},
)

class ArchivingTypesModel(BaseModel):
data_type: str
file_extension: Optional[list[str]] = []

class DaacUpdateModel(BaseModel):
daac_collection_id: str
api_key: str
daac_provider: Optional[str] = None
daac_data_version: Optional[str] = None
daac_sns_topic_arn: Optional[str] = None
daac_role_arn: Optional[str] = None
daac_role_session_name: Optional[str] = None
archiving_types: Optional[list[ArchivingTypesModel]] = None

class InternalDDBConnector:
def __init__(self):
required_env = ['CATALYA_DAAC_AGREEMENT_DB_NAME', 'CATALYA_DB_NAME']
if not all([k in os.environ for k in required_env]):
raise EnvironmentError(f'one or more missing env: {required_env}')
self.cad = CataliaAuthDb(os.getenv('CATALYA_DB_NAME'))
self.cdhsd = CataliaDaacHandshakesDb(os.getenv('CATALYA_DAAC_AGREEMENT_DB_NAME'))
self.auth_info = {}
self.configured_daac_configs = []

def archive_methods_initiator(self, request, collection_id, daac_collection_id):
LOGGER.debug(f'started archive_methods_initiator.')
self.auth_info = FastApiUtils.get_authorization_info(request)
if daac_collection_id is None:
self.configured_daac_configs = self.cdhsd.search(collection_id)
configured_daac_ids = [k[self.cdhsd.target_project] for k in self.configured_daac_configs]
else:
configured_daac_ids = [daac_collection_id]
authorized_daacs = self.cad.get_authorized_daac_full(self.auth_info.get('ldap_groups'), collection_id, configured_daac_ids)
if len(authorized_daacs) < 1:
LOGGER.debug(f'user: {self.auth_info["username"]} is not authorized for {collection_id}')
raise HTTPException(status_code=403, detail=json.dumps({
'message': 'not authorized to execute this action'
}))
return authorized_daacs

@router.post("/{collection_id}/{daac_collection_id}/archive")
@router.post("/{collection_id}/{daac_collection_id}/archive/")
async def add_daac_archive_config(request: Request, collection_id: str, daac_collection_id: str, new_body: DaacUpdateModel):
LOGGER.debug(f'started add_daac_archive_config. {new_body.model_dump()}')
i1 = InternalDDBConnector()
authorized_daacs = i1.archive_methods_initiator(request, collection_id, daac_collection_id)
authorized_ldaps = [k['userGroup'] for k in authorized_daacs]
b1 = new_body.model_dump()
try:
# def add(self, catalia_collection, daac_collection, api_key, provider, data_version, sns_topic_arn, role_arn, role_session_name, archiving_types, user, user_group):
i1.cdhsd.add(collection_id, daac_collection_id, b1['api_key'], b1['daac_provider'], b1['daac_data_version'],
b1['daac_sns_topic_arn'], b1['daac_role_arn'], b1['daac_role_session_name'], b1['archiving_types'], i1.auth_info['username'], authorized_ldaps)
except Exception as e:
LOGGER.exception(f'error while add_daac_archive_config: {b1}')
raise HTTPException(status_code=500, detail=e)
return {'message': 'archive config added'}

@router.delete("/{collection_id}/{daac_collection_id}/archive")
@router.delete("/{collection_id}/{daac_collection_id}/archive/")
async def delete_daac_archive_config(request: Request, collection_id: str, daac_collection_id: str):
LOGGER.debug(f'started delete_daac_archive_config.')
i1 = InternalDDBConnector()
authorized_daacs = i1.archive_methods_initiator(request, collection_id, daac_collection_id)
try:
i1.cdhsd.delete(collection_id, daac_collection_id)
except Exception as e:
LOGGER.exception(f'error while delete_daac_archive_config: {collection_id}, {daac_collection_id}')
raise HTTPException(status_code=500, detail=e)
return {'message': 'archive config deleted'}

@router.get("/{collection_id}/{daac_collection_id}/archive")
@router.get("/{collection_id}/{daac_collection_id}/archive/")
async def get_daac_archive_config(request: Request, collection_id: str, daac_collection_id: str):
LOGGER.debug(f'started get_daac_archive_config.')
i1 = InternalDDBConnector()
authorized_daacs = i1.archive_methods_initiator(request, collection_id, daac_collection_id)
try:
result = i1.cdhsd.get_single(collection_id, daac_collection_id)
except Exception as e:
LOGGER.exception(f'error while get_daac_archive_config: {collection_id}, {daac_collection_id}')
raise HTTPException(status_code=500, detail=e)
return {'result': result}

@router.put("/{collection_id}/archive/{granule_id}")
@router.put("/{collection_id}/archive/{granule_id}/")
async def archive_single_granule(request: Request, collection_id: str, granule_id: str):
LOGGER.debug(f'started archive_single_granule.')
i1 = InternalDDBConnector()
authorized_daacs = i1.archive_methods_initiator(request, collection_id, None)
authorized_ldaps = set([k['userGroup'] for k in authorized_daacs])
authorized_configured_daac_configs = [k for k in i1.configured_daac_configs if k[i1.cdhsd.target_project] in authorized_ldaps]
dac = DaacArchiverCatalia()
dac.staged_s3_bucket = os.getenv('CATALYA_UDS_STAGING_BUCKET')
dac.daac_agreements = authorized_configured_daac_configs
dac.archive_granule(collection_id, granule_id)
return {'message': 'archive initiated'}

@router.put("/{collection_id}/archive")
@router.put("/{collection_id}/archive/")
async def archive_entire_collection(request: Request, collection_id: str):
LOGGER.debug(f'started archive_entire_collection.')
i1 = InternalDDBConnector()
authorized_daacs = i1.archive_methods_initiator(request, collection_id, None)
authorized_ldaps = set([k['userGroup'] for k in authorized_daacs])
authorized_configured_daac_configs = [k for k in i1.configured_daac_configs if k[i1.cdhsd.target_project] in authorized_ldaps]
dac = DaacArchiverCatalia()
dac.staged_s3_bucket = os.getenv('CATALYA_UDS_STAGING_BUCKET')
dac.daac_agreements = authorized_configured_daac_configs
dac.archive_collection(collection_id) # TODO accept filtering mechanisms?
return {'message': 'archive initiated'}

Loading
Loading