Skip to content
86 changes: 86 additions & 0 deletions nemo_run/run/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,51 @@ def dryrun(self, log: bool = True, exist_ok: bool = False, delete_exp_dir: bool
if delete_exp_dir:
shutil.rmtree(self._exp_dir)

def export(self, output_dir: str, exist_ok: bool = False) -> None:
"""
Export runnable scripts for all tasks to output_dir without submitting.

Each task produces a script file in output_dir:
- SlurmExecutor → <task>_sbatch.sh (sbatch <file>)
- DGXCloudExecutor → <task>_torchrun_job.sh
- LocalExecutor → <task>.sh (bash <file>)
- DockerExecutor → <task>.yaml
- SkypilotExecutor → <task>.yaml (sky launch <file>)
- SkypilotJobsExecutor → <task>.yaml
- LeptonExecutor → <task>.sh

Also generates submit_all.sh.

Args:
output_dir: Directory to write into. Created if it doesn't exist.
exist_ok: Passed to mkdir.
"""
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=exist_ok)

self._prepare(exist_ok=True)

# Redirect all executors to write scripts into output_dir
for job in self.jobs:
if isinstance(job, JobGroup):
executors = (
[job.executors] if isinstance(job.executors, Executor) else job.executors
)
for executor in executors:
executor.experiment_dir = str(out)
else:
job.executor.experiment_dir = str(out)

# Run dryrun — each scheduler writes its script file to output_dir
for job in self.jobs:
job.launch(wait=False, runner=self._runner, dryrun=True, direct=False, log_dryrun=False)

# Generate submit_all.sh
_write_submit_script(out, self._title, self.jobs)

self.console.log(f"[bold green]Exported scripts to {out}")
shutil.rmtree(self._exp_dir)

def run(
self,
sequential: bool = False,
Expand Down Expand Up @@ -1332,6 +1377,47 @@ def _get_sorted_dirs(path: str) -> list[str]:
_LOADED_MAINS = set()


def _write_submit_script(out: Path, title: str, jobs: list) -> None:
_SUBMIT_CMDS = {
"SlurmExecutor": "sbatch",
"SkypilotExecutor": "sky launch",
"SkypilotJobsExecutor": "sky jobs launch",
"DockerExecutor": "docker compose -f",
}

lines = [
"#!/bin/bash",
f"# Submit all jobs for experiment: {title}",
"# Generated by NeMo Run",
"",
'SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"',
"",
]

for job in jobs:
if isinstance(job, JobGroup):
executors = [job.executors] if isinstance(job.executors, Executor) else job.executors
executor_type = type(executors[0]).__name__
cmd = _SUBMIT_CMDS.get(executor_type, "bash")
scripts = sorted(out.glob(f"{job.id}*"))
for s in scripts:
if s.name == "submit_all.sh":
continue
lines.append(f'{cmd} "$SCRIPT_DIR/{s.name}"')
else:
executor_type = type(job.executor).__name__
cmd = _SUBMIT_CMDS.get(executor_type, "bash")
scripts = sorted(out.glob(f"{job.id}*"))
for s in scripts:
if s.name == "submit_all.sh":
continue
lines.append(f'{cmd} "$SCRIPT_DIR/{s.name}"')

submit = out / "submit_all.sh"
submit.write_text("\n".join(lines) + "\n")
submit.chmod(0o700)


def maybe_load_external_main(exp_dir: str):
main_file = Path(exp_dir) / "__main__.py"
if main_file.exists() and main_file not in _LOADED_MAINS:
Expand Down
10 changes: 5 additions & 5 deletions nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,11 @@ def _submit_dryrun( # type: ignore
)

# Write and copy sbatch script
path = os.path.join(executor.experiment_dir, "torchrun_job.sh")
script = req.materialize()

with open(path, "w") as f:
f.write(script)
if executor.experiment_dir:
path = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh")
with open(path, "w") as f:
f.write(script)

return AppDryRunInfo(
DGXRequest(app=app, executor=executor, cmd=cmd, name=role.name),
Expand All @@ -145,7 +145,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str:

# The DGXExecutor's launch call typically returns (job_id, handle).
# We'll call it without additional parameters here.
cmd = os.path.join(executor.experiment_dir, "torchrun_job.sh")
cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh")
req.launch_cmd = ["bash", cmd]
job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd)
if not job_id:
Expand Down
6 changes: 6 additions & 0 deletions nemo_run/run/torchx_backend/schedulers/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobR
basename = Path(executor.job_dir).name
app_id = make_unique(basename)
req = DockerJobRequest(id=app_id, executor=executor, containers=containers)

if executor.experiment_dir:
path = os.path.join(executor.experiment_dir, f"{executor.job_name}.yaml")
with open(path, "w") as f:
f.write(str(req))

return AppDryRunInfo(req, repr)

def schedule(self, dryrun_info: AppDryRunInfo[DockerJobRequest]) -> str: # type: ignore
Expand Down
14 changes: 14 additions & 0 deletions nemo_run/run/torchx_backend/schedulers/lepton.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import logging
import os
import shlex
import shutil
import tempfile
from dataclasses import dataclass
Expand Down Expand Up @@ -98,6 +99,19 @@ def _submit_dryrun( # type: ignore
role = values.apply(role)

cmd = [role.entrypoint] + role.args

if executor.experiment_dir:
path = os.path.join(executor.experiment_dir, f"{executor.job_name}.sh")
lines = ["#!/bin/bash", "# Generated by NeMo Run", "# Submit via Lepton AI API", ""]
for key, val in executor.env_vars.items():
lines.append(f"export {key}={shlex.quote(str(val))}")
for key, val in role.env.items():
lines.append(f"export {key}={shlex.quote(str(val))}")
lines.append(" ".join(shlex.quote(p) for p in cmd))
with open(path, "w") as f:
f.write("\n".join(lines) + "\n")
os.chmod(path, 0o700)

return AppDryRunInfo(
LeptonRequest(app=app, executor=executor, cmd=cmd, name=role.name),
# Minimal function to show the config, if any
Expand Down
16 changes: 16 additions & 0 deletions nemo_run/run/torchx_backend/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import os
import pprint
import shlex
import shutil
import tempfile
import warnings
Expand Down Expand Up @@ -100,6 +101,21 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[PopenReque
cfg_dict = asdict(cfg)
cfg_dict["log_dir"] = cfg_dict.pop("job_dir")
request = self._to_popen_request(app, cfg_dict) # type: ignore

if cfg.experiment_dir:
path = os.path.join(cfg.experiment_dir, f"{app.name}.sh")
lines = ["#!/bin/bash", "# Generated by NeMo Run", ""]
for key, val in cfg.env_vars.items():
lines.append(f"export {key}={shlex.quote(str(val))}")
for role in app.roles:
for key, val in role.env.items():
lines.append(f"export {key}={shlex.quote(str(val))}")
cmd_parts = [role.entrypoint] + role.args
lines.append(" ".join(shlex.quote(p) for p in cmd_parts))
with open(path, "w") as f:
f.write("\n".join(lines) + "\n")
os.chmod(path, 0o700)

return AppDryRunInfo(request, lambda p: pprint.pformat(asdict(p), indent=2, width=80))

def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str:
Expand Down
6 changes: 6 additions & 0 deletions nemo_run/run/torchx_backend/schedulers/skypilot.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ def _submit_dryrun( # type: ignore
task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env)

req = SkypilotRequest(task=task, executor=cfg)

if cfg.experiment_dir:
path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml")
with open(path, "w") as f:
f.write(common_utils.dump_yaml_str(req.task.to_yaml_config()))

return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config()))

def _validate(self, app: AppDef, scheduler: str) -> None:
Expand Down
6 changes: 6 additions & 0 deletions nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ def _submit_dryrun( # type: ignore
task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env)

req = SkypilotJobsRequest(task=task, executor=cfg)

if cfg.experiment_dir:
path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml")
with open(path, "w") as f:
f.write(common_utils.dump_yaml_str(req.task.to_yaml_config()))

return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config()))

def _validate(self, app: AppDef, scheduler: str) -> None:
Expand Down
92 changes: 92 additions & 0 deletions test/run/test_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -1511,3 +1511,95 @@ def to_config(self):
# Should pull tunnel and connect
exp._initialize_tunnels(extract_from_executors=True)
assert "t1" in exp.tunnels


def test_experiment_export_local(temp_dir):
"""export() with LocalExecutor writes a .sh script and submit_all.sh."""
output_dir = os.path.join(temp_dir, "exported")
with Experiment("test-exp") as exp:
task = run.Partial(dummy_function, x=1, y=2)
exp.add(task, name="hello-job")
exp.export(output_dir)

files = os.listdir(output_dir)
# At least one .sh script for the job and submit_all.sh
sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"]
assert len(sh_scripts) >= 1, f"Expected a .sh script, got: {files}"
assert "submit_all.sh" in files

# submit_all.sh must be executable and contain "bash"
submit_path = os.path.join(output_dir, "submit_all.sh")
assert os.access(submit_path, os.X_OK)
content = Path(submit_path).read_text()
assert "bash" in content

# The job script must be executable
job_script_path = os.path.join(output_dir, sh_scripts[0])
assert os.access(job_script_path, os.X_OK)

# The exp_dir should have been cleaned up
assert not os.path.exists(exp._exp_dir)


def test_experiment_export_creates_output_dir(temp_dir):
"""export() creates the output directory if it does not exist."""
output_dir = os.path.join(temp_dir, "new_dir", "nested")
with Experiment("test-exp") as exp:
task = run.Partial(dummy_function, x=1, y=2)
exp.add(task, name="nested-job")
exp.export(output_dir)

assert os.path.isdir(output_dir)
assert "submit_all.sh" in os.listdir(output_dir)


def test_experiment_export_exist_ok(temp_dir):
"""export() with exist_ok=True does not raise if output_dir already exists."""
output_dir = os.path.join(temp_dir, "exists")
os.makedirs(output_dir)
with Experiment("test-exp") as exp:
task = run.Partial(dummy_function, x=1, y=2)
exp.add(task, name="job")
exp.export(output_dir, exist_ok=True)

assert "submit_all.sh" in os.listdir(output_dir)


def test_experiment_export_multiple_jobs(temp_dir):
"""export() produces one script per job and all are referenced in submit_all.sh."""
output_dir = os.path.join(temp_dir, "multi")
with Experiment("test-exp") as exp:
exp.add(run.Partial(dummy_function, x=1, y=2), name="job-a")
exp.add(run.Partial(dummy_function, x=3, y=4), name="job-b")
exp.export(output_dir)

files = os.listdir(output_dir)
sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"]
assert len(sh_scripts) == 2, f"Expected 2 scripts, got: {sh_scripts}"

submit_content = Path(os.path.join(output_dir, "submit_all.sh")).read_text()
for script in sh_scripts:
assert script in submit_content, f"{script} not referenced in submit_all.sh"


def test_experiment_export_job_group(temp_dir):
"""export() with a JobGroup redirects all executors and writes scripts."""
output_dir = os.path.join(temp_dir, "group_export")

with patch(
"nemo_run.run.job.JobGroup.SUPPORTED_EXECUTORS", new_callable=PropertyMock
) as mock_supported:
mock_supported.return_value = {LocalExecutor}

with Experiment("test-exp") as exp:
from typing import Sequence

tasks: Sequence[run.Partial] = [
run.Partial(dummy_function, x=1, y=2),
run.Partial(dummy_function, x=3, y=4),
]
exp.add(tasks, name="group-job") # type: ignore
exp.export(output_dir)

files = os.listdir(output_dir)
assert "submit_all.sh" in files
11 changes: 11 additions & 0 deletions test/run/torchx_backend/schedulers/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import tempfile
from unittest import mock
from unittest.mock import MagicMock
Expand Down Expand Up @@ -71,6 +72,16 @@ def test_submit_dryrun(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor):
assert dryrun_info.request is not None


def test_submit_dryrun_writes_script(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor):
with tempfile.TemporaryDirectory() as exp_dir:
dgx_cloud_executor.job_name = "test-job"
dgx_cloud_executor.experiment_dir = exp_dir
with mock.patch.object(DGXCloudExecutor, "package"):
dgx_cloud_scheduler._submit_dryrun(mock_app_def, dgx_cloud_executor)
script = os.path.join(exp_dir, "test-job_torchrun_job.sh")
assert os.path.isfile(script)


def test_dgx_cloud_scheduler_methods(dgx_cloud_scheduler):
# Test that basic methods exist
assert hasattr(dgx_cloud_scheduler, "_submit_dryrun")
Expand Down
10 changes: 10 additions & 0 deletions test/run/torchx_backend/schedulers/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ def test_submit_dryrun(docker_scheduler, mock_app_def, docker_executor):
assert dryrun_info.request is not None


def test_submit_dryrun_writes_yaml(docker_scheduler, mock_app_def, docker_executor):
with tempfile.TemporaryDirectory() as exp_dir:
docker_executor.job_name = "test-job"
docker_executor.experiment_dir = exp_dir
with mock.patch.object(DockerExecutor, "package"):
docker_scheduler._submit_dryrun(mock_app_def, docker_executor)
yaml_file = os.path.join(exp_dir, "test-job.yaml")
assert os.path.isfile(yaml_file)


def test_check_docker_version_success():
with mock.patch("subprocess.check_output") as mock_check_output:
mock_check_output.return_value = b"Docker version 20.10.0, build abcdef\n"
Expand Down
Loading
Loading