From 74147a84c7ce1658f71fceaba454f0e8fbaa1e7b Mon Sep 17 00:00:00 2001 From: Brendan Hall Date: Thu, 15 Jan 2026 17:24:43 -0800 Subject: [PATCH] Improved SGE support and added job timeouts --- pydock3/dockopt/dockopt.py | 9 ++++-- pydock3/job_schedulers.py | 62 +++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/pydock3/dockopt/dockopt.py b/pydock3/dockopt/dockopt.py index 04a235e..553afe1 100755 --- a/pydock3/dockopt/dockopt.py +++ b/pydock3/dockopt/dockopt.py @@ -96,6 +96,7 @@ class DockoptPipelineComponentRunFuncArgSet: # TODO: rename? export_decoys_mol2: bool = False delete_intermediate_files: bool = False max_scheduler_jobs_running_at_a_time: Optional[int] = None + job_timeout_minutes: Optional[int] = None class Dockopt(Script): @@ -187,6 +188,7 @@ def run( retrodock_job_max_reattempts: int = 0, allow_failed_retrodock_jobs: bool = False, retrodock_job_timeout_minutes: Optional[str] = None, + job_timeout_minutes: Optional[str] = None, max_task_array_size: Optional[int] = None, extra_submission_cmd_params_str: Optional[str] = None, sleep_seconds_after_copying_output: int = 0, @@ -262,6 +264,7 @@ def run( sleep_seconds_after_copying_output=sleep_seconds_after_copying_output, export_decoys_mol2=export_decoys_mol2, delete_intermediate_files=delete_intermediate_files, + job_timeout_minutes=job_timeout_minutes, #max_scheduler_jobs_running_at_a_time=max_scheduler_jobs_running_at_a_time, # TODO: move checking of this to this class? ) @@ -752,7 +755,7 @@ def _get_unique_partial_docking_configuration_kwargs_sorted(self, dc_kwargs_list return new_dc_kwargs_sorted - def parallel_run_graph_steps(self, scheduler) -> None: + def parallel_run_graph_steps(self, scheduler, job_timeout_minutes) -> None: """Runs steps in parallel while ensuring each unique step is executed only once.""" g = self.graph @@ -789,7 +792,7 @@ def parallel_run_graph_steps(self, scheduler) -> None: for step_instance, step_id in steps_to_run_scheduler: logger.info(f"Submitting {step_instance.__class__.__name__} to the scheduler") - scheduler.submit_single_step(step_instance, job_name=step_id) + scheduler.submit_single_step(step_instance, job_name=step_id, job_timeout_minutes=job_timeout_minutes) for step_instance in steps_to_run_sequentially: step_instance.run() @@ -820,7 +823,7 @@ def run( # run necessary steps to get all dock files logger.info("Generating docking configurations") - self.parallel_run_graph_steps(component_run_func_arg_set.scheduler) + self.parallel_run_graph_steps(component_run_func_arg_set.scheduler, component_run_func_arg_set.job_timeout_minutes) for dc in self.docking_configurations: indock_file = dc.get_indock_file(self.pipeline_dir.path) indock_file.write(dc.get_dock_files(self.pipeline_dir.path), dc.indock_file_generation_flat_param_dict) diff --git a/pydock3/job_schedulers.py b/pydock3/job_schedulers.py index fae94e2..ae7c8ad 100644 --- a/pydock3/job_schedulers.py +++ b/pydock3/job_schedulers.py @@ -123,33 +123,29 @@ def submit( return procs - def submit_single_step( - self, - step_instance, - job_name="blaster_step", - ): - # TODO: Better handling of the step_dir. Technically the run() function - # will overwrite this folder. I think its ok for now but silly + def submit_single_step(self, step_instance, job_name="blaster_step", job_timeout_minutes=None): step_dir = step_instance.step_dir.path os.makedirs(step_dir, exist_ok=True) + step_pickle_path = os.path.join(step_dir, "step_instance.pkl") with open(step_pickle_path, "wb") as f: pickle.dump(step_instance, f) - slurm_script = f"""#!/bin/bash -#SBATCH --job-name={job_name} -#SBATCH --output={step_dir}/{job_name}_%A_%a.out -#SBATCH --error={step_dir}/{job_name}_%A_%a.err - -{sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()" -""" - sub_script_path = os.path.join(step_dir, "submission.sh") - with open(sub_script_path, "w") as f: - f.write(slurm_script) - - proc = system_call(f"{self.SBATCH_EXEC} {sub_script_path}") - - return proc + script_content = f"""#!/bin/bash + {sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()" + """ + script_path = os.path.join(step_dir, "submission.sh") + with open(script_path, "w") as f: + f.write(script_content) + + return self.submit( + job_name=job_name, + script_path=script_path, + log_dir_path=step_dir, + task_ids=[0], + env_vars_dict={}, + job_timeout_minutes=job_timeout_minutes + )[0] @@ -254,6 +250,30 @@ def submit( return procs + def submit_single_step(self, step_instance, job_name="blaster_step", job_timeout_minutes=None): + step_dir = step_instance.step_dir.path + os.makedirs(step_dir, exist_ok=True) + + step_pickle_path = os.path.join(step_dir, "step_instance.pkl") + with open(step_pickle_path, "wb") as f: + pickle.dump(step_instance, f) + + script_content = f"""#!/bin/bash + {sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()" + """ + script_path = os.path.join(step_dir, "submission.sh") + with open(script_path, "w") as f: + f.write(script_content) + + return self.submit( + job_name=job_name, + script_path=script_path, + log_dir_path=step_dir, + task_ids=[0], + env_vars_dict={}, + job_timeout_minutes=job_timeout_minutes + )[0] + def job_is_on_queue(self, job_name: str) -> bool: command_str = f"{self.QSTAT_EXEC} -r | grep '{job_name}'" proc = system_call(command_str)