Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pydock3/dockopt/dockopt.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class DockoptPipelineComponentRunFuncArgSet: # TODO: rename?
export_decoys_mol2: bool = False
delete_intermediate_files: bool = False
max_scheduler_jobs_running_at_a_time: Optional[int] = None
job_timeout_minutes: Optional[int] = None


class Dockopt(Script):
Expand Down Expand Up @@ -187,6 +188,7 @@ def run(
retrodock_job_max_reattempts: int = 0,
allow_failed_retrodock_jobs: bool = False,
retrodock_job_timeout_minutes: Optional[str] = None,
job_timeout_minutes: Optional[str] = None,
max_task_array_size: Optional[int] = None,
extra_submission_cmd_params_str: Optional[str] = None,
sleep_seconds_after_copying_output: int = 0,
Expand Down Expand Up @@ -262,6 +264,7 @@ def run(
sleep_seconds_after_copying_output=sleep_seconds_after_copying_output,
export_decoys_mol2=export_decoys_mol2,
delete_intermediate_files=delete_intermediate_files,
job_timeout_minutes=job_timeout_minutes,
#max_scheduler_jobs_running_at_a_time=max_scheduler_jobs_running_at_a_time, # TODO: move checking of this to this class?
)

Expand Down Expand Up @@ -752,7 +755,7 @@ def _get_unique_partial_docking_configuration_kwargs_sorted(self, dc_kwargs_list

return new_dc_kwargs_sorted

def parallel_run_graph_steps(self, scheduler) -> None:
def parallel_run_graph_steps(self, scheduler, job_timeout_minutes) -> None:
"""Runs steps in parallel while ensuring each unique step is executed only once."""

g = self.graph
Expand Down Expand Up @@ -789,7 +792,7 @@ def parallel_run_graph_steps(self, scheduler) -> None:

for step_instance, step_id in steps_to_run_scheduler:
logger.info(f"Submitting {step_instance.__class__.__name__} to the scheduler")
scheduler.submit_single_step(step_instance, job_name=step_id)
scheduler.submit_single_step(step_instance, job_name=step_id, job_timeout_minutes=job_timeout_minutes)

for step_instance in steps_to_run_sequentially:
step_instance.run()
Expand Down Expand Up @@ -820,7 +823,7 @@ def run(
# run necessary steps to get all dock files
logger.info("Generating docking configurations")

self.parallel_run_graph_steps(component_run_func_arg_set.scheduler)
self.parallel_run_graph_steps(component_run_func_arg_set.scheduler, component_run_func_arg_set.job_timeout_minutes)
for dc in self.docking_configurations:
indock_file = dc.get_indock_file(self.pipeline_dir.path)
indock_file.write(dc.get_dock_files(self.pipeline_dir.path), dc.indock_file_generation_flat_param_dict)
Expand Down
62 changes: 41 additions & 21 deletions pydock3/job_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,33 +123,29 @@ def submit(

return procs

def submit_single_step(
self,
step_instance,
job_name="blaster_step",
):
# TODO: Better handling of the step_dir. Technically the run() function
# will overwrite this folder. I think its ok for now but silly
def submit_single_step(self, step_instance, job_name="blaster_step", job_timeout_minutes=None):
step_dir = step_instance.step_dir.path
os.makedirs(step_dir, exist_ok=True)

step_pickle_path = os.path.join(step_dir, "step_instance.pkl")
with open(step_pickle_path, "wb") as f:
pickle.dump(step_instance, f)

slurm_script = f"""#!/bin/bash
#SBATCH --job-name={job_name}
#SBATCH --output={step_dir}/{job_name}_%A_%a.out
#SBATCH --error={step_dir}/{job_name}_%A_%a.err

{sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()"
"""
sub_script_path = os.path.join(step_dir, "submission.sh")
with open(sub_script_path, "w") as f:
f.write(slurm_script)

proc = system_call(f"{self.SBATCH_EXEC} {sub_script_path}")

return proc
script_content = f"""#!/bin/bash
{sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()"
"""
script_path = os.path.join(step_dir, "submission.sh")
with open(script_path, "w") as f:
f.write(script_content)

return self.submit(
job_name=job_name,
script_path=script_path,
log_dir_path=step_dir,
task_ids=[0],
env_vars_dict={},
job_timeout_minutes=job_timeout_minutes
)[0]



Expand Down Expand Up @@ -254,6 +250,30 @@ def submit(

return procs

def submit_single_step(self, step_instance, job_name="blaster_step", job_timeout_minutes=None):
step_dir = step_instance.step_dir.path
os.makedirs(step_dir, exist_ok=True)

step_pickle_path = os.path.join(step_dir, "step_instance.pkl")
with open(step_pickle_path, "wb") as f:
pickle.dump(step_instance, f)

script_content = f"""#!/bin/bash
{sys.executable} -c "import pickle; step = pickle.load(open('{step_pickle_path}', 'rb')); step.run()"
"""
script_path = os.path.join(step_dir, "submission.sh")
with open(script_path, "w") as f:
f.write(script_content)

return self.submit(
job_name=job_name,
script_path=script_path,
log_dir_path=step_dir,
task_ids=[0],
env_vars_dict={},
job_timeout_minutes=job_timeout_minutes
)[0]

def job_is_on_queue(self, job_name: str) -> bool:
command_str = f"{self.QSTAT_EXEC} -r | grep '{job_name}'"
proc = system_call(command_str)
Expand Down