Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 82 additions & 33 deletions singularity_exec_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
import shutil
import argparse
import subprocess
import tarfile

from argparse import RawTextHelpFormatter


def mprint(*margs, **mkwargs):
print(*margs, file=sys.stdout, flush=True, **mkwargs)


def oscommand(command_string):
mprint(command_string)
mprint(os.popen(command_string).read())


def create_ssh_agent():
mprint("creating ssh agent...")
p = subprocess.Popen('ssh-agent -s',
Expand All @@ -37,6 +41,32 @@ def create_ssh_agent():
os.environ[varname] = varvalue


def dirs_tar(scratch_dir, tar_filepath, dirs_to_scratch):
"""
Create .tar file of passed dirs
Process already created .tar archives
:param scratch_dir: path to scratch directory
:param tar_filepath: path to the tar file that is currently being created
:param dirs_to_scratch: list of dirs that are being compressed and moved to the scratch
:return: list of .tar file names
"""
tar_names = []
with tarfile.open(tar_filepath, mode='w:xz') as archive:
for path in dirs_to_scratch:
if os.path.isfile(path) and tarfile.is_tarfile(path):
filename = os.path.basename(path)
shutil.copy(path, scratch_dir)
tar_names.append(filename)
archive.add(path, recursive=True)
return tar_names


def mpiexec_value(arg_val):
if "mpiexec" not in arg_val and arg_val != "None":
raise argparse.ArgumentTypeError("Invalid value. Pass the path to mpiexec or None if you are not using mpiexec")
return arg_val


if __name__ == "__main__":

mprint("================== singularity_exec_mpi.py START ==================")
Expand All @@ -51,18 +81,19 @@ def create_ssh_agent():
help='Singularity SIF image or Docker image (will be converted to SIF)')
parser.add_argument('-B', '--bind', type=str, metavar="PATH,...", default="", required=False,
help='comma separated list of paths to be bind to Singularity container')
parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False,
parser.add_argument('-m', '--mpiexec', type=mpiexec_value, metavar="PATH", default="", required=False,
help="path (inside the container) to mpiexec to be run, default is 'mpiexec'")
parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False,
help='''
directory path, its content will be copied to SCRATCHDIR;
''')
# if file path, each user defined path inside the file will be copied to SCRATCHDIR
parser.add_argument('-ds', '--dirs-to-scratch', nargs='+', default=[])
# if file path, each user defined path inside the file will be copied to SCRATCHDIR
parser.add_argument('prog', nargs=argparse.REMAINDER,
help='''
mpiexec arguments and the executable, follow mpiexec doc:
"mpiexec args executable pgmargs [ : args executable pgmargs ... ]"

still can use MPMD (Multiple Program Multiple Data applications):
-n 4 program1 : -n 3 program2 : -n 2 program3 ...
''')
Expand Down Expand Up @@ -114,7 +145,7 @@ def create_ssh_agent():
else:
assert 'HOME' in os.environ
ssh_known_hosts_file = os.path.join(os.environ['HOME'], '.ssh/known_hosts')

mprint("host file name:", ssh_known_hosts_file)

ssh_known_hosts = []
Expand All @@ -140,7 +171,7 @@ def create_ssh_agent():
# add the nodes to known_hosts so the fingerprint verification is skipped later
# in shell just append # >> ~ /.ssh / known_hosts
# or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite
ssh_keys = os.popen('ssh-keyscan -H ' + node) .readlines()
ssh_keys = os.popen('ssh-keyscan -H ' + node).readlines()
ssh_keys = list((line for line in ssh_keys if not line.startswith('#')))
for sk in ssh_keys:
splits = sk.split(" ")
Expand Down Expand Up @@ -196,21 +227,35 @@ def create_ssh_agent():
command = ' '.join(['cd', source, '&&', 'tar -cvf', source_tar_filepath, '.', '&& cd', script_dir])
oscommand(command)

dirs_to_scratch = args.dirs_to_scratch
tar_names = []
if len(dirs_to_scratch) > 0:
source_dirs_tar_filename = 'dirs_scratch.tar.xz'
source_dirs_tar_filepath = os.path.join(script_dir, source_dirs_tar_filename)
tar_names = dirs_tar(source, source_dirs_tar_filepath, dirs_to_scratch)
tar_names.append(source_dirs_tar_filename)

for node in node_names:
destination_name = username + "@" + node
destination_path = destination_name + ':' + scratch_dir_path
command = ' '.join(['scp', source_tar_filepath, destination_path])
oscommand(command)

#command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /'])
command = ' '.join(['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename,
'&&', 'rm ', source_tar_filename, '"'])
# command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /'])

command_list = ['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename,
'&&', 'rm ', source_tar_filename]

if len(tar_names) > 0:
for tar_name in tar_names:
command = command_list.extend(['&&', 'tar -xf', tar_name, '&&', 'rm ', tar_name])

command = ' '.join(command_list)
oscommand(command)

# remove the scratch tar
oscommand(' '.join(['rm', source_tar_filename]))


# A] process bindings, exclude ssh agent in launcher bindings
bindings = "-B " + os.environ['SSH_AUTH_SOCK']
# possibly add current dir to container bindings
Expand All @@ -221,11 +266,11 @@ def create_ssh_agent():
bindings_in_launcher = "-B " + args.bind

if scratch_dir_path:
bindings = bindings + "," + scratch_dir_path
if args.bind == "":
bindings_in_launcher = "-B "+ scratch_dir_path
else:
bindings_in_launcher = bindings_in_launcher + "," + scratch_dir_path
bindings = bindings + "," + scratch_dir_path
if args.bind == "":
bindings_in_launcher = "-B " + scratch_dir_path
else:
bindings_in_launcher = bindings_in_launcher + "," + scratch_dir_path

sing_command = ' '.join(['singularity', 'exec', bindings, image])
sing_command_in_launcher = ' '.join(['singularity', 'exec', bindings_in_launcher, image])
Expand All @@ -250,31 +295,35 @@ def create_ssh_agent():
f.write('\n'.join(launcher_lines))
oscommand('chmod +x ' + launcher_path)

# C] set mpiexec path inside the container
# if container path to mpiexec is provided, use it
# otherwise try to use the default
mpiexec_path = "mpiexec"
if args.mpiexec != "":
mpiexec_path = args.mpiexec

# test_mpiexec = os.popen(sing_command + ' which ' + 'mpiexec').read()
# # test_mpiexec = os.popen('singularity exec docker://flow123d/geomop:master_8d5574fc2 which flow123d').read()
# mprint("test_mpiexec: ", test_mpiexec)
# if mpiexec_path == "":
# raise Exception("mpiexec path '" + mpiexec_path + "' not found in container!")

# D] join mpiexec arguments
mpiexec_args = " ".join([mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path])
if args.mpiexec != "None":
# C] set mpiexec path inside the container
# if container path to mpiexec is provided, use it
# otherwise try to use the default
mpiexec_path = "mpiexec"
if args.mpiexec != "":
mpiexec_path = args.mpiexec

# test_mpiexec = os.popen(sing_command + ' which ' + 'mpiexec').read()
# # test_mpiexec = os.popen('singularity exec docker://flow123d/geomop:master_8d5574fc2 which flow123d').read()
# mprint("test_mpiexec: ", test_mpiexec)
# if mpiexec_path == "":
# raise Exception("mpiexec path '" + mpiexec_path + "' not found in container!")

# D] join mpiexec arguments
mpiexec_args = " ".join([mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path])

# F] join all the arguments into final singularity container command
final_command = " ".join([sing_command, mpiexec_args, *prog_args])
else:
final_command = " ".join([sing_command, *prog_args])

# F] join all the arguments into final singularity container command
final_command = " ".join([sing_command, mpiexec_args, *prog_args])

###################################################################################################################
# Final call.
###################################################################################################################
if scratch_dir_path:
mprint("Entering SCRATCHDIR:", scratch_dir_path)
os.chdir(scratch_dir_path)
mprint("Entering SCRATCHDIR:", scratch_dir_path)
os.chdir(scratch_dir_path)

mprint("current directory:", os.getcwd())
# mprint(os.popen("ls -l").read())
Expand Down