diff --git a/.github/workflows/cleanup-firebase.yml b/.github/workflows/cleanup-firebase.yml deleted file mode 100644 index ec939d5b7..000000000 --- a/.github/workflows/cleanup-firebase.yml +++ /dev/null @@ -1,22 +0,0 @@ -name: Cleanup Firebase Metadata - -on: - schedule: - - cron: "24 18 * * 1" # Runs at 18:24 UTC every Monday - -jobs: - cleanup: - runs-on: ${{ matrix.os }} - strategy: - matrix: - python-version: [3.11] - os: [ubuntu-latest, windows-latest, macOS-latest] - steps: - - uses: actions/checkout@v4.2.2 - - uses: ./.github/actions/dependencies - - name: Cleanup Firebase Metadata - env: - FIREBASE_TOKEN: ${{ secrets.FIREBASE_TOKEN }} - FIREBASE_EMAIL: ${{ secrets.FIREBASE_EMAIL }} - run: | - uv run python cellpack/bin/cleanup_tasks.py diff --git a/cellpack/autopack/DBRecipeHandler.py b/cellpack/autopack/DBRecipeHandler.py index 8b4e8578a..8e3aec7ff 100644 --- a/cellpack/autopack/DBRecipeHandler.py +++ b/cellpack/autopack/DBRecipeHandler.py @@ -1,7 +1,6 @@ import copy import logging import shutil -from datetime import datetime, timezone from enum import Enum from pathlib import Path @@ -10,7 +9,6 @@ import hashlib import json -import requests from cellpack.autopack.utils import deep_merge @@ -321,36 +319,6 @@ def __init__(self, settings): self.settings = settings -class ResultDoc: - def __init__(self, db): - self.db = db - - def handle_expired_results(self): - """ - Check if the results in the database are expired and delete them if the linked object expired. - """ - current_utc = datetime.now(timezone.utc) - results = self.db.get_all_docs("results") - if results: - for result in results: - result_data = self.db.doc_to_dict(result) - result_age = current_utc - result_data["timestamp"] - if result_age.days > 180 and not self.validate_existence( - result_data["url"] - ): - self.db.delete_doc("results", self.db.doc_id(result)) - logging.info("Results cleanup complete.") - else: - logging.info("No results found in the database.") - - def validate_existence(self, url): - """ - Validate the existence of an S3 object by checking if the URL is accessible. - Returns True if the URL is accessible. - """ - return requests.head(url).status_code == requests.codes.ok - - class DBUploader(object): """ Handles the uploading of data to the database. @@ -529,42 +497,34 @@ def upload_config(self, config_data, source_path): self.db.update_doc("configs", id, config_data) return id - def upload_result_metadata(self, file_name, url, job_id=None): - """ - Upload the metadata of the result file to the database. - """ - if self.db: - username = self.db.get_username() - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "results", - file_name, - { - "user": username, - "timestamp": timestamp, - "url": url, - "batch_job_id": job_id, - }, - ) - if job_id: - self.upload_job_status(job_id, "DONE", result_path=url) - - def upload_job_status(self, job_id, status, result_path=None, error_message=None): + def upload_job_status( + self, + dedup_hash, + status, + result_path=None, + error_message=None, + outputs_directory=None, + ): """ - Update status for a given job ID + Update status for a given dedup_hash """ if self.db: - timestamp = self.db.create_timestamp() - self.db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "status": str(status), - "result_path": result_path, - "error_message": error_message, - }, - ) + db_handler = self.db + # If db is AWSHandler, switch to firebase handler for job status updates + if hasattr(self.db, "s3_client"): + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.FIREBASE) + db_handler = handler(default_db="staging") + timestamp = db_handler.create_timestamp() + data = { + "timestamp": timestamp, + "status": str(status), + "error_message": error_message, + } + if result_path: + data["result_path"] = result_path + if outputs_directory: + data["outputs_directory"] = outputs_directory + db_handler.update_or_create("job_status", dedup_hash, data) def save_recipe_and_config_to_output(self, output_folder, config_data, recipe_data): output_path = Path(output_folder) @@ -583,7 +543,7 @@ def upload_packing_results_workflow( self, source_folder, recipe_name, - job_id, + dedup_hash, config_data, recipe_data, ): @@ -591,7 +551,7 @@ def upload_packing_results_workflow( Complete packing results upload workflow including folder preparation and s3 upload """ try: - if job_id: + if dedup_hash: source_path = Path(source_folder) if not source_path.exists(): @@ -601,7 +561,7 @@ def upload_packing_results_workflow( # prepare unique S3 upload folder parent_folder = source_path.parent - unique_folder_name = f"{source_path.name}_run_{job_id}" + unique_folder_name = f"{source_path.name}_run_{dedup_hash}" s3_upload_folder = parent_folder / unique_folder_name logging.debug(f"outputs will be copied to: {s3_upload_folder}") @@ -618,7 +578,7 @@ def upload_packing_results_workflow( upload_result = self.upload_outputs_to_s3( output_folder=s3_upload_folder, recipe_name=recipe_name, - job_id=job_id, + dedup_hash=dedup_hash, ) # clean up temporary folder after upload @@ -628,9 +588,12 @@ def upload_packing_results_workflow( f"Cleaned up temporary upload folder: {s3_upload_folder}" ) - # update outputs directory in firebase - self.update_outputs_directory( - job_id, upload_result.get("outputs_directory") + # update outputs directory in job status + self.upload_job_status( + dedup_hash, + "DONE", + result_path=upload_result.get("simularium_url"), + outputs_directory=upload_result.get("outputs_directory"), ) return upload_result @@ -639,7 +602,7 @@ def upload_packing_results_workflow( logging.error(e) return {"success": False, "error": e} - def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): + def upload_outputs_to_s3(self, output_folder, recipe_name, dedup_hash): """ Upload packing outputs to S3 bucket """ @@ -647,7 +610,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): bucket_name = self.db.bucket_name region_name = self.db.region_name sub_folder_name = self.db.sub_folder_name - s3_prefix = f"{sub_folder_name}/{recipe_name}/{job_id}" + s3_prefix = f"{sub_folder_name}/{recipe_name}/{dedup_hash}" try: upload_result = self.db.upload_directory( @@ -661,8 +624,11 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): f"{base_url}/{file_info['s3_key']}" for file_info in upload_result["uploaded_files"] ] + simularium_url = None + for url in public_urls: + if url.endswith(".simularium"): + simularium_url = url outputs_directory = f"https://us-west-2.console.aws.amazon.com/s3/buckets/{bucket_name}/{s3_prefix}/" - logging.info( f"Successfully uploaded {upload_result['total_files']} files to {outputs_directory}" ) @@ -671,7 +637,7 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): return { "success": True, - "run_id": job_id, + "dedup_hash": dedup_hash, "s3_bucket": bucket_name, "s3_prefix": s3_prefix, "public_url_base": f"{base_url}/{s3_prefix}/", @@ -680,30 +646,12 @@ def upload_outputs_to_s3(self, output_folder, recipe_name, job_id): "total_size": upload_result["total_size"], "urls": public_urls, "outputs_directory": outputs_directory, + "simularium_url": simularium_url, } except Exception as e: logging.error(e) return {"success": False, "error": e} - def update_outputs_directory(self, job_id, outputs_directory): - if not self.db or self.db.s3_client: - # switch to firebase handler to update job status - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler(default_db="staging") - if job_id: - timestamp = initialized_db.create_timestamp() - initialized_db.update_or_create( - "job_status", - job_id, - { - "timestamp": timestamp, - "outputs_directory": outputs_directory, - }, - ) - logging.debug( - f"Updated outputs s3 location {outputs_directory} for job ID: {job_id}" - ) - class DBRecipeLoader(object): """ @@ -890,23 +838,4 @@ def compile_db_recipe_data(db_recipe_data, obj_dict, grad_dict, comp_dict): return recipe_data -class DBMaintenance(object): - """ - Handles the maintenance of the database. - """ - - def __init__(self, db_handler): - self.db = db_handler - self.result_doc = ResultDoc(self.db) - - def cleanup_results(self): - """ - Check if the results in the database are expired and delete them if the linked object expired. - """ - self.result_doc.handle_expired_results() - - def readme_url(self): - """ - Return the URL to the README file for the database setup section. - """ - return "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" +DB_SETUP_README_URL = "https://github.com/mesoscope/cellpack?tab=readme-ov-file#introduction-to-remote-databases" diff --git a/cellpack/autopack/interface_objects/default_values.py b/cellpack/autopack/interface_objects/default_values.py index bbdbc452d..969eaa6a8 100644 --- a/cellpack/autopack/interface_objects/default_values.py +++ b/cellpack/autopack/interface_objects/default_values.py @@ -9,7 +9,6 @@ "objects", "gradients", "recipes", - "results", "configs", "recipes_edited", ] diff --git a/cellpack/autopack/loaders/recipe_loader.py b/cellpack/autopack/loaders/recipe_loader.py index 84cd78ac0..bbdb662ad 100644 --- a/cellpack/autopack/loaders/recipe_loader.py +++ b/cellpack/autopack/loaders/recipe_loader.py @@ -30,7 +30,13 @@ class RecipeLoader(object): # TODO: add all default values here default_values = default_recipe_values.copy() - def __init__(self, input_file_path, save_converted_recipe=False, use_docker=False): + def __init__( + self, + input_file_path, + save_converted_recipe=False, + use_docker=False, + json_recipe=None, + ): _, file_extension = os.path.splitext(input_file_path) self.current_version = CURRENT_VERSION self.file_path = input_file_path @@ -38,6 +44,7 @@ def __init__(self, input_file_path, save_converted_recipe=False, use_docker=Fals self.ingredient_list = [] self.compartment_list = [] self.save_converted_recipe = save_converted_recipe + self.json_recipe = json_recipe # set CURRENT_RECIPE_PATH appropriately for remote(firebase) vs local recipes if autopack.is_remote_path(self.file_path): @@ -49,6 +56,15 @@ def __init__(self, input_file_path, save_converted_recipe=False, use_docker=Fals self.recipe_data = self._read(use_docker=use_docker) + @classmethod + def from_json(cls, json_recipe, save_converted_recipe=False, use_docker=False): + return cls( + input_file_path="", + save_converted_recipe=save_converted_recipe, + use_docker=use_docker, + json_recipe=json_recipe, + ) + @staticmethod def _resolve_object(key, objects): current_object = objects[key] @@ -168,9 +184,15 @@ def _migrate_version(self, old_recipe): ) def _read(self, resolve_inheritance=True, use_docker=False): - new_values, database_name, is_unnested_firebase = autopack.load_file( - self.file_path, cache="recipes", use_docker=use_docker - ) + database_name = None + is_unnested_firebase = False + new_values = self.json_recipe + if new_values is None: + # Read recipe from filepath + new_values, database_name, is_unnested_firebase = autopack.load_file( + self.file_path, cache="recipes", use_docker=use_docker + ) + if database_name == "firebase": if is_unnested_firebase: objects = new_values.get("objects", {}) diff --git a/cellpack/autopack/upy/simularium/simularium_helper.py b/cellpack/autopack/upy/simularium/simularium_helper.py index 86af08746..87c616e18 100644 --- a/cellpack/autopack/upy/simularium/simularium_helper.py +++ b/cellpack/autopack/upy/simularium/simularium_helper.py @@ -22,7 +22,7 @@ from simulariumio.cellpack import HAND_TYPE, CellpackConverter from simulariumio.constants import DISPLAY_TYPE, VIZ_TYPE -from cellpack.autopack.DBRecipeHandler import DBMaintenance, DBUploader +from cellpack.autopack.DBRecipeHandler import DB_SETUP_README_URL from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.autopack.upy import hostHelper from cellpack.autopack.upy.simularium.plots import PlotData @@ -1385,64 +1385,29 @@ def raycast(self, **kw): def raycast_test(self, obj, start, end, length, **kw): return - def post_and_open_file(self, file_name, open_results_in_browser): + def post_and_open_file(self, file_name, open_results_in_browser, dedup_hash=None): simularium_file = Path(f"{file_name}.simularium") - url = None - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - file_name, url = simulariumHelper.store_result_file( - simularium_file, storage="aws", batch_job_id=job_id - ) - if file_name and url: - simulariumHelper.store_metadata( - file_name, url, db="firebase", job_id=job_id - ) - if open_results_in_browser: + if dedup_hash is None: + url = simulariumHelper.store_result_file(simularium_file, storage="aws") + if url and open_results_in_browser: simulariumHelper.open_in_simularium(url) @staticmethod - def store_result_file( - file_path, storage=None, batch_job_id=None, sub_folder="simularium" - ): + def store_result_file(file_path, storage=None, sub_folder="simularium"): if storage == "aws": handler = DATABASE_IDS.handlers().get(storage) - # if batch_job_id is not None, then we are in a batch job and should use the temp bucket - # TODO: use cellpack-results bucket for batch jobs once we have the correct permissions - if batch_job_id: - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) - else: - initialized_handler = handler( - bucket_name="cellpack-results", - sub_folder_name=sub_folder, - region_name="us-west-2", - ) - file_name, url = initialized_handler.save_file_and_get_url(file_path) - if not file_name or not url: - db_maintainer = DBMaintenance(initialized_handler) - logging.warning( - f"Skipping browser opening, upload credentials not configured. For setup instructions see: {db_maintainer.readme_url()}" - ) - return file_name, url - - @staticmethod - def store_metadata(file_name, url, db=None, job_id=None): - if db == "firebase": - handler = DATABASE_IDS.handlers().get(db) - initialized_db = handler( - default_db="staging" - ) # default to staging for metadata uploads - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) - db_uploader.upload_result_metadata(file_name, url, job_id) - else: - db_maintainer = DBMaintenance(initialized_db) + initialized_handler = handler( + bucket_name="cellpack-results", + sub_folder_name=sub_folder, + region_name="us-west-2", + ) + _, url = initialized_handler.save_file_and_get_url(file_path) + if not url: logging.warning( - f"Firebase credentials not found. For setup instructions see: {db_maintainer.readme_url()}. Or try cellPACK web interface: https://cellpack.allencell.org (no setup required)" + f"Skipping browser opening, upload credentials not configured. For setup instructions see: {DB_SETUP_README_URL}" ) - return + return url + return None @staticmethod def open_in_simularium(aws_url): diff --git a/cellpack/autopack/writers/__init__.py b/cellpack/autopack/writers/__init__.py index 6ca931af2..0b09e03a6 100644 --- a/cellpack/autopack/writers/__init__.py +++ b/cellpack/autopack/writers/__init__.py @@ -197,8 +197,11 @@ def save_as_simularium(self, env, seed_to_results_map): number_of_packings = env.config_data.get("number_of_packings", 1) open_results_in_browser = env.config_data.get("open_results_in_browser", False) upload_results = env.config_data.get("upload_results", False) + dedup_hash = getattr(env, "dedup_hash", None) if (number_of_packings == 1 or is_aggregate) and upload_results: - autopack.helper.post_and_open_file(file_name, open_results_in_browser) + autopack.helper.post_and_open_file( + file_name, open_results_in_browser, dedup_hash + ) def save_Mixed_asJson( self, diff --git a/cellpack/bin/cleanup_tasks.py b/cellpack/bin/cleanup_tasks.py deleted file mode 100644 index 08217aa04..000000000 --- a/cellpack/bin/cleanup_tasks.py +++ /dev/null @@ -1,20 +0,0 @@ -from cellpack.autopack.DBRecipeHandler import DBMaintenance -from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS - - -def run_cleanup(db_id=DATABASE_IDS.FIREBASE): - """ - Performs cleanup operations on expired database entries. - This function is executed as part of a scheduled task defined in .github/workflows/cleanup-firebase.yml - - Args: - db_id(str): The database id to use - """ - handler = DATABASE_IDS.handlers().get(db_id) - initialized_db = handler(default_db="staging") - db_maintainer = DBMaintenance(initialized_db) - db_maintainer.cleanup_results() - - -if __name__ == "__main__": - run_cleanup() diff --git a/cellpack/bin/pack.py b/cellpack/bin/pack.py index 9db539372..27c4d0185 100644 --- a/cellpack/bin/pack.py +++ b/cellpack/bin/pack.py @@ -1,6 +1,5 @@ import logging import logging.config -import os import time from pathlib import Path @@ -25,23 +24,32 @@ def pack( - recipe, config_path=None, analysis_config_path=None, docker=False, validate=True + recipe, + config_path=None, + analysis_config_path=None, + docker=False, + hash=None, ): """ Initializes an autopack packing from the command line - :param recipe: string argument, path to recipe + :param recipe: string argument, path to recipe file, or a dictionary representing a recipe :param config_path: string argument, path to packing config file :param analysis_config_path: string argument, path to analysis config file :param docker: boolean argument, are we using docker - :param validate: boolean argument, validate recipe before packing + :param hash: string argument, dedup hash identifier for tracking/caching results :return: void """ packing_config_data = ConfigLoader(config_path, docker).config - recipe_loader = RecipeLoader( - recipe, packing_config_data["save_converted_recipe"], docker - ) + if isinstance(recipe, dict): + # Load recipe from JSON dictionary + recipe_loader = RecipeLoader.from_json(recipe, use_docker=docker) + else: + # Load recipe from file path + recipe_loader = RecipeLoader( + recipe, packing_config_data["save_converted_recipe"], docker + ) recipe_data = recipe_loader.recipe_data analysis_config_data = {} if analysis_config_path is not None: @@ -52,6 +60,7 @@ def pack( autopack.helper = helper env = Environment(config=packing_config_data, recipe=recipe_data) env.helper = helper + env.dedup_hash = hash log.info("Packing recipe: %s", recipe_data["name"]) log.info("Outputs will be saved to %s", env.out_folder) @@ -78,24 +87,22 @@ def pack( env.buildGrid(rebuild=True) env.pack_grid(verbose=0, usePP=False) - if docker: - job_id = os.environ.get("AWS_BATCH_JOB_ID", None) - if job_id: - handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) - # temporarily using demo bucket before permissions are granted - initialized_handler = handler( - bucket_name="cellpack-demo", - sub_folder_name="runs", - region_name="us-west-2", - ) - uploader = DBUploader(db_handler=initialized_handler) - uploader.upload_packing_results_workflow( - source_folder=env.out_folder, - recipe_name=recipe_data["name"], - job_id=job_id, - config_data=packing_config_data, - recipe_data=recipe_loader.serializable_recipe_data, - ) + if docker and hash: + handler = DATABASE_IDS.handlers().get(DATABASE_IDS.AWS) + # temporarily using demo bucket before permissions are granted + initialized_handler = handler( + bucket_name="cellpack-demo", + sub_folder_name="runs", + region_name="us-west-2", + ) + uploader = DBUploader(db_handler=initialized_handler) + uploader.upload_packing_results_workflow( + source_folder=env.out_folder, + recipe_name=recipe_data["name"], + dedup_hash=hash, + config_data=packing_config_data, + recipe_data=recipe_loader.serializable_recipe_data, + ) def main(): diff --git a/cellpack/bin/upload.py b/cellpack/bin/upload.py index b038b4e4e..59fc2104e 100644 --- a/cellpack/bin/upload.py +++ b/cellpack/bin/upload.py @@ -3,7 +3,7 @@ import json from cellpack.autopack.FirebaseHandler import FirebaseHandler -from cellpack.autopack.DBRecipeHandler import DBUploader, DBMaintenance +from cellpack.autopack.DBRecipeHandler import DBUploader, DB_SETUP_README_URL from cellpack.autopack.upy.simularium.simularium_helper import simulariumHelper from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.autopack.loaders.config_loader import ConfigLoader @@ -90,7 +90,7 @@ def upload( id, _ = db_handler.upload_data("editable_fields", field) editable_fields_ids.append(id) if output_file: - _, result_url = simulariumHelper.store_result_file( + result_url = simulariumHelper.store_result_file( output_file, storage="aws", sub_folder="client" ) if studio: @@ -105,9 +105,8 @@ def upload( db_handler.upload_data("example_packings", recipe_metadata) else: - db_maintainer = DBMaintenance(db_handler) sys.exit( - f"The selected database is not initialized. Please set up Firebase credentials to upload recipes. Refer to the instructions at {db_maintainer.readme_url()} " + f"The selected database is not initialized. Please set up Firebase credentials to upload recipes. Refer to the instructions at {DB_SETUP_README_URL} " ) diff --git a/cellpack/tests/test_db_uploader.py b/cellpack/tests/test_db_uploader.py index 0c91cbd52..414f6b9c5 100644 --- a/cellpack/tests/test_db_uploader.py +++ b/cellpack/tests/test_db_uploader.py @@ -175,3 +175,56 @@ def test_upload_recipe(): "A": "firebase:composition/test_id", } assert recipe_doc.objects_to_path_map == {"sphere_25": "firebase:objects/test_id"} + + +def test_upload_job_status_with_firebase_handler(): + mock_firebase_db = MagicMock() + mock_firebase_db.create_timestamp.return_value = "test_timestamp" + # firebaseHandler does not have s3_client attribute + del mock_firebase_db.s3_client + + uploader = DBUploader(mock_firebase_db) + uploader.upload_job_status("test_hash", "RUNNING") + + mock_firebase_db.create_timestamp.assert_called_once() + mock_firebase_db.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "test_timestamp", + "status": "RUNNING", + "error_message": None, + }, + ) + + +def test_upload_job_status_with_aws_handler(): + mock_aws_db = MagicMock() + mock_aws_db.s3_client = MagicMock() # AWSHandler has s3_client + + mock_firebase_handler = MagicMock() + mock_firebase_handler.create_timestamp.return_value = "firebase_timestamp" + + with patch( + "cellpack.autopack.DBRecipeHandler.DATABASE_IDS.handlers" + ) as mock_handlers: + mock_handlers.return_value.get.return_value = ( + lambda default_db: mock_firebase_handler + ) + + uploader = DBUploader(mock_aws_db) + uploader.upload_job_status("test_hash", "DONE", result_path="test_path") + + mock_firebase_handler.create_timestamp.assert_called_once() + mock_firebase_handler.update_or_create.assert_called_once_with( + "job_status", + "test_hash", + { + "timestamp": "firebase_timestamp", + "status": "DONE", + "error_message": None, + "result_path": "test_path", + }, + ) + # AWS handler should not be called for timestamp + mock_aws_db.create_timestamp.assert_not_called() diff --git a/docker/Dockerfile.ecs b/docker/Dockerfile.ecs index 7a303c023..5ed89b7bd 100644 --- a/docker/Dockerfile.ecs +++ b/docker/Dockerfile.ecs @@ -4,7 +4,7 @@ WORKDIR /cellpack COPY . /cellpack RUN python -m pip install --upgrade pip --root-user-action=ignore -RUN python -m pip install . -r requirements/linux/requirements.txt --root-user-action=ignore +RUN python -m pip install . --root-user-action=ignore EXPOSE 80 diff --git a/docker/server.py b/docker/server.py index 581e0151d..7b0a209de 100644 --- a/docker/server.py +++ b/docker/server.py @@ -1,8 +1,7 @@ import asyncio -from aiohttp import web -import os import uuid -from cellpack.autopack.DBRecipeHandler import DBUploader +from aiohttp import web +from cellpack.autopack.DBRecipeHandler import DataDoc, DBUploader from cellpack.autopack.interface_objects.database_ids import DATABASE_IDS from cellpack.bin.pack import pack @@ -12,41 +11,66 @@ class CellpackServer: def __init__(self): self.packing_tasks = set() - async def run_packing(self, recipe, config, job_id): - os.environ["AWS_BATCH_JOB_ID"] = job_id + def _get_firebase_handler(self, database_name="firebase"): + handler = DATABASE_IDS.handlers().get(database_name) + initialized_db = handler(default_db="staging") + if initialized_db._initialized: + return initialized_db + return None + + def job_exists(self, dedup_hash): + db = self._get_firebase_handler() + if not db: + return False + + job_status, _ = db.get_doc_by_id("job_status", dedup_hash) + return job_status is not None + + async def run_packing(self, job_id, recipe=None, config=None, body=None): self.update_job_status(job_id, "RUNNING") try: - pack(recipe=recipe, config_path=config, docker=True) + # Pack JSON recipe in body if provided, otherwise use recipe path + pack(recipe=(body if body else recipe), config_path=config, docker=True, hash=job_id) except Exception as e: self.update_job_status(job_id, "FAILED", error_message=str(e)) def update_job_status(self, job_id, status, result_path=None, error_message=None): - handler = DATABASE_IDS.handlers().get("firebase") - initialized_db = handler( - default_db="staging" - ) - if initialized_db._initialized: - db_uploader = DBUploader(initialized_db) + db = self._get_firebase_handler() + if db: + db_uploader = DBUploader(db) db_uploader.upload_job_status(job_id, status, result_path, error_message) async def hello_world(self, request: web.Request) -> web.Response: return web.Response(text="Hello from the cellPACK server") async def health_check(self, request: web.Request) -> web.Response: - # healthcheck endpoint needed for AWS load balancer + # health check endpoint needed for AWS load balancer return web.Response() async def pack_handler(self, request: web.Request) -> web.Response: - recipe = request.rel_url.query.get("recipe") - if recipe is None: + recipe = request.rel_url.query.get("recipe") or "" + if request.can_read_body: + body = await request.json() + else: + body = None + if not recipe and not body: raise web.HTTPBadRequest( "Pack requests must include recipe as a query param" ) config = request.rel_url.query.get("config") - job_id = str(uuid.uuid4()) + + if body: + dedup_hash = DataDoc.generate_hash(body) + if self.job_exists(dedup_hash): + return web.json_response({"jobId": dedup_hash}) + job_id = dedup_hash + else: + job_id = str(uuid.uuid4()) # Initiate packing task to run in background - packing_task = asyncio.create_task(self.run_packing(recipe, config, job_id)) + packing_task = asyncio.create_task( + self.run_packing(job_id, recipe, config, body) + ) # Keep track of task references to prevent them from being garbage # collected, then discard after task completion @@ -70,4 +94,4 @@ async def init_app() -> web.Application: ) return app -web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) \ No newline at end of file +web.run_app(init_app(), host="0.0.0.0", port=SERVER_PORT) diff --git a/docs/DOCKER.md b/docs/DOCKER.md index a1214a335..48e4b6ddd 100644 --- a/docs/DOCKER.md +++ b/docs/DOCKER.md @@ -13,6 +13,6 @@ ## AWS ECS Docker Image 1. Build image, running `docker build -f docker/Dockerfile.ecs -t [CONTAINER-NAME] .` 2. Run packings in the container, running: `docker run -v ~/.aws:/root/.aws -p 80:80 [CONTAINER-NAME]` -3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:8443/hello` in your browser. -4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/pack?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. +3. Try hitting the test endpoint on the server, by navigating to `http://0.0.0.0:80/hello` in your browser. +4. Try running a packing on the server, by hitting the `http://0.0.0.0:80/start-packing?recipe=firebase:recipes/one_sphere_v_1.0.0` in your browser. 5. Verify that the packing result path was uploaded to the firebase results table, with the job id specified in the response from the request in step 4.The result simularium file can be found at the s3 path specified there. \ No newline at end of file diff --git a/server_workflow_diagram.md b/server_workflow_diagram.md new file mode 100644 index 000000000..d950a0bf7 --- /dev/null +++ b/server_workflow_diagram.md @@ -0,0 +1,103 @@ +# CellPACK Server Job Workflow Changes + +## Summary of Changes + +The main changes in this PR allow the server to accept recipe JSON directly in the request body, in addition to the existing recipe file path approach. This enables better deduplication and caching of packing jobs. + +## BEFORE: Original Server Workflow +```mermaid +graph TD + A[Client Request] --> B[POST /start-packing] + B --> C{Check for recipe URL param} + C -->|Missing| D[Return 400 Error] + C -->|Present| E[Generate UUID for job_id] + E --> F[Create Background Task] + F --> G[Return job_id immediately] + F --> I[Initiate packing] + I --> J[Load recipe from firebase
using file path from
URL param] + J --> K[Execute packing] + K --> L{Packing succeeds?} + L -->|Success| M[S3: Upload outputs to S3
Firebase: Update job status to SUCCEEDED] + L -->|Failure| N[Firebase: Update job status to FAILED] + + style A fill:#e1f5fe + style G fill:#c8e6c9 + style M fill:#fff3e0 + style N fill:#ffcdd2 +``` + +## AFTER: Enhanced Server Workflow with JSON Recipe Support +```mermaid +graph TD + A[Client Request] --> B[POST /start-packing] + B --> C{Check inputs} + C -->|No recipe - no URL param
and no request body| D[Return 400 Error] + C -->|Has recipe path URL param| E[Generate UUID for job_id] + C -->|Has recipe JSON in request body| F[Generate hash from JSON] + F --> G{Packing result exists
in firebase for this hash?} + G -->|Yes| H[Return existing hash
as job_id] + G -->|No| I[Use hash as job_id] + E --> J[Create Background Task] + I --> J + J --> K[Return job_id immediately] + J --> L[Initiate packing] + L --> M{Input type?} + M -->|Recipe path| N[Load recipe from firebase
using file path from
URL param] + M -->|JSON body| O[Load recipe from JSON dict
from request body] + N --> P[Execute packing] + O --> P + P --> Q{Packing succeeds?} + Q -->|Success| R[S3: Upload outputs to S3
Firebase: Update job status to SUCCEEDED] + Q -->|Failure| S[Firebase: Update job status to FAILED] + + style A fill:#e1f5fe + style K fill:#c8e6c9 + style R fill:#fff3e0 + style S fill:#ffcdd2 + style G fill:#ffeb3b + style H fill:#c8e6c9 +``` + +## Key Server Improvements + +### 1. **Deduplication & Caching** +- **BEFORE**: Each request generated a unique UUID, no deduplication possible +- **AFTER**: JSON recipes generate deterministic hash, enabling job deduplication + +### 2. **Input Flexibility & Backwards Compatibility** +- **BEFORE**: Only recipe file paths supported via query parameter +- **AFTER**: Supports both recipe file paths AND direct JSON recipe objects in request body, plus optional config parameter + +### 3. **Smart Job Management** +- **BEFORE**: Generated UUID for each job without deduplication, every request creates new job regardless of content +- **AFTER**: Uses deterministic hash for JSON recipes, enabling job reuse for identical recipes + +### 4. **Firebase Request Reduction** +- **BEFORE**: Every edited recipe was uploaded to firebase by the client and downloaded from firebase by the server +- **AFTER**: Edited recipes are passed in the body of the packing request, so no firebase uploads or downloads occur + +### 5: **Unified Results Upload** +- **BEFORE**: Simularium result file was uploaded to S3 twice per job, once on its own and once as part of the full output files upload +- **AFTER**: Only upload Simularium result file once by keeping track of its path when we upload all output files + +## Technical Implementation + +### New Server Components: +1. **`DataDoc.generate_hash()`** - Creates deterministic hash from recipe JSON +2. **`job_exists()`** - Checks if job already completed in Firebase +3. **Enhanced request handling** - Reads JSON from request body +4. **Smart job ID generation** - Uses hash for JSON recipes, UUID for file paths + +### Request Flow Changes: +1. **Input validation** now checks both query params and request body +2. **Hash-based deduplication** for JSON recipes +3. **Backward compatibility** maintained for file-based recipes +4. **Consistent job tracking** with hash parameter + +## Benefits + +1. **Reduced Server Load**: Identical recipes don't reprocess +2. **Faster Client Response**: Instant return for duplicate JSON requests +3. **Better Resource Utilization**: No redundant compute for same recipes +4. **Improved API Design**: JSON recipes easier for programmatic access +5. **Reduced Firebase Usage**: Passing recipe directly instead of uploading to firebase \ No newline at end of file