Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,6 @@ jobs:
sshpass -e ssh ${opt} ${{secrets.CHARON_USER_FRONTEND}} \
"cd _testing; rm -rf swrap; git clone -b ${BRANCH} https://github.com/flow123d/swrap.git; \
pip install --user .; \
cd swrap/testing/integrated; python3 run_test.py 01_mpi4py"
cd swrap/integrated; python3 run_test.py 01_mpi4py"


Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
common: # these fields are used in every case
image: docker://flow123d/geomop-gnu:2.0.0
# image to run in
wrapper: smpiexec
wrapper: smpiexec.py
# wrapper to test, currently just mpiexec

#####
# PBS options see: https://wiki.metacentrum.cz/wiki/About_scheduling_system#How_to_set_number_of_nodes_and_processors
pbs_select: 1
pbs_select: 1:mem=4gb:scratch_local=10gb
# PBS select option: "-l select={pbs_select}"
# syntax select=<chunk_spec> [+ <other chunk specs>]
# <chunk spec> = <n chunks>[:ncpu=<n processes per chunk>][:mem=<memory limit>][:scratch_<type>=<size>]
Expand All @@ -32,9 +32,15 @@ cases:
# used number of precesses must fit the scheduled number on PBS; see pbs_* keys
# the host file is passed automaticaly
# for mpiexec see: https://www.open-mpi.org/doc/v3.0/man1/mpiexec.1.php
pbs_select: 2
pbs_select: 2:mem=4gb:scratch_local=10gb

- name: script
command: -n 4 python3 script.py
pbs_select: 2:ncpus=2

- name: simple
command: -n 4 python3 simple.py
pbs_select: 2:ncpus=2:mem=4gb:scratch_local=10gb

- name: error_in_process
command: -n 4 python3 error_in_process.py
pbs_select: 2:ncpus=2:mem=4gb:scratch_local=10gb
checkers:
check_return_code: 1
check_find_stdout_regex: ZeroDivisionError
13 changes: 13 additions & 0 deletions integrated/01_mpi4py/error_in_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from mpi4py import MPI


comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

if rank == 0:
# error
print("P{} zero division.".format(rank))
a = 2 / 0

print("P{} finished".format(rank))
File renamed without changes.
37 changes: 37 additions & 0 deletions integrated/01_mpi4py/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from mpi4py import MPI
import hashlib
import random
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

seed = hashlib.sha256(str(rank).encode()).digest()
random.seed(seed)

rand_max = 1000_000
num_sends = 100_000
num_iter = 10

for i in range(num_iter):
sum = 0
for j in range(1, num_sends + 1):
data_out = random.randrange(rand_max)
dest = (rank + j) % size
req = comm.isend(data_out, dest=dest)
req.wait()

req = comm.irecv()
data_in = req.wait()
sum += data_out
sum += data_in
sum %= 275_604_541

recvbuf = np.empty(1, dtype=int)
sendbuf = sum * np.ones(size, dtype=int)
comm.Reduce_scatter(sendbuf, recvbuf)

print("[{}] {}".format(rank, recvbuf[0]))

print("P{} finished".format(rank))
File renamed without changes.
31 changes: 24 additions & 7 deletions testing/integrated/run_test.py → integrated/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ def check_return_code(state: JobState, ref_return_code: int) -> str:
return f"Error: return code {state.return_code} != {ref_return_code}"


def check_find_stdout_regex(state: JobState, regex: str) -> str:
"""
Check if output contains regular expression.
"""
if re.search(regex, state.get_output()) is None:
return f"Error: output don't contain regular expression: {regex}"


# def as_list(x):
# if type(x) is list:
# return x
Expand Down Expand Up @@ -190,7 +198,7 @@ def __init__(self, queue: Queue, config: Dict[str, Any]):

Run._counter += 1
self.id = Run._counter
config['wrapper'] = os.path.join(test_script_dir, "../../src/swrap/", config['wrapper'])
config['wrapper'] = os.path.join(test_script_dir, "../src/swrap/", config['wrapper'])
self.config = dict_merge(Run._default_config, config)

self.checkers = [(get_check_fn(fn), normalize_args(args)) for fn, args in config['checkers'].items()]
Expand Down Expand Up @@ -290,8 +298,8 @@ def pbs_submit(self):
time.sleep(10)
err_msg = None
for check_fn, args in self.checkers:
err_msh = check_fn(self.state, *args)
if err_msh is not None:
err_msg = check_fn(self.state, *args)
if err_msg is not None:
break
self.summary(err_msg)
print(f"[{self.id}] done")
Expand All @@ -304,27 +312,33 @@ def summary(self, err_msg):
if err_msg is None:
result = "Succeed:"
err_msg = ""
success = True
else:
if self.state.timeout:
result = "Timed out:"
else:
result = "Failed:"
err_msg = f"{err_msg}\n{self.state.get_output()}"
success = False
msg = f"[{self.id}] {result} {self.short_id}, {self.pbs_script}: {self.config['command']} \n{err_msg}"
self.queue.put((self.id, msg))

self.queue.put((self.id, msg, success))


def report(print_queue):
messages = []
all_success = True
while True:
try:
messages.append(print_queue.get_nowait())
print_queue.task_done()
except Empty:
break
for id, msg in sorted(messages):
for id, msg, success in sorted(messages):
print(msg)
if not success:
all_success = False
return all_success


def parse_arguments():
"""
Expand Down Expand Up @@ -383,7 +397,10 @@ def main():

for f in futures:
print(f.result())
report(print_queue)
all_success = report(print_queue)
if not all_success:
exit(1)


if __name__ == "__main__":
main()
File renamed without changes.
File renamed without changes.
113 changes: 27 additions & 86 deletions src/swrap/smpiexec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,10 @@

from argparse import RawTextHelpFormatter

def flush_print(*margs, **mkwargs):
print(*margs, file=sys.stdout, flush=True, **mkwargs)

def oscommand(command_string):
flush_print(command_string)
flush_print(os.popen(command_string).read())

def create_ssh_agent():
"""
Setup ssh agent and set appropriate environment variables.
:return:
"""
flush_print("creating ssh agent...")
p = subprocess.Popen('ssh-agent -s',
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
shell=True, universal_newlines=True)
outinfo, errinfo = p.communicate('ssh-agent cmd\n')
# print(outinfo)

lines = outinfo.split('\n')
for line in lines:
# trim leading and trailing whitespace
line = line.strip()
# ignore blank/empty lines
if not line:
continue
# break off the part before the semicolon
left, right = line.split(';', 1)
if '=' in left:
# get variable and value, put into os.environ
varname, varvalue = left.split('=', 1)
flush_print("setting variable from ssh-agent:", varname, "=", varvalue)
os.environ[varname] = varvalue
sys.path.append(os.path.dirname(os.path.realpath(__file__)))

from tools import flush_print, oscommand, create_ssh_agent, create_known_hosts_file


def arguments():
parser = argparse.ArgumentParser(
Expand All @@ -52,7 +23,11 @@ def arguments():
help='comma separated list of paths to be bind to Singularity container')
parser.add_argument('-m', '--mpiexec', type=str, metavar="PATH", default="", required=False,
help="path (inside the container) to mpiexec to be run, default is 'mpiexec'")
parser.add_argument('-s', '--scratch_copy', type=str, metavar="PATH", default="", required=False,
parser.add_argument('-s', '--scratch_dir', type=str, metavar="PATH", default="", required=False,
help='''
directory path, where SCRATCHDIR is, overwrite SCRATCHDIR from environment;
''')
parser.add_argument('-c', '--scratch_copy', type=str, metavar="PATH", default="", required=False,
help='''
directory path, its content will be copied to SCRATCHDIR;
''')
Expand Down Expand Up @@ -117,50 +92,7 @@ def main():
shutil.copy(orig_node_file, node_file)
# mprint(os.popen("ls -l").read())

# Get ssh keys to nodes and append it to $HOME/.ssh/known_hosts
ssh_known_hosts_to_append = []
if debug:
# ssh_known_hosts_file = 'testing_known_hosts'
ssh_known_hosts_file = 'xxx/.ssh/testing_known_hosts'
else:
assert 'HOME' in os.environ
ssh_known_hosts_file = os.path.join(os.environ['HOME'], '.ssh/known_hosts')

flush_print("host file name:", ssh_known_hosts_file)

ssh_known_hosts = []
if os.path.exists(ssh_known_hosts_file):
with open(ssh_known_hosts_file, 'r') as fp:
ssh_known_hosts = fp.readlines()
else:
flush_print("creating host file...")
dirname = os.path.dirname(ssh_known_hosts_file)
if not os.path.exists(dirname):
os.makedirs(dirname)

flush_print("reading host file...")
with open(node_file) as fp:
node_names_read = fp.read().splitlines()
# remove duplicates
node_names = list(dict.fromkeys(node_names_read))

flush_print("connecting nodes...")
for node in node_names:
# touch all the nodes, so that they are accessible also through container
os.popen('ssh ' + node + ' exit')
# add the nodes to known_hosts so the fingerprint verification is skipped later
# in shell just append # >> ~ /.ssh / known_hosts
# or sort by 3.column in shell: 'sort -k3 -u ~/.ssh/known_hosts' and rewrite
ssh_keys = os.popen('ssh-keyscan -H ' + node) .readlines()
ssh_keys = list((line for line in ssh_keys if not line.startswith('#')))
for sk in ssh_keys:
splits = sk.split(" ")
if not splits[2] in ssh_known_hosts:
ssh_known_hosts_to_append.append(sk)

flush_print("finishing host file...")
with open(ssh_known_hosts_file, 'a') as fp:
fp.writelines(ssh_known_hosts_to_append)
node_names = create_known_hosts_file(current_dir, node_file, debug=debug)

# mprint(os.environ)
create_agent = 'SSH_AUTH_SOCK' not in os.environ
Expand All @@ -179,8 +111,11 @@ def main():
flush_print("assembling final command...")

scratch_dir_path = None
if 'SCRATCHDIR' in os.environ:
if args.scratch_dir:
scratch_dir_path = args.scratch_dir
elif 'SCRATCHDIR' in os.environ:
scratch_dir_path = os.environ['SCRATCHDIR']
if scratch_dir_path and args.scratch_copy:
flush_print("Using SCRATCHDIR:", scratch_dir_path)

flush_print("copying to SCRATCHDIR on all nodes...")
Expand Down Expand Up @@ -210,11 +145,11 @@ def main():
for node in node_names:
destination_name = username + "@" + node
destination_path = destination_name + ':' + scratch_dir_path
command = ' '.join(['scp', source_tar_filepath, destination_path])
command = ' '.join(['scp', '-o', 'UserKnownHostsFile=known_hosts', source_tar_filepath, destination_path])
oscommand(command)

#command = ' '.join(['ssh', destination_name, 'cd', scratch_dir_path, '&&', 'tar --strip-components 1 -xf', source_tar_filepath, '-C /'])
command = ' '.join(['ssh', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename,
command = ' '.join(['ssh', '-o', 'UserKnownHostsFile=known_hosts', destination_name, '"cd', scratch_dir_path, '&&', 'tar -xf', source_tar_filename,
'&&', 'rm ', source_tar_filename, '"'])
oscommand(command)

Expand All @@ -238,7 +173,8 @@ def main():
else:
bindings_in_launcher = bindings_in_launcher + "," + scratch_dir_path

sing_command = ' '.join(['singularity', 'exec', bindings, image])
sing_command_list = ['singularity', 'exec', bindings, image]
sing_command = ' '.join(sing_command_list)
sing_command_in_launcher = ' '.join(['singularity', 'exec', bindings_in_launcher, image])

flush_print('sing_command:', sing_command)
Expand Down Expand Up @@ -286,15 +222,15 @@ def main():
# raise Exception("mpiexec path '" + mpiexec_path + "' not found in container!")

# D] join mpiexec arguments
mpiexec_args = " ".join([mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path])
mpiexec_args = [mpiexec_path, '-f', node_file, '-launcher-exec', launcher_path]

# F] join all the arguments into final singularity container command
final_command_list = [sing_command, mpiexec_args, *prog_args]
final_command_list = [*sing_command_list, *mpiexec_args, *prog_args]

###################################################################################################################
# Final call.
###################################################################################################################
if scratch_dir_path:
if scratch_dir_path and args.scratch_copy:
flush_print("Entering SCRATCHDIR:", scratch_dir_path)
os.chdir(scratch_dir_path)

Expand All @@ -304,7 +240,12 @@ def main():
flush_print("=================== smpiexec.py END ===================")
if not debug:
flush_print("================== Program output START ==================")
proc = subprocess.run(final_command_list)
if scratch_dir_path:
sing_tmp = os.path.join(scratch_dir_path, "singularity_tmp")
else:
sing_tmp = os.path.join(os.environ['HOME'], "singularity_tmp")
os.makedirs(sing_tmp, exist_ok = True)
proc = subprocess.run(final_command_list, env={**os.environ, "SINGULARITY_TMPDIR": sing_tmp})

flush_print("=================== Program output END ===================")
exit(proc.returncode)
Expand Down
Loading