From d9e01258a1e6c5fca89d322e9bae222f653e418a Mon Sep 17 00:00:00 2001 From: martinspetlik Date: Fri, 16 Sep 2022 15:59:17 +0200 Subject: [PATCH] tar dirs --- singularity_exec_mpi.py | 115 ++++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 33 deletions(-) diff --git a/singularity_exec_mpi.py b/singularity_exec_mpi.py index b135b6f..ee7fb35 100755 --- a/singularity_exec_mpi.py +++ b/singularity_exec_mpi.py @@ -3,16 +3,20 @@ import shutil import argparse import subprocess +import tarfile from argparse import RawTextHelpFormatter + def mprint(*margs, **mkwargs): print(*margs, file=sys.stdout, flush=True, **mkwargs) + def oscommand(command_string): mprint(command_string) mprint(os.popen(command_string).read()) + def create_ssh_agent(): mprint("creating ssh agent...") p = subprocess.Popen('ssh-agent -s', @@ -37,6 +41,32 @@ def create_ssh_agent(): os.environ[varname] = varvalue +def dirs_tar(scratch_dir, tar_filepath, dirs_to_scratch): + """ + Create .tar file of passed dirs + Process already created .tar archives + :param scratch_dir: path to scratch directory + :param tar_filepath: path to the tar file that is currently being created + :param dirs_to_scratch: list of dirs that are being compressed and moved to the scratch + :return: list of .tar file names + """ + tar_names = [] + with tarfile.open(tar_filepath, mode='w:xz') as archive: + for path in dirs_to_scratch: + if os.path.isfile(path) and tarfile.is_tarfile(path): + filename = os.path.basename(path) + shutil.copy(path, scratch_dir) + tar_names.append(filename) + archive.add(path, recursive=True) + return tar_names + + +def mpiexec_value(arg_val): + if "mpiexec" not in arg_val and arg_val != "None": + raise argparse.ArgumentTypeError("Invalid value. Pass the path to mpiexec or None if you are not using mpiexec") + return arg_val + + if __name__ == "__main__": mprint("================== singularity_exec_mpi.py START ==================") @@ -51,18 +81,19 @@ def create_ssh_agent(): help='Singularity SIF image or Docker image (will be converted to SIF)') parser.add_argument('-B', '--bind', type=str, metavar="PATH,...", default="", required=False, help='comma separated list of paths to be bind to Singularity container') - parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False, + parser.add_argument('-m', '--mpiexec', type=mpiexec_value, metavar="PATH", default="", required=False, help="path (inside the container) to mpiexec to be run, default is 'mpiexec'") parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False, help=''' directory path, its content will be copied to SCRATCHDIR; ''') - # if file path, each user defined path inside the file will be copied to SCRATCHDIR + parser.add_argument('-ds', '--dirs-to-scratch', nargs='+', default=[]) + # if file path, each user defined path inside the file will be copied to SCRATCHDIR parser.add_argument('prog', nargs=argparse.REMAINDER, help=''' mpiexec arguments and the executable, follow mpiexec doc: "mpiexec args executable pgmargs [ : args executable pgmargs ... ]" - + still can use MPMD (Multiple Program Multiple Data applications): -n 4 program1 : -n 3 program2 : -n 2 program3 ... ''') @@ -114,7 +145,7 @@ def create_ssh_agent(): else: assert 'HOME' in os.environ ssh_known_hosts_file = os.path.join(os.environ['HOME'], '.ssh/known_hosts') - + mprint("host file name:", ssh_known_hosts_file) ssh_known_hosts = [] @@ -140,7 +171,7 @@ def create_ssh_agent(): # add the nodes to known_hosts so the fingerprint verification is skipped later # in shell just append # >> ~ /.ssh / known_hosts # or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite - ssh_keys = os.popen('ssh-keyscan -H ' + node) .readlines() + ssh_keys = os.popen('ssh-keyscan -H ' + node).readlines() ssh_keys = list((line for line in ssh_keys if not line.startswith('#'))) for sk in ssh_keys: splits = sk.split(" ") @@ -196,21 +227,35 @@ def create_ssh_agent(): command = ' '.join(['cd', source, '&&', 'tar -cvf', source_tar_filepath, '.', '&& cd', script_dir]) oscommand(command) + dirs_to_scratch = args.dirs_to_scratch + tar_names = [] + if len(dirs_to_scratch) > 0: + source_dirs_tar_filename = 'dirs_scratch.tar.xz' + source_dirs_tar_filepath = os.path.join(script_dir, source_dirs_tar_filename) + tar_names = dirs_tar(source, source_dirs_tar_filepath, dirs_to_scratch) + tar_names.append(source_dirs_tar_filename) + for node in node_names: destination_name = username + "@" + node destination_path = destination_name + ':' + scratch_dir_path command = ' '.join(['scp', source_tar_filepath, destination_path]) oscommand(command) - #command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /']) - command = ' '.join(['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename, - '&&', 'rm ', source_tar_filename, '"']) + # command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /']) + + command_list = ['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename, + '&&', 'rm ', source_tar_filename] + + if len(tar_names) > 0: + for tar_name in tar_names: + command = command_list.extend(['&&', 'tar -xf', tar_name, '&&', 'rm ', tar_name]) + + command = ' '.join(command_list) oscommand(command) # remove the scratch tar oscommand(' '.join(['rm', source_tar_filename])) - # A] process bindings, exclude ssh agent in launcher bindings bindings = "-B " + os.environ['SSH_AUTH_SOCK'] # possibly add current dir to container bindings @@ -221,11 +266,11 @@ def create_ssh_agent(): bindings_in_launcher = "-B " + args.bind if scratch_dir_path: - bindings = bindings + "," + scratch_dir_path - if args.bind == "": - bindings_in_launcher = "-B "+ scratch_dir_path - else: - bindings_in_launcher = bindings_in_launcher + "," + scratch_dir_path + bindings = bindings + "," + scratch_dir_path + if args.bind == "": + bindings_in_launcher = "-B " + scratch_dir_path + else: + bindings_in_launcher = bindings_in_launcher + "," + scratch_dir_path sing_command = ' '.join(['singularity', 'exec', bindings, image]) sing_command_in_launcher = ' '.join(['singularity', 'exec', bindings_in_launcher, image]) @@ -250,31 +295,35 @@ def create_ssh_agent(): f.write('\n'.join(launcher_lines)) oscommand('chmod +x ' + launcher_path) - # C] set mpiexec path inside the container - # if container path to mpiexec is provided, use it - # otherwise try to use the default - mpiexec_path = "mpiexec" - if args.mpiexec != "": - mpiexec_path = args.mpiexec - - # test_mpiexec = os.popen(sing_command + ' which ' + 'mpiexec').read() - # # test_mpiexec = os.popen('singularity exec docker://flow123d/geomop:master_8d5574fc2 which flow123d').read() - # mprint("test_mpiexec: ", test_mpiexec) - # if mpiexec_path == "": - # raise Exception("mpiexec path '" + mpiexec_path + "' not found in container!") - - # D] join mpiexec arguments - mpiexec_args = " ".join([mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path]) + if args.mpiexec != "None": + # C] set mpiexec path inside the container + # if container path to mpiexec is provided, use it + # otherwise try to use the default + mpiexec_path = "mpiexec" + if args.mpiexec != "": + mpiexec_path = args.mpiexec + + # test_mpiexec = os.popen(sing_command + ' which ' + 'mpiexec').read() + # # test_mpiexec = os.popen('singularity exec docker://flow123d/geomop:master_8d5574fc2 which flow123d').read() + # mprint("test_mpiexec: ", test_mpiexec) + # if mpiexec_path == "": + # raise Exception("mpiexec path '" + mpiexec_path + "' not found in container!") + + # D] join mpiexec arguments + mpiexec_args = " ".join([mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path]) + + # F] join all the arguments into final singularity container command + final_command = " ".join([sing_command, mpiexec_args, *prog_args]) + else: + final_command = " ".join([sing_command, *prog_args]) - # F] join all the arguments into final singularity container command - final_command = " ".join([sing_command, mpiexec_args, *prog_args]) ################################################################################################################### # Final call. ################################################################################################################### if scratch_dir_path: - mprint("Entering SCRATCHDIR:", scratch_dir_path) - os.chdir(scratch_dir_path) + mprint("Entering SCRATCHDIR:", scratch_dir_path) + os.chdir(scratch_dir_path) mprint("current directory:", os.getcwd()) # mprint(os.popen("ls -l").read())