diff --git a/docs/deployment/reference.md b/docs/deployment/reference.md index 77e62c00c..3a88d5a2b 100644 --- a/docs/deployment/reference.md +++ b/docs/deployment/reference.md @@ -111,9 +111,14 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn | `transformer.persistence.existingClaim` | Existing persistent volume claim | nil | | `transformer.subdir` | Subdirectory of the mount to write transformer results to (should end with trailing /) | nil | | `dataLifecycle.enabled` | Enable deployment of data lifecycle jobs | false | -| `dataLifecycle.image` | Default image for data lifecycle job | `sslhep/servicex_minio_cleanup` | -| `dataLifecycle.tag` | Data lifecycle job image tag | | +| `dataLifecycle.image` | Default image for data lifecycle job | `python` | +| `dataLifecycle.tag` | Data lifecycle job image tag | `3.10` | | `dataLifecycle.pullPolicy` | Data lifecycle image pull policy | `Always` | | `dataLifecycle.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) | | `dataLifecycle.retention` | We will archive any transforms older than this. Use the gnu date command --date argument. See [date command](https://www.geeksforgeeks.org/date-command-linux-examples/#4-how-to-display-past-dates) for examples. | 7 days ago | | `dataLifecycle.maxDesiredCacheSize` | If the server-side cache is larger than this cleanup service will keep going forward in time to delete transforms. Specify units as Mb Gb, Tb or Pb | "1Tb" | +| `datasetLifecycle.image` | Default image for dataset cache lifecycle job | `curlimages/curl` | +| `datasetLifecycle.tag` | Dataset cache lifecycle job image tag | `8.17.0` | +| `datasetLifecycle.pullPolicy` | Dataset cache lifecycle image pull policy | `Always` | +| `datasetLifecycle.schedule` | Schedule for dataset cache cleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `0 * * * *` (top of every hour) | +| `datasetLifecycle.cacheLifetime` | Lifetime of dataset cache, in hours | 24 | diff --git a/helm/servicex/templates/dataset-lifecycle/cronjob.yaml b/helm/servicex/templates/dataset-lifecycle/cronjob.yaml new file mode 100644 index 000000000..98b9aa83f --- /dev/null +++ b/helm/servicex/templates/dataset-lifecycle/cronjob.yaml @@ -0,0 +1,24 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ .Release.Name }}-dataset-lifecycle-job +spec: + schedule: {{ .Values.datasetLifecycle.schedule | default "0 * * * *" | quote }} + concurrencyPolicy: "Forbid" + jobTemplate: + spec: + template: + metadata: + labels: + app: {{ .Release.Name }}-dataset-lifecycle-job + spec: + containers: + - name: {{ .Release.Name }}-dataset-lifecycle-job + image: {{ .Values.datasetLifecycle.image }}:{{ .Values.datasetLifecycle.tag }} + imagePullPolicy: {{ .Values.datasetLifecycle.pullPolicy }} + env: + - name: LIFETIME + value: {{ .Values.datasetLifecycle.cacheLifetime }} + args: + - --request POST "http://{{ .Release.Name }}-servicex-app:8000/servicex/internal/dataset-lifecycle?age=$(LIFETIME)" + restartPolicy: OnFailure diff --git a/helm/servicex/values.yaml b/helm/servicex/values.yaml index 877df1c03..7aeb80968 100644 --- a/helm/servicex/values.yaml +++ b/helm/servicex/values.yaml @@ -265,3 +265,15 @@ dataLifecycle: # The cleanup service will go beyond the retention date to keep the server side cache size below # this threshold. Specify this as a string with Mb, Gb, Tb or Pb units in the string maxDesiredCacheSize: "1Tb" + +# This obsoletes the dataset cache for datasets older than "cacheLifetime" hours. +# Does not touch transforms or output files. +datasetLifecycle: + # image should support curl + image: curlimages/curl + tag: "8.17.0" + pullPolicy: Always + schedule: "0 * * * *" + + # How long to keep datasets in the cache, in hours + cacheLifetime: 24 diff --git a/servicex_app/servicex_app/resources/datasets/delete_dataset.py b/servicex_app/servicex_app/resources/datasets/delete_dataset.py index b056df187..f9f2a0028 100644 --- a/servicex_app/servicex_app/resources/datasets/delete_dataset.py +++ b/servicex_app/servicex_app/resources/datasets/delete_dataset.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, IRIS-HEP +# Copyright (c) 2024-25, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -30,18 +30,22 @@ from servicex_app.resources.servicex_resource import ServiceXResource -class DeleteDataset(ServiceXResource): - @auth_required - def delete(self, dataset_id): - dataset = Dataset.find_by_id(dataset_id) +def delete_dataset(dataset_id): + dataset = Dataset.find_by_id(dataset_id) + + if not dataset: + return {"message": f"Dataset {dataset_id} not found"}, 404 - if not dataset: - return {"message": f"Dataset {dataset_id} not found"}, 404 + if dataset.stale: + return {"message": f"Dataset {dataset_id} has already been deleted"}, 400 - if dataset.stale: - return {"message": f"Dataset {dataset_id} has already been deleted"}, 400 + dataset.stale = True + dataset.save_to_db() - dataset.stale = True - dataset.save_to_db() + return {"dataset-id": dataset_id, "stale": True}, 200 - return {"dataset-id": dataset_id, "stale": True} + +class DeleteDataset(ServiceXResource): + @auth_required + def delete(self, dataset_id): + return delete_dataset(dataset_id) diff --git a/servicex_app/servicex_app/resources/datasets/get_all.py b/servicex_app/servicex_app/resources/datasets/get_all.py index 722f8536c..6f0472857 100644 --- a/servicex_app/servicex_app/resources/datasets/get_all.py +++ b/servicex_app/servicex_app/resources/datasets/get_all.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, IRIS-HEP +# Copyright (c) 2024-25, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -31,20 +31,26 @@ from servicex_app.models import Dataset from servicex_app.resources.servicex_resource import ServiceXResource +from typing import List + parser = reqparse.RequestParser() parser.add_argument("did-finder", type=str, location="args", required=False) parser.add_argument("show-deleted", type=bool, location="args", required=False) +def get_all_datasets(args={}) -> List[Dataset]: + show_deleted = args["show-deleted"] if "show-deleted" in args else False + if "did-finder" in args and args["did-finder"]: + did_finder = args["did-finder"] + datasets = Dataset.get_by_did_finder(did_finder, show_deleted) + else: + datasets = Dataset.get_all(show_deleted) + + return datasets + + class AllDatasets(ServiceXResource): @auth_required def get(self): args = parser.parse_args() - show_deleted = args["show-deleted"] if "show-deleted" in args else False - if "did-finder" in args and args["did-finder"]: - did_finder = args["did-finder"] - datasets = Dataset.get_by_did_finder(did_finder, show_deleted) - else: - datasets = Dataset.get_all(show_deleted) - - return {"datasets": [dataset.to_json() for dataset in datasets]} + return {"datasets": [dataset.to_json() for dataset in get_all_datasets(args)]} diff --git a/servicex_app/servicex_app/resources/internal/dataset_lifecycle_ops.py b/servicex_app/servicex_app/resources/internal/dataset_lifecycle_ops.py new file mode 100644 index 000000000..a3dffe5d4 --- /dev/null +++ b/servicex_app/servicex_app/resources/internal/dataset_lifecycle_ops.py @@ -0,0 +1,61 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from datetime import datetime, timedelta, timezone + +from flask import request, current_app + +from servicex_app.resources.servicex_resource import ServiceXResource +from ..datasets.get_all import get_all_datasets +from ..datasets.delete_dataset import delete_dataset + + +class DatasetLifecycleOps(ServiceXResource): + def post(self): + """ + Obsolete cached datasets older than N hours + """ + now = datetime.now(timezone.utc) + try: + age = float(request.get_json().get("age", 24)) + except Exception: + return {"message": "Invalid age parameter"}, 422 + delta = timedelta(hours=age) + datasets = ( + get_all_datasets() + ) # by default this will only give non-stale datasets + todelete = [ + _.id for _ in datasets if _.last_updated and (now - _.last_updated) > delta + ] + current_app.logger.info( + f"Obsoletion called for datasets older than {delta}. " + f"Obsoleting {len(todelete)} datasets." + ) + for dataset_id in todelete: + delete_dataset(dataset_id) + + return {"message": "Success"}, 200 diff --git a/servicex_app/servicex_app/routes.py b/servicex_app/servicex_app/routes.py index 148946dbd..2799af8c1 100644 --- a/servicex_app/servicex_app/routes.py +++ b/servicex_app/servicex_app/routes.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, IRIS-HEP +# Copyright (c) 2019-2025, IRIS-HEP # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -32,6 +32,7 @@ from servicex_app.resources.datasets.get_one import OneDataset from servicex_app.resources.transformation.delete import DeleteTransform from servicex_app.resources.internal.data_lifecycle_ops import DataLifecycleOps +from servicex_app.resources.internal.dataset_lifecycle_ops import DatasetLifecycleOps def add_routes( @@ -198,3 +199,5 @@ def add_routes( DataLifecycleOps.make_api(object_store) api.add_resource(DataLifecycleOps, "/servicex/internal/data-lifecycle") + + api.add_resource(DatasetLifecycleOps, "/servicex/internal/dataset-lifecycle") diff --git a/servicex_app/servicex_app_test/resources/internal/test_dataset_lifecycle_ops.py b/servicex_app/servicex_app_test/resources/internal/test_dataset_lifecycle_ops.py new file mode 100644 index 000000000..b08ca7554 --- /dev/null +++ b/servicex_app/servicex_app_test/resources/internal/test_dataset_lifecycle_ops.py @@ -0,0 +1,87 @@ +# Copyright (c) 2025, IRIS-HEP +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +from datetime import datetime, timezone +from unittest.mock import patch + +from pytest import fixture + +from servicex_app.models import Dataset + +from servicex_app_test.resource_test_base import ResourceTestBase + + +class TestDatasetLifecycle(ResourceTestBase): + @fixture + def fake_dataset_list(self): + with patch( + "servicex_app.resources.internal.dataset_lifecycle_ops.get_all_datasets" + ) as dsfunc: + dsfunc.return_value = [ + Dataset( + last_used=datetime(2022, 1, 1, tzinfo=timezone.utc), + last_updated=datetime(2022, 1, 1, tzinfo=timezone.utc), + id=1, + name="not-orphaned", + events=100, + size=1000, + n_files=1, + lookup_status="complete", + did_finder="rucio", + ), + Dataset( + last_used=datetime.now(timezone.utc), + last_updated=datetime.now(timezone.utc), + id=2, + name="orphaned", + events=100, + size=1000, + n_files=1, + lookup_status="complete", + did_finder="rucio", + ), + ] + yield dsfunc + + def test_fail_on_bad_param(self, client): + with client.application.app_context(): + response = client.post( + "/servicex/internal/dataset-lifecycle", json={"age": "string"} + ) + assert response.status_code == 422 + + def test_deletion(self, fake_dataset_list, client): + with client.application.app_context(): + with patch( + "servicex_app.resources.internal.dataset_lifecycle_ops.delete_dataset" + ) as deletion_obj: + response = client.post( + "/servicex/internal/dataset-lifecycle", json={"age": 24} + ) + fake_dataset_list.assert_called_once() + deletion_obj.assert_called_once() + assert response.status_code == 200