From 02e63458386bad5836f2591b6733691bf0586073 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sat, 7 Jan 2023 11:39:24 +0100 Subject: [PATCH 01/12] pbs wrappers, singulatity data class --- .idea/.gitignore | 3 ++ setup.py | 3 +- src/swrap/pbs_utils.py | 35 +++++++++++-- src/swrap/sexec.py | 113 ++++++++++++++++++++++++++--------------- src/swrap/smpiexec.py | 7 +-- src/swrap/utils.py | 13 +++++ testing/test_none.py | 6 ++- 7 files changed, 128 insertions(+), 52 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 src/swrap/utils.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/setup.py b/setup.py index f702869..7bafb79 100644 --- a/setup.py +++ b/setup.py @@ -37,11 +37,10 @@ zip_safe=False, install_requires=[], python_requires='>=3', - packages=['swrap'], package_dir={'': 'src'}, entry_points={ - 'console_scripts': ['smpiexec=swrap.smpiexec:main'] + 'console_scripts': ['sexec=swrap.sexec:main', 'smpiexec=swrap.smpiexec:main'] } ) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index 18f3066..dba324f 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -1,9 +1,8 @@ import os # import subprocess -import sexec -from sexec import flush_print - +from swrap.utils import flush_print, oscommand +from pathlib import Path def run_in_ssh(arg_list, init_dir=None): """ @@ -47,3 +46,33 @@ def qsub(job_file, arg_list=None): return run_in_ssh(["qsub", job_file], init_dir=job_dir) else: return run_in_ssh(["qsub", *arg_list, job_file], init_dir=job_dir) + + +def make_wrapper(dir, cmd, host_addr, binds): + bind_replace=""" + PWD_REL="${{PWD#{1}}}" + if [ "$PWD_REL" != "$PWD" ] + then + PWD_HOST="{0}$PWD_REL" + fi + """ + Path(dir).mkdir(parents=True, exist_ok=True) + pwd_replace_binds = [bind_replace.format(host_dir, cont_dir) for host_dir, cont_dir in binds.items()] + content=[ + "# !/bin/bash", + "PWD=\"`pwd`\"", + "PWD_HOST=\"${PWD}\"", + *pwd_replace_binds, + f"ssh {host_addr} \"cd '$PWD_HOST'; {cmd} @*\"" + ] + cmd_path = os.path.join(dir, cmd) + with open(cmd_path, "w") as f: + f.write("\n".join(content)) + os.chmod(cmd_path, 0o477) + +def make_pbs_wrappers(dir, binds): + host_addr = os.environ.get('PBS_O_HOST', None) + if host_addr is None: + host_addr = oscommand('hostname') + make_wrapper(dir, 'qstat', host_addr, binds) + make_wrapper(dir, 'qsub', host_addr, binds) diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index 81fe6d2..eb5b8f0 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -1,22 +1,15 @@ +from typing import * import os -import sys import shutil import argparse import subprocess +import attrs -from argparse import RawTextHelpFormatter +from swrap.utils import flush_print, oscommand +from . import pbs_utils -def flush_print(*margs, **mkwargs): - print(*margs, file=sys.stdout, flush=True, **mkwargs) - - -def oscommand(command_string): - flush_print(command_string) - flush_print(os.popen(command_string).read()) - - -def process_image_path(image_path): +def process_image_url(image_path: str) -> str: if os.path.isfile(image_path): image = os.path.abspath(image_path) elif image_path.startswith('docker://'): @@ -26,6 +19,61 @@ def process_image_path(image_path): return image +@attrs.define +class SingularityCall: + image: str = attrs.field(converter=process_image_url) + # singularity image url + command: List[str] + # command to call in the container with its arguments + bindings: List[str] = attrs.field(factory=list) + env_dict: Dict[str, str] = attrs.field(factory=dict) + append_path: List[str] = attrs.field(factory=list) + prepend_path: List[str] = attrs.field(factory=list) + debug: bool = False + + def append_path(self, add_path): + append_path_list = self.env_dict.get('APPEND_PATH', "").split(":") + append_path_list.append(add_path) + append_path_list = ":".join(append_path_list) + self.env_dict['APPEND_PATH'] = append_path_list + + def prepend_path(self, add_path): + append_path_list = self.env_dict.get('PREPEND_PATH', "").split(":") + append_path_list.insert(0, add_path) + append_path_list = ":".join(append_path_list) + self.env_dict['PREPEND_PATH'] = append_path_list + + def form_bindings(self): + # currently we olny support binding of the same paths in host and in container + return self.bindings + + def form_env_list(self): + return [f"{key}={str(value)}" for key, value in self.env_dict.items()] + + def cmd_list(self): + sing_command = ['singularity', 'exec', + '-B', ",".join(self.form_bindings()), + '--env', ",".join(self.form_env_list()), + self.image, + *self.command] + + # F] join all the arguments into final singularity container command + return sing_command + + def call(self): + flush_print("current directory:", os.getcwd()) + # mprint(os.popen("ls -l").read()) + flush_print("final command:", *self.cmd_list()) + flush_print("=================== smpiexec.py END ===================") + if not self.debug: + flush_print("================== Program output START ==================") + # proc = subprocess.run(final_command_list) + final_command = " ".join(self.cmd_list()) + oscommand(final_command) + flush_print("=================== Program output END ===================") + # exit(proc.returncode) + + def copy_and_read_node_file(orig_node_file, directory): flush_print("reading host file...") @@ -198,14 +246,8 @@ def main(): current_dir = os.getcwd() args = arguments() - # get debug variable - debug = args.debug - # get program and its arguments - prog_args = args.prog[1:] - - # get program and its arguments, set absolute path - image = process_image_path(args.image) + sing = SingularityCall(args.image, args.prog[1:], debug=args.debug) ################################################################################################################### # Process node file and setup ssh access to given nodes. ################################################################################################################### @@ -218,9 +260,9 @@ def main(): pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job') # create auxiliary job output directory os.makedirs(pbs_job_aux_dir, mode=0o775) - + # get nodefile, copy it to local dir so that it can be passed into container mpiexec later - if debug: + if sing.debug: orig_node_file = "testing_hostfile" else: orig_node_file = os.environ['PBS_NODEFILE'] @@ -228,7 +270,7 @@ def main(): # Get ssh keys to nodes and append it to $HOME/.ssh/known_hosts ssh_known_hosts_to_append = [] - if debug: + if sing.debug: # ssh_known_hosts_file = 'testing_known_hosts' ssh_known_hosts_file = 'xxx/.ssh/testing_known_hosts' else: @@ -252,21 +294,19 @@ def main(): # A] process bindings, exclude ssh agent in launcher bindings common_bindings = ["/etc/ssh/ssh_config", "/etc/ssh/ssh_known_hosts", "/etc/krb5.conf"] - bindings = [*common_bindings, os.environ['SSH_AUTH_SOCK']] + sing.bindings.extend(common_bindings) + sing.bindings.append(os.environ['SSH_AUTH_SOCK']) # possibly add current dir to container bindings # bindings = bindings + "," + current_dir + ":" + current_dir if args.bind != "": - bindings.append(args.bind) + sing.bindings.append(args.bind) if scratch_dir_path: - bindings.append(scratch_dir_path) + sing.bindings.append(scratch_dir_path) - sing_command = ['singularity', 'exec', '-B', ",".join(bindings), image] - flush_print('sing_command:', *sing_command) - - # F] join all the arguments into final singularity container command - final_command_list = [*sing_command, *prog_args] + pbs_utils.make_pbs_wrappers(pbs_job_aux_dir, sing.bindings) + sing.append_path(pbs_job_aux_dir) ################################################################################################################### # Final call. @@ -275,18 +315,7 @@ def main(): flush_print("Entering SCRATCHDIR:", scratch_dir_path) os.chdir(scratch_dir_path) - flush_print("current directory:", os.getcwd()) - # mprint(os.popen("ls -l").read()) - flush_print("final command:", *final_command_list) - flush_print("=================== smpiexec.py END ===================") - if not debug: - flush_print("================== Program output START ==================") - # proc = subprocess.run(final_command_list) - final_command = " ".join(final_command_list) - oscommand(final_command) - - flush_print("=================== Program output END ===================") - # exit(proc.returncode) + sing.call() if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/swrap/smpiexec.py b/src/swrap/smpiexec.py index 4060d82..c5b1fd0 100755 --- a/src/swrap/smpiexec.py +++ b/src/swrap/smpiexec.py @@ -2,7 +2,8 @@ import subprocess import sexec -from sexec import flush_print +import swrap.utils +from swrap.utils import flush_print def prepare_mpiexec_launcher(pbs_job_aux_dir, pbs_job_id, sing_command_in_launcher): @@ -31,7 +32,7 @@ def prepare_mpiexec_launcher(pbs_job_aux_dir, pbs_job_id, sing_command_in_launch ] with open(launcher_path, 'w') as f: f.write('\n'.join(launcher_lines)) - sexec.oscommand('chmod +x ' + launcher_path) + swrap.utils.oscommand('chmod +x ' + launcher_path) return launcher_path @@ -149,7 +150,7 @@ def main(): flush_print("================== Program output START ==================") # proc = subprocess.run(final_command_list) final_command = " ".join(final_command_list) - sexec.oscommand(final_command) + swrap.utils.oscommand(final_command) flush_print("=================== Program output END ===================") # exit(proc.returncode) diff --git a/src/swrap/utils.py b/src/swrap/utils.py new file mode 100644 index 0000000..cf0cce8 --- /dev/null +++ b/src/swrap/utils.py @@ -0,0 +1,13 @@ +import os +import sys + + +def flush_print(*margs, **mkwargs): + print(*margs, file=sys.stdout, flush=True, **mkwargs) + + +def oscommand(command_string): + flush_print(command_string) + stdout=os.popen(command_string).read() + flush_print(stdout) + return stdout \ No newline at end of file diff --git a/testing/test_none.py b/testing/test_none.py index 1c49fd9..a957bcb 100644 --- a/testing/test_none.py +++ b/testing/test_none.py @@ -1,2 +1,4 @@ -def test_test(): - print("Test passed.") +from swrap.pbs_utils import make_pbs_wrappers + +def test_pbs_wrappers(): + make_pbs_wrappers('sandbox', 'hostname', {'/host/dir':'/cont/dir', '/H/':'/C/'}) From bdd1a8f9b580ffb349b03023a72a6ddb7ca4d8b4 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sat, 7 Jan 2023 19:17:26 +0100 Subject: [PATCH 02/12] Working PBS wrappers, sexec refactoring. --- src/swrap/pbs_utils.py | 12 ++++--- src/swrap/sexec.py | 71 +++++++++++++++++++++++++----------------- 2 files changed, 50 insertions(+), 33 deletions(-) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index dba324f..51ff3fb 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -49,6 +49,7 @@ def qsub(job_file, arg_list=None): def make_wrapper(dir, cmd, host_addr, binds): + host_addr=host_addr.strip("\n") bind_replace=""" PWD_REL="${{PWD#{1}}}" if [ "$PWD_REL" != "$PWD" ] @@ -57,18 +58,21 @@ def make_wrapper(dir, cmd, host_addr, binds): fi """ Path(dir).mkdir(parents=True, exist_ok=True) - pwd_replace_binds = [bind_replace.format(host_dir, cont_dir) for host_dir, cont_dir in binds.items()] + #pwd_replace_binds = [bind_replace.format(host_dir, cont_dir) for host_dir, cont_dir in binds] + # TODO: support bindings with path different on host and container + pwd_replace_binds=[] content=[ - "# !/bin/bash", + "#!/bin/bash", + "set -x", "PWD=\"`pwd`\"", "PWD_HOST=\"${PWD}\"", *pwd_replace_binds, - f"ssh {host_addr} \"cd '$PWD_HOST'; {cmd} @*\"" + f'ssh {host_addr} "cd \'$PWD_HOST\'; {cmd} $*"' ] cmd_path = os.path.join(dir, cmd) with open(cmd_path, "w") as f: f.write("\n".join(content)) - os.chmod(cmd_path, 0o477) + os.chmod(cmd_path, 0o777) def make_pbs_wrappers(dir, binds): host_addr = os.environ.get('PBS_O_HOST', None) diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index eb5b8f0..936a5f9 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -4,11 +4,13 @@ import argparse import subprocess import attrs +import socket from swrap.utils import flush_print, oscommand from . import pbs_utils + def process_image_url(image_path: str) -> str: if os.path.isfile(image_path): image = os.path.abspath(image_path) @@ -27,8 +29,6 @@ class SingularityCall: # command to call in the container with its arguments bindings: List[str] = attrs.field(factory=list) env_dict: Dict[str, str] = attrs.field(factory=dict) - append_path: List[str] = attrs.field(factory=list) - prepend_path: List[str] = attrs.field(factory=list) debug: bool = False def append_path(self, add_path): @@ -74,20 +74,27 @@ def call(self): # exit(proc.returncode) -def copy_and_read_node_file(orig_node_file, directory): +def copy_and_read_node_file(directory): + orig_node_file = os.environ.get('PBS_NODEFILE', None) + if orig_node_file is None: + node_file = os.path.join(directory, "nodefile") + hostname = socket.gethostname() + flush_print(f"Warning: missing PBS_NODEFILE variable. Using just local node: {hostname}.") + with open(node_file, "w") as f: + f.write(hostname) + else: + # create a copy + node_file = os.path.join(directory, os.path.basename(orig_node_file)) + shutil.copy(orig_node_file, node_file) + flush_print("reading host file...") - # create a copy - node_file = os.path.join(directory, os.path.basename(orig_node_file)) - shutil.copy(orig_node_file, node_file) - # mprint(os.popen("ls -l").read()) - # read node names with open(node_file) as fp: node_names_read = fp.read().splitlines() # remove duplicates - node_names = list(dict.fromkeys(node_names_read)) - return node_file, node_names + node_names = list(set(node_names_read)) + return node_file, node_names def create_ssh_agent(): @@ -161,7 +168,14 @@ def process_known_hosts_file(ssh_known_hosts_file, node_names): def prepare_scratch_dir(scratch_source, node_names): - scratch_dir_path = os.environ['SCRATCHDIR'] + if scratch_source == "": + return os.getcwd() + + scratch_dir_path = os.environ.get('SCRATCHDIR', None) + if scratch_dir_path is None: + return os.getcwd() + + flush_print("Using SCRATCHDIR:", scratch_dir_path) flush_print("copying to SCRATCHDIR on all nodes...") @@ -189,6 +203,7 @@ def prepare_scratch_dir(scratch_source, node_names): command = ' '.join(['cd', source, '&&', 'tar -cvf', source_tar_filepath, '.', '&& cd', current_dir]) oscommand(command) + # copy to scratch dir on every used node through ssh for node in node_names: destination_name = username + "@" + node destination_path = destination_name + ':' + scratch_dir_path @@ -241,32 +256,33 @@ def arguments(): return args +def setup_aux_dir(): + current_dir = os.getcwd() + pbs_job_id = os.environ.get('PBS_JOBID', f"pid_{os.getpid()}") + flush_print("PBS job id: ", pbs_job_id) + pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job') + # create auxiliary job output directory + os.makedirs(pbs_job_aux_dir, mode=0o775) + return pbs_job_aux_dir + + def main(): flush_print("================== smpiexec.py START ==================") - current_dir = os.getcwd() args = arguments() - sing = SingularityCall(args.image, args.prog[1:], debug=args.debug) + sing = SingularityCall(args.image, args.prog, debug=args.debug) ################################################################################################################### # Process node file and setup ssh access to given nodes. ################################################################################################################### flush_print("Hostname: ", os.popen('hostname').read()) # mprint("os.environ", os.environ) - - pbs_job_id = os.environ['PBS_JOBID'] - flush_print("PBS job id: ", pbs_job_id) - pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job') - # create auxiliary job output directory - os.makedirs(pbs_job_aux_dir, mode=0o775) + pbs_job_aux_dir = setup_aux_dir() # get nodefile, copy it to local dir so that it can be passed into container mpiexec later - if sing.debug: - orig_node_file = "testing_hostfile" - else: - orig_node_file = os.environ['PBS_NODEFILE'] - node_file, node_names = copy_and_read_node_file(orig_node_file, pbs_job_aux_dir) + node_file, node_names = copy_and_read_node_file(pbs_job_aux_dir) + # Get ssh keys to nodes and append it to $HOME/.ssh/known_hosts ssh_known_hosts_to_append = [] @@ -286,10 +302,7 @@ def main(): ################################################################################################################### flush_print("assembling final command...") - - scratch_dir_path = None - if 'SCRATCHDIR' in os.environ: - scratch_dir_path = prepare_scratch_dir(args.scratch_copy, node_names) + scratch_dir_path = prepare_scratch_dir(args.scratch_copy, node_names) # A] process bindings, exclude ssh agent in launcher bindings @@ -318,4 +331,4 @@ def main(): sing.call() if __name__ == "__main__": - main() \ No newline at end of file + main() From 345aaecca3ad53f4db8a4a00ab8ef10afa1c7571 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sun, 8 Jan 2023 01:04:16 +0100 Subject: [PATCH 03/12] drop dependency on attrs --- src/swrap/sexec.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index 936a5f9..dd2caff 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -3,7 +3,7 @@ import shutil import argparse import subprocess -import attrs +#import attrs import socket from swrap.utils import flush_print, oscommand @@ -21,15 +21,15 @@ def process_image_url(image_path: str) -> str: return image -@attrs.define class SingularityCall: - image: str = attrs.field(converter=process_image_url) - # singularity image url - command: List[str] - # command to call in the container with its arguments - bindings: List[str] = attrs.field(factory=list) - env_dict: Dict[str, str] = attrs.field(factory=dict) - debug: bool = False + def __init__(self.image, commad, debug=False): + self.image: str = process_image_url(image) + # singularity image url + self.command: List[str] = command + # command to call in the container with its arguments + self.bindings: List[str] = [] + self.env_dict: Dict[str, str] = {} + self.debug: bool = False def append_path(self, add_path): append_path_list = self.env_dict.get('APPEND_PATH', "").split(":") From 226321f471b2909964c9195a1658ff0ec40a0e85 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Sun, 8 Jan 2023 15:18:32 +0100 Subject: [PATCH 04/12] Make install-less runs possible. --- src/swrap/pbs_utils.py | 2 +- src/swrap/sexec.py | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index 51ff3fb..4b510f4 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -1,7 +1,7 @@ import os # import subprocess -from swrap.utils import flush_print, oscommand +from utils import flush_print, oscommand from pathlib import Path def run_in_ssh(arg_list, init_dir=None): diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index dd2caff..484ed60 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -6,8 +6,8 @@ #import attrs import socket -from swrap.utils import flush_print, oscommand -from . import pbs_utils +from utils import flush_print, oscommand +from pbs_utils import make_pbs_wrappers @@ -22,7 +22,7 @@ def process_image_url(image_path: str) -> str: class SingularityCall: - def __init__(self.image, commad, debug=False): + def __init__(self, image, command, debug=False): self.image: str = process_image_url(image) # singularity image url self.command: List[str] = command @@ -318,7 +318,7 @@ def main(): sing.bindings.append(scratch_dir_path) - pbs_utils.make_pbs_wrappers(pbs_job_aux_dir, sing.bindings) + make_pbs_wrappers(pbs_job_aux_dir, sing.bindings) sing.append_path(pbs_job_aux_dir) ################################################################################################################### From 282f873b567440266f048b337dd4f85e6edef222 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Tue, 10 Jan 2023 20:26:04 +0100 Subject: [PATCH 05/12] add venv support --- src/swrap/sexec.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index 484ed60..fa8543d 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -22,11 +22,12 @@ def process_image_url(image_path: str) -> str: class SingularityCall: - def __init__(self, image, command, debug=False): + def __init__(self, image, command, venv="", debug=False): self.image: str = process_image_url(image) # singularity image url self.command: List[str] = command # command to call in the container with its arguments + self.venv:str = os.path.abspath(venv) if venv else "" self.bindings: List[str] = [] self.env_dict: Dict[str, str] = {} self.debug: bool = False @@ -48,9 +49,12 @@ def form_bindings(self): return self.bindings def form_env_list(self): + self.env_dict['SWRAP_SINGULARITY_VENV'] = self.venv return [f"{key}={str(value)}" for key, value in self.env_dict.items()] def cmd_list(self): + if len(self.venv) > 0: + self.prepend_path(os.path.join(os.path.abspath(self.venv), 'bin')) sing_command = ['singularity', 'exec', '-B', ",".join(self.form_bindings()), '--env', ",".join(self.form_env_list()), @@ -221,15 +225,24 @@ def prepare_scratch_dir(scratch_source, node_names): def arguments(): - parser = argparse.ArgumentParser( - description='Auxiliary executor for parallel programs running inside (Singularity) container under PBS.', - formatter_class=argparse.RawTextHelpFormatter) + description=\ + """ + Auxiliary executor for parallel programs running inside (Singularity) container under PBS. + + Provides some tools to start other jobs running in the same image: + 1. wrapper sripts 'qsub' and 'qstat' are created in the job auxiliary directory + 2. environment variables SINGULARITY_CONTAINER, SINGULARITY_BIND, SWRAP_SINGULARITY_VENV. + 3. mpiexec ... + """ + parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('-d', '--debug', action='store_true', help='use testing files and print the final command') parser.add_argument('-i', '--image', type=str, required=True, help='Singularity SIF image or Docker image (will be converted to SIF)') parser.add_argument('-B', '--bind', type=str, metavar="PATH,...", default="", required=False, help='comma separated list of paths to be bind to Singularity container') + parser.add_argument('-e', '--venv', type=str, metavar="PATH", default="", required=False, + help='If specified, the python virtual environment in PATH directory will be activated before given command.') parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False, help="path (inside the container) to mpiexec to be run, default is 'mpiexec'") parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False, @@ -271,7 +284,7 @@ def main(): args = arguments() - sing = SingularityCall(args.image, args.prog, debug=args.debug) + sing = SingularityCall(args.image, args.prog, args.venv, debug=args.debug) ################################################################################################################### # Process node file and setup ssh access to given nodes. ################################################################################################################### From 7b5481e468467afd156ba38790982c9b0e1b42d1 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Wed, 11 Jan 2023 18:38:24 +0100 Subject: [PATCH 06/12] Fix qsub wrapper. --- src/swrap/pbs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index 4b510f4..f4949bb 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -79,4 +79,4 @@ def make_pbs_wrappers(dir, binds): if host_addr is None: host_addr = oscommand('hostname') make_wrapper(dir, 'qstat', host_addr, binds) - make_wrapper(dir, 'qsub', host_addr, binds) + make_wrapper(dir, 'qsub --', host_addr, binds) From b8d3f35e441265e3af2501000ce7c8e8875934f2 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Thu, 12 Jan 2023 13:23:10 +0100 Subject: [PATCH 07/12] Undo wrong fix for qsub wrapper. --- src/swrap/pbs_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index f4949bb..4b510f4 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -79,4 +79,4 @@ def make_pbs_wrappers(dir, binds): if host_addr is None: host_addr = oscommand('hostname') make_wrapper(dir, 'qstat', host_addr, binds) - make_wrapper(dir, 'qsub --', host_addr, binds) + make_wrapper(dir, 'qsub', host_addr, binds) From d37a2d549acb7081390ab315c818406b43765d40 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Fri, 13 Jan 2023 16:26:13 +0100 Subject: [PATCH 08/12] Remove debug messages to have corrser qsub stdout --- src/swrap/pbs_utils.py | 36 ++++++++++++++++++++++++------------ src/swrap/sexec.py | 19 +++++++------------ src/swrap/utils.py | 18 +++++++++++------- 3 files changed, 42 insertions(+), 31 deletions(-) diff --git a/src/swrap/pbs_utils.py b/src/swrap/pbs_utils.py index 4b510f4..79d0825 100755 --- a/src/swrap/pbs_utils.py +++ b/src/swrap/pbs_utils.py @@ -1,4 +1,6 @@ import os +import shutil +import socket # import subprocess from utils import flush_print, oscommand @@ -48,8 +50,12 @@ def qsub(job_file, arg_list=None): return run_in_ssh(["qsub", *arg_list, job_file], init_dir=job_dir) -def make_wrapper(dir, cmd, host_addr, binds): - host_addr=host_addr.strip("\n") +def make_wrapper(dir, cmd, binds): + host_addr = socket.gethostname() + host_addr = host_addr.strip("\n") + full_cmd = shutil.which(cmd) + # full resolution of the cmd + bind_replace=""" PWD_REL="${{PWD#{1}}}" if [ "$PWD_REL" != "$PWD" ] @@ -60,23 +66,29 @@ def make_wrapper(dir, cmd, host_addr, binds): Path(dir).mkdir(parents=True, exist_ok=True) #pwd_replace_binds = [bind_replace.format(host_dir, cont_dir) for host_dir, cont_dir in binds] # TODO: support bindings with path different on host and container + + ssh_call=f'ssh {host_addr} "cd \'$PWD_HOST\'; {full_cmd} $@"' pwd_replace_binds=[] content=[ "#!/bin/bash", "set -x", "PWD=\"`pwd`\"", "PWD_HOST=\"${PWD}\"", - *pwd_replace_binds, - f'ssh {host_addr} "cd \'$PWD_HOST\'; {cmd} $*"' + *pwd_replace_binds, + ssh_call ] - cmd_path = os.path.join(dir, cmd) - with open(cmd_path, "w") as f: + wrapper_path = os.path.join(dir, cmd) + with open(wrapper_path, "w") as f: f.write("\n".join(content)) - os.chmod(cmd_path, 0o777) + os.chmod(wrapper_path, 0o777) def make_pbs_wrappers(dir, binds): - host_addr = os.environ.get('PBS_O_HOST', None) - if host_addr is None: - host_addr = oscommand('hostname') - make_wrapper(dir, 'qstat', host_addr, binds) - make_wrapper(dir, 'qsub', host_addr, binds) + #host_addr = os.environ.get('PBS_O_HOST', None) + #if host_addr is None: + + # It seems that 'PBS_O_HOST' contains the name of machin on which the qsub command has been executed. + # Not clear from which node we want to execute the recursive qsub calls, but natural would be from local host (that would be the case without swrap). + # So we use current host, that should be te first host form the nodefile as well. + + make_wrapper(dir, 'qstat', binds) + make_wrapper(dir, 'qsub', binds) diff --git a/src/swrap/sexec.py b/src/swrap/sexec.py index fa8543d..e905c1a 100755 --- a/src/swrap/sexec.py +++ b/src/swrap/sexec.py @@ -26,6 +26,7 @@ def __init__(self, image, command, venv="", debug=False): self.image: str = process_image_url(image) # singularity image url self.command: List[str] = command + print("command with args:", command) # command to call in the container with its arguments self.venv:str = os.path.abspath(venv) if venv else "" self.bindings: List[str] = [] @@ -72,23 +73,21 @@ def call(self): if not self.debug: flush_print("================== Program output START ==================") # proc = subprocess.run(final_command_list) - final_command = " ".join(self.cmd_list()) - oscommand(final_command) + oscommand(self.cmd_list()) flush_print("=================== Program output END ===================") # exit(proc.returncode) def copy_and_read_node_file(directory): + node_file = os.path.join(directory, "nodefile") orig_node_file = os.environ.get('PBS_NODEFILE', None) if orig_node_file is None: - node_file = os.path.join(directory, "nodefile") hostname = socket.gethostname() flush_print(f"Warning: missing PBS_NODEFILE variable. Using just local node: {hostname}.") with open(node_file, "w") as f: f.write(hostname) else: # create a copy - node_file = os.path.join(directory, os.path.basename(orig_node_file)) shutil.copy(orig_node_file, node_file) flush_print("reading host file...") @@ -204,23 +203,19 @@ def prepare_scratch_dir(scratch_source, node_names): current_dir = os.getcwd() source_tar_filename = 'scratch.tar' source_tar_filepath = os.path.join(current_dir, source_tar_filename) - command = ' '.join(['cd', source, '&&', 'tar -cvf', source_tar_filepath, '.', '&& cd', current_dir]) - oscommand(command) + oscommand(['tar', '-cvf', source_tar_filepath], cwd=source) # copy to scratch dir on every used node through ssh for node in node_names: destination_name = username + "@" + node destination_path = destination_name + ':' + scratch_dir_path - command = ' '.join(['scp', source_tar_filepath, destination_path]) - oscommand(command) + oscommand(['scp', source_tar_filepath, destination_path]) # command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /']) - command = ' '.join(['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename, - '&&', 'rm ', source_tar_filename, '"']) - oscommand(command) + oscommand(['ssh', destination_name, f'cd "{scratch_dir_path}" && tar -xf "{source_tar_filename}" && rm "source_tar_filename"']) # remove the scratch tar - oscommand(' '.join(['rm', source_tar_filename])) + oscommand(['rm', source_tar_filename]) return scratch_dir_path diff --git a/src/swrap/utils.py b/src/swrap/utils.py index cf0cce8..7e2df40 100644 --- a/src/swrap/utils.py +++ b/src/swrap/utils.py @@ -1,13 +1,17 @@ -import os import sys - +import subprocess def flush_print(*margs, **mkwargs): print(*margs, file=sys.stdout, flush=True, **mkwargs) -def oscommand(command_string): - flush_print(command_string) - stdout=os.popen(command_string).read() - flush_print(stdout) - return stdout \ No newline at end of file +def oscommand(command_list, **kwargs): + flush_print("Executing: ", command_list) + stdout = subprocess.check_output(command_list, **kwargs) + str_out = stdout.decode("utf-8") + flush_print(str_out) + return str_out + + + + From 064e4f3ed66f4790022029345702c090cffaf5a0 Mon Sep 17 00:00:00 2001 From: Jan Brezina Date: Tue, 21 Feb 2023 00:08:13 +0100 Subject: [PATCH 09/12] Bash swrap version, arg parsing. --- src/swrap/swrap.sh | 477 +++++++++++++++++++++++++++++++++++++++++ src/swrap/tst_swrap.sh | 23 ++ 2 files changed, 500 insertions(+) create mode 100755 src/swrap/swrap.sh create mode 100644 src/swrap/tst_swrap.sh diff --git a/src/swrap/swrap.sh b/src/swrap/swrap.sh new file mode 100755 index 0000000..730c337 --- /dev/null +++ b/src/swrap/swrap.sh @@ -0,0 +1,477 @@ +#!/bin/bash +# +# TODO: +# - debug output to given file, practical for wrapper (mpiexec, qsub, qstat) debugging, need suitable print function +# + +# Default debug output. +STDERR="/dev/stderr" + +function arg_assignment_split() { + RESULT_ARG=${1%=*} + if [ "$RESULT_ARG" == "$1" ] + then + RESULT_VALUE= + else + RESULT_VALUE="${1#*=}" + fi +} + +function split_to_array () { + # USAGE: split_to_array + # split a strng + delimiter=$1 + input=$2 + readarray -td${delimiter} RESULT <<<"${input}${delimiter}"; unset 'RESULT[-1]' +} + +function error () { + echo -e "ERROR: $1" + exit 1 + +} + +function dbg () { + if [ -n "$DEBUG" ] + then + echo "$@" >>"$DEBUG" + fi +} + + + +# +# def process_image_url(image_path: str) -> str: +# if os.path.isfile(image_path): +# image = os.path.abspath(image_path) +# elif image_path.startswith('docker://'): +# image = image_path +# else: +# raise Exception("Invalid image: not a file nor docker hub link: " + image_path) +# return image +# +# +# class SingularityCall: +# def __init__(self, image, command, venv="", debug=False): +# self.image: str = process_image_url(image) +# # singularity image url +# self.command: List[str] = command +# print("command with args:", command) +# # command to call in the container with its arguments +# self.venv:str = os.path.abspath(venv) if venv else "" +# self.bindings: List[str] = [] +# self.env_dict: Dict[str, str] = {} +# self.debug: bool = False +# +# def append_path(self, add_path): +# append_path_list = self.env_dict.get('APPEND_PATH', "").split(":") +# append_path_list.append(add_path) +# append_path_list = ":".join(append_path_list) +# self.env_dict['APPEND_PATH'] = append_path_list +# +# def prepend_path(self, add_path): +# append_path_list = self.env_dict.get('PREPEND_PATH', "").split(":") +# append_path_list.insert(0, add_path) +# append_path_list = ":".join(append_path_list) +# self.env_dict['PREPEND_PATH'] = append_path_list +# +# def form_bindings(self): +# # currently we olny support binding of the same paths in host and in container +# return self.bindings +# +# def form_env_list(self): +# self.env_dict['SWRAP_SINGULARITY_VENV'] = self.venv +# return [f"{key}={str(value)}" for key, value in self.env_dict.items()] +# +# def cmd_list(self): +# if len(self.venv) > 0: +# self.prepend_path(os.path.join(os.path.abspath(self.venv), 'bin')) +# sing_command = ['singularity', 'exec', +# '-B', ",".join(self.form_bindings()), +# '--env', ",".join(self.form_env_list()), +# self.image, +# *self.command] +# +# # F] join all the arguments into final singularity container command +# return sing_command +# +# def call(self): +# flush_print("current directory:", os.getcwd()) +# # mprint(os.popen("ls -l").read()) +# flush_print("final command:", *self.cmd_list()) +# flush_print("=================== smpiexec.py END ===================") +# if not self.debug: +# flush_print("================== Program output START ==================") +# # proc = subprocess.run(final_command_list) +# oscommand(self.cmd_list()) +# flush_print("=================== Program output END ===================") +# # exit(proc.returncode) +# +# +# def copy_and_read_node_file(directory): +# node_file = os.path.join(directory, "nodefile") +# orig_node_file = os.environ.get('PBS_NODEFILE', None) +# if orig_node_file is None: +# hostname = socket.gethostname() +# flush_print(f"Warning: missing PBS_NODEFILE variable. Using just local node: {hostname}.") +# with open(node_file, "w") as f: +# f.write(hostname) +# else: +# # create a copy +# shutil.copy(orig_node_file, node_file) +# +# flush_print("reading host file...") +# +# # read node names +# with open(node_file) as fp: +# node_names_read = fp.read().splitlines() +# # remove duplicates +# node_names = list(set(node_names_read)) +# return node_file, node_names +# +# +# def create_ssh_agent(): +# """ +# Setup ssh agent and set appropriate environment variables. +# :return: +# """ +# create_agent = 'SSH_AUTH_SOCK' not in os.environ +# if not create_agent: +# create_agent = os.environ['SSH_AUTH_SOCK'] == '' +# if not create_agent: +# return +# +# flush_print("creating ssh agent...") +# p = subprocess.Popen('ssh-agent -s', +# stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, +# shell=True, universal_newlines=True) +# outinfo, errinfo = p.communicate('ssh-agent cmd\n') +# # print(outinfo) +# +# lines = outinfo.split('\n') +# for line in lines: +# # trim leading and trailing whitespace +# line = line.strip() +# # ignore blank/empty lines +# if not line: +# continue +# # break off the part before the semicolon +# left, right = line.split(';', 1) +# if '=' in left: +# # get variable and value, put into os.environ +# varname, varvalue = left.split('=', 1) +# flush_print("setting variable from ssh-agent:", varname, "=", varvalue) +# os.environ[varname] = varvalue +# +# assert 'SSH_AUTH_SOCK' in os.environ +# assert os.environ['SSH_AUTH_SOCK'] != "" +# +# +# def process_known_hosts_file(ssh_known_hosts_file, node_names): +# flush_print("host file name:", ssh_known_hosts_file) +# +# ssh_known_hosts = [] +# if os.path.exists(ssh_known_hosts_file): +# with open(ssh_known_hosts_file, 'r') as fp: +# ssh_known_hosts = fp.readlines() +# else: +# flush_print("creating host file...") +# dirname = os.path.dirname(ssh_known_hosts_file) +# if not os.path.exists(dirname): +# os.makedirs(dirname) +# +# flush_print("connecting nodes...") +# ssh_known_hosts_to_append = [] +# for node in node_names: +# # touch all the nodes, so that they are accessible also through container +# os.popen('ssh ' + node + ' exit') +# # add the nodes to known_hosts so the fingerprint verification is skipped later +# # in shell just append # >> ~ /.ssh / known_hosts +# # or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite +# ssh_keys = os.popen('ssh-keyscan -H ' + node).readlines() +# ssh_keys = list((line for line in ssh_keys if not line"".startswith('#'))) +# for sk in ssh_keys: +# splits = sk.split(" ") +# if not splits[2] in ssh_known_hosts: +# ssh_known_hosts_to_append.append(sk) +# +# flush_print("finishing host file...") +# with open(ssh_known_hosts_file, 'a') as fp: +# fp.writelines(ssh_known_hosts_to_append) +# +# +# def prepare_scratch_dir(scratch_source, node_names): +# if scratch_source == "": +# return os.getcwd() +# +# scratch_dir_path = os.environ.get('SCRATCHDIR', None) +# if scratch_dir_path is None: +# return os.getcwd() +# +# +# flush_print("Using SCRATCHDIR:", scratch_dir_path) +# +# flush_print("copying to SCRATCHDIR on all nodes...") +# username = os.environ['USER'] +# # get source files +# source = None +# if os.path.isdir(scratch_source): +# # source = scratch_source + "/." +# # paths = [os.path.join(scratch_source,fp) for fp in os.listdir(scratch_source)] +# # source = ' '.join(paths) +# source = scratch_source +# else: +# raise Exception("--scratch_copy argument is not a valid directory: " + scratch_source) +# # with open(scratch_source) as fp: +# # paths = fp.read().splitlines() +# # source = ' '.join(paths) +# +# if source is None or source is []: +# flush_print(scratch_source, "is empty") +# +# # create tar +# current_dir = os.getcwd() +# source_tar_filename = 'scratch.tar' +# source_tar_filepath = os.path.join(current_dir, source_tar_filename) +# oscommand(['tar', '-cvf', source_tar_filepath], cwd=source) +# +# # copy to scratch dir on every used node through ssh +# for node in node_names: +# destination_name = username + "@" + node +# destination_path = destination_name + ':' + scratch_dir_path +# oscommand(['scp', source_tar_filepath, destination_path]) +# +# # command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /']) +# oscommand(['ssh', destination_name, f'cd "{scratch_dir_path}" && tar -xf "{source_tar_filename}" && rm "source_tar_filename"']) +# +# # remove the scratch tar +# oscommand(['rm', source_tar_filename]) +# return scratch_dir_path + + + +function print_usage() { +cat << EOF + +Usage: + + Execute COMMAND under PBS: + endorse [-d[=PATH]] [-b=] [-e=] [-m=[]] [-s=] [] + + +Options: +-d, --debug[=LOG_PATH] + Output debug messages to stderr or to the file given by the LOG_PATH. + +-b, --bind= + Comma separated list of directory binds. Single bind format follows the Docker -v options, i.e. host dir:container dir[options]. + +-e, --env= + Comma separated list of the exported environment variables to introduce into the container environment. + +-m, --mpiexec[=MPIEXEC_PATH] + Creat a wrapper of mpiexec in the contiainer that manage lunching child mpi processes + in the same container through the cals: SSH -> swrap -> container -> mpiexec. + Optionaly MPIEXEC_PATH provides path to the mpiexec in the container to use. + ? mpi host file + +-s, scratch_copy[=INPUT_DIR] + Every local process copy content of current directory or directory given by INPUT_DIR to the directory given by SCRATCHDIR + environment variable provided by PBS. Copy is done through 'scp' to caluculation nodes. + +EOF + +# TODO direct run endorse_mlmc.py + +} +# +# def arguments(): +# description=\ +# """ +# Auxiliary executor for parallel programs running inside (Singularity) container under PBS. +# +# Provides some tools to start other jobs running in the same image: +# 1. wrapper sripts 'qsub' and 'qstat' are created in the job auxiliary directory +# 2. environment variables SINGULARITY_CONTAINER, SINGULARITY_BIND, SWRAP_SINGULARITY_VENV. +# 3. mpiexec ... +# """ +# parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter) +# parser.add_argument('-d', '--debug', action='store_true', +# help='use testing files and print the final command') +# parser.add_argument('-b', '--bind', type=str, metavar="PATH,...", default="", required=False, +# help='comma separated list of paths to be bind to Singularity container') +# parser.add_argument('-e', '--venv', type=str, metavar="PATH", default="", required=False, +# help='If specified, the python virtual environment in PATH directory will be activated before given command.') +# parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False, +# help="path (inside the container) to mpiexec to be run, default is 'mpiexec'") +# parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False, +# help=''' +# directory path, its content will be copied to SCRATCHDIR; +# ''') +# # if file path, each user defined path inside the file will be copied to SCRATCHDIR +# parser.add_argument('prog', nargs=argparse.REMAINDER, +# help=''' +# mpiexec arguments and the executable, follow mpiexec doc: +# "mpiexec args executable pgmargs [ : args executable pgmargs ... ]" +# +# still can use MPMD (Multiple Program Multiple Data applications): +# -n 4 program1 : -n 3 program2 : -n 2 program3 ... +# ''') +# +# # create the parser for the "prog" command +# # parser_prog = parser.add_subparsers().add_parser('prog', help='program to be run and all its arguments') +# # parser_prog.add_argument('args', nargs="+", help="all arguments passed to 'prog'") +# +# # parser.print_help() +# # parser.print_usage() +# +# args = parser.parse_args() +# return args + + +function parse_arguments() { + while [ "${1#-}" != "$1" ] # arg starts with '-' + do + arg_assignment_split "$1" # produce $RESULT_ARG and $RESULT_VALUE + arg="$RESULT_ARG" + value="$RESULT_VALUE" + shift + case $arg in + -d|--debug) + DEBUG=${value:-$STDERR} + ;; + -b|--bind) + split_to_array "," "$value" + CONT_BIND_LIST=( "${RESULT[@]}" ) + # one item in docker-like format: host_dir:container_dir[:options] + ;; + -e|--env) + split_to_array "," "$value" + CONT_ENV_LIST=( "${RESULT[@]}" ) + # contains exported variable names + ;; + -m|--mpiexec) + DEFAULT_MPIEXEC="mpiexec" + MPIEXEC=${value:-$DEFAULT_MPIEXEC} + ;; + -s|--scratch_copy) + SCRATCH_INPUT_DIR=${value:-`pwd`} + ;; + -h|--help) + print_usage + exit 0 + ;; + *) + print_usage + error "Invalid argument '$arg'" + ;; + esac + done + + if [ -z "$1" ] + then + print_usage + error "Missing image url." + fi + IMAGE_URL="$1" + shift + if [ -z "$1" ] + then + print_usage + error "Missing command." + fi + COMMAND_WITH_ARGS=("${@}") +} + +# +# def setup_aux_dir(): +# current_dir = os.getcwd() +# pbs_job_id = os.environ.get('PBS_JOBID', f"pid_{os.getpid()}") +# flush_print("PBS job id: ", pbs_job_id) +# pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job') +# # create auxiliary job output directory +# os.makedirs(pbs_job_aux_dir, mode=0o775) +# return pbs_job_aux_dir +# +# +# def main(): +# flush_print("================== smpiexec.py START ==================") +# args = arguments() +# +# +# sing = SingularityCall(args.image, args.prog, args.venv, debug=args.debug) +# ################################################################################################################### +# # Process node file and setup ssh access to given nodes. +# ################################################################################################################### +# +# flush_print("Hostname: ", os.popen('hostname').read()) +# # mprint("os.environ", os.environ) +# pbs_job_aux_dir = setup_aux_dir() +# +# # get nodefile, copy it to local dir so that it can be passed into container mpiexec later +# node_file, node_names = copy_and_read_node_file(pbs_job_aux_dir) +# +# +# # Get ssh keys to nodes and append it to $HOME/.ssh/known_hosts +# ssh_known_hosts_to_append = [] +# if sing.debug: +# # ssh_known_hosts_file = 'testing_known_hosts' +# ssh_known_hosts_file = 'xxx/.ssh/testing_known_hosts' +# else: +# assert 'HOME' in os.environ +# ssh_known_hosts_file = os.path.join(os.environ['HOME'], '.ssh/known_hosts') +# process_known_hosts_file(ssh_known_hosts_file, node_names) +# +# # mprint(os.environ) +# create_ssh_agent() +# +# ################################################################################################################### +# # Create Singularity container commands. +# ################################################################################################################### +# +# flush_print("assembling final command...") +# scratch_dir_path = prepare_scratch_dir(args.scratch_copy, node_names) +# +# +# # A] process bindings, exclude ssh agent in launcher bindings +# common_bindings = ["/etc/ssh/ssh_config", "/etc/ssh/ssh_known_hosts", "/etc/krb5.conf"] +# sing.bindings.extend(common_bindings) +# sing.bindings.append(os.environ['SSH_AUTH_SOCK']) +# # possibly add current dir to container bindings +# # bindings = bindings + "," + current_dir + ":" + current_dir +# if args.bind != "": +# sing.bindings.append(args.bind) +# +# if scratch_dir_path: +# sing.bindings.append(scratch_dir_path) +# +# +# make_pbs_wrappers(pbs_job_aux_dir, sing.bindings) +# sing.append_path(pbs_job_aux_dir) +# +# ################################################################################################################### +# # Final call. +# ################################################################################################################### +# if scratch_dir_path: +# flush_print("Entering SCRATCHDIR:", scratch_dir_path) +# os.chdir(scratch_dir_path) +# +# sing.call() +# +# if __name__ == "__main__": +# main() + + +# =================== MAIN + +WORKDIR=`pwd` +parse_arguments $@ + +# Report parsed arguments +dbg "DEBUG: '$DEBUG'" +dbg "CONT_BIND_LIST: ${CONT_BIND_LIST[@]}" +dbg "CONT_ENV_LIST: ${CONT_ENV_LIST[@]}" +dbg "MPIEXEC: '$MPIEXEC'" +dbg "SCRATCH_INPUT_DIR: '$SCRATCH_INPUT_DIR'" +dbg "IMAGE_URL: '$IMAGE_URL'" +dbg "COMMAND: ${COMMAND_WITH_ARGS[@]}" diff --git a/src/swrap/tst_swrap.sh b/src/swrap/tst_swrap.sh new file mode 100644 index 0000000..4e6b08e --- /dev/null +++ b/src/swrap/tst_swrap.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +rm ./tst_stderr +echo "Running test" +./swrap.sh -d=tst_stderr -b="a:b,c:d" -e=A,B -m -s IMG CMD + +echo "Test completed" + +if cmp tst_stderr -- < Date: Wed, 1 Mar 2023 11:06:36 +0100 Subject: [PATCH 10/12] Nearly working docker - fixed test script for PID - working SSH setup - basic structure of docker and singularity call --- src/swrap/swrap.sh | 608 +++++++++++++++++++++++++---------------- src/swrap/tst_nodefile | 2 + src/swrap/tst_swrap.sh | 68 ++++- 3 files changed, 430 insertions(+), 248 deletions(-) create mode 100644 src/swrap/tst_nodefile diff --git a/src/swrap/swrap.sh b/src/swrap/swrap.sh index 730c337..602ee2c 100755 --- a/src/swrap/swrap.sh +++ b/src/swrap/swrap.sh @@ -3,10 +3,22 @@ # TODO: # - debug output to given file, practical for wrapper (mpiexec, qsub, qstat) debugging, need suitable print function # +set -x # Default debug output. STDERR="/dev/stderr" +# Global variables +# DEBUG - path to the redirection of debug messages +# IMAGE_URL + +function cleanup () { + [ -n "${JOB_AUX_DIR}" ] && rm -r ${JOB_AUX_DIR} +} + +trap "cleanup" EXIT + + function arg_assignment_split() { RESULT_ARG=${1%=*} if [ "$RESULT_ARG" == "$1" ] @@ -26,11 +38,26 @@ function split_to_array () { } function error () { - echo -e "ERROR: $1" - exit 1 + # Report error and exit. + # Usage: error "The error message" + + echo -e "ERROR: $1" + exit 1 +} +function check_var () { + # Check that given var is nonempty (check global variables at beginning of a function. + varname=$1 + [ -z "${!varname}" ] && error "Use of uninitialized var: $varname" } +function check_indexed_array () { + # TODO: check indexed array type + varname=$1 + [[ -v ${varname}[@] ]] && error "Use of uninitialized array: $varname" +} + + function dbg () { if [ -n "$DEBUG" ] then @@ -38,7 +65,276 @@ function dbg () { fi } +function dbgvar { + varname=$1 + shift + dbg "$varname: '${!varname}'" +} + +function dbgarray { + local varname=$1 + local -n array="$varname" + args="" + for item in "${array[@]}" + do args+=" '$item'" + done + dbg "$varname:" $args +} + + + +function print_usage() { +cat << EOF + +Usage: + + Execute COMMAND under PBS: + endorse [-d[=PATH]] [-b=] [-e=] [-m=[]] [-s=] [] + + +Options: +-d, --debug[=LOG_PATH] + Output debug messages to stderr or to the file given by the LOG_PATH. + +-b, --bind= + Comma separated list of directory binds. Single bind format follows the Docker -v options, i.e. host dir:container dir[options]. + +-e, --env= + Comma separated list of the exported environment variables to introduce into the container environment. + +-m, --mpiexec[=MPIEXEC_PATH] + Creat a wrapper of mpiexec in the contiainer that manage lunching child mpi processes + in the same container through the cals: SSH -> swrap -> container -> mpiexec. + Optionaly MPIEXEC_PATH provides path to the mpiexec in the container to use. + ? mpi host file + +-s, scratch_copy[=INPUT_DIR] + Every local process copy content of current directory or directory given by INPUT_DIR to the directory given by SCRATCHDIR + environment variable provided by PBS. Copy is done through 'scp' to caluculation nodes. + +Used environment variables: + +PBS_JOBID +PBS_NODEFILE +SSH_KNOWNHOSTS + + +EOF + +# TODO direct run endorse_mlmc.py +} + + +function parse_arguments() { + while [ "${1#-}" != "$1" ] # arg starts with '-' + do + arg_assignment_split "$1" # produce $RESULT_ARG and $RESULT_VALUE + arg="$RESULT_ARG" + value="$RESULT_VALUE" + shift + case $arg in + -d|--debug) + DEBUG=${value:-$STDERR} + ;; + -b|--bind) + split_to_array "," "$value" + CONT_BIND_LIST=( "${RESULT[@]}" ) + # one item in docker-like format: host_dir:container_dir[:options] + ;; + -e|--env) + split_to_array "," "$value" + CONT_ENV_LIST=( "${RESULT[@]}" ) + # contains exported variable names + ;; + -m|--mpiexec) + DEFAULT_MPIEXEC="mpiexec" + MPIEXEC=${value:-$DEFAULT_MPIEXEC} + ;; + -s|--scratch_copy) + SCRATCH_INPUT_DIR=${value:-`pwd`} + ;; + -h|--help) + print_usage + exit 0 + ;; + *) + print_usage + error "Invalid argument '$arg'" + ;; + esac + done + + if [ -z "$1" ] + then + print_usage + error "Missing image url." + fi + IMAGE_URL="$1" + shift + if [ -z "$1" ] + then + print_usage + error "Missing command." + fi + COMMAND_WITH_ARGS=("$@") +} + + +function read_pbs () { + # read PBS variables, proceed with defaults if running without PBS + PBS_JOBID=${PBS_JOBID:-"pid_$$"} + dbg "PBS_JOBID: '$PBS_JOBID'" + + # read PBS_NODEFILE + declare -a NODE_NAMES + if [ -z "${PBS_NODEFILE}" ]; then + # default just local node + NODE_NAMES+=($(hostname)) + else + readarray -t NODE_NAMES <${PBS_NODEFILE} + fi + dbg "NODE_NAMES list: ${NODE_NAMES[@]}" +} + +function setup_aux_dir () { + # setup aux job dir + check_var PBS_JOBID + JOB_AUX_DIR="$(pwd)/${PBS_JOBID}_job" + mkdir -p -m=775 "$JOB_AUX_DIR" + +} + + + + +function ssh_batch () { + #no_strict_key_check=-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null + ssh -o BatchMode=yes "$@" +} + + + +function update_ssh_known_hosts_file () { +# def process_known_hosts_file(ssh_known_hosts_file, node_names): +# flush_print("host file name:", ssh_known_hosts_file) +# +# ssh_known_hosts = [] +# if os.path.exists(ssh_known_hosts_file): +# with open(ssh_known_hosts_file, 'r') as fp: +# ssh_known_hosts = fp.readlines() +# else: +# flush_print("creating host file...") +# dirname = os.path.dirname(ssh_known_hosts_file) +# if not os.path.exists(dirname): +# os.makedirs(dirname) +# +# flush_print("connecting nodes...") +# ssh_known_hosts_to_append = [] +# for node in node_names: +# # touch all the nodes, so that they are accessible also through container +# os.popen('ssh ' + node + ' exit') +# # add the nodes to known_hosts so the fingerprint verification is skipped later +# # in shell just append # >> ~ /.ssh / known_hosts +# # or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite +# ssh_keys = os.popen('ssh-keyscan -H ' + node).readlines() +# ssh_keys = list((line for line in ssh_keys if not line"".startswith('#'))) +# for sk in ssh_keys: +# splits = sk.split(" ") +# if not splits[2] in ssh_known_hosts: +# ssh_known_hosts_to_append.append(sk) +# +# flush_print("finishing host file...") +# with open(ssh_known_hosts_file, 'a') as fp: +# fp.writelines(ssh_known_hosts_to_append) + + + local ssh_knownhosts_file="${SSH_KNOWNHOSTS:-${HOME}/.ssh/known_hosts}" + check_indexed_array NODE_NAMES + if [ ! -f "$ssh_knownhosts_file" ] + then + # create default known_hosts file + mkdir -p "${ssh_knownhosts_file%/*}" + touch "${ssh_knownhosts_file}" + fi + + ( + # lock file descriptor + flock -w 30 223 + cp "$ssh_knownhosts_file" ~/.ssh/_tmp_hosts + for node in "${NODE_NAMES[@]}";do + # touch all the nodes, so that they are accessible also through container + # TODO: should we do this in separate loop after updationg known_hosts ?? + # point of thi is to fail early + ssh_batch $node exit + ssh-keyscan -H $node 2>&1 >> ~/.ssh/_tmp_hosts + done + sort -u ~/.ssh/_tmp_hosts >"$ssh_knownhosts_file" + rm ~/.ssh/_tmp_hosts + ) 223>${HOME}/.ssh/_swrap_lock +} + + +function create_ssh_agent () { +# def create_ssh_agent(): +# """ +# Setup ssh agent and set appropriate environment variables. +# :return: +# """ +# create_agent = 'SSH_AUTH_SOCK' not in os.environ +# if not create_agent: +# create_agent = os.environ['SSH_AUTH_SOCK'] == '' +# if not create_agent: +# return +# +# flush_print("creating ssh agent...") +# p = subprocess.Popen('ssh-agent -s', +# stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, +# shell=True, universal_newlines=True) +# outinfo, errinfo = p.communicate('ssh-agent cmd\n') +# # print(outinfo) +# +# lines = outinfo.split('\n') +# for line in lines: +# # trim leading and trailing whitespace +# line = line.strip() +# # ignore blank/empty lines +# if not line: +# continue +# # break off the part before the semicolon +# left, right = line.split(';', 1) +# if '=' in left: +# # get variable and value, put into os.environ +# varname, varvalue = left.split('=', 1) +# flush_print("setting variable from ssh-agent:", varname, "=", varvalue) +# os.environ[varname] = varvalue +# +# assert 'SSH_AUTH_SOCK' in os.environ +# assert os.environ['SSH_AUTH_SOCK'] != "" + + if [ -z ${SSH_AUTH_SOCK} ]; then + cmds=$(ssh-agent -s) + eval $cmds + fi + + dbg "SSH_AUTH_SOCK: '${SSH_AUTH_SOCK}'" + dbg "SSH_AGENT_PID: '${SSH_AGENT_PID}'" +} + + +function make_wrapper () { + # TODO: for docker make transplation of PWD + # use a aux file with bindings, read it into associative array, lookup in it + cmd=$1 + full_cmd=$(command -v $cmd) + cat < str: @@ -108,97 +404,11 @@ function dbg () { # # exit(proc.returncode) # # -# def copy_and_read_node_file(directory): -# node_file = os.path.join(directory, "nodefile") -# orig_node_file = os.environ.get('PBS_NODEFILE', None) -# if orig_node_file is None: -# hostname = socket.gethostname() -# flush_print(f"Warning: missing PBS_NODEFILE variable. Using just local node: {hostname}.") -# with open(node_file, "w") as f: -# f.write(hostname) -# else: -# # create a copy -# shutil.copy(orig_node_file, node_file) -# -# flush_print("reading host file...") -# -# # read node names -# with open(node_file) as fp: -# node_names_read = fp.read().splitlines() -# # remove duplicates -# node_names = list(set(node_names_read)) -# return node_file, node_names -# # -# def create_ssh_agent(): -# """ -# Setup ssh agent and set appropriate environment variables. -# :return: -# """ -# create_agent = 'SSH_AUTH_SOCK' not in os.environ -# if not create_agent: -# create_agent = os.environ['SSH_AUTH_SOCK'] == '' -# if not create_agent: -# return -# -# flush_print("creating ssh agent...") -# p = subprocess.Popen('ssh-agent -s', -# stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, -# shell=True, universal_newlines=True) -# outinfo, errinfo = p.communicate('ssh-agent cmd\n') -# # print(outinfo) # -# lines = outinfo.split('\n') -# for line in lines: -# # trim leading and trailing whitespace -# line = line.strip() -# # ignore blank/empty lines -# if not line: -# continue -# # break off the part before the semicolon -# left, right = line.split(';', 1) -# if '=' in left: -# # get variable and value, put into os.environ -# varname, varvalue = left.split('=', 1) -# flush_print("setting variable from ssh-agent:", varname, "=", varvalue) -# os.environ[varname] = varvalue # -# assert 'SSH_AUTH_SOCK' in os.environ -# assert os.environ['SSH_AUTH_SOCK'] != "" # # -# def process_known_hosts_file(ssh_known_hosts_file, node_names): -# flush_print("host file name:", ssh_known_hosts_file) -# -# ssh_known_hosts = [] -# if os.path.exists(ssh_known_hosts_file): -# with open(ssh_known_hosts_file, 'r') as fp: -# ssh_known_hosts = fp.readlines() -# else: -# flush_print("creating host file...") -# dirname = os.path.dirname(ssh_known_hosts_file) -# if not os.path.exists(dirname): -# os.makedirs(dirname) -# -# flush_print("connecting nodes...") -# ssh_known_hosts_to_append = [] -# for node in node_names: -# # touch all the nodes, so that they are accessible also through container -# os.popen('ssh ' + node + ' exit') -# # add the nodes to known_hosts so the fingerprint verification is skipped later -# # in shell just append # >> ~ /.ssh / known_hosts -# # or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite -# ssh_keys = os.popen('ssh-keyscan -H ' + node).readlines() -# ssh_keys = list((line for line in ssh_keys if not line"".startswith('#'))) -# for sk in ssh_keys: -# splits = sk.split(" ") -# if not splits[2] in ssh_known_hosts: -# ssh_known_hosts_to_append.append(sk) -# -# flush_print("finishing host file...") -# with open(ssh_known_hosts_file, 'a') as fp: -# fp.writelines(ssh_known_hosts_to_append) -# # # def prepare_scratch_dir(scratch_source, node_names): # if scratch_source == "": @@ -250,148 +460,8 @@ function dbg () { -function print_usage() { -cat << EOF - -Usage: - - Execute COMMAND under PBS: - endorse [-d[=PATH]] [-b=] [-e=] [-m=[]] [-s=] [] - - -Options: --d, --debug[=LOG_PATH] - Output debug messages to stderr or to the file given by the LOG_PATH. - --b, --bind= - Comma separated list of directory binds. Single bind format follows the Docker -v options, i.e. host dir:container dir[options]. - --e, --env= - Comma separated list of the exported environment variables to introduce into the container environment. - --m, --mpiexec[=MPIEXEC_PATH] - Creat a wrapper of mpiexec in the contiainer that manage lunching child mpi processes - in the same container through the cals: SSH -> swrap -> container -> mpiexec. - Optionaly MPIEXEC_PATH provides path to the mpiexec in the container to use. - ? mpi host file - --s, scratch_copy[=INPUT_DIR] - Every local process copy content of current directory or directory given by INPUT_DIR to the directory given by SCRATCHDIR - environment variable provided by PBS. Copy is done through 'scp' to caluculation nodes. - -EOF - -# TODO direct run endorse_mlmc.py - -} -# -# def arguments(): -# description=\ -# """ -# Auxiliary executor for parallel programs running inside (Singularity) container under PBS. -# -# Provides some tools to start other jobs running in the same image: -# 1. wrapper sripts 'qsub' and 'qstat' are created in the job auxiliary directory -# 2. environment variables SINGULARITY_CONTAINER, SINGULARITY_BIND, SWRAP_SINGULARITY_VENV. -# 3. mpiexec ... -# """ -# parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter) -# parser.add_argument('-d', '--debug', action='store_true', -# help='use testing files and print the final command') -# parser.add_argument('-b', '--bind', type=str, metavar="PATH,...", default="", required=False, -# help='comma separated list of paths to be bind to Singularity container') -# parser.add_argument('-e', '--venv', type=str, metavar="PATH", default="", required=False, -# help='If specified, the python virtual environment in PATH directory will be activated before given command.') -# parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False, -# help="path (inside the container) to mpiexec to be run, default is 'mpiexec'") -# parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False, -# help=''' -# directory path, its content will be copied to SCRATCHDIR; -# ''') -# # if file path, each user defined path inside the file will be copied to SCRATCHDIR -# parser.add_argument('prog', nargs=argparse.REMAINDER, -# help=''' -# mpiexec arguments and the executable, follow mpiexec doc: -# "mpiexec args executable pgmargs [ : args executable pgmargs ... ]" -# -# still can use MPMD (Multiple Program Multiple Data applications): -# -n 4 program1 : -n 3 program2 : -n 2 program3 ... -# ''') -# -# # create the parser for the "prog" command -# # parser_prog = parser.add_subparsers().add_parser('prog', help='program to be run and all its arguments') -# # parser_prog.add_argument('args', nargs="+", help="all arguments passed to 'prog'") -# -# # parser.print_help() -# # parser.print_usage() -# -# args = parser.parse_args() -# return args - - -function parse_arguments() { - while [ "${1#-}" != "$1" ] # arg starts with '-' - do - arg_assignment_split "$1" # produce $RESULT_ARG and $RESULT_VALUE - arg="$RESULT_ARG" - value="$RESULT_VALUE" - shift - case $arg in - -d|--debug) - DEBUG=${value:-$STDERR} - ;; - -b|--bind) - split_to_array "," "$value" - CONT_BIND_LIST=( "${RESULT[@]}" ) - # one item in docker-like format: host_dir:container_dir[:options] - ;; - -e|--env) - split_to_array "," "$value" - CONT_ENV_LIST=( "${RESULT[@]}" ) - # contains exported variable names - ;; - -m|--mpiexec) - DEFAULT_MPIEXEC="mpiexec" - MPIEXEC=${value:-$DEFAULT_MPIEXEC} - ;; - -s|--scratch_copy) - SCRATCH_INPUT_DIR=${value:-`pwd`} - ;; - -h|--help) - print_usage - exit 0 - ;; - *) - print_usage - error "Invalid argument '$arg'" - ;; - esac - done - - if [ -z "$1" ] - then - print_usage - error "Missing image url." - fi - IMAGE_URL="$1" - shift - if [ -z "$1" ] - then - print_usage - error "Missing command." - fi - COMMAND_WITH_ARGS=("${@}") -} # -# def setup_aux_dir(): -# current_dir = os.getcwd() -# pbs_job_id = os.environ.get('PBS_JOBID', f"pid_{os.getpid()}") -# flush_print("PBS job id: ", pbs_job_id) -# pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job') -# # create auxiliary job output directory -# os.makedirs(pbs_job_aux_dir, mode=0o775) -# return pbs_job_aux_dir # # # def main(): @@ -461,17 +531,83 @@ function parse_arguments() { # if __name__ == "__main__": # main() + +# TODO: add aux dir to path +# ? mount users home ? + +function call_docker() { + if ! docker image inspect $IMAGE_URL &> /dev/null; then + docker pull $IMAGE_URL + fi + + local env_vars="-euid=$uid -egid=$gid -ewho=$uname -ehome=/mnt/$HOME" + local host_workdir=`pwd` + local cont_workdir=${host_workdir#$HOME/*/} + local sub_home=${host_workdir%$cont_workdir} + local binds="-v/$HOME:/mnt/$HOME -v$sub_home:/" + docker run $env_vars ${CONT_ENV_LIST[@]/#/-e } $binds ${CONT_BIND_LIST[@]/#/-v} -w=${CONT_WORKDIR} $IMAGE_URL ${COMMAND_WITH_ARGS[@]} +} + + +function call_singularity() { + # prepare singularity image: download, convert, (install) + LOCAL_IMAGE="$ENDORSE_WORKSPACE/endorse_ci_${tag}.sif" + if [ ! -f $LOCAL_IMAGE ] + then + singularity build $ENDORSE_IMAGE "docker://$IMAGE_URL" + fi + + # call + singularity exec ${CONT_ENV_LIST[@]/#/-e } $LOCAL_IMAGE ${COMMAND_WITH_ARGS[@]} +} + + +function call_container { + # TODO: have concept of work dir or where to place converted images, currently place them in .singularity default location + # communicate with metacentrum what is the best strategy + # + # prefer to build them in tmp or in scratch dir + + + if [ -x `command -v docker` ]; then + call_docker + elif [ -x `command -v singularity` ]; then + call_singularity + else + error "No container tool. Not supported." + fi + +} + + # =================== MAIN WORKDIR=`pwd` -parse_arguments $@ +parse_arguments "$@" # Report parsed arguments -dbg "DEBUG: '$DEBUG'" -dbg "CONT_BIND_LIST: ${CONT_BIND_LIST[@]}" -dbg "CONT_ENV_LIST: ${CONT_ENV_LIST[@]}" -dbg "MPIEXEC: '$MPIEXEC'" -dbg "SCRATCH_INPUT_DIR: '$SCRATCH_INPUT_DIR'" -dbg "IMAGE_URL: '$IMAGE_URL'" -dbg "COMMAND: ${COMMAND_WITH_ARGS[@]}" +dbgvar DEBUG +dbgarray CONT_BIND_LIST +dbgarray CONT_ENV_LIST +dbgvar MPIEXEC +dbgvar SCRATCH_INPUT_DIR +dbgvar IMAGE_URL +dbgarray COMMAND_WITH_ARGS + + +# read PBS_JOBID and NODE_NAMES array +read_pbs + +# set JOB_AUX_DIR, create aux dir +setup_aux_dir + +update_ssh_known_hosts_file + +# export: SSH_AUTH_SOCK, SSH_AGENT_PID +create_ssh_agent + +make_wrapper qstat +make_wrapper qsub + +call_container diff --git a/src/swrap/tst_nodefile b/src/swrap/tst_nodefile new file mode 100644 index 0000000..d96033a --- /dev/null +++ b/src/swrap/tst_nodefile @@ -0,0 +1,2 @@ +charon21.nti.tul.cz +charon22.nti.tul.cz diff --git a/src/swrap/tst_swrap.sh b/src/swrap/tst_swrap.sh index 4e6b08e..fb725f3 100644 --- a/src/swrap/tst_swrap.sh +++ b/src/swrap/tst_swrap.sh @@ -1,23 +1,67 @@ #!/bin/bash -rm ./tst_stderr -echo "Running test" -./swrap.sh -d=tst_stderr -b="a:b,c:d" -e=A,B -m -s IMG CMD +N_TST=1 +function check () { + tst_name=$1 + shift + + # read stdin + input=$(cat) + + output_name="./tst_stderr_${N_TST}" + + rm "$output_name" + echo "RUN #${N_TST}: $tst_name" + ./swrap.sh -d="$output_name" "$@" & + subproc_pid=$! + echo "Last cmd: $subproc_pid" + wait $subproc_pid + echo "DONE #${N_TST}: $tst_name" -echo "Test completed" + # substitute PID in input + substituted_input=$(eval "echo \"$input\"") -if cmp tst_stderr -- < Date: Wed, 1 Mar 2023 13:33:04 +0100 Subject: [PATCH 11/12] working docker call --- src/swrap/swrap.sh | 24 ++++++++++++-------- src/swrap/tst_swrap.sh | 50 +++++++++++++++++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/src/swrap/swrap.sh b/src/swrap/swrap.sh index 602ee2c..86ce5a8 100755 --- a/src/swrap/swrap.sh +++ b/src/swrap/swrap.sh @@ -541,12 +541,20 @@ function call_docker() { fi local env_vars="-euid=$uid -egid=$gid -ewho=$uname -ehome=/mnt/$HOME" - local host_workdir=`pwd` - local cont_workdir=${host_workdir#$HOME/*/} - local sub_home=${host_workdir%$cont_workdir} - local binds="-v/$HOME:/mnt/$HOME -v$sub_home:/" - docker run $env_vars ${CONT_ENV_LIST[@]/#/-e } $binds ${CONT_BIND_LIST[@]/#/-v} -w=${CONT_WORKDIR} $IMAGE_URL ${COMMAND_WITH_ARGS[@]} + #local host_workdir=`pwd` + #local rel_home="${host_workdir#$HOME/}" + #local home_first_subdir="${rel_home%%/*}" + #local abs_first_subdir="$HOME/$home_first_subdir" + local binds="-v$(pwd):$(pwd)" + # --rm remove container after completion + # mount current dir (shuld include JOB_AUX_DIR) + + # assert + [[ -d "$JOB_AUX_DIR" && "$JOB_AUX_DIR" = $(pwd)* ]] || error "JOB_AUX_DIR not subdir of PWD" + + + docker run --rm -u $(id -u):$(id -g) $env_vars ${CONT_ENV_LIST[@]/#/-e } $binds ${CONT_BIND_LIST[@]/#/-v} -w $(pwd) $IMAGE_URL "${COMMAND_WITH_ARGS[@]}" } @@ -564,10 +572,8 @@ function call_singularity() { function call_container { - # TODO: have concept of work dir or where to place converted images, currently place them in .singularity default location - # communicate with metacentrum what is the best strategy - # - # prefer to build them in tmp or in scratch dir + # We automaticaly bind: + # - PWD (including JOB_AUX_DIR) if [ -x `command -v docker` ]; then diff --git a/src/swrap/tst_swrap.sh b/src/swrap/tst_swrap.sh index fb725f3..cf021c0 100644 --- a/src/swrap/tst_swrap.sh +++ b/src/swrap/tst_swrap.sh @@ -2,13 +2,13 @@ N_TST=1 function check () { - tst_name=$1 + tst_name="$1" shift # read stdin input=$(cat) - output_name="./tst_stderr_${N_TST}" + output_name="./tst_stderr_${tst_name}" rm "$output_name" echo "RUN #${N_TST}: $tst_name" @@ -33,8 +33,11 @@ function check () { IMG=alpine -check "local configuration" -b="a:b,c:d" -e=A,B -m -s $IMG touch "file 1" "file 2"< Date: Thu, 2 Mar 2023 22:18:17 +0100 Subject: [PATCH 12/12] working singularity --- src/swrap/swrap.sh | 66 +++++++++++++++++++++++++++++++++++------- src/swrap/tst_swrap.sh | 35 ++++++++++------------ 2 files changed, 70 insertions(+), 31 deletions(-) diff --git a/src/swrap/swrap.sh b/src/swrap/swrap.sh index 86ce5a8..2804fb3 100755 --- a/src/swrap/swrap.sh +++ b/src/swrap/swrap.sh @@ -3,6 +3,11 @@ # TODO: # - debug output to given file, practical for wrapper (mpiexec, qsub, qstat) debugging, need suitable print function # + +# METACETRUM and singularity +# - use project dir for SINGULARTY_CACHE +# - set SINGULARITY_TMPDIR to a HOME dir or to the SCRATCH if in use, SCRATCH is allocated but not its use not checked + set -x # Default debug output. @@ -82,6 +87,33 @@ function dbgarray { } +function env_list { + declare -a ENV_ASSIGN_LIST + if [ ${#CONT_ENV_LIST[@]} -gt 0 ] + then + for item in ${CONT_ENV_LIST[@]} + do + if [ "${!item}" ]; then + ENV_ASSIGN_LIST+=("--env") + ENV_ASSIGN_LIST+=("$item=${!item}") + fi + done + fi +} + + +function bind_list { + bind_option=$1 + declare -a BIND_ASSIGN_LIST + if [ ${#CONT_BIND_LIST[@]} -gt 0 ] + then + for item in ${CONT_BIND_LIST[@]} + do + BIND_ASSIGN_LIST+=("$bind_option") + BIND_ASSIGN_LIST+=("$item") + done + fi +} function print_usage() { cat << EOF @@ -317,8 +349,9 @@ function create_ssh_agent () { eval $cmds fi - dbg "SSH_AUTH_SOCK: '${SSH_AUTH_SOCK}'" - dbg "SSH_AGENT_PID: '${SSH_AGENT_PID}'" + [ "$SSH_AUTH_SOCK" ] || error "Unable to start ssh-agent." + #dbg "SSH_AUTH_SOCK: '${SSH_AUTH_SOCK}'" + #dbg "SSH_AGENT_PID: '${SSH_AGENT_PID}'" } @@ -553,21 +586,28 @@ function call_docker() { # assert [[ -d "$JOB_AUX_DIR" && "$JOB_AUX_DIR" = $(pwd)* ]] || error "JOB_AUX_DIR not subdir of PWD" + env_list # procude ENV_ASSIGN_LIST from CONT_ENV_LIST + bind_list "-v" - docker run --rm -u $(id -u):$(id -g) $env_vars ${CONT_ENV_LIST[@]/#/-e } $binds ${CONT_BIND_LIST[@]/#/-v} -w $(pwd) $IMAGE_URL "${COMMAND_WITH_ARGS[@]}" + docker run --rm -u $(id -u):$(id -g) $env_vars $binds "${ENV_ASSIGN_LIST[@]}" "${BIND_ASSIGN_LIST[@]}" -w $(pwd) $IMAGE_URL "${COMMAND_WITH_ARGS[@]}" } function call_singularity() { + # TODO: use Scratch dir if provided + SINGULARITY_TMPDIR=${SINGULARITY_TMPDIR:-$HOME/singularity_tmpdir} + [ -d $SINGULARITY_TMPDIR ] || mkdir -p $SINGULARITY_TMPDIR + + local singularity_image_url="docker://$IMAGE_URL" # prepare singularity image: download, convert, (install) - LOCAL_IMAGE="$ENDORSE_WORKSPACE/endorse_ci_${tag}.sif" - if [ ! -f $LOCAL_IMAGE ] - then - singularity build $ENDORSE_IMAGE "docker://$IMAGE_URL" - fi + # build singularity image just once + singularity exec $singularity_image_url echo "Image $IMAGE_URL converted to singularity." + env_list # procude ENV_ASSIGN_LIST from CONT_ENV_LIST + bind_list "-B" # call - singularity exec ${CONT_ENV_LIST[@]/#/-e } $LOCAL_IMAGE ${COMMAND_WITH_ARGS[@]} + singularity exec "${ENV_ASSIGN_LIST[@]}" "${BIND_ASSIGN_LIST[@]}" $singularity_image_url "${COMMAND_WITH_ARGS[@]}" + } @@ -576,9 +616,9 @@ function call_container { # - PWD (including JOB_AUX_DIR) - if [ -x `command -v docker` ]; then + if [ -x "$(command -v docker)" ]; then call_docker - elif [ -x `command -v singularity` ]; then + elif [ -x "$(command -v singularity)" ]; then call_singularity else error "No container tool. Not supported." @@ -592,6 +632,7 @@ function call_container { WORKDIR=`pwd` parse_arguments "$@" + # Report parsed arguments dbgvar DEBUG dbgarray CONT_BIND_LIST @@ -616,4 +657,7 @@ create_ssh_agent make_wrapper qstat make_wrapper qsub +# Always add pwd to the binds +CONT_BIND_LIST+=("$(pwd):$(pwd)") + call_container diff --git a/src/swrap/tst_swrap.sh b/src/swrap/tst_swrap.sh index cf021c0..f0f6f1a 100644 --- a/src/swrap/tst_swrap.sh +++ b/src/swrap/tst_swrap.sh @@ -31,23 +31,20 @@ function check () { N_TST=$((N_TST+1)) } -IMG=alpine - function test_local { name="test_local" +IMG=alpine rm 'file 1' 'file 2' check "$name" -b="a:b,c:d" -e=A,B -m -s $IMG touch "file 1" "file 2"<cont_out' <cont_out' PBS_JOBID: '1234' NODE_NAMES list: charon21.nti.tul.cz charon22.nti.tul.cz -SSH_AUTH_SOCK: '/run/user/1000/keyring/ssh' -SSH_AGENT_PID: '' END # check command effect -[ -f 'file 1' ] || echo "Missing touched 'file 1'." -[ -f 'file 2' ] || echo "Missing touched 'file 2'." +[ "$(cat cont_out)" = "X Y Z" ] || "Container command not performed." } @@ -93,8 +89,6 @@ IMAGE_URL: 'IMG' COMMAND: CMD ARG1 ARG2 PBS_JOBID: '1234' NODE_NAMES list: charon21.nti.tul.cz charon22.nti.tul.cz -SSH_AUTH_SOCK: '/run/user/1000/keyring/ssh' -SSH_AGENT_PID: '' END } @@ -102,6 +96,7 @@ END test_local test_pbs_artificial -if [ command -v qsub ]; then - test_pbs_real +if [ -x "$(command -v qsub)" ] +then + #test_pbs_real fi