From 80beffa946a1df9d432d946f4e5a7faa197971be Mon Sep 17 00:00:00 2001 From: Maxime Piraux Date: Wed, 23 Oct 2024 11:35:42 +0200 Subject: [PATCH] Adds an utility script to provision and cleanup mpf clusters --- README.md | 4 ++- mpf-cluster | 68 +++++++++++++++++++++++++++++++++++++++++++++++++ mpf/__init__.py | 7 ++--- setup.py | 1 + 4 files changed, 76 insertions(+), 4 deletions(-) create mode 100755 mpf-cluster diff --git a/README.md b/README.md index 63ddd99..65d7428 100644 --- a/README.md +++ b/README.md @@ -46,4 +46,6 @@ $ tests/experiments/iperf.ipy -c tests/experiments/cluster.yaml ## Requirements -Running experiments with mpf requires reasonably coherent Python and packages versions between the executor and the remote nodes. `ipyparallel` should be installed on the remote nodes and `rsync` on the node running `mpf`. +Running experiments with mpf requires reasonably coherent Python and packages versions between the executor and the remote nodes. +`ipyparallel` should be installed on the remote nodes and `rsync` on the node running `mpf`. +The `mpf-cluster` utility can be used to provision remote nodes automatically with SSH. diff --git a/mpf-cluster b/mpf-cluster new file mode 100755 index 0000000..74bfa74 --- /dev/null +++ b/mpf-cluster @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 + +import argparse +import subprocess +import yaml +import sys +import os + +from mpf import REMOTE_PROFILE_DIR + +MINICONDA_INSTALL_LINK = 'https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh' +MINICONDA_INSTALLER = 'Miniconda3-latest-Linux-x86_64.sh' + +def execute(machine, *command, quiet=False): + return subprocess.run(['ssh', f"{machine['user']}@{machine['hostname']}"] + list(command), capture_output=quiet) + +parser = argparse.ArgumentParser(description='Control an mpf cluster') +parser.add_argument('-c', '--cluster', metavar='cluster.yaml', type=argparse.FileType('r'), required=True, help='The YAML file describing the cluster') +subparsers = parser.add_subparsers(title='Action to execute on the cluster', dest='action') +cleanup_parser = subparsers.add_parser('cleanup', help='Stop all processes and remove all files related to mpf on the cluster') +cleanup_parser.add_argument('-f', '--force', action='store_true', help='Abortfully clean the cluster') +provision_parser = subparsers.add_parser('provision', help='Installs the required Python environment with Miniconda') +provision_parser.add_argument('-p', '--python-version', metavar='version', default=f"{sys.version_info.major}.{sys.version_info.minor}", help='Sets the Python version to provision') +args = parser.parse_args() + +cluster_definition = yaml.safe_load(args.cluster) + +if args.action == 'provision': + path = cluster_definition['global']['python_path'] + + conda_env_name = os.path.basename(os.path.dirname(os.path.dirname(path))) + miniconda_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(path)))) + + if 'envs' not in path or not path.endswith('bin/python') or miniconda_dir == '/': + print("Python path must be in the form */{miniconda_dir}/envs/{env_name}/bin/python", file=sys.stderr) + exit(-1) + + for m in cluster_definition['machines'] + [cluster_definition['controller']]: + if m['hostname'] == 'localhost': + print("Provisioning local machines is not supported yet", file=sys.stderr) + continue + if execute(m, 'ls', cluster_definition['global']['python_path'], quiet=True).returncode == 0: + print(f"Path {cluster_definition['global']['python_path']} already exists on machine {m['hostname']}, skipping", file=sys.stderr) + continue + assert execute(m, 'wget', '--quiet', MINICONDA_INSTALL_LINK).returncode == 0, f"Couldn't download Miniconda installer on {m['hostname']}" + assert execute(m, 'bash', f"./{MINICONDA_INSTALLER}", '-b', '-u', '-p', miniconda_dir), f"Couldn't install Miniconda in {miniconda_dir}" + assert execute(m, f'{miniconda_dir}/bin/conda', 'create', '-n', conda_env_name, '-y', f'Python={args.python_version}', 'ipyparallel', 'numpy', 'scipy') + assert execute(m, 'rm', MINICONDA_INSTALLER).returncode == 0, f"Coudln't remove Miniconda installer on {m['hostname']}" + +elif args.action == 'cleanup': + profile_dir = f"profile_{os.path.basename(args.cluster.name)}" + if not os.path.exists(profile_dir): + print(f"Couldn't locate profile directory {profile_dir} of cluster {args.cluster.name}, make sure it is in the current directory", file=sys.stderr) + exit(-1) + + if not args.force: + subprocess.run(['ipcluster', 'stop', f'--profile-dir=./{profile_dir}/']) + + for m in cluster_definition['machines'] + [cluster_definition['controller']]: + execute(m, 'pkill', '-f', f'"{cluster_definition['global']['python_path']} -m ipyparallel"') + + for m in cluster_definition['machines']: + execute(m, 'rm', '-rf', REMOTE_PROFILE_DIR) + + profile_dir = os.path.relpath(f"profile_{os.path.basename(args.cluster.name)}", start=os.path.expanduser('~')) + execute(cluster_definition['controller'], 'rm', '-rf', profile_dir) + + subprocess.run(['rm', '-rf', profile_dir]) \ No newline at end of file diff --git a/mpf/__init__.py b/mpf/__init__.py index 642a49b..189eddd 100644 --- a/mpf/__init__.py +++ b/mpf/__init__.py @@ -30,6 +30,7 @@ run_logger.addHandler(sh) RESERVED_VARIABLES = {'mpf_ctx'} +REMOTE_PROFILE_DIR = '/tmp/mpf-ipy-profile' setup_done: bool = False variables: Dict[str, 'Variable'] = {} @@ -192,9 +193,9 @@ def create_profile(profile_dir: str, cluster: dict): ipcluster_config += """ c.Cluster.engine_launcher_class = 'ssh' c.SSHEngineSetLauncher.engines = {engines} -c.SSHEngineSetLauncher.remote_profile_dir = '/tmp/mpf-ipy-profile' +c.SSHEngineSetLauncher.remote_profile_dir = '{remote_profile_dir}' c.SSHLauncher.remote_python = '{python_path}' -""".format(engines=repr({e: 1 for e in engines}), python_path=cluster['global']['python_path']) +""".format(engines=repr({e: 1 for e in engines}), python_path=cluster['global']['python_path'], remote_profile_dir=REMOTE_PROFILE_DIR) elif engines_launcher == 'local': ipcluster_config += "c.Cluster.engine_launcher_class = 'local'\n" @@ -211,7 +212,7 @@ def create_profile(profile_dir: str, cluster: dict): if engines_launcher == 'ssh': for e in engines: - p = subprocess.run(['rsync', '--mkpath', magics_profile_filename, f'{e}:/tmp/mpf-ipy-profile/startup/00-mpf_magics.ipy']) + p = subprocess.run(['rsync', '--mkpath', magics_profile_filename, f'{e}:{REMOTE_PROFILE_DIR}/startup/00-mpf_magics.ipy']) assert p.returncode == 0 def add_variable(name: str, values): diff --git a/setup.py b/setup.py index 2ba1dca..260c3fe 100644 --- a/setup.py +++ b/setup.py @@ -17,5 +17,6 @@ 'scipy >= 1.11', 'tqdm' ], + scripts=['mpf-cluster'], include_package_data=True, )