Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions docs/guides/architecture.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Architecture

> **Audience**: Contributors adding new executors, and users who want to understand why something is failing or how to extend NeMo-Run.
>
> **Prerequisite**: Read [Execution](execution.md), at least one [executor guide](executors/index.md), and [Management](management.md) first.

## `run.run()` vs `run.Experiment`

`run.run()` is a thin convenience wrapper. Internally it creates an `Experiment` with a single task and `detach=False`:

```python
# These two are equivalent
run.run(task, executor=executor)

with run.Experiment("untitled") as exp:
exp.add(task, executor=executor)
exp.run(detach=False)
```

All the mechanics described below apply to both.

---

## Call chain

```{mermaid}
flowchart TD
A["exp.run()"] --> B["Experiment._prepare()"]
B --> C["Job.prepare()"]
C --> D["executor.assign(exp_id, exp_dir, task_id, task_dir)"]
C --> E["executor.create_job_dir()"]
C --> F["package(task, executor) → AppDef + Role(s)"]
A --> G["Job.launch(runner)"]
G --> H["runner.dryrun(AppDef, scheduler_name, cfg=executor)"]
H --> I["scheduler.submit_dryrun(AppDef, executor)"]
G --> J["runner.schedule(dryrun_info)"]
J --> K["scheduler.schedule(dryrun_info) → AppHandle"]
```

1. `_prepare()` calls `Job.prepare()` for each task, which assigns experiment/job directories, syncs code, and builds the TorchX `AppDef`.
2. `Job.launch(runner)` calls `runner.dryrun()` to validate the submission plan, then `runner.schedule()` to submit it.
3. The `AppHandle` returned by `scheduler.schedule()` is stored in the experiment metadata so `Experiment.from_id()` can reconnect.

---

## Executor → TorchX scheduler mapping

Each executor is backed by a TorchX scheduler registered as an entry point in `pyproject.toml` under `torchx.schedulers`:

| Executor | TorchX Scheduler |
|----------|-----------------|
| `LocalExecutor` | `local_persistent` |
| `DockerExecutor` | `docker_persistent` |
| `SlurmExecutor` | `slurm_tunnel` |
| `SkypilotExecutor` | `skypilot` |
| `SkypilotJobsExecutor` | `skypilot_jobs` |
| `DGXCloudExecutor` | `dgx_cloud` |
| `LeptonExecutor` | `lepton` |

Schedulers are discovered at runtime via `torchx.schedulers.get_scheduler_factories()`.

---

## Key TorchX types

| Type | What it represents |
|------|--------------------|
| `AppDef` | Full application: list of `Role`s + metadata |
| `Role` | One execution unit: entrypoint, args, env, image, num_replicas, resources |
| `AppDryRunInfo` | Validated `AppDef` + submission plan (can be inspected without running) |
| `AppHandle` | Running job ID: `"{scheduler}://{runner}/{app_id}"` |
| `AppState` | Status enum: `RUNNING`, `SUCCEEDED`, `FAILED`, `CANCELLED`, `UNKNOWN` |

---

## How `Executor` fields map to TorchX concepts

| Executor field | TorchX mapping |
|----------------|---------------|
| `nnodes()` + `nproc_per_node()` | `Role.num_replicas` + replica topology |
| `launcher` | `AppDef` structure (`torchrun` / `ft` / basic entrypoint) |
| `retries` | `Role.max_retries` |
| `env_vars` | `Role.env` |
| `packager` | Pre-launch code sync strategy |
| `assign(exp_id, exp_dir, task_id, task_dir)` | Sets path metadata consumed by the scheduler |

---

## Metadata storage layout

All experiment metadata is written under `NEMORUN_HOME` (default `~/.nemo_run`):

```
~/.nemo_run/experiments/{title}/{title}_{exp_id}/
├── {task_id}/
│ ├── configs/
│ │ ├── {task_id}_executor.yaml # serialised executor config
│ │ ├── {task_id}_fn_or_script # zlib-JSON encoded task
│ │ └── {task_id}_packager # zlib-JSON encoded packager
│ └── scripts/{task_id}.sh # generated sbatch/shell script
└── .tasks # serialised Job metadata (JSON)
```

`Experiment.from_id()` reads `.tasks` to reconstruct the experiment and reattach to live jobs via the stored `AppHandle`.

---

## Adding a new executor

1. **Subclass `Executor`** in `nemo_run/core/execution/`:

```python
from nemo_run.core.execution.base import Executor

@dataclass
class MyExecutor(Executor):
my_param: str = "default"
...
```

2. **Implement a TorchX `Scheduler`** in `nemo_run/run/torchx_backend/schedulers/`:

```python
from torchx.schedulers import Scheduler

class MyScheduler(Scheduler):
def submit_dryrun(self, app, cfg): ...
def schedule(self, dryrun_info): ...
def describe(self, app_id): ...
def cancel(self, app_id): ...
```

3. **Register the scheduler as an entry point** in `pyproject.toml`:

```toml
[project.entry-points."torchx.schedulers"]
my_scheduler = "nemo_run.run.torchx_backend.schedulers.my:create_scheduler"
```

4. **Add to `EXECUTOR_MAPPING`** in `nemo_run/run/torchx_backend/schedulers/api.py`:

```python
EXECUTOR_MAPPING = {
...,
MyExecutor: "my_scheduler",
}
```
220 changes: 11 additions & 209 deletions docs/guides/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,216 +122,18 @@ hybrid_packager = run.HybridPackager(

This would create an archive where the contents of `src` are under a `code/` directory and matched `configs/*.yaml` files are under a `configs/` directory.

### Defining Executors
### Executor guides

Next, We'll describe details on setting up each of the executors below.
For per-executor prerequisites, configuration reference, and end-to-end examples see the **[Executors](executors/index.md)** section:

#### LocalExecutor
- [LocalExecutor](executors/local.md)
- [DockerExecutor](executors/docker.md)
- [SlurmExecutor](executors/slurm.md)
- [SkypilotExecutor](executors/skypilot.md)
- [DGXCloudExecutor](executors/dgxcloud.md)
- [LeptonExecutor](executors/lepton.md)
- [KubeRayExecutor](executors/kuberay.md)

The LocalExecutor is the simplest executor. It executes your task locally in a separate process or group from your current working directory.
Defining executors in Python offers great flexibility — you can mix and match common environment variables, and the separation of tasks from executors lets you run the same `run.Script` on any supported backend.

The easiest way to define one is to call `run.LocalExecutor()`.

#### DockerExecutor

The DockerExecutor enables launching a task using `docker` on your local machine. It requires `docker` to be installed and running as a prerequisite.

The DockerExecutor uses the [docker python client](https://docker-py.readthedocs.io/en/stable/) and most of the options are passed directly to the client.

Below is an example of configuring a Docker Executor

```python
run.DockerExecutor(
container_image="python:3.12",
num_gpus=-1,
runtime="nvidia",
ipc_mode="host",
shm_size="30g",
volumes=["/local/path:/path/in/container"],
env_vars={"PYTHONUNBUFFERED": "1"},
packager=run.Packager(),
)
```

#### SlurmExecutor

The SlurmExecutor enables launching the configured task on a Slurm Cluster with Pyxis. Additionally, you can configure a `run.SSHTunnel`, which enables you to execute tasks on the Slurm cluster from your local machine while NeMo-Run manages the SSH connection for you. This setup supports use cases such as launching the same task on multiple Slurm clusters.

Below is an example of configuring a Slurm Executor

```python
def your_slurm_executor(nodes: int = 1, container_image: str = DEFAULT_IMAGE):
# SSH Tunnel
ssh_tunnel = run.SSHTunnel(
host="your-slurm-host",
user="your-user",
job_dir="directory-to-store-runs-on-the-slurm-cluster",
identity="optional-path-to-your-key-for-auth",
)
# Local Tunnel to use if you're already on the cluster
local_tunnel = run.LocalTunnel()

packager = GitArchivePackager(
# This will also be the working directory in your task.
# If empty, the working directory will be toplevel of your git repo
subpath="optional-subpath-from-toplevel-of-your-git-repo"
)

executor = run.SlurmExecutor(
# Most of these parameters are specific to slurm
account="your-account",
partition="your-partition",
ntasks_per_node=8,
gpus_per_node=8,
nodes=nodes,
tunnel=ssh_tunnel,
container_image=container_image,
time="00:30:00",
env_vars=common_envs(),
container_mounts=mounts_for_your_hubs(),
packager=packager,
)

# You can then call the executor in your script like
executor = your_slurm_cluster(nodes=8, container_image="your-nemo-image")
```

Use the SSH Tunnel when launching from your local machine, or the Local Tunnel if you're already on the Slurm cluster.

##### Job Dependencies

`SlurmExecutor` supports defining dependencies between [jobs](management.md#add-tasks), allowing you to create workflows where jobs run in a specific order. Additionally, you can specify the `dependency_type` parameter:

```python
executor = run.SlurmExecutor(
# ... other parameters ...
dependency_type="afterok",
)
```

The `dependency_type` parameter specifies the type of dependency relationship:

- `afterok` (default): Job will start only after the specified jobs have completed successfully
- `afterany`: Job will start after the specified jobs have terminated (regardless of exit code)
- `afternotok`: Job will start after the specified jobs have failed
- Other options are available as defined in the [Slurm documentation](https://slurm.schedmd.com/sbatch.html#OPT_dependency)

This functionality enables you to create complex workflows with proper orchestration between different tasks, such as starting a training job only after data preparation is complete, or running an evaluation only after training finishes successfully.

#### SkypilotExecutor

This executor is used to configure [Skypilot](https://skypilot.readthedocs.io/en/latest/docs/index.html). Make sure Skypilot is installed using `pip install "nemo_run[skypilot]"` and atleast one cloud is configured using `sky check`.

Here's an example of the `SkypilotExecutor` for Kubernetes:

```python
def your_skypilot_executor(nodes: int, devices: int, container_image: str):
return SkypilotExecutor(
gpus="RTX5880-ADA-GENERATION",
gpus_per_node=devices,
num_nodes=nodes,
env_vars=common_envs(),
container_image=container_image,
cloud="kubernetes",
# Optional to reuse Skypilot cluster
cluster_name="tester",
setup="""
conda deactivate
nvidia-smi
ls -al ./
""",
)

# You can then call the executor in your script like
executor = your_skypilot_cluster(nodes=8, devices=8, container_image="your-nemo-image")
```

As demonstrated in the examples, defining executors in Python offers great flexibility. You can easily mix and match things like common environment variables, and the separation of tasks from executors enables you to run the same configured task on any supported executor.

#### DGXCloudExecutor

The `DGXCloudExecutor` integrates with a DGX Cloud cluster's Run:ai API to launch distributed jobs. It uses REST API calls to authenticate, identify the target project and cluster, and submit the job specification.

```{warning}
Currently, the `DGXCloudExecutor` is only supported when launching experiments *from* a pod running on the DGX Cloud cluster itself. Furthermore, this launching pod must have access to a Persistent Volume Claim (PVC) where the experiment/job directories will be created, and this same PVC must also be configured to be mounted by the job being launched.
```

Here's an example configuration:

```python
def your_dgx_executor(nodes: int, gpus_per_node: int, container_image: str):
# Ensure these are set correctly for your DGX Cloud environment
# You might fetch these from environment variables or a config file
base_url = "YOUR_DGX_CLOUD_API_ENDPOINT" # e.g., https://<cluster-name>.<domain>/api/v1
app_id = "YOUR_RUNAI_APP_ID"
app_secret = "YOUR_RUNAI_APP_SECRET"
project_name = "YOUR_RUNAI_PROJECT_NAME"
# Define the PVC that will be mounted in the job pods
# Ensure the path specified here contains your NEMORUN_HOME
pvc_name = "your-pvc-k8s-name" # The Kubernetes name of the PVC
pvc_mount_path = "/your_custom_path" # The path where the PVC will be mounted inside the container

executor = run.DGXCloudExecutor(
base_url=base_url,
app_id=app_id,
app_secret=app_secret,
project_name=project_name,
container_image=container_image,
nodes=nodes,
gpus_per_node=gpus_per_node,
pvcs=[{"name": pvc_name, "path": pvc_mount_path}],
# Optional: Add custom environment variables or Slurm specs if needed
env_vars=common_envs(),
# packager=run.GitArchivePackager() # Choose appropriate packager
)
return executor

# Example usage:
# executor = your_dgx_executor(nodes=4, gpus_per_node=8, container_image="your-nemo-image")

```

For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDIA DGX Cloud NeMo End-to-End Workflow Example](https://docs.nvidia.com/dgx-cloud/run-ai/latest/nemo-e2e-example.html).

#### LeptonExecutor

The `LeptonExecutor` integrates with an NVIDIA DGX Cloud Lepton cluster's Python SDK to launch distributed jobs. It uses API calls behind the Lepton SDK to authenticate, identify the target node group and resource shapes, and submit the job specification which will be launched as a batch job on the cluster.

Here's an example configuration:

```python
def your_lepton_executor(nodes: int, gpus_per_node: int, container_image: str):
# Ensure these are set correctly for your DGX Cloud environment
# You might fetch these from environment variables or a config file
resource_shape = "gpu.8xh100-80gb" # Replace with your desired resource shape representing the number of GPUs in a pod
node_group = "my-node-group" # The node group to run the job in
nemo_run_dir = "/nemo-workspace/nemo-run" # The NeMo-Run directory where experiments are saved
# Define the remote storage directory that will be mounted in the job pods
# Ensure the path specified here contains your NEMORUN_HOME
storage_path = "/nemo-workspace" # The remote storage directory to mount in jobs
mount_path = "/nemo-workspace" # The path where the remote storage directory will be mounted inside the container

executor = run.LeptonExecutor(
resource_shape=resource_shape,
node_group=node_group,
container_image=container_image,
nodes=nodes,
nemo_run_dir=nemo_run_dir,
gpus_per_node=gpus_per_node,
mounts=[{"path": storage_path, "mount_path": mount_path}],
# Optional: Add custom environment variables or PyTorch specs if needed
env_vars=common_envs(),
# Optional: Specify a node reservation to schedule jobs with
# node_reservation="my-node-reservation",
# Optional: Specify commands to run at container launch prior to the job starting
# pre_launch_commands=["nvidia-smi"],
# Optional: Specify image pull secrets for authenticating with container registries
# image_pull_secrets=["my-image-pull-secret"],
# packager=run.GitArchivePackager() # Choose appropriate packager
)
return executor

# Example usage:
executor = your_lepton_executor(nodes=4, gpus_per_node=8, container_image="your-nemo-image")

```
For a deep dive into how executors map to TorchX schedulers and how `run.Experiment` orchestrates execution, see [Architecture](architecture.md).
Loading
Loading