Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@
zip_safe=False,
install_requires=[],
python_requires='>=3',

packages=['swrap'],
package_dir={'': 'src'},
entry_points={
'console_scripts': ['smpiexec=swrap.smpiexec:main']
'console_scripts': ['sexec=swrap.sexec:main', 'smpiexec=swrap.smpiexec:main']
}
)

Expand Down
51 changes: 48 additions & 3 deletions src/swrap/pbs_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import shutil
import socket
# import subprocess

import sexec
from sexec import flush_print

from utils import flush_print, oscommand
from pathlib import Path

def run_in_ssh(arg_list, init_dir=None):
"""
Expand Down Expand Up @@ -47,3 +48,47 @@ def qsub(job_file, arg_list=None):
return run_in_ssh(["qsub", job_file], init_dir=job_dir)
else:
return run_in_ssh(["qsub", *arg_list, job_file], init_dir=job_dir)


def make_wrapper(dir, cmd, binds):
host_addr = socket.gethostname()
host_addr = host_addr.strip("\n")
full_cmd = shutil.which(cmd)
# full resolution of the cmd

bind_replace="""
PWD_REL="${{PWD#{1}}}"
if [ "$PWD_REL" != "$PWD" ]
then
PWD_HOST="{0}$PWD_REL"
fi
"""
Path(dir).mkdir(parents=True, exist_ok=True)
#pwd_replace_binds = [bind_replace.format(host_dir, cont_dir) for host_dir, cont_dir in binds]
# TODO: support bindings with path different on host and container

ssh_call=f'ssh {host_addr} "cd \'$PWD_HOST\'; {full_cmd} $@"'
pwd_replace_binds=[]
content=[
"#!/bin/bash",
"set -x",
"PWD=\"`pwd`\"",
"PWD_HOST=\"${PWD}\"",
*pwd_replace_binds,
ssh_call
]
wrapper_path = os.path.join(dir, cmd)
with open(wrapper_path, "w") as f:
f.write("\n".join(content))
os.chmod(wrapper_path, 0o777)

def make_pbs_wrappers(dir, binds):
#host_addr = os.environ.get('PBS_O_HOST', None)
#if host_addr is None:

# It seems that 'PBS_O_HOST' contains the name of machin on which the qsub command has been executed.
# Not clear from which node we want to execute the recursive qsub calls, but natural would be from local host (that would be the case without swrap).
# So we use current host, that should be te first host form the nodefile as well.

make_wrapper(dir, 'qstat', binds)
make_wrapper(dir, 'qsub', binds)
202 changes: 126 additions & 76 deletions src/swrap/sexec.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
from typing import *
import os
import sys
import shutil
import argparse
import subprocess
#import attrs
import socket

from argparse import RawTextHelpFormatter
from utils import flush_print, oscommand
from pbs_utils import make_pbs_wrappers


def flush_print(*margs, **mkwargs):
print(*margs, file=sys.stdout, flush=True, **mkwargs)


def oscommand(command_string):
flush_print(command_string)
flush_print(os.popen(command_string).read())


def process_image_path(image_path):
def process_image_url(image_path: str) -> str:
if os.path.isfile(image_path):
image = os.path.abspath(image_path)
elif image_path.startswith('docker://'):
Expand All @@ -26,20 +21,83 @@ def process_image_path(image_path):
return image


def copy_and_read_node_file(orig_node_file, directory):
class SingularityCall:
def __init__(self, image, command, venv="", debug=False):
self.image: str = process_image_url(image)
# singularity image url
self.command: List[str] = command
print("command with args:", command)
# command to call in the container with its arguments
self.venv:str = os.path.abspath(venv) if venv else ""
self.bindings: List[str] = []
self.env_dict: Dict[str, str] = {}
self.debug: bool = False

def append_path(self, add_path):
append_path_list = self.env_dict.get('APPEND_PATH', "").split(":")
append_path_list.append(add_path)
append_path_list = ":".join(append_path_list)
self.env_dict['APPEND_PATH'] = append_path_list

def prepend_path(self, add_path):
append_path_list = self.env_dict.get('PREPEND_PATH', "").split(":")
append_path_list.insert(0, add_path)
append_path_list = ":".join(append_path_list)
self.env_dict['PREPEND_PATH'] = append_path_list

def form_bindings(self):
# currently we olny support binding of the same paths in host and in container
return self.bindings

def form_env_list(self):
self.env_dict['SWRAP_SINGULARITY_VENV'] = self.venv
return [f"{key}={str(value)}" for key, value in self.env_dict.items()]

def cmd_list(self):
if len(self.venv) > 0:
self.prepend_path(os.path.join(os.path.abspath(self.venv), 'bin'))
sing_command = ['singularity', 'exec',
'-B', ",".join(self.form_bindings()),
'--env', ",".join(self.form_env_list()),
self.image,
*self.command]

# F] join all the arguments into final singularity container command
return sing_command

def call(self):
flush_print("current directory:", os.getcwd())
# mprint(os.popen("ls -l").read())
flush_print("final command:", *self.cmd_list())
flush_print("=================== smpiexec.py END ===================")
if not self.debug:
flush_print("================== Program output START ==================")
# proc = subprocess.run(final_command_list)
oscommand(self.cmd_list())
flush_print("=================== Program output END ===================")
# exit(proc.returncode)


def copy_and_read_node_file(directory):
node_file = os.path.join(directory, "nodefile")
orig_node_file = os.environ.get('PBS_NODEFILE', None)
if orig_node_file is None:
hostname = socket.gethostname()
flush_print(f"Warning: missing PBS_NODEFILE variable. Using just local node: {hostname}.")
with open(node_file, "w") as f:
f.write(hostname)
else:
# create a copy
shutil.copy(orig_node_file, node_file)

flush_print("reading host file...")

# create a copy
node_file = os.path.join(directory, os.path.basename(orig_node_file))
shutil.copy(orig_node_file, node_file)
# mprint(os.popen("ls -l").read())

# read node names
with open(node_file) as fp:
node_names_read = fp.read().splitlines()
# remove duplicates
node_names = list(dict.fromkeys(node_names_read))
return node_file, node_names
node_names = list(set(node_names_read))
return node_file, node_names


def create_ssh_agent():
Expand Down Expand Up @@ -113,7 +171,14 @@ def process_known_hosts_file(ssh_known_hosts_file, node_names):


def prepare_scratch_dir(scratch_source, node_names):
scratch_dir_path = os.environ['SCRATCHDIR']
if scratch_source == "":
return os.getcwd()

scratch_dir_path = os.environ.get('SCRATCHDIR', None)
if scratch_dir_path is None:
return os.getcwd()


flush_print("Using SCRATCHDIR:", scratch_dir_path)

flush_print("copying to SCRATCHDIR on all nodes...")
Expand All @@ -138,35 +203,41 @@ def prepare_scratch_dir(scratch_source, node_names):
current_dir = os.getcwd()
source_tar_filename = 'scratch.tar'
source_tar_filepath = os.path.join(current_dir, source_tar_filename)
command = ' '.join(['cd', source, '&&', 'tar -cvf', source_tar_filepath, '.', '&& cd', current_dir])
oscommand(command)
oscommand(['tar', '-cvf', source_tar_filepath], cwd=source)

# copy to scratch dir on every used node through ssh
for node in node_names:
destination_name = username + "@" + node
destination_path = destination_name + ':' + scratch_dir_path
command = ' '.join(['scp', source_tar_filepath, destination_path])
oscommand(command)
oscommand(['scp', source_tar_filepath, destination_path])

# command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /'])
command = ' '.join(['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename,
'&&', 'rm ', source_tar_filename, '"'])
oscommand(command)
oscommand(['ssh', destination_name, f'cd "{scratch_dir_path}" && tar -xf "{source_tar_filename}" && rm "source_tar_filename"'])

# remove the scratch tar
oscommand(' '.join(['rm', source_tar_filename]))
oscommand(['rm', source_tar_filename])
return scratch_dir_path


def arguments():
parser = argparse.ArgumentParser(
description='Auxiliary executor for parallel programs running inside (Singularity) container under PBS.',
formatter_class=argparse.RawTextHelpFormatter)
description=\
"""
Auxiliary executor for parallel programs running inside (Singularity) container under PBS.

Provides some tools to start other jobs running in the same image:
1. wrapper sripts 'qsub' and 'qstat' are created in the job auxiliary directory
2. environment variables SINGULARITY_CONTAINER, SINGULARITY_BIND, SWRAP_SINGULARITY_VENV.
3. mpiexec ...
"""
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('-d', '--debug', action='store_true',
help='use testing files and print the final command')
parser.add_argument('-i', '--image', type=str, required=True,
help='Singularity SIF image or Docker image (will be converted to SIF)')
parser.add_argument('-B', '--bind', type=str, metavar="PATH,...", default="", required=False,
help='comma separated list of paths to be bind to Singularity container')
parser.add_argument('-e', '--venv', type=str, metavar="PATH", default="", required=False,
help='If specified, the python virtual environment in PATH directory will be activated before given command.')
parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False,
help="path (inside the container) to mpiexec to be run, default is 'mpiexec'")
parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False,
Expand All @@ -193,42 +264,37 @@ def arguments():
return args


def setup_aux_dir():
current_dir = os.getcwd()
pbs_job_id = os.environ.get('PBS_JOBID', f"pid_{os.getpid()}")
flush_print("PBS job id: ", pbs_job_id)
pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job')
# create auxiliary job output directory
os.makedirs(pbs_job_aux_dir, mode=0o775)
return pbs_job_aux_dir


def main():
flush_print("================== smpiexec.py START ==================")
current_dir = os.getcwd()
args = arguments()

# get debug variable
debug = args.debug
# get program and its arguments
prog_args = args.prog[1:]

# get program and its arguments, set absolute path
image = process_image_path(args.image)

sing = SingularityCall(args.image, args.prog, args.venv, debug=args.debug)
###################################################################################################################
# Process node file and setup ssh access to given nodes.
###################################################################################################################

flush_print("Hostname: ", os.popen('hostname').read())
# mprint("os.environ", os.environ)
pbs_job_aux_dir = setup_aux_dir()

pbs_job_id = os.environ['PBS_JOBID']
flush_print("PBS job id: ", pbs_job_id)
pbs_job_aux_dir = os.path.join(current_dir, pbs_job_id + '_job')
# create auxiliary job output directory
os.makedirs(pbs_job_aux_dir, mode=0o775)

# get nodefile, copy it to local dir so that it can be passed into container mpiexec later
if debug:
orig_node_file = "testing_hostfile"
else:
orig_node_file = os.environ['PBS_NODEFILE']
node_file, node_names = copy_and_read_node_file(orig_node_file, pbs_job_aux_dir)
node_file, node_names = copy_and_read_node_file(pbs_job_aux_dir)


# Get ssh keys to nodes and append it to $HOME/.ssh/known_hosts
ssh_known_hosts_to_append = []
if debug:
if sing.debug:
# ssh_known_hosts_file = 'testing_known_hosts'
ssh_known_hosts_file = 'xxx/.ssh/testing_known_hosts'
else:
Expand All @@ -244,29 +310,24 @@ def main():
###################################################################################################################

flush_print("assembling final command...")

scratch_dir_path = None
if 'SCRATCHDIR' in os.environ:
scratch_dir_path = prepare_scratch_dir(args.scratch_copy, node_names)
scratch_dir_path = prepare_scratch_dir(args.scratch_copy, node_names)


# A] process bindings, exclude ssh agent in launcher bindings
common_bindings = ["/etc/ssh/ssh_config", "/etc/ssh/ssh_known_hosts", "/etc/krb5.conf"]
bindings = [*common_bindings, os.environ['SSH_AUTH_SOCK']]
sing.bindings.extend(common_bindings)
sing.bindings.append(os.environ['SSH_AUTH_SOCK'])
# possibly add current dir to container bindings
# bindings = bindings + "," + current_dir + ":" + current_dir
if args.bind != "":
bindings.append(args.bind)
sing.bindings.append(args.bind)

if scratch_dir_path:
bindings.append(scratch_dir_path)

sing_command = ['singularity', 'exec', '-B', ",".join(bindings), image]
sing.bindings.append(scratch_dir_path)

flush_print('sing_command:', *sing_command)

# F] join all the arguments into final singularity container command
final_command_list = [*sing_command, *prog_args]
make_pbs_wrappers(pbs_job_aux_dir, sing.bindings)
sing.append_path(pbs_job_aux_dir)

###################################################################################################################
# Final call.
Expand All @@ -275,18 +336,7 @@ def main():
flush_print("Entering SCRATCHDIR:", scratch_dir_path)
os.chdir(scratch_dir_path)

flush_print("current directory:", os.getcwd())
# mprint(os.popen("ls -l").read())
flush_print("final command:", *final_command_list)
flush_print("=================== smpiexec.py END ===================")
if not debug:
flush_print("================== Program output START ==================")
# proc = subprocess.run(final_command_list)
final_command = " ".join(final_command_list)
oscommand(final_command)

flush_print("=================== Program output END ===================")
# exit(proc.returncode)
sing.call()

if __name__ == "__main__":
main()
main()
Loading