diff --git a/doc/workloads/index.rst b/doc/workloads/index.rst index ffb074c6e..44338a933 100644 --- a/doc/workloads/index.rst +++ b/doc/workloads/index.rst @@ -23,6 +23,7 @@ Available Workloads ":doc:`nemo_launcher`", "✅", "❌", "❌", "❌" ":doc:`nemo_run`", "✅", "❌", "❌", "❌" ":doc:`nixl_bench`", "✅", "❌", "❌", "❌" + ":doc:`nixl_ep`", "✅", "❌", "❌", "❌" ":doc:`nixl_kvbench`", "✅", "❌", "❌", "❌" ":doc:`nixl_perftest`", "✅", "❌", "❌", "❌" ":doc:`sleep`", "✅", "✅", "❌", "✅" diff --git a/doc/workloads/nixl_ep.rst b/doc/workloads/nixl_ep.rst new file mode 100644 index 000000000..162c3a06b --- /dev/null +++ b/doc/workloads/nixl_ep.rst @@ -0,0 +1,106 @@ +NIXL EP +======= + +This workload (``test_template_name`` is ``NixlEP``) runs the NIXL Elastic EP benchmark through a Slurm-managed multi-node elastic launcher flow. + +Overview +-------- + +The Slurm launch model is: + +- one ``elastic.py`` process per node, started in sequence as the plan progresses +- the master node starts first and exposes a TCPStore for rank coordination +- follower nodes connect via ``--tcp-server $master_ip`` once the master is ready +- the benchmark runtime comes from the container image +- each run serializes its plan JSON into the output directory + +Plan Format +----------- + +The ``plan`` field is a JSON-encoded list of phases. Each phase is a list of rank indices passed directly to the benchmark. CloudAI uses the following convention to drive the elastic launcher: + +- **Positive rank index** — the rank is active. A rank that is new relative to the previous phase causes CloudAI to fire an additional ``srun`` for that worker. +- **Negative rank index** (e.g. ``-6``) — signals a contraction: the benchmark sees the absolute value and treats it as temporarily removed. No new ``srun`` is launched for negative indices. +- **Omitted rank** — a rank present in an earlier phase but absent from the current phase list is not relaunched. The benchmark's own phase logic handles its inactivity. + +Example: + +.. code-block:: text + + [[0, 1, 2, 3], # phase 0: ranks 0–3 start + [0, 1, 2, 3, 4, 5, 6, 7], # phase 1: ranks 4–7 join (expansion) + [0, 1, 2, 3, 4, -6, 7], # phase 2: rank 6 contracted (no new launch) + [0, 1, 2, 3, 4, 5, 6, 7]] # phase 3: rank 6 rejoins (new launch for rank 6) + +Phase completion is detected by polling the primary log for ``-> end phase N`` markers. + +Usage Examples +-------------- + +Test TOML example: + +.. code-block:: toml + + name = "nixl-ep-expansion-contraction" + description = "NIXL Elastic EP expansion/contraction benchmark" + test_template_name = "NixlEP" + + [cmd_args] + docker_image_url = "" + elastic_script = "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py" + plan = "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]" + num_processes_per_node = 4 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + +Test-in-Scenario example: + +.. code-block:: toml + + name = "nixl-ep-expansion-contraction" + + [[Tests]] + id = "nixl_ep.expansion_contraction" + num_nodes = 3 + time_limit = "00:30:00" + + name = "nixl-ep-expansion-contraction" + description = "NIXL Elastic EP expansion/contraction benchmark" + test_template_name = "NixlEP" + + [Tests.cmd_args] + docker_image_url = "" + elastic_script = "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py" + plan = "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]" + num_processes_per_node = 4 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + +Reporting +--------- + +After a run completes, CloudAI prints a single table with one row per (node, rank) measurement. The ``Phases`` column shows each phase index colour-coded green (passed) or red (failed). Bandwidth columns report dispatch+combine throughput and timing per rank. + +The reported metric (``default``) is the mean dispatch+combine bandwidth in GB/s across all ranks. + +API Documentation +----------------- + +Command Arguments +~~~~~~~~~~~~~~~~~ + +.. autopydantic_model:: cloudai.workloads.nixl_ep.nixl_ep.NixlEPCmdArgs + :members: + +Test Definition +~~~~~~~~~~~~~~~ + +.. autoclass:: cloudai.workloads.nixl_ep.nixl_ep.NixlEPTestDefinition + :members: + :show-inheritance: diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 866baa945..29fe809a5 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -130,6 +130,11 @@ def register_all(): NIXLBenchSlurmCommandGenStrategy, NIXLBenchTestDefinition, ) + from cloudai.workloads.nixl_ep import ( + NixlEPReportGenerationStrategy, + NixlEPSlurmCommandGenStrategy, + NixlEPTestDefinition, + ) from cloudai.workloads.nixl_kvbench import NIXLKVBenchSlurmCommandGenStrategy, NIXLKVBenchTestDefinition from cloudai.workloads.nixl_perftest import ( NIXLKVBenchDummyReport, @@ -210,6 +215,7 @@ def register_all(): Registry().add_command_gen_strategy(SlurmSystem, NeMoLauncherTestDefinition, NeMoLauncherSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, NeMoRunTestDefinition, NeMoRunSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, NIXLBenchTestDefinition, NIXLBenchSlurmCommandGenStrategy) + Registry().add_command_gen_strategy(SlurmSystem, NixlEPTestDefinition, NixlEPSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, GPTTestDefinition, JaxToolboxSlurmCommandGenStrategy) Registry().add_command_gen_strategy(SlurmSystem, GrokTestDefinition, JaxToolboxSlurmCommandGenStrategy) @@ -260,6 +266,7 @@ def register_all(): Registry().add_test_definition("MegatronBridge", MegatronBridgeTestDefinition) Registry().add_test_definition("TritonInference", TritonInferenceTestDefinition) Registry().add_test_definition("NIXLBench", NIXLBenchTestDefinition) + Registry().add_test_definition("NixlEP", NixlEPTestDefinition) Registry().add_test_definition("AIDynamo", AIDynamoTestDefinition) Registry().add_test_definition("BashCmd", BashCmdTestDefinition) Registry().add_test_definition("NixlPerftest", NixlPerftestTestDefinition) @@ -286,6 +293,7 @@ def register_all(): Registry().add_report(UCCTestDefinition, UCCTestReportGenerationStrategy) Registry().add_report(TritonInferenceTestDefinition, TritonInferenceReportGenerationStrategy) Registry().add_report(NIXLBenchTestDefinition, NIXLBenchReportGenerationStrategy) + Registry().add_report(NixlEPTestDefinition, NixlEPReportGenerationStrategy) Registry().add_report(AIDynamoTestDefinition, AIDynamoReportGenerationStrategy) Registry().add_report(AiconfiguratorTestDefinition, AiconfiguratorReportGenerationStrategy) Registry().add_report(NixlPerftestTestDefinition, NIXLKVBenchDummyReport) diff --git a/src/cloudai/workloads/nixl_ep/__init__.py b/src/cloudai/workloads/nixl_ep/__init__.py new file mode 100644 index 000000000..f13454e26 --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/__init__.py @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .nixl_ep import NixlEPCmdArgs, NixlEPTestDefinition +from .report_generation_strategy import NixlEPReportGenerationStrategy +from .slurm_command_gen_strategy import NixlEPSlurmCommandGenStrategy + +__all__ = [ + "NixlEPCmdArgs", + "NixlEPReportGenerationStrategy", + "NixlEPSlurmCommandGenStrategy", + "NixlEPTestDefinition", +] diff --git a/src/cloudai/workloads/nixl_ep/log_parsing.py b/src/cloudai/workloads/nixl_ep/log_parsing.py new file mode 100644 index 000000000..f64ff5b8a --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/log_parsing.py @@ -0,0 +1,90 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import re +from dataclasses import dataclass +from pathlib import Path + +_FLOAT_RE = r"[-+]?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?" +_END_PHASE_RE = re.compile(r"-> end phase (\d+)") +_COMBINED_BW_RE = re.compile( + rf"\[rank (?P\d+)\] Dispatch \+ combine bandwidth: " + rf"(?P{_FLOAT_RE}) GB/s, " + rf"avg_t=(?P{_FLOAT_RE}) us, " + rf"min_t=(?P{_FLOAT_RE}) us, " + rf"max_t=(?P{_FLOAT_RE}) us" +) +_KINETO_BW_RE = re.compile( + rf"\[rank (?P\d+)\] Dispatch bandwidth: " + rf"(?P{_FLOAT_RE}) GB/s \| " + rf"Combine bandwidth: (?P{_FLOAT_RE}) GB/s" +) + + +@dataclass(frozen=True) +class NixlEPBandwidthSample: + """A parsed bandwidth sample emitted by the NIXL EP benchmark logs.""" + + rank: int + dispatch_combine_bandwidth_gbps: float | None = None + avg_time_us: float | None = None + min_time_us: float | None = None + max_time_us: float | None = None + dispatch_bandwidth_gbps: float | None = None + combine_bandwidth_gbps: float | None = None + + +def parse_nixl_ep_completed_phases(path: Path) -> set[int]: + """Return the set of phase indices that completed according to the log at path.""" + if not path.is_file(): + return set() + completed: set[int] = set() + for line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): + if match := _END_PHASE_RE.search(line): + completed.add(int(match.group(1))) + return completed + + +def parse_nixl_ep_bandwidth_samples(path: Path) -> list[NixlEPBandwidthSample]: + if not path.is_file(): + return [] + + samples: list[NixlEPBandwidthSample] = [] + for line in path.read_text(encoding="utf-8", errors="ignore").splitlines(): + if match := _COMBINED_BW_RE.search(line): + samples.append( + NixlEPBandwidthSample( + rank=int(match.group("rank")), + dispatch_combine_bandwidth_gbps=float(match.group("bandwidth")), + avg_time_us=float(match.group("avg_time")), + min_time_us=float(match.group("min_time")), + max_time_us=float(match.group("max_time")), + ) + ) + continue + + if match := _KINETO_BW_RE.search(line): + samples.append( + NixlEPBandwidthSample( + rank=int(match.group("rank")), + dispatch_bandwidth_gbps=float(match.group("dispatch_bandwidth")), + combine_bandwidth_gbps=float(match.group("combine_bandwidth")), + ) + ) + + return samples diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep.py b/src/cloudai/workloads/nixl_ep/nixl_ep.py new file mode 100644 index 000000000..a99020a9e --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/nixl_ep.py @@ -0,0 +1,228 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +from pathlib import Path, PurePosixPath +from typing import ClassVar, Optional + +from pydantic import Field, field_validator + +from cloudai.core import DockerImage, Installable, JobStatusResult, TestRun +from cloudai.models.workload import CmdArgs, TestDefinition + +from .log_parsing import parse_nixl_ep_bandwidth_samples + +GENERATED_PLAN_FILE_NAME = "nixl-ep-plan.json" + + +class NixlEPCmdArgs(CmdArgs): + """Command line arguments for the NIXL Elastic EP benchmark.""" + + docker_image_url: str = Field(description="URL of the Docker image that contains the NIXL EP benchmark.") + elastic_script: str = Field( + default="/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + description=( + "Path to the benchmark entrypoint, relative to the container's NIXL runtime root " + "or absolute in the container." + ), + ) + python_executable: str = Field(default="python3", description="Python executable to use inside the container.") + plan: str | list[str] = Field( + description=( + "Serialized phase plan to write into a per-run JSON file. " + 'Use a single string such as "[[0, 1], [0, 1, 2, 3]]" for a single run, ' + "or a list of such strings to enable DSE mode (one run per plan)." + ), + ) + num_processes_per_node: int | list[int] = Field( + description="Number of local worker processes to spawn on each allocated node.", + ) + service_startup_timeout_seconds: int = Field( + default=60, + ge=1, + description="Seconds to wait for the master node's TCPStore to accept connections.", + ) + store_port: int = Field(default=9999, ge=1, le=65535, description="TCPStore port used by the benchmark.") + + @field_validator("num_processes_per_node", mode="after") + @classmethod + def validate_num_processes_per_node(cls, value: int | list[int]) -> int | list[int]: + values = value if isinstance(value, list) else [value] + if any(item < 1 for item in values): + raise ValueError("num_processes_per_node must contain only positive integers") + return value + + @field_validator("plan", mode="after") + @classmethod + def validate_plan(cls, value: str | list[str]) -> str | list[str]: + if isinstance(value, list): + if not value: + raise ValueError("plan list must not be empty.") + for item in value: + stripped = item.strip() + if not stripped: + raise ValueError("plan list must not contain empty strings.") + cls._parse_plan(stripped) + return value + value = value.strip() + if not value: + raise ValueError("plan must not be empty.") + cls._parse_plan(value) + return value + + @staticmethod + def _parse_plan(plan: str) -> list[list[int]]: + try: + parsed = json.loads(plan) + except json.JSONDecodeError as exc: + raise ValueError(f"plan must be valid JSON: {exc}") from exc + + if not isinstance(parsed, list) or not parsed: + raise ValueError("plan must decode to a non-empty list of phases.") + + for phase in parsed: + if not isinstance(phase, list) or not phase: + raise ValueError("Each plan phase must be a non-empty list of ranks.") + if any(type(rank) is not int for rank in phase): + raise ValueError("Each plan rank must be an integer.") + + return parsed + + def parse_plan(self) -> list[list[int]]: + if not isinstance(self.plan, str): + raise ValueError("parse_plan() requires cmd_args.plan to be a serialized string.") + return self._parse_plan(self.plan) + + +class NixlEPTestDefinition(TestDefinition): + """Test definition for the NIXL Elastic EP benchmark.""" + + container_runtime_root: ClassVar[str] = "/workspace/nixl" + cmd_args: NixlEPCmdArgs + _docker_image: Optional[DockerImage] = None + + @property + def docker_image(self) -> DockerImage: + if not self._docker_image: + self._docker_image = DockerImage(url=self.cmd_args.docker_image_url) + return self._docker_image + + @property + def installables(self) -> list[Installable]: + return [self.docker_image, *self.git_repos] + + @property + def container_runtime_root_path(self) -> PurePosixPath: + return PurePosixPath(self.container_runtime_root) + + @staticmethod + def _tail(path: Path, num_lines: int = 40) -> str: + lines = path.read_text(encoding="utf-8", errors="ignore").splitlines() + return "\n".join(lines[-num_lines:]) + + @staticmethod + def _primary_launch_exit_error_message(content: str) -> str | None: + match = re.search(r"Primary NIXL EP launch exited before phase (\d+) completed", content) + if match is None: + return None + + phase = int(match.group(1)) + if phase == 0: + return ( + "The initial NIXL EP launch exited before phase 0 completed, so later stage launches never " + "started and some node logs may be absent." + ) + + return f"The primary NIXL EP launch exited before phase {phase} completed." + + def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: + if not path.is_file(): + return None + + launcher_failure_patterns = ( + ("python3: can't open file", "The benchmark entrypoint could not be opened."), + ("Traceback (most recent call last):", "The benchmark launcher raised a Python traceback."), + ("Timed out waiting for NIXL EP master services", "The master services never became ready."), + ("no plan phases were found for rank", "A worker was launched for a rank that never appears in the plan."), + ("recvValueWithTimeout failed", "A worker lost its TCPStore connection before the benchmark completed."), + ("timed out after 300000ms", "A worker timed out waiting on the TCPStore."), + ("Failed to prepare remote memory view", "NIXL EP failed to initialize its UCX remote memory view."), + ("srun: error:", "Slurm reported an srun failure."), + ("Exited with exit code", "A Slurm step exited with a non-zero status."), + ) + content = path.read_text(encoding="utf-8", errors="ignore") + primary_launch_error = self._primary_launch_exit_error_message(content) + if primary_launch_error is not None: + tail = self._tail(path) + error_message = f"{primary_launch_error} See {path}." + if tail: + error_message += f"\n{tail}" + return JobStatusResult(is_successful=False, error_message=error_message) + + for pattern, description in launcher_failure_patterns: + if pattern in content: + tail = self._tail(path) + error_message = f"{description} See {path}." + if tail: + error_message += f"\n{tail}" + return JobStatusResult(is_successful=False, error_message=error_message) + + return None + + def _check_benchmark_output(self, expected_node_logs: list[Path]) -> JobStatusResult | None: + missing_summaries = [path.name for path in expected_node_logs if not parse_nixl_ep_bandwidth_samples(path)] + if not missing_summaries: + return None + + first_log = expected_node_logs[0] + tail = self._tail(first_log) + error_message = ( + "NIXL EP completed at the Slurm level, but benchmark summary lines were missing from " + f"{', '.join(missing_summaries)}. " + "Expected lines such as '[rank N] Dispatch + combine bandwidth: ...'." + ) + if tail: + error_message += f"\n{tail}" + + return JobStatusResult(is_successful=False, error_message=error_message) + + def was_run_successful(self, tr: TestRun) -> JobStatusResult: + output_path = tr.output_path + expected_node_logs = [tr.output_path / f"nixl-ep-node-{node_idx}.log" for node_idx in range(tr.nnodes)] + + for path in [*expected_node_logs, output_path / "stdout.txt", output_path / "stderr.txt"]: + result = self._scan_log_for_failures(path) + if result is not None: + return result + + missing_node_logs = [path.name for path in expected_node_logs if not path.is_file()] + if missing_node_logs: + existing_node_logs = sorted(path.name for path in output_path.glob("nixl-ep-node-*.log")) + existing_logs_str = ", ".join(existing_node_logs) if existing_node_logs else "none" + return JobStatusResult( + is_successful=False, + error_message=( + f"Expected NIXL EP node logs not found in {output_path}: {', '.join(missing_node_logs)}. " + f"Existing node logs: {existing_logs_str}." + ), + ) + + benchmark_output_result = self._check_benchmark_output(expected_node_logs) + if benchmark_output_result is not None: + return benchmark_output_result + + return JobStatusResult(is_successful=True) diff --git a/src/cloudai/workloads/nixl_ep/report_generation_strategy.py b/src/cloudai/workloads/nixl_ep/report_generation_strategy.py new file mode 100644 index 000000000..112742913 --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/report_generation_strategy.py @@ -0,0 +1,157 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import json +from pathlib import Path +from typing import ClassVar + +from rich.console import Console +from rich.table import Table + +from cloudai.core import METRIC_ERROR, ReportGenerationStrategy + +from .log_parsing import parse_nixl_ep_bandwidth_samples, parse_nixl_ep_completed_phases +from .nixl_ep import GENERATED_PLAN_FILE_NAME + + +class NixlEPReportGenerationStrategy(ReportGenerationStrategy): + """Strategy for generating reports from NIXL EP benchmark directories.""" + + metrics: ClassVar[list[str]] = ["default"] + + def _node_logs(self) -> list[Path]: + return [self.test_run.output_path / f"nixl-ep-node-{i}.log" for i in range(self.test_run.nnodes)] + + def can_handle_directory(self) -> bool: + return any(parse_nixl_ep_bandwidth_samples(path) for path in self._node_logs()) + + def _load_plan(self) -> list[list[int]]: + plan_path = self.test_run.output_path / GENERATED_PLAN_FILE_NAME + if not plan_path.is_file(): + return [] + try: + plan = json.loads(plan_path.read_text(encoding="utf-8")) + return plan if isinstance(plan, list) else [] + except (json.JSONDecodeError, OSError): + return [] + + @staticmethod + def _mean(vals: list[float]) -> float | None: + return sum(vals) / len(vals) if vals else None + + @staticmethod + def _fmt(v: float | None) -> str: + return f"{v:.2f}" if v is not None else "—" + + def _phase_cell(self, plan: list[list[int]], completed: set[int]) -> str: + if not plan: + return "—" + parts = [] + for p, ranks in enumerate(plan): + label = str(ranks) + parts.append(f"[green]{label}[/green]" if p in completed else f"[red]{label}[/red]") + return "\n".join(parts) + + def _build_table( + self, + title: str, + plan: list[list[int]], + node_logs: list[Path], + completed_by_node: dict[int, set[int]], + samples_by_node: dict[int, list], + has_combined: bool, + has_kineto: bool, + ) -> Table: + table = Table(title=title, show_lines=True) + table.add_column("Node", justify="right") + table.add_column("Phases", justify="left") + if has_combined: + table.add_column("Dispatch+Combine BW (GB/s)", justify="right") + table.add_column("Avg (µs)", justify="right") + table.add_column("Min (µs)", justify="right") + table.add_column("Max (µs)", justify="right") + if has_kineto: + table.add_column("Dispatch BW (GB/s)", justify="right") + table.add_column("Combine BW (GB/s)", justify="right") + + for node_idx in range(len(node_logs)): + completed = completed_by_node.get(node_idx, set()) + samples = samples_by_node.get(node_idx, []) + row = [str(node_idx), self._phase_cell(plan, completed)] + if has_combined: + row += [ + self._fmt( + self._mean( + [ + s.dispatch_combine_bandwidth_gbps + for s in samples + if s.dispatch_combine_bandwidth_gbps is not None + ] + ) + ), + self._fmt(self._mean([s.avg_time_us for s in samples if s.avg_time_us is not None])), + self._fmt(self._mean([s.min_time_us for s in samples if s.min_time_us is not None])), + self._fmt(self._mean([s.max_time_us for s in samples if s.max_time_us is not None])), + ] + if has_kineto: + row += [ + self._fmt( + self._mean( + [s.dispatch_bandwidth_gbps for s in samples if s.dispatch_bandwidth_gbps is not None] + ) + ), + self._fmt( + self._mean([s.combine_bandwidth_gbps for s in samples if s.combine_bandwidth_gbps is not None]) + ), + ] + table.add_row(*row) + return table + + def generate_report(self) -> None: + console = Console() + node_logs = self._node_logs() + plan = self._load_plan() + num_phases = len(plan) + + if not node_logs: + console.print("[yellow]NIXL EP: no node logs found[/yellow]") + return + + completed_by_node = {i: parse_nixl_ep_completed_phases(log) for i, log in enumerate(node_logs)} + samples_by_node = {i: parse_nixl_ep_bandwidth_samples(log) for i, log in enumerate(node_logs)} + + has_combined = any(s.dispatch_combine_bandwidth_gbps is not None for ss in samples_by_node.values() for s in ss) + has_kineto = any(s.dispatch_bandwidth_gbps is not None for ss in samples_by_node.values() for s in ss) + + passed = sum(1 for p in range(num_phases) if p in completed_by_node.get(0, set())) + phases_summary = f"{passed}/{num_phases} phases passed" if num_phases else "" + title = f"NIXL EP — {self.test_run.name}" + (f" — {phases_summary}" if phases_summary else "") + + table = self._build_table(title, plan, node_logs, completed_by_node, samples_by_node, has_combined, has_kineto) + console.print(table) + + def get_metric(self, metric: str) -> float: + if metric not in self.metrics: + return METRIC_ERROR + samples = [s for path in self._node_logs() for s in parse_nixl_ep_bandwidth_samples(path)] + bw_values = [ + s.dispatch_combine_bandwidth_gbps for s in samples if s.dispatch_combine_bandwidth_gbps is not None + ] + if not bw_values: + return METRIC_ERROR + return sum(bw_values) / len(bw_values) diff --git a/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py new file mode 100644 index 000000000..bea7e7dd0 --- /dev/null +++ b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py @@ -0,0 +1,401 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import shlex +from dataclasses import dataclass +from pathlib import Path +from typing import cast + +from cloudai.systems.slurm import SlurmCommandGenStrategy +from cloudai.util import parse_time_limit + +from .nixl_ep import GENERATED_PLAN_FILE_NAME, NixlEPCmdArgs, NixlEPTestDefinition + + +@dataclass(frozen=True) +class NixlEPLaunch: + """One concrete worker launch on a specific node.""" + + node_idx: int + num_processes: int + include_tcp_server: bool + append_output: bool = False + + +@dataclass(frozen=True) +class NixlEPStage: + """Launches that should be appended when a given plan phase introduces new ranks.""" + + idx: int + launches: tuple[NixlEPLaunch, ...] + + +class NixlEPSlurmCommandGenStrategy(SlurmCommandGenStrategy): + """Command generation strategy for the NIXL Elastic EP benchmark.""" + + @property + def tdef(self) -> NixlEPTestDefinition: + return cast(NixlEPTestDefinition, self.test_run.test) + + def image_path(self) -> str | None: + return str(self.tdef.docker_image.installed_path) + + def _container_mounts(self) -> list[str]: + return [] + + @property + def num_processes_per_node(self) -> int: + num_processes_per_node = self.tdef.cmd_args.num_processes_per_node + if not isinstance(num_processes_per_node, int): + raise ValueError("NIXL EP Slurm command generation requires num_processes_per_node to be an integer.") + return num_processes_per_node + + def _append_sbatch_directives(self, batch_script_content: list[str]) -> None: + super()._append_sbatch_directives(batch_script_content) + batch_script_content.extend( + [ + "", + "nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )", + "nodes_array=($nodes)", + "master_node=${nodes_array[0]}", + "master_ip=$(srun --nodes=1 --ntasks=1 -w \"$master_node\" hostname --ip-address | awk '{print $1}')", + "", + "echo Nodes: $SLURM_JOB_NODELIST", + "echo Num Nodes: ${#nodes[@]}", + "echo Master Node: $master_node", + "echo Master IP: $master_ip", + "", + ] + ) + + @property + def env_vars_path(self) -> Path: + return self.test_run.output_path / "env_vars.sh" + + def node_log_path(self, node_idx: int) -> Path: + return self.test_run.output_path / f"nixl-ep-node-{node_idx}.log" + + def resolve_plan_path(self) -> str: + return str((self.test_run.output_path / GENERATED_PLAN_FILE_NAME).absolute()) + + def _write_plan_file(self) -> None: + plan_path = self.test_run.output_path / GENERATED_PLAN_FILE_NAME + plan_path.parent.mkdir(parents=True, exist_ok=True) + plan_path.write_text(json.dumps(self.tdef.cmd_args.parse_plan(), indent=2) + "\n", encoding="utf-8") + + @property + def phase_transition_timeout_seconds(self) -> int: + phase_timeout = 600 + if self.test_run.time_limit: + phase_timeout = max(int(parse_time_limit(self.test_run.time_limit).total_seconds()), 1) + num_plan_phases = max(len(self.tdef.cmd_args.parse_plan()), 1) + return max(phase_timeout // num_plan_phases, 1) + + def _new_process_counts_by_phase(self) -> list[int]: + counts: list[int] = [] + previous_positive_ranks: set[int] = set() + for positive_ranks in [{rank for rank in phase if rank >= 0} for phase in self.tdef.cmd_args.parse_plan()]: + counts.append(len(positive_ranks - previous_positive_ranks)) + previous_positive_ranks = positive_ranks + + self._validate_requested_processes(counts) + return counts + + def _validate_requested_processes(self, new_process_counts: list[int]) -> None: + total_requested_processes = sum(new_process_counts) + num_nodes, _ = self.get_cached_nodes_spec() + if num_nodes == 1: + if self.num_processes_per_node != total_requested_processes: + raise ValueError( + "For single-node NIXL EP runs, num_processes_per_node must match the plan-derived " + f"total launched workers ({total_requested_processes}), got {self.num_processes_per_node}." + ) + return + + total_capacity = self.num_processes_per_node * num_nodes + if total_requested_processes > total_capacity: + raise ValueError( + "For multi-node NIXL EP runs, num_processes_per_node defines the maximum number of workers " + f"each node can launch across all plan phases. The plan requires {total_requested_processes} total " + f"workers, but {num_nodes} nodes with capacity {self.num_processes_per_node} only provide " + f"{total_capacity}." + ) + + def _allocate_stage_launches( + self, phase_idx: int, new_process_count: int, remaining_capacity: list[int] + ) -> tuple[NixlEPLaunch, ...]: + if new_process_count == 0: + return () + + launches: list[NixlEPLaunch] = [] + remaining_phase_processes = new_process_count + is_initial_phase = phase_idx == 0 + for node_idx, node_capacity in enumerate(remaining_capacity): + if remaining_phase_processes == 0: + break + + assignable = min(node_capacity, remaining_phase_processes) + if assignable == 0: + continue + + remaining_capacity[node_idx] -= assignable + remaining_phase_processes -= assignable + launches.append( + NixlEPLaunch( + node_idx=node_idx, + num_processes=assignable, + include_tcp_server=(not is_initial_phase) or node_idx != 0, + append_output=not is_initial_phase, + ) + ) + + if remaining_phase_processes != 0: + num_nodes, _ = self.get_cached_nodes_spec() + raise ValueError( + "For multi-node NIXL EP runs, the plan-derived launches cannot be packed onto " + f"{num_nodes} nodes with per-node capacity {self.num_processes_per_node}. " + f"Remaining phase size: {remaining_phase_processes}." + ) + + return tuple(launches) + + @property + def plan_stages(self) -> tuple[NixlEPStage, ...]: + new_process_counts = self._new_process_counts_by_phase() + + num_nodes, _ = self.get_cached_nodes_spec() + remaining_capacity = [self.num_processes_per_node] * num_nodes + stages: list[NixlEPStage] = [] + for phase_idx, new_process_count in enumerate(new_process_counts): + launches = self._allocate_stage_launches(phase_idx, new_process_count, remaining_capacity) + stages.append(NixlEPStage(idx=phase_idx, launches=launches)) + + return tuple(stages) + + def _build_benchmark_command(self, launch: NixlEPLaunch) -> list[str]: + cmd_args: NixlEPCmdArgs = self.tdef.cmd_args + command = [ + cmd_args.python_executable, + cmd_args.elastic_script, + "--plan", + self.resolve_plan_path(), + "--num-processes", + str(launch.num_processes), + ] + + if launch.include_tcp_server: + command.extend(["--tcp-server", "$master_ip"]) + for arg, value in sorted((cmd_args.model_extra or {}).items()): + flag = "--" + arg.replace("_", "-") + if isinstance(value, bool): + if value: + command.append(flag) + elif value is not None: + command.extend([flag, str(value)]) + + return command + + def generate_wait_for_master_services_function(self) -> str: + # The upstream rank server assigns a rank on any successful TCP + # connection, so the readiness probe must not touch that port. + return f"""\ +wait_for_master_services() {{ + local timeout={self.tdef.cmd_args.service_startup_timeout_seconds} + local interval=1 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if timeout 1 bash -c ": > /dev/tcp/$master_ip/{self.tdef.cmd_args.store_port}" >/dev/null 2>&1; then + echo "NIXL EP master services are ready on $master_ip" + return 0 + fi + sleep "$interval" + done + + echo "Timed out waiting for NIXL EP master services on $master_ip" + return 1 +}}""" + + def _launch_srun_prefix(self, node_idx: int) -> str: + target_arg = "--nodelist=$SLURM_JOB_MASTER_NODE" if node_idx == 0 else f"--relative={node_idx}" + parts = [ + *self.gen_srun_prefix(with_num_nodes=False), + "--overlap", + target_arg, + "--ntasks-per-node=1", + "--ntasks=1", + "-N1", + ] + return " ".join(parts) + + def _render_launch(self, launch: NixlEPLaunch) -> str: + command = " ".join(self._build_benchmark_command(launch)) + env_file = self.env_vars_path.absolute() + log_file = self.node_log_path(launch.node_idx).absolute() + open_mode_arg = " --open-mode=append" if launch.append_output else "" + script = f"source {shlex.quote(str(env_file))}; {command}".replace('"', '\\"') + return f'{self._launch_srun_prefix(launch.node_idx)}{open_mode_arg} --output={log_file} bash -c "{script}"' + + def generate_wait_for_phase_completion_function(self) -> str: + timeout = self.phase_transition_timeout_seconds + return f"""\ +wait_for_phase_completion() {{ + local phase="$1" + local log_file="$2" + local primary_pid="$3" + local timeout={timeout} + local interval=1 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if [ -f "$log_file" ] && grep -Fq -- "-> end phase $phase" "$log_file"; then + echo "Detected completion of phase $phase in $log_file" + return 0 + fi + if [ -f "$log_file" ] && grep -Fq -- "no plan phases were found for rank" "$log_file"; then + echo "Detected an early NIXL EP failure while waiting for phase $phase" + return 1 + fi + if ! kill -0 "$primary_pid" >/dev/null 2>&1; then + echo "Primary NIXL EP launch exited before phase $phase completed" + return 1 + fi + sleep "$interval" + done + + echo "Timed out waiting for phase $phase to complete" + return 1 +}}""" + + def _write_env_vars_file(self) -> None: + self.test_run.output_path.mkdir(parents=True, exist_ok=True) + with self.env_vars_path.open("w", encoding="utf-8") as env_file: + for key, value in self.final_env_vars.items(): + env_file.write(f"export {key}={value}\n") + + def _background_launches_lines(self, launches: tuple[NixlEPLaunch, ...]) -> list[str]: + lines: list[str] = [] + for launch in launches: + lines.extend([self._render_launch(launch) + " &", "worker_pids+=($!)"]) + return lines + + @staticmethod + def _finish_with_rc_lines() -> list[str]: + return [ + 'if [ "$rc" -eq 0 ]; then', + ' echo "All NIXL EP launches completed successfully"', + "fi", + "", + "exit $rc", + ] + + @classmethod + def _wait_for_workers_lines(cls) -> list[str]: + return [ + "", + "rc=0", + 'for pid in "${worker_pids[@]}"; do', + ' wait "$pid" || rc=$?', + "done", + "", + *cls._finish_with_rc_lines(), + ] + + @staticmethod + def _has_follower_launches(stages: list[NixlEPStage]) -> bool: + return any(launch.node_idx != 0 for element in stages for launch in element.launches) + + def _render_single_stage(self, stage: NixlEPStage) -> str: + return "\n".join( + [ + 'echo "Starting NIXL EP on the master node..."', + self._render_launch(stage.launches[0]), + "rc=$?", + *self._finish_with_rc_lines(), + ] + ) + + def _plan_helper_function_lines(self, has_followers: bool, has_multiple_stages: bool) -> list[str]: + master_service_lines = [self.generate_wait_for_master_services_function(), ""] if has_followers else [] + phase_wait_lines = [self.generate_wait_for_phase_completion_function(), ""] if has_multiple_stages else [] + return [*master_service_lines, *phase_wait_lines] + + @staticmethod + def _wait_for_master_services_lines() -> list[str]: + return [ + "", + 'echo "Waiting for NIXL EP master services..."', + "wait_for_master_services || exit 1", + ] + + def _initial_follower_launch_lines(self, stage: NixlEPStage) -> list[str]: + if len(stage.launches) == 1: + return [] + return [ + "", + f'echo "Starting the rest of initial phase {stage.idx}..."', + *self._background_launches_lines(stage.launches[1:]), + ] + + def _initial_stage_lines(self, stage: NixlEPStage, has_followers: bool) -> list[str]: + primary_launch = stage.launches[0] + master_service_lines = self._wait_for_master_services_lines() if has_followers else [] + header_lines = [ + "worker_pids=()", + "", + 'echo "Starting initial NIXL EP stage on the master node..."', + self._render_launch(primary_launch) + " &", + "primary_pid=$!", + "worker_pids+=($primary_pid)", + ] + return header_lines + master_service_lines + self._initial_follower_launch_lines(stage) + + def _followup_stage_lines(self, stage: NixlEPStage) -> list[str]: + wait_phase = stage.idx - 1 + header_lines = [ + "", + f'echo "Waiting for phase {wait_phase} before starting phase {stage.idx}..."', + f'wait_for_phase_completion "{wait_phase}" "{self.node_log_path(0).absolute()}" "$primary_pid" || exit 1', + "", + f'echo "Starting launches for phase {stage.idx}..."', + ] + return header_lines + self._background_launches_lines(stage.launches) + + def _gen_srun_command(self) -> str: + self._write_env_vars_file() + self._write_plan_file() + + stages = [stage for stage in self.plan_stages if stage.launches] + if not stages: + raise ValueError("NIXL EP plan does not launch any non-negative ranks.") + + first_stage = stages[0] + if len(stages) == 1 and len(first_stage.launches) == 1: + return self._render_single_stage(first_stage) + + has_followers = self._has_follower_launches(stages) + lines = self._plan_helper_function_lines( + has_followers=has_followers, + has_multiple_stages=len(stages) > 1, + ) + lines += self._initial_stage_lines(first_stage, has_followers=has_followers) + + for stage in stages[1:]: + lines += self._followup_stage_lines(stage) + + lines += self._wait_for_workers_lines() + return "\n".join(lines) diff --git a/tests/ref_data/nixl-ep.sbatch b/tests/ref_data/nixl-ep.sbatch new file mode 100644 index 000000000..6ddf36627 --- /dev/null +++ b/tests/ref_data/nixl-ep.sbatch @@ -0,0 +1,106 @@ +#!/bin/bash +# generated by CloudAI@__CLOUDAI_VERSION__ +#SBATCH --job-name=__JOB_NAME__ +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main +#SBATCH -N 3 +#SBATCH --gpus-per-node=8 +#SBATCH --gres=gpu:8 + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) + +nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) ) +nodes_array=($nodes) +master_node=${nodes_array[0]} +master_ip=$(srun --nodes=1 --ntasks=1 -w "$master_node" hostname --ip-address | awk '{print $1}') + +echo Nodes: $SLURM_JOB_NODELIST +echo Num Nodes: ${#nodes[@]} +echo Master Node: $master_node +echo Master IP: $master_ip + +export LD_LIBRARY_PATH=/workspace/rdma_core/lib:$LD_LIBRARY_PATH +srun --export=ALL --mpi=pmix -N3 --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix -N3 --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --ntasks=3 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +wait_for_master_services() { + local timeout=90 + local interval=1 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if timeout 1 bash -c ": > /dev/tcp/$master_ip/9999" >/dev/null 2>&1; then + echo "NIXL EP master services are ready on $master_ip" + return 0 + fi + sleep "$interval" + done + + echo "Timed out waiting for NIXL EP master services on $master_ip" + return 1 +} + +wait_for_phase_completion() { + local phase="$1" + local log_file="$2" + local primary_pid="$3" + local timeout=150 + local interval=1 + local end_time=$(($(date +%s) + timeout)) + + while [ "$(date +%s)" -lt "$end_time" ]; do + if [ -f "$log_file" ] && grep -Fq -- "-> end phase $phase" "$log_file"; then + echo "Detected completion of phase $phase in $log_file" + return 0 + fi + if [ -f "$log_file" ] && grep -Fq -- "no plan phases were found for rank" "$log_file"; then + echo "Detected an early NIXL EP failure while waiting for phase $phase" + return 1 + fi + if ! kill -0 "$primary_pid" >/dev/null 2>&1; then + echo "Primary NIXL EP launch exited before phase $phase completed" + return 1 + fi + sleep "$interval" + done + + echo "Timed out waiting for phase $phase to complete" + return 1 +} + +worker_pids=() + +echo "Starting initial NIXL EP stage on the master node..." +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --nodelist=$SLURM_JOB_MASTER_NODE --ntasks-per-node=1 --ntasks=1 -N1 --output=__OUTPUT_DIR__/output/nixl-ep-node-0.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +primary_pid=$! +worker_pids+=($primary_pid) + +echo "Waiting for NIXL EP master services..." +wait_for_master_services || exit 1 + +echo "Waiting for phase 0 before starting phase 1..." +wait_for_phase_completion "0" "__OUTPUT_DIR__/output/nixl-ep-node-0.log" "$primary_pid" || exit 1 + +echo "Starting launches for phase 1..." +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --relative=1 --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-1.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +worker_pids+=($!) + +echo "Waiting for phase 2 before starting phase 3..." +wait_for_phase_completion "2" "__OUTPUT_DIR__/output/nixl-ep-node-0.log" "$primary_pid" || exit 1 + +echo "Starting launches for phase 3..." +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --relative=2 --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-2.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 2 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +worker_pids+=($!) + +rc=0 +for pid in "${worker_pids[@]}"; do + wait "$pid" || rc=$? +done + +if [ "$rc" -eq 0 ]; then + echo "All NIXL EP launches completed successfully" +fi + +exit $rc diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index ec1b38399..e62f173fe 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -15,6 +15,7 @@ # limitations under the License. import argparse +import json from functools import partial from importlib.metadata import version from pathlib import Path @@ -66,6 +67,7 @@ ) from cloudai.workloads.nemo_run import NeMoRunCmdArgs, NeMoRunTestDefinition from cloudai.workloads.nixl_bench import NIXLBenchCmdArgs, NIXLBenchTestDefinition +from cloudai.workloads.nixl_ep import NixlEPCmdArgs, NixlEPTestDefinition from cloudai.workloads.nixl_kvbench import NIXLKVBenchCmdArgs, NIXLKVBenchTestDefinition from cloudai.workloads.nixl_perftest import NixlPerftestCmdArgs, NixlPerftestTestDefinition from cloudai.workloads.osu_bench import OSUBenchCmdArgs, OSUBenchTestDefinition @@ -266,6 +268,7 @@ def build_special_test_run( "megatron-bridge", "triton-inference", "nixl_bench", + "nixl-ep", "ai-dynamo", "nixl-perftest", "nixl-kvbench", @@ -398,6 +401,34 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), ), ), + "nixl-ep": lambda: create_test_run( + partial_tr, + "nixl-ep", + NixlEPTestDefinition( + name="nixl-ep", + description="nixl-ep", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs.model_validate( + { + "docker_image_url": "docker.io/nvidia/nixl-ep:latest", + "elastic_script": "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + "plan": json.dumps( + [[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]] + ), + "num_processes_per_node": 4, + "service_startup_timeout_seconds": 90, + "store_port": 9999, + "num_tokens": 256, + "num_experts_per_rank": 4, + "hidden_dim": 8192, + "num_topk": 6, + "disable_ll_nvlink": True, + "kineto": True, + } + ), + extra_env_vars={"LD_LIBRARY_PATH": "/workspace/rdma_core/lib:$LD_LIBRARY_PATH"}, + ), + ), "nixl_bench": lambda: create_test_run( partial_tr, "nixl_bench", @@ -658,6 +689,8 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - tr.test.extra_env_vars["NIM_CACHE_PATH"] = str(tr.output_path) if request.param in {"nixl_bench", "nixl-kvbench"}: tr.num_nodes = 2 + if request.param == "nixl-ep": + tr.num_nodes = 3 if request.param == "ai-dynamo": tr.num_nodes = 2 if request.param == "deepep-benchmark": diff --git a/tests/test_init.py b/tests/test_init.py index db998c0e1..e3509545e 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -76,6 +76,7 @@ NIXLBenchSlurmCommandGenStrategy, NIXLBenchTestDefinition, ) +from cloudai.workloads.nixl_ep import NixlEPSlurmCommandGenStrategy, NixlEPTestDefinition from cloudai.workloads.nixl_kvbench import NIXLKVBenchSlurmCommandGenStrategy, NIXLKVBenchTestDefinition from cloudai.workloads.nixl_perftest import NixlPerftestSlurmCommandGenStrategy, NixlPerftestTestDefinition from cloudai.workloads.osu_bench import ( @@ -147,6 +148,7 @@ def test_runners(): (SlurmSystem, NIXLBenchTestDefinition): NIXLBenchSlurmCommandGenStrategy, (SlurmSystem, AIDynamoTestDefinition): AIDynamoSlurmCommandGenStrategy, (SlurmSystem, BashCmdTestDefinition): BashCmdCommandGenStrategy, + (SlurmSystem, NixlEPTestDefinition): NixlEPSlurmCommandGenStrategy, (SlurmSystem, NixlPerftestTestDefinition): NixlPerftestSlurmCommandGenStrategy, (SlurmSystem, NIXLKVBenchTestDefinition): NIXLKVBenchSlurmCommandGenStrategy, (SlurmSystem, OSUBenchTestDefinition): OSUBenchSlurmCommandGenStrategy, @@ -225,7 +227,7 @@ def test_installers(): def test_definitions(): test_defs = Registry().test_definitions_map - assert len(test_defs) == 24 + assert len(test_defs) == 25 for tdef in [ ("UCCTest", UCCTestDefinition), ("DDLBTest", DDLBTestDefinition), @@ -245,6 +247,7 @@ def test_definitions(): ("NIXLBench", NIXLBenchTestDefinition), ("AIDynamo", AIDynamoTestDefinition), ("BashCmd", BashCmdTestDefinition), + ("NixlEP", NixlEPTestDefinition), ("NixlPerftest", NixlPerftestTestDefinition), ("NIXLKVBench", NIXLKVBenchTestDefinition), ("Aiconfigurator", AiconfiguratorTestDefinition), diff --git a/tests/test_test_scenario.py b/tests/test_test_scenario.py index 335d721fc..a1ba44dce 100644 --- a/tests/test_test_scenario.py +++ b/tests/test_test_scenario.py @@ -70,6 +70,7 @@ NeMoRunTestDefinition, ) from cloudai.workloads.nixl_bench import NIXLBenchReportGenerationStrategy, NIXLBenchTestDefinition +from cloudai.workloads.nixl_ep import NixlEPReportGenerationStrategy, NixlEPTestDefinition from cloudai.workloads.nixl_perftest import NIXLKVBenchDummyReport, NixlPerftestTestDefinition from cloudai.workloads.osu_bench import OSUBenchReportGenerationStrategy, OSUBenchTestDefinition from cloudai.workloads.sglang import SGLangBenchReportGenerationStrategy, SglangTestDefinition @@ -623,7 +624,7 @@ def test_default(self): assert len(reporters) == 0 def test_default_reporters_size(self): - assert len(Registry().reports_map) == 19 + assert len(Registry().reports_map) == 20 @pytest.mark.parametrize( "tdef,expected_reporters", @@ -644,6 +645,7 @@ def test_default_reporters_size(self): (UCCTestDefinition, {UCCTestReportGenerationStrategy}), (TritonInferenceTestDefinition, {TritonInferenceReportGenerationStrategy}), (NIXLBenchTestDefinition, {NIXLBenchReportGenerationStrategy}), + (NixlEPTestDefinition, {NixlEPReportGenerationStrategy}), (OSUBenchTestDefinition, {OSUBenchReportGenerationStrategy}), (SglangTestDefinition, {SGLangBenchReportGenerationStrategy}), (AIDynamoTestDefinition, {AIDynamoReportGenerationStrategy}), diff --git a/tests/workloads/nixl_ep/__init__.py b/tests/workloads/nixl_ep/__init__.py new file mode 100644 index 000000000..3ac11851d --- /dev/null +++ b/tests/workloads/nixl_ep/__init__.py @@ -0,0 +1,15 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py new file mode 100644 index 000000000..a3722ed9d --- /dev/null +++ b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py @@ -0,0 +1,816 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import re +from importlib.metadata import version +from pathlib import Path + +import pytest + +from cloudai.core import TestRun +from cloudai.systems.slurm import SlurmSystem +from cloudai.workloads.nixl_ep import ( + NixlEPCmdArgs, + NixlEPSlurmCommandGenStrategy, + NixlEPTestDefinition, +) +from cloudai.workloads.nixl_ep.nixl_ep import GENERATED_PLAN_FILE_NAME +from cloudai.workloads.nixl_ep.slurm_command_gen_strategy import NixlEPLaunch + +EXPANSION_CONTRACTION_PLAN = [ + [0, 1, 2, 3], + [0, 1, 2, 3, 4, 5, 6, 7], + [0, 1, 2, 3, 4, -6, 7], + [0, 1, 2, 3, 4, 5, 6, 7], +] +EXPANSION_CONTRACTION_PLAN_STR = json.dumps(EXPANSION_CONTRACTION_PLAN) +DOUBLE_EXPANSION_PLAN = [ + [0, 1, 2, 3], + [0, 1, 2, 3, 4, 5], + [0, 1, 2, 3, 4, 5, 6, 7], +] +DOUBLE_EXPANSION_PLAN_STR = json.dumps(DOUBLE_EXPANSION_PLAN) +SINGLE_EXPANSION_PLAN = [ + [0, 1, 2, 3], + [0, 1, 2, 3, 4, 5, 6, 7], +] +SINGLE_EXPANSION_PLAN_STR = json.dumps(SINGLE_EXPANSION_PLAN) +SINGLE_RANK_PLAN_STR = json.dumps([[0]]) + + +def make_cmd_args(**overrides: object) -> NixlEPCmdArgs: + payload = { + "docker_image_url": "docker.io/nvidia/nixl-ep:latest", + "elastic_script": "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + "plan": EXPANSION_CONTRACTION_PLAN_STR, + "num_processes_per_node": 4, + "service_startup_timeout_seconds": 90, + "store_port": 9999, + "num_tokens": 256, + "num_experts_per_rank": 4, + "hidden_dim": 8192, + "num_topk": 6, + "disable_ll_nvlink": True, + "kineto": True, + } + payload.update(overrides) + return NixlEPCmdArgs.model_validate(payload) + + +def replace_cmd_args(cmd_args: NixlEPCmdArgs, **overrides: object) -> NixlEPCmdArgs: + payload = cmd_args.model_dump() + payload.update(cmd_args.model_extra or {}) + payload.update(overrides) + return NixlEPCmdArgs.model_validate(payload) + + +@pytest.fixture +def nixl_ep() -> NixlEPTestDefinition: + return NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=make_cmd_args(), + extra_env_vars={ + "LD_LIBRARY_PATH": "/workspace/rdma_core/lib:$LD_LIBRARY_PATH", + }, + ) + + +@pytest.fixture +def nixl_ep_tr(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> TestRun: + return TestRun( + name="nixl-ep", + num_nodes=3, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + + +def normalize_sbatch(content: str, test_run: TestRun, slurm_system: SlurmSystem) -> str: + normalized = content.replace(str(slurm_system.install_path.absolute()), "__INSTALL_DIR__").replace( + str(test_run.output_path.parent.absolute()), "__OUTPUT_DIR__" + ) + normalized = re.sub( + r"^#SBATCH --job-name=.*$", + "#SBATCH --job-name=__JOB_NAME__", + normalized, + flags=re.MULTILINE, + ) + return normalized.replace(version("cloudai"), "__CLOUDAI_VERSION__") + + +def significant_sbatch_lines(content: str) -> list[str]: + return [line for line in content.splitlines() if line.strip() and not line.lstrip().startswith("echo ")] + + +def normalize_stages(strategy: NixlEPSlurmCommandGenStrategy) -> list[tuple[int, tuple[int, ...]]]: + num_nodes, _ = strategy.get_cached_nodes_spec() + normalized_stages: list[tuple[int, tuple[int, ...]]] = [] + for stage in strategy.plan_stages: + per_node_processes = [0] * num_nodes + for launch in stage.launches: + per_node_processes[launch.node_idx] = launch.num_processes + normalized_stages.append((stage.idx, tuple(per_node_processes))) + return normalized_stages + + +def test_num_processes_per_node_returns_integer( + nixl_ep: NixlEPTestDefinition, + slurm_system: SlurmSystem, +) -> None: + nixl_ep.cmd_args.num_processes_per_node = 5 + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert strategy.num_processes_per_node == 5 + + +def test_missing_plan_is_rejected() -> None: + with pytest.raises(ValueError, match="Field required"): + NixlEPCmdArgs.model_validate( + { + "docker_image_url": "docker.io/nvidia/nixl-ep:latest", + "num_processes_per_node": 4, + } + ) + + +def test_plan_accepts_single_string_list() -> None: + cmd_args = NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=[EXPANSION_CONTRACTION_PLAN_STR], + num_processes_per_node=4, + ) + + assert cmd_args.plan == [EXPANSION_CONTRACTION_PLAN_STR] + + +def test_plan_accepts_multiple_strings_for_dse() -> None: + cmd_args = NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=[EXPANSION_CONTRACTION_PLAN_STR, DOUBLE_EXPANSION_PLAN_STR], + num_processes_per_node=4, + ) + + assert cmd_args.plan == [EXPANSION_CONTRACTION_PLAN_STR, DOUBLE_EXPANSION_PLAN_STR] + + +def test_plan_rejects_empty_list() -> None: + with pytest.raises(ValueError, match="plan list must not be empty"): + NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=[], + num_processes_per_node=4, + ) + + +def test_plan_rejects_list_with_empty_string() -> None: + with pytest.raises(ValueError, match="plan list must not contain empty strings"): + NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=[EXPANSION_CONTRACTION_PLAN_STR, " "], + num_processes_per_node=4, + ) + + +def test_plan_rejects_list_with_invalid_json() -> None: + with pytest.raises(ValueError, match="plan must be valid JSON"): + NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=[EXPANSION_CONTRACTION_PLAN_STR, "not-json"], + num_processes_per_node=4, + ) + + +def test_plan_rejects_invalid_json() -> None: + with pytest.raises(ValueError, match="plan must be valid JSON"): + NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan="not-json", + num_processes_per_node=4, + ) + + +def test_plan_rejects_non_integer_ranks() -> None: + with pytest.raises(ValueError, match="Each plan rank must be an integer"): + NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan='[[0, "1"]]', + num_processes_per_node=2, + ) + + +def test_num_processes_per_node_rejects_list(nixl_ep_tr: TestRun, slurm_system: SlurmSystem) -> None: + nixl_ep_tr.test.cmd_args.num_processes_per_node = [4, 4, 2] + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, nixl_ep_tr) + + with pytest.raises(ValueError, match="requires num_processes_per_node to be an integer"): + _ = strategy.num_processes_per_node + + +def test_build_benchmark_command(nixl_ep_tr: TestRun, slurm_system: SlurmSystem) -> None: + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, nixl_ep_tr) + + master_cmd = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=4, include_tcp_server=False)) + follower_cmd = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=2, include_tcp_server=True)) + generated_plan_path = nixl_ep_tr.output_path / GENERATED_PLAN_FILE_NAME + + assert master_cmd == [ + "python3", + "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + "--plan", + str(generated_plan_path.absolute()), + "--num-processes", + "4", + "--disable-ll-nvlink", + "--hidden-dim", + "8192", + "--kineto", + "--num-experts-per-rank", + "4", + "--num-tokens", + "256", + "--num-topk", + "6", + ] + assert "--tcp-server" not in master_cmd + assert follower_cmd[6:10] == [ + "--tcp-server", + "$master_ip", + "--disable-ll-nvlink", + "--hidden-dim", + ] + assert "--service-startup-timeout-seconds" not in follower_cmd + strategy._write_plan_file() + assert json.loads(generated_plan_path.read_text(encoding="utf-8")) == EXPANSION_CONTRACTION_PLAN + + +def test_build_benchmark_command_always_uses_generated_plan_json( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=EXPANSION_CONTRACTION_PLAN_STR, + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + command = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=4, include_tcp_server=False)) + + assert command[1] == "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py" + assert command[3] == str((test_run.output_path / GENERATED_PLAN_FILE_NAME).absolute()) + strategy._write_plan_file() + assert json.loads((test_run.output_path / GENERATED_PLAN_FILE_NAME).read_text(encoding="utf-8")) == ( + EXPANSION_CONTRACTION_PLAN + ) + + +def test_relative_elastic_script_is_resolved_under_container_runtime_root() -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + elastic_script="examples/device/ep/tests/elastic/elastic.py", + plan=EXPANSION_CONTRACTION_PLAN_STR, + num_processes_per_node=4, + ), + ) + + assert tdef.installables == [tdef.docker_image] + + +def test_build_benchmark_command_passes_through_relative_elastic_script_path( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + elastic_script="examples/device/ep/tests/elastic/elastic.py", + plan=EXPANSION_CONTRACTION_PLAN_STR, + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + command = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=4, include_tcp_server=False)) + + assert command[1] == "examples/device/ep/tests/elastic/elastic.py" + + +def test_build_benchmark_command_omits_disable_ll_nvlink_by_default( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=DOUBLE_EXPANSION_PLAN_STR, + num_processes_per_node=8, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + command = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=4, include_tcp_server=False)) + + assert "--disable-ll-nvlink" not in command + + +def test_build_benchmark_command_passes_through_extra_flags( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs.model_validate( + { + "docker_image_url": "docker.io/nvidia/nixl-ep:latest", + "plan": DOUBLE_EXPANSION_PLAN_STR, + "num_processes_per_node": 8, + "service_startup_timeout_seconds": 90, + "store_port": 9999, + "dry_run": True, + "custom_arg": "value", + "ignored_arg": None, + } + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + command = strategy._build_benchmark_command(NixlEPLaunch(node_idx=0, num_processes=4, include_tcp_server=True)) + + assert "--dry-run" in command + assert "--custom-arg" in command + assert "value" in command + assert "--service-startup-timeout-seconds" not in command + assert "--store-port" not in command + assert "--ignored-arg" not in command + + +def test_wait_for_master_services_only_probes_tcpstore( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + wait_function = strategy.generate_wait_for_master_services_function() + + assert f"/dev/tcp/$master_ip/{nixl_ep.cmd_args.store_port}" in wait_function + assert "/dev/tcp/$master_ip/10000" not in wait_function + + +def test_phase_transition_timeout_divides_job_timeout_by_plan_phases( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + nixl_ep.cmd_args.plan = SINGLE_EXPANSION_PLAN_STR + nixl_ep.cmd_args.num_processes_per_node = 8 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + time_limit="00:10:00", + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert strategy.phase_transition_timeout_seconds == 300 + assert "local timeout=300" in strategy.generate_wait_for_phase_completion_function() + + +def test_phase_transition_timeout_divides_default_budget_without_job_timeout( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert strategy.phase_transition_timeout_seconds == 150 + assert "local timeout=150" in strategy.generate_wait_for_phase_completion_function() + + +def test_gen_srun_command_single_node(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: + nixl_ep.cmd_args.num_processes_per_node = 10 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert "wait_for_master_services" not in srun_command + assert "wait_for_phase_completion()" in srun_command + assert 'wait_for_phase_completion "0"' in srun_command + assert 'wait_for_phase_completion "2"' in srun_command + assert srun_command.count("--num-processes 4") == 2 + assert srun_command.count("--num-processes 2") == 1 + assert srun_command.count("--tcp-server $master_ip") == 2 + assert srun_command.count("--open-mode=append") == 2 + assert "--nodelist=$SLURM_JOB_MASTER_NODE" in srun_command + assert "--relative=1" not in srun_command + + +def test_gen_srun_command_single_node_static_plan(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: + nixl_ep.cmd_args.plan = json.dumps([[0, 1, 2, 3]]) + nixl_ep.cmd_args.num_processes_per_node = 4 + nixl_ep.cmd_args = replace_cmd_args(nixl_ep.cmd_args, disable_ll_nvlink=False) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert "wait_for_phase_completion()" not in srun_command + assert srun_command.count("--num-processes 4") == 1 + assert "--disable-ll-nvlink" not in srun_command + + +def test_gen_srun_command_single_node_single_rank_plan( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + nixl_ep.cmd_args.plan = SINGLE_RANK_PLAN_STR + nixl_ep.cmd_args.num_processes_per_node = 1 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert "wait_for_master_services()" not in srun_command + assert "wait_for_phase_completion()" not in srun_command + assert srun_command.count("--num-processes 1") == 1 + assert "--tcp-server $master_ip" not in srun_command + assert "--open-mode=append" not in srun_command + + +def test_gen_srun_command_rejects_process_list(slurm_system: SlurmSystem) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=json.dumps([[0, 1, 2, 3]]), + num_processes_per_node=[4], + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + with pytest.raises(ValueError, match="requires num_processes_per_node to be an integer"): + strategy.gen_srun_command() + + +def test_single_node_stages_cover_each_plan_phase(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: + nixl_ep.cmd_args.num_processes_per_node = 10 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4,)), (1, (4,)), (2, (0,)), (3, (2,))] + + +def test_single_node_stages_follow_double_expansion_public_plan( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + nixl_ep.cmd_args.plan = DOUBLE_EXPANSION_PLAN_STR + nixl_ep.cmd_args.num_processes_per_node = 8 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4,)), (1, (2,)), (2, (2,))] + + +def test_single_node_stages_follow_single_expansion_public_plan( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + nixl_ep.cmd_args.plan = SINGLE_EXPANSION_PLAN_STR + nixl_ep.cmd_args.num_processes_per_node = 8 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4,)), (1, (4,))] + + +def test_single_node_single_stage_plan_has_one_stage(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: + nixl_ep.cmd_args.plan = json.dumps([[0, 1, 2, 3]]) + nixl_ep.cmd_args.num_processes_per_node = 4 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4,))] + + +def test_multi_node_stages_match_public_two_node_single_expansion( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=SINGLE_EXPANSION_PLAN_STR, + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4, 0)), (1, (0, 4))] + + +def test_multi_node_single_stage_plan_splits_initial_launches_across_nodes( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=json.dumps([[0, 1, 2, 3, 4, 5, 6, 7]]), + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (4, 4))] + + +def test_gen_srun_command_single_node_double_expansion_omits_disable_flag( + nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem +) -> None: + nixl_ep.cmd_args.plan = DOUBLE_EXPANSION_PLAN_STR + nixl_ep.cmd_args.num_processes_per_node = 8 + nixl_ep.cmd_args = replace_cmd_args(nixl_ep.cmd_args, disable_ll_nvlink=False) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert 'wait_for_phase_completion "0"' in srun_command + assert 'wait_for_phase_completion "1"' in srun_command + assert srun_command.count("--num-processes 2") == 2 + assert "--disable-ll-nvlink" not in srun_command + + +def test_gen_srun_command_multi_node_public_single_expansion_waits_for_phase_before_second_stage( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=SINGLE_EXPANSION_PLAN_STR, + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert "wait_for_master_services()" in srun_command + assert "wait_for_phase_completion()" in srun_command + assert 'wait_for_phase_completion "0"' in srun_command + assert srun_command.count("--num-processes 4") == 2 + assert srun_command.count("--relative=1") == 1 + assert srun_command.count("--nodelist=$SLURM_JOB_MASTER_NODE") == 1 + assert srun_command.count("--tcp-server $master_ip") == 1 + assert srun_command.count("--open-mode=append") == 1 + + +def test_gen_srun_command_multi_node_single_stage_starts_followers( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=json.dumps([[0, 1, 2, 3, 4, 5, 6, 7]]), + num_processes_per_node=4, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert "wait_for_master_services()" in srun_command + assert "wait_for_phase_completion()" not in srun_command + assert srun_command.count("--num-processes 4") == 2 + assert srun_command.count("--relative=1") == 1 + assert srun_command.count("--tcp-server $master_ip") == 1 + + +def test_single_node_stages_reject_mismatch(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: + nixl_ep.cmd_args.num_processes_per_node = 9 + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=nixl_ep, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + with pytest.raises(ValueError, match="total launched workers \\(10\\), got 9"): + _ = strategy.plan_stages + + +def test_gen_srun_command_single_launch_reports_success( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=SINGLE_RANK_PLAN_STR, + num_processes_per_node=1, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=1, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + srun_command = strategy.gen_srun_command() + + assert 'echo "All NIXL EP launches completed successfully"' in srun_command + assert 'if [ "$rc" -eq 0 ]; then' in srun_command + assert "exit $rc" in srun_command + + +def test_gen_exec_command_matches_reference(nixl_ep_tr: TestRun, slurm_system: SlurmSystem) -> None: + slurm_system.container_mount_home = True + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, nixl_ep_tr) + + sbatch_cmd = strategy.gen_exec_command() + + assert sbatch_cmd == f"sbatch {nixl_ep_tr.output_path / 'cloudai_sbatch_script.sh'}" + + content = (nixl_ep_tr.output_path / "cloudai_sbatch_script.sh").read_text().strip() + content = normalize_sbatch(content, nixl_ep_tr, slurm_system) + + ref = (Path(__file__).parents[2] / "ref_data" / "nixl-ep.sbatch").read_text().strip() + ref = normalize_sbatch(ref, nixl_ep_tr, slurm_system) + assert significant_sbatch_lines(content) == significant_sbatch_lines(ref) diff --git a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py new file mode 100644 index 000000000..de0dcab6c --- /dev/null +++ b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py @@ -0,0 +1,244 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import cast + +import pytest + +from cloudai.core import TestRun +from cloudai.workloads.nixl_ep import NixlEPCmdArgs, NixlEPTestDefinition + +EXPANSION_CONTRACTION_PLAN = ( + "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]" +) +SUCCESSFUL_BANDWIDTH_LINE = ( + "[rank 0] Dispatch + combine bandwidth: 12.34 GB/s, avg_t=56.7 us, min_t=50.0 us, max_t=60.0 us\n" +) +TCPSTORE_TIMEOUT_LINE = ( + "recvValueWithTimeout failed on SocketImpl(fd=65, addr=[pool0-01876]:11088, remote=[pool0-01873.cm.cluster]:9999)\n" +) + + +def num_nodes(test_run: TestRun) -> int: + return cast(int, test_run.num_nodes) + + +@pytest.fixture +def nixl_ep_tr(tmp_path) -> TestRun: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + elastic_script="/workspace/nixl/examples/device/ep/tests/elastic/elastic.py", + plan=EXPANSION_CONTRACTION_PLAN, + num_processes_per_node=4, + ), + ) + return TestRun(name="nixl-ep", test=tdef, num_nodes=3, nodes=[], output_path=tmp_path / "output") + + +class TestNixlEPStatusCheck: + def test_successful_job(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert result.is_successful + assert result.error_message == "" + + def test_launcher_path_error_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text( + ( + "python3: can't open file '/workspace/nixl/tests/elastic/elastic.py': " + "[Errno 2] No such file or directory\n" + ), + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "entrypoint could not be opened" in result.error_message + assert "nixl-ep-node-0.log" in result.error_message + + def test_missing_node_logs_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text(SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8") + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "nixl-ep-node-1.log, nixl-ep-node-2.log" in result.error_message + + def test_plan_mismatch_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "nixl-ep-node-1.log").write_text( + "Process 0 -> no plan phases were found for rank 9 after phase None, exiting\n", + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "never appears in the plan" in result.error_message + + def test_tcpstore_timeout_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "nixl-ep-node-2.log").write_text( + TCPSTORE_TIMEOUT_LINE, + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "lost its TCPStore connection" in result.error_message + + def test_primary_launch_exit_before_phase_completion_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "nixl-ep-node-1.log").write_text( + "Primary NIXL EP launch exited before phase 1 completed\n", + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "primary NIXL EP launch exited before phase 1 completed" in result.error_message + + def test_initial_primary_launch_exit_explains_missing_later_node_logs(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text( + "global_rank=0, local_rank=0 -> start phase 0\n", + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "stdout.txt").write_text( + "Primary NIXL EP launch exited before phase 0 completed\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "initial NIXL EP launch exited before phase 0 completed" in result.error_message + assert "later stage launches never started" in result.error_message + assert "some node logs may be absent" in result.error_message + + def test_ucx_remote_memory_view_failure_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text( + "E0319 04:13:25.442619 950677 ucx_backend.cpp:1486] " + "Failed to prepare remote memory view: Failed to create device memory list(remote): No such device\n", + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "failed to initialize its UCX remote memory view" in result.error_message + + @pytest.mark.parametrize( + "log_content, expected_fragment", + [ + ( + "Traceback (most recent call last):\n File elastic.py, line 42\nRuntimeError: boom\n", + "Python traceback", + ), + ( + "Timed out waiting for NIXL EP master services on 192.168.1.1\n", + "master services never became ready", + ), + ( + "recvValueWithTimeout timed out after 300000ms\n", + "worker timed out", + ), + ( + "srun: error: Unable to allocate resources: Invalid node name\n", + "srun failure", + ), + ( + "node001: task 0: Exited with exit code 1\n", + "non-zero status", + ), + ], + ) + def test_launcher_failure_patterns_are_reported( + self, nixl_ep_tr: TestRun, log_content: str, expected_fragment: str + ) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text(log_content, encoding="utf-8") + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert expected_fragment in result.error_message + + def test_completed_job_without_benchmark_output_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text("run completed\n", encoding="utf-8") + (nixl_ep_tr.output_path / "slurm-job.toml").write_text( + 'state = "COMPLETED"\nexit_code = "0:0"\n', + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "benchmark summary lines were missing from" in result.error_message diff --git a/tests/workloads/nixl_ep/test_log_parsing.py b/tests/workloads/nixl_ep/test_log_parsing.py new file mode 100644 index 000000000..c88885ad6 --- /dev/null +++ b/tests/workloads/nixl_ep/test_log_parsing.py @@ -0,0 +1,62 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path + +from cloudai.workloads.nixl_ep.log_parsing import parse_nixl_ep_bandwidth_samples + + +def test_parse_combined_bandwidth_output(tmp_path: Path) -> None: + log_path = tmp_path / "nixl-ep-node-0.log" + log_path.write_text( + "[rank 3] Dispatch + combine bandwidth: 45.67 GB/s, avg_t=123.4 us, min_t=120.0 us, max_t=130.0 us\n", + encoding="utf-8", + ) + + samples = parse_nixl_ep_bandwidth_samples(log_path) + + assert len(samples) == 1 + assert samples[0].rank == 3 + assert samples[0].dispatch_combine_bandwidth_gbps == 45.67 + assert samples[0].avg_time_us == 123.4 + assert samples[0].min_time_us == 120.0 + assert samples[0].max_time_us == 130.0 + + +def test_parse_kineto_bandwidth_output(tmp_path: Path) -> None: + log_path = tmp_path / "nixl-ep-node-0.log" + log_path.write_text( + "[rank 7] Dispatch bandwidth: 30.25 GB/s | Combine bandwidth: 28.75 GB/s\n", + encoding="utf-8", + ) + + samples = parse_nixl_ep_bandwidth_samples(log_path) + + assert len(samples) == 1 + assert samples[0].rank == 7 + assert samples[0].dispatch_bandwidth_gbps == 30.25 + assert samples[0].combine_bandwidth_gbps == 28.75 + + +def test_parse_nixl_ep_bandwidth_samples_ignores_unrelated_lines(tmp_path: Path) -> None: + log_path = tmp_path / "nixl-ep-node-0.log" + log_path.write_text("GpuFreq=control_disabled\nrun completed\n", encoding="utf-8") + + assert parse_nixl_ep_bandwidth_samples(log_path) == [] + + +def test_parse_nixl_ep_bandwidth_samples_missing_file_returns_empty_list(tmp_path: Path) -> None: + assert parse_nixl_ep_bandwidth_samples(tmp_path / "missing.log") == []