Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ $ tests/experiments/iperf.ipy -c tests/experiments/cluster.yaml

## Requirements

Running experiments with mpf requires reasonably coherent Python and packages versions between the executor and the remote nodes. `ipyparallel` should be installed on the remote nodes and `rsync` on the node running `mpf`.
Running experiments with mpf requires reasonably coherent Python and packages versions between the executor and the remote nodes.
`ipyparallel` should be installed on the remote nodes and `rsync` on the node running `mpf`.
The `mpf-cluster` utility can be used to provision remote nodes automatically with SSH.
68 changes: 68 additions & 0 deletions mpf-cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3

import argparse
import subprocess
import yaml
import sys
import os

from mpf import REMOTE_PROFILE_DIR

MINICONDA_INSTALL_LINK = 'https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh'
MINICONDA_INSTALLER = 'Miniconda3-latest-Linux-x86_64.sh'

def execute(machine, *command, quiet=False):
return subprocess.run(['ssh', f"{machine['user']}@{machine['hostname']}"] + list(command), capture_output=quiet)

parser = argparse.ArgumentParser(description='Control an mpf cluster')
parser.add_argument('-c', '--cluster', metavar='cluster.yaml', type=argparse.FileType('r'), required=True, help='The YAML file describing the cluster')
subparsers = parser.add_subparsers(title='Action to execute on the cluster', dest='action')
cleanup_parser = subparsers.add_parser('cleanup', help='Stop all processes and remove all files related to mpf on the cluster')
cleanup_parser.add_argument('-f', '--force', action='store_true', help='Abortfully clean the cluster')
provision_parser = subparsers.add_parser('provision', help='Installs the required Python environment with Miniconda')
provision_parser.add_argument('-p', '--python-version', metavar='version', default=f"{sys.version_info.major}.{sys.version_info.minor}", help='Sets the Python version to provision')
args = parser.parse_args()

cluster_definition = yaml.safe_load(args.cluster)

if args.action == 'provision':
path = cluster_definition['global']['python_path']

conda_env_name = os.path.basename(os.path.dirname(os.path.dirname(path)))
miniconda_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(path))))

if 'envs' not in path or not path.endswith('bin/python') or miniconda_dir == '/':
print("Python path must be in the form */{miniconda_dir}/envs/{env_name}/bin/python", file=sys.stderr)
exit(-1)

for m in cluster_definition['machines'] + [cluster_definition['controller']]:
if m['hostname'] == 'localhost':
print("Provisioning local machines is not supported yet", file=sys.stderr)
continue
if execute(m, 'ls', cluster_definition['global']['python_path'], quiet=True).returncode == 0:
print(f"Path {cluster_definition['global']['python_path']} already exists on machine {m['hostname']}, skipping", file=sys.stderr)
continue
assert execute(m, 'wget', '--quiet', MINICONDA_INSTALL_LINK).returncode == 0, f"Couldn't download Miniconda installer on {m['hostname']}"
assert execute(m, 'bash', f"./{MINICONDA_INSTALLER}", '-b', '-u', '-p', miniconda_dir), f"Couldn't install Miniconda in {miniconda_dir}"
assert execute(m, f'{miniconda_dir}/bin/conda', 'create', '-n', conda_env_name, '-y', f'Python={args.python_version}', 'ipyparallel', 'numpy', 'scipy')
assert execute(m, 'rm', MINICONDA_INSTALLER).returncode == 0, f"Coudln't remove Miniconda installer on {m['hostname']}"

elif args.action == 'cleanup':
profile_dir = f"profile_{os.path.basename(args.cluster.name)}"
if not os.path.exists(profile_dir):
print(f"Couldn't locate profile directory {profile_dir} of cluster {args.cluster.name}, make sure it is in the current directory", file=sys.stderr)
exit(-1)

if not args.force:
subprocess.run(['ipcluster', 'stop', f'--profile-dir=./{profile_dir}/'])

for m in cluster_definition['machines'] + [cluster_definition['controller']]:
execute(m, 'pkill', '-f', f'"{cluster_definition['global']['python_path']} -m ipyparallel"')

for m in cluster_definition['machines']:
execute(m, 'rm', '-rf', REMOTE_PROFILE_DIR)

profile_dir = os.path.relpath(f"profile_{os.path.basename(args.cluster.name)}", start=os.path.expanduser('~'))
execute(cluster_definition['controller'], 'rm', '-rf', profile_dir)

subprocess.run(['rm', '-rf', profile_dir])
7 changes: 4 additions & 3 deletions mpf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
run_logger.addHandler(sh)

RESERVED_VARIABLES = {'mpf_ctx'}
REMOTE_PROFILE_DIR = '/tmp/mpf-ipy-profile'

setup_done: bool = False
variables: Dict[str, 'Variable'] = {}
Expand Down Expand Up @@ -192,9 +193,9 @@ def create_profile(profile_dir: str, cluster: dict):
ipcluster_config += """
c.Cluster.engine_launcher_class = 'ssh'
c.SSHEngineSetLauncher.engines = {engines}
c.SSHEngineSetLauncher.remote_profile_dir = '/tmp/mpf-ipy-profile'
c.SSHEngineSetLauncher.remote_profile_dir = '{remote_profile_dir}'
c.SSHLauncher.remote_python = '{python_path}'
""".format(engines=repr({e: 1 for e in engines}), python_path=cluster['global']['python_path'])
""".format(engines=repr({e: 1 for e in engines}), python_path=cluster['global']['python_path'], remote_profile_dir=REMOTE_PROFILE_DIR)

elif engines_launcher == 'local':
ipcluster_config += "c.Cluster.engine_launcher_class = 'local'\n"
Expand All @@ -211,7 +212,7 @@ def create_profile(profile_dir: str, cluster: dict):

if engines_launcher == 'ssh':
for e in engines:
p = subprocess.run(['rsync', '--mkpath', magics_profile_filename, f'{e}:/tmp/mpf-ipy-profile/startup/00-mpf_magics.ipy'])
p = subprocess.run(['rsync', '--mkpath', magics_profile_filename, f'{e}:{REMOTE_PROFILE_DIR}/startup/00-mpf_magics.ipy'])
assert p.returncode == 0

def add_variable(name: str, values):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
'scipy >= 1.11',
'tqdm'
],
scripts=['mpf-cluster'],
include_package_data=True,
)