Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions docs/deployment/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,14 @@ parameters for the [rabbitMQ](https://github.com/bitnami/charts/tree/master/bitn
| `transformer.persistence.existingClaim` | Existing persistent volume claim | nil |
| `transformer.subdir` | Subdirectory of the mount to write transformer results to (should end with trailing /) | nil |
| `dataLifecycle.enabled` | Enable deployment of data lifecycle jobs | false |
| `dataLifecycle.image` | Default image for data lifecycle job | `sslhep/servicex_minio_cleanup` |
| `dataLifecycle.tag` | Data lifecycle job image tag | |
| `dataLifecycle.image` | Default image for data lifecycle job | `python` |
| `dataLifecycle.tag` | Data lifecycle job image tag | `3.10` |
| `dataLifecycle.pullPolicy` | Data lifecycle image pull policy | `Always` |
| `dataLifecycle.schedule` | Schedule for minioCleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `* */8 * * *` (every 8 hours) |
| `dataLifecycle.retention` | We will archive any transforms older than this. Use the gnu date command --date argument. See [date command](https://www.geeksforgeeks.org/date-command-linux-examples/#4-how-to-display-past-dates) for examples. | 7 days ago |
| `dataLifecycle.maxDesiredCacheSize` | If the server-side cache is larger than this cleanup service will keep going forward in time to delete transforms. Specify units as Mb Gb, Tb or Pb | "1Tb" |
| `datasetLifecycle.image` | Default image for dataset cache lifecycle job | `curlimages/curl` |
| `datasetLifecycle.tag` | Dataset cache lifecycle job image tag | `8.17.0` |
| `datasetLifecycle.pullPolicy` | Dataset cache lifecycle image pull policy | `Always` |
| `datasetLifecycle.schedule` | Schedule for dataset cache cleanup cronjob. See [reference](https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/#cron-schedule-syntax) for details on fields | `0 * * * *` (top of every hour) |
| `datasetLifecycle.cacheLifetime` | Lifetime of dataset cache, in hours | 24 |
24 changes: 24 additions & 0 deletions helm/servicex/templates/dataset-lifecycle/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
apiVersion: batch/v1
kind: CronJob
metadata:
name: {{ .Release.Name }}-dataset-lifecycle-job
spec:
schedule: {{ .Values.datasetLifecycle.schedule | default "0 * * * *" | quote }}
concurrencyPolicy: "Forbid"
jobTemplate:
spec:
template:
metadata:
labels:
app: {{ .Release.Name }}-dataset-lifecycle-job
spec:
containers:
- name: {{ .Release.Name }}-dataset-lifecycle-job
image: {{ .Values.datasetLifecycle.image }}:{{ .Values.datasetLifecycle.tag }}
imagePullPolicy: {{ .Values.datasetLifecycle.pullPolicy }}
env:
- name: LIFETIME
value: {{ .Values.datasetLifecycle.cacheLifetime }}
args:
- --request POST "http://{{ .Release.Name }}-servicex-app:8000/servicex/internal/dataset-lifecycle?age=$(LIFETIME)"
restartPolicy: OnFailure
12 changes: 12 additions & 0 deletions helm/servicex/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,15 @@ dataLifecycle:
# The cleanup service will go beyond the retention date to keep the server side cache size below
# this threshold. Specify this as a string with Mb, Gb, Tb or Pb units in the string
maxDesiredCacheSize: "1Tb"

# This obsoletes the dataset cache for datasets older than "cacheLifetime" hours.
# Does not touch transforms or output files.
datasetLifecycle:
# image should support curl
image: curlimages/curl
tag: "8.17.0"
pullPolicy: Always
schedule: "0 * * * *"

# How long to keep datasets in the cache, in hours
cacheLifetime: 24
28 changes: 16 additions & 12 deletions servicex_app/servicex_app/resources/datasets/delete_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, IRIS-HEP
# Copyright (c) 2024-25, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -30,18 +30,22 @@
from servicex_app.resources.servicex_resource import ServiceXResource


class DeleteDataset(ServiceXResource):
@auth_required
def delete(self, dataset_id):
dataset = Dataset.find_by_id(dataset_id)
def delete_dataset(dataset_id):
dataset = Dataset.find_by_id(dataset_id)

if not dataset:
return {"message": f"Dataset {dataset_id} not found"}, 404

if not dataset:
return {"message": f"Dataset {dataset_id} not found"}, 404
if dataset.stale:
return {"message": f"Dataset {dataset_id} has already been deleted"}, 400

if dataset.stale:
return {"message": f"Dataset {dataset_id} has already been deleted"}, 400
dataset.stale = True
dataset.save_to_db()

dataset.stale = True
dataset.save_to_db()
return {"dataset-id": dataset_id, "stale": True}, 200

return {"dataset-id": dataset_id, "stale": True}

class DeleteDataset(ServiceXResource):
@auth_required
def delete(self, dataset_id):
return delete_dataset(dataset_id)
24 changes: 15 additions & 9 deletions servicex_app/servicex_app/resources/datasets/get_all.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2024, IRIS-HEP
# Copyright (c) 2024-25, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -31,20 +31,26 @@
from servicex_app.models import Dataset
from servicex_app.resources.servicex_resource import ServiceXResource

from typing import List

parser = reqparse.RequestParser()
parser.add_argument("did-finder", type=str, location="args", required=False)
parser.add_argument("show-deleted", type=bool, location="args", required=False)


def get_all_datasets(args={}) -> List[Dataset]:
show_deleted = args["show-deleted"] if "show-deleted" in args else False
if "did-finder" in args and args["did-finder"]:
did_finder = args["did-finder"]
datasets = Dataset.get_by_did_finder(did_finder, show_deleted)
else:
datasets = Dataset.get_all(show_deleted)

return datasets


class AllDatasets(ServiceXResource):
@auth_required
def get(self):
args = parser.parse_args()
show_deleted = args["show-deleted"] if "show-deleted" in args else False
if "did-finder" in args and args["did-finder"]:
did_finder = args["did-finder"]
datasets = Dataset.get_by_did_finder(did_finder, show_deleted)
else:
datasets = Dataset.get_all(show_deleted)

return {"datasets": [dataset.to_json() for dataset in datasets]}
return {"datasets": [dataset.to_json() for dataset in get_all_datasets(args)]}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from datetime import datetime, timedelta, timezone

from flask import request, current_app

from servicex_app.resources.servicex_resource import ServiceXResource
from ..datasets.get_all import get_all_datasets
from ..datasets.delete_dataset import delete_dataset


class DatasetLifecycleOps(ServiceXResource):
def post(self):
"""
Obsolete cached datasets older than N hours
"""
now = datetime.now(timezone.utc)
try:
age = float(request.get_json().get("age", 24))
except Exception:
return {"message": "Invalid age parameter"}, 422
delta = timedelta(hours=age)
datasets = (
get_all_datasets()
) # by default this will only give non-stale datasets
todelete = [
_.id for _ in datasets if _.last_updated and (now - _.last_updated) > delta
]
current_app.logger.info(
f"Obsoletion called for datasets older than {delta}. "
f"Obsoleting {len(todelete)} datasets."
)
for dataset_id in todelete:
delete_dataset(dataset_id)

return {"message": "Success"}, 200
5 changes: 4 additions & 1 deletion servicex_app/servicex_app/routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019, IRIS-HEP
# Copyright (c) 2019-2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -32,6 +32,7 @@
from servicex_app.resources.datasets.get_one import OneDataset
from servicex_app.resources.transformation.delete import DeleteTransform
from servicex_app.resources.internal.data_lifecycle_ops import DataLifecycleOps
from servicex_app.resources.internal.dataset_lifecycle_ops import DatasetLifecycleOps


def add_routes(
Expand Down Expand Up @@ -198,3 +199,5 @@ def add_routes(

DataLifecycleOps.make_api(object_store)
api.add_resource(DataLifecycleOps, "/servicex/internal/data-lifecycle")

api.add_resource(DatasetLifecycleOps, "/servicex/internal/dataset-lifecycle")
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright (c) 2025, IRIS-HEP
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from datetime import datetime, timezone
from unittest.mock import patch

from pytest import fixture

from servicex_app.models import Dataset

from servicex_app_test.resource_test_base import ResourceTestBase


class TestDatasetLifecycle(ResourceTestBase):
@fixture
def fake_dataset_list(self):
with patch(
"servicex_app.resources.internal.dataset_lifecycle_ops.get_all_datasets"
) as dsfunc:
dsfunc.return_value = [
Dataset(
last_used=datetime(2022, 1, 1, tzinfo=timezone.utc),
last_updated=datetime(2022, 1, 1, tzinfo=timezone.utc),
id=1,
name="not-orphaned",
events=100,
size=1000,
n_files=1,
lookup_status="complete",
did_finder="rucio",
),
Dataset(
last_used=datetime.now(timezone.utc),
last_updated=datetime.now(timezone.utc),
id=2,
name="orphaned",
events=100,
size=1000,
n_files=1,
lookup_status="complete",
did_finder="rucio",
),
]
yield dsfunc

def test_fail_on_bad_param(self, client):
with client.application.app_context():
response = client.post(
"/servicex/internal/dataset-lifecycle", json={"age": "string"}
)
assert response.status_code == 422

def test_deletion(self, fake_dataset_list, client):
with client.application.app_context():
with patch(
"servicex_app.resources.internal.dataset_lifecycle_ops.delete_dataset"
) as deletion_obj:
response = client.post(
"/servicex/internal/dataset-lifecycle", json={"age": 24}
)
fake_dataset_list.assert_called_once()
deletion_obj.assert_called_once()
assert response.status_code == 200