Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/workloads/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Available Workloads
":doc:`nemo_launcher`", "✅", "❌", "❌", "❌"
":doc:`nemo_run`", "✅", "❌", "❌", "❌"
":doc:`nixl_bench`", "✅", "❌", "❌", "❌"
":doc:`nixl_ep`", "✅", "❌", "❌", "❌"
":doc:`nixl_kvbench`", "✅", "❌", "❌", "❌"
":doc:`nixl_perftest`", "✅", "❌", "❌", "❌"
":doc:`sleep`", "✅", "✅", "❌", "✅"
Expand Down
106 changes: 106 additions & 0 deletions doc/workloads/nixl_ep.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
NIXL EP
=======

This workload (``test_template_name`` is ``NixlEP``) runs the NIXL Elastic EP benchmark through a Slurm-managed multi-node elastic launcher flow.

Overview
--------

The Slurm launch model is:

- one ``elastic.py`` process per node, started in sequence as the plan progresses
- the master node starts first and exposes a TCPStore for rank coordination
- follower nodes connect via ``--tcp-server $master_ip`` once the master is ready
- the benchmark runtime comes from the container image
- each run serializes its plan JSON into the output directory

Plan Format
-----------

The ``plan`` field is a JSON-encoded list of phases. Each phase is a list of rank indices passed directly to the benchmark. CloudAI uses the following convention to drive the elastic launcher:

- **Positive rank index** — the rank is active. A rank that is new relative to the previous phase causes CloudAI to fire an additional ``srun`` for that worker.
- **Negative rank index** (e.g. ``-6``) — signals a contraction: the benchmark sees the absolute value and treats it as temporarily removed. No new ``srun`` is launched for negative indices.
- **Omitted rank** — a rank present in an earlier phase but absent from the current phase list is not relaunched. The benchmark's own phase logic handles its inactivity.

Example:

.. code-block:: text

[[0, 1, 2, 3], # phase 0: ranks 0–3 start
[0, 1, 2, 3, 4, 5, 6, 7], # phase 1: ranks 4–7 join (expansion)
[0, 1, 2, 3, 4, -6, 7], # phase 2: rank 6 contracted (no new launch)
[0, 1, 2, 3, 4, 5, 6, 7]] # phase 3: rank 6 rejoins (new launch for rank 6)

Phase completion is detected by polling the primary log for ``-> end phase N`` markers.

Usage Examples
--------------

Test TOML example:

.. code-block:: toml

name = "nixl-ep-expansion-contraction"
description = "NIXL Elastic EP expansion/contraction benchmark"
test_template_name = "NixlEP"

[cmd_args]
docker_image_url = "<docker container url here>"
elastic_script = "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py"
plan = "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]"
num_processes_per_node = 4
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true

Test-in-Scenario example:

.. code-block:: toml

name = "nixl-ep-expansion-contraction"

[[Tests]]
id = "nixl_ep.expansion_contraction"
num_nodes = 3
time_limit = "00:30:00"

name = "nixl-ep-expansion-contraction"
description = "NIXL Elastic EP expansion/contraction benchmark"
test_template_name = "NixlEP"

[Tests.cmd_args]
docker_image_url = "<docker container url here>"
elastic_script = "/workspace/nixl/examples/device/ep/tests/elastic/elastic.py"
plan = "[[0, 1, 2, 3], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, -6, 7], [0, 1, 2, 3, 4, 5, 6, 7]]"
num_processes_per_node = 4
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true

Reporting
---------

After a run completes, CloudAI prints a single table with one row per (node, rank) measurement. The ``Phases`` column shows each phase index colour-coded green (passed) or red (failed). Bandwidth columns report dispatch+combine throughput and timing per rank.

The reported metric (``default``) is the mean dispatch+combine bandwidth in GB/s across all ranks.

API Documentation
-----------------

Command Arguments
~~~~~~~~~~~~~~~~~

.. autopydantic_model:: cloudai.workloads.nixl_ep.nixl_ep.NixlEPCmdArgs
:members:

Test Definition
~~~~~~~~~~~~~~~

.. autoclass:: cloudai.workloads.nixl_ep.nixl_ep.NixlEPTestDefinition
:members:
:show-inheritance:
8 changes: 8 additions & 0 deletions src/cloudai/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ def register_all():
NIXLBenchSlurmCommandGenStrategy,
NIXLBenchTestDefinition,
)
from cloudai.workloads.nixl_ep import (
NixlEPReportGenerationStrategy,
NixlEPSlurmCommandGenStrategy,
NixlEPTestDefinition,
)
from cloudai.workloads.nixl_kvbench import NIXLKVBenchSlurmCommandGenStrategy, NIXLKVBenchTestDefinition
from cloudai.workloads.nixl_perftest import (
NIXLKVBenchDummyReport,
Expand Down Expand Up @@ -210,6 +215,7 @@ def register_all():
Registry().add_command_gen_strategy(SlurmSystem, NeMoLauncherTestDefinition, NeMoLauncherSlurmCommandGenStrategy)
Registry().add_command_gen_strategy(SlurmSystem, NeMoRunTestDefinition, NeMoRunSlurmCommandGenStrategy)
Registry().add_command_gen_strategy(SlurmSystem, NIXLBenchTestDefinition, NIXLBenchSlurmCommandGenStrategy)
Registry().add_command_gen_strategy(SlurmSystem, NixlEPTestDefinition, NixlEPSlurmCommandGenStrategy)

Registry().add_command_gen_strategy(SlurmSystem, GPTTestDefinition, JaxToolboxSlurmCommandGenStrategy)
Registry().add_command_gen_strategy(SlurmSystem, GrokTestDefinition, JaxToolboxSlurmCommandGenStrategy)
Expand Down Expand Up @@ -260,6 +266,7 @@ def register_all():
Registry().add_test_definition("MegatronBridge", MegatronBridgeTestDefinition)
Registry().add_test_definition("TritonInference", TritonInferenceTestDefinition)
Registry().add_test_definition("NIXLBench", NIXLBenchTestDefinition)
Registry().add_test_definition("NixlEP", NixlEPTestDefinition)
Registry().add_test_definition("AIDynamo", AIDynamoTestDefinition)
Registry().add_test_definition("BashCmd", BashCmdTestDefinition)
Registry().add_test_definition("NixlPerftest", NixlPerftestTestDefinition)
Expand All @@ -286,6 +293,7 @@ def register_all():
Registry().add_report(UCCTestDefinition, UCCTestReportGenerationStrategy)
Registry().add_report(TritonInferenceTestDefinition, TritonInferenceReportGenerationStrategy)
Registry().add_report(NIXLBenchTestDefinition, NIXLBenchReportGenerationStrategy)
Registry().add_report(NixlEPTestDefinition, NixlEPReportGenerationStrategy)
Registry().add_report(AIDynamoTestDefinition, AIDynamoReportGenerationStrategy)
Registry().add_report(AiconfiguratorTestDefinition, AiconfiguratorReportGenerationStrategy)
Registry().add_report(NixlPerftestTestDefinition, NIXLKVBenchDummyReport)
Expand Down
26 changes: 26 additions & 0 deletions src/cloudai/workloads/nixl_ep/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .nixl_ep import NixlEPCmdArgs, NixlEPTestDefinition
from .report_generation_strategy import NixlEPReportGenerationStrategy
from .slurm_command_gen_strategy import NixlEPSlurmCommandGenStrategy

__all__ = [
"NixlEPCmdArgs",
"NixlEPReportGenerationStrategy",
"NixlEPSlurmCommandGenStrategy",
"NixlEPTestDefinition",
]
90 changes: 90 additions & 0 deletions src/cloudai/workloads/nixl_ep/log_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import re
from dataclasses import dataclass
from pathlib import Path

_FLOAT_RE = r"[-+]?\d+(?:\.\d+)?(?:[eE][-+]?\d+)?"
_END_PHASE_RE = re.compile(r"-> end phase (\d+)")
_COMBINED_BW_RE = re.compile(
rf"\[rank (?P<rank>\d+)\] Dispatch \+ combine bandwidth: "
rf"(?P<bandwidth>{_FLOAT_RE}) GB/s, "
rf"avg_t=(?P<avg_time>{_FLOAT_RE}) us, "
rf"min_t=(?P<min_time>{_FLOAT_RE}) us, "
rf"max_t=(?P<max_time>{_FLOAT_RE}) us"
)
_KINETO_BW_RE = re.compile(
rf"\[rank (?P<rank>\d+)\] Dispatch bandwidth: "
rf"(?P<dispatch_bandwidth>{_FLOAT_RE}) GB/s \| "
rf"Combine bandwidth: (?P<combine_bandwidth>{_FLOAT_RE}) GB/s"
)


@dataclass(frozen=True)
class NixlEPBandwidthSample:
"""A parsed bandwidth sample emitted by the NIXL EP benchmark logs."""

rank: int
dispatch_combine_bandwidth_gbps: float | None = None
avg_time_us: float | None = None
min_time_us: float | None = None
max_time_us: float | None = None
dispatch_bandwidth_gbps: float | None = None
combine_bandwidth_gbps: float | None = None


def parse_nixl_ep_completed_phases(path: Path) -> set[int]:
"""Return the set of phase indices that completed according to the log at path."""
if not path.is_file():
return set()
completed: set[int] = set()
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
if match := _END_PHASE_RE.search(line):
completed.add(int(match.group(1)))
return completed


def parse_nixl_ep_bandwidth_samples(path: Path) -> list[NixlEPBandwidthSample]:
if not path.is_file():
return []

samples: list[NixlEPBandwidthSample] = []
for line in path.read_text(encoding="utf-8", errors="ignore").splitlines():
if match := _COMBINED_BW_RE.search(line):
samples.append(
NixlEPBandwidthSample(
rank=int(match.group("rank")),
dispatch_combine_bandwidth_gbps=float(match.group("bandwidth")),
avg_time_us=float(match.group("avg_time")),
min_time_us=float(match.group("min_time")),
max_time_us=float(match.group("max_time")),
)
)
continue

if match := _KINETO_BW_RE.search(line):
samples.append(
NixlEPBandwidthSample(
rank=int(match.group("rank")),
dispatch_bandwidth_gbps=float(match.group("dispatch_bandwidth")),
combine_bandwidth_gbps=float(match.group("combine_bandwidth")),
)
)

return samples
Loading
Loading