Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/cloudai/systems/kubernetes/kubernetes_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import annotations

import json
import logging
import subprocess
import time
Expand All @@ -39,6 +40,7 @@ class KubernetesSystem(System):
scheduler: str = "kubernetes"
monitor_interval: int = 1
gpus_per_node: int = 1
use_host_network: bool | None = None
_core_v1: Optional[k8s.client.CoreV1Api] = None
_batch_v1: Optional[k8s.client.BatchV1Api] = None
_custom_objects_api: Optional[k8s.client.CustomObjectsApi] = None
Expand Down Expand Up @@ -131,6 +133,45 @@ def update(self) -> None:
"""
pass

def get_network_attachment_definitions(self) -> list[str]:
"""Return all NetworkAttachmentDefinitions in the cluster as 'namespace/name' strings."""
cmd = ["kubectl", "get", "network-attachment-definitions", "--all-namespaces", "-o", "json"]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
logging.debug("Failed to list NetworkAttachmentDefinitions: %s", e.stderr)
return []

try:
data = json.loads(result.stdout)
except json.JSONDecodeError as e:
logging.debug("Failed to parse NetworkAttachmentDefinitions output: %s", e)
return []

return [f"{item['metadata']['namespace']}/{item['metadata']['name']}" for item in data.get("items", [])]

def resolve_cni_networks(self) -> list[str] | None:
"""
Determine which CNI networks to use, or None if host networking should be used.

- use_host_network=True: always use hostNetwork, skip CNI discovery.
- use_host_network=False: require CNI; raise if none are found.
- use_host_network=None: try CNI, fall back to hostNetwork if none found.
"""
if self.use_host_network is True:
return None

if networks := self.get_network_attachment_definitions():
return networks

if self.use_host_network is False:
raise RuntimeError(
"use_host_network=False but no NetworkAttachmentDefinitions were found in the cluster. "
"Ensure the CNI operator is installed and net-attach-defs are configured."
)

return None # auto mode: fall back to hostNetwork

def is_job_running(self, job: BaseJob) -> bool:
k_job: KubernetesJob = cast(KubernetesJob, job)
return self._is_job_running(k_job)
Expand Down
103 changes: 55 additions & 48 deletions src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def ssh_port(self) -> int:
def gen_json(self) -> dict[Any, Any]:
k8s_system = cast(KubernetesSystem, self.system)
job_name = self.sanitize_k8s_job_name(self.test_run.name)
cni_networks = k8s_system.resolve_cni_networks()

deployment = {
"apiVersion": "kubeflow.org/v2beta1",
Expand All @@ -50,8 +51,8 @@ def gen_json(self) -> dict[Any, Any]:
"slotsPerWorker": k8s_system.gpus_per_node,
"runPolicy": {"cleanPodPolicy": "Running"},
"mpiReplicaSpecs": {
"Launcher": self._create_launcher_spec(),
"Worker": self._create_worker_spec(),
"Launcher": self._create_launcher_spec(cni_networks),
"Worker": self._create_worker_spec(cni_networks),
},
},
}
Expand All @@ -66,56 +67,53 @@ def container_url(self) -> str:
tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test)
return tdef.cmd_args.docker_image_url.replace("#", "/")

def _create_launcher_spec(self) -> dict[str, Any]:
def _create_launcher_spec(self, cni_networks: list[str] | None) -> dict[str, Any]:
env_vars = self._get_merged_env_vars()
return {
"replicas": 1,
"template": {
"spec": {
"hostNetwork": True,
"containers": [
{
"image": self.container_url,
"name": "nccl-test-launcher",
"imagePullPolicy": "IfNotPresent",
"securityContext": {"privileged": True},
"env": self._generate_env_list(env_vars),
"command": ["/bin/bash", "-c"],
"args": [self._generate_launcher_command()],
"resources": self._prepare_launcher_resources(),
}
],
},
},
spec: dict[str, Any] = {
"containers": [
{
"image": self.container_url,
"name": "nccl-test-launcher",
"imagePullPolicy": "IfNotPresent",
"securityContext": {"privileged": True},
"env": self._generate_env_list(env_vars),
"command": ["/bin/bash", "-c"],
"args": [self._generate_launcher_command()],
"resources": self._prepare_launcher_resources(),
}
],
}
if not cni_networks:
spec["hostNetwork"] = True
return {"replicas": 1, "template": {"spec": spec}}

def _create_worker_spec(self) -> dict[str, Any]:
def _create_worker_spec(self, cni_networks: list[str] | None) -> dict[str, Any]:
env_vars = self._get_merged_env_vars()
return {
"replicas": self.test_run.nnodes,
"template": {
"spec": {
"hostNetwork": True,
"containers": [
{
"image": self.container_url,
"name": "nccl-test-worker",
"ports": [{"containerPort": self.ssh_port, "name": "ssh"}],
"imagePullPolicy": "IfNotPresent",
"securityContext": {"privileged": True},
"env": self._generate_env_list(env_vars),
"command": ["/bin/bash", "-c"],
"args": [self._generate_worker_command()],
"resources": self._prepare_worker_resources(),
"volumeMounts": [
{"mountPath": "/dev/shm", "name": "dev-shm"},
],
}
spec: dict[str, Any] = {
"containers": [
{
"image": self.container_url,
"name": "nccl-test-worker",
"ports": [{"containerPort": self.ssh_port, "name": "ssh"}],
"imagePullPolicy": "IfNotPresent",
"securityContext": {"privileged": True},
"env": self._generate_env_list(env_vars),
"command": ["/bin/bash", "-c"],
"args": [self._generate_worker_command()],
"resources": self._prepare_worker_resources(cni_networks),
"volumeMounts": [
{"mountPath": "/dev/shm", "name": "dev-shm"},
],
"volumes": [{"name": "dev-shm", "emptyDir": {"medium": "Memory", "sizeLimit": "1Gi"}}],
},
},
}
],
"volumes": [{"name": "dev-shm", "emptyDir": {"medium": "Memory", "sizeLimit": "1Gi"}}],
}
if not cni_networks:
spec["hostNetwork"] = True
template: dict[str, Any] = {"spec": spec}
if cni_networks:
template["metadata"] = {"annotations": {"k8s.v1.cni.cncf.io/networks": ",".join(cni_networks)}}
return {"replicas": self.test_run.nnodes, "template": template}

def _generate_worker_command(self) -> str:
"""
Expand Down Expand Up @@ -209,7 +207,16 @@ def _prepare_launcher_resources(self) -> Dict[str, Dict[str, str]]:
"limits": {"cpu": "2", "memory": "8Gi"},
}

def _prepare_worker_resources(self) -> Dict[str, Dict[str, str]]:
def _prepare_worker_resources(self, cni_networks: list[str] | None) -> Dict[str, Dict[str, str]]:
k8s_system = cast(KubernetesSystem, self.system)
gpu_count = str(k8s_system.gpus_per_node)
return {"requests": {"nvidia.com/gpu": gpu_count}, "limits": {"nvidia.com/gpu": gpu_count}}
resources: Dict[str, Dict[str, str]] = {
"requests": {"nvidia.com/gpu": gpu_count},
"limits": {"nvidia.com/gpu": gpu_count},
}
if cni_networks:
for net in cni_networks:
nic_name = net.split("/", 1)[1]
resources["requests"][f"nvidia.com/{nic_name}"] = "1"
resources["limits"][f"nvidia.com/{nic_name}"] = "1"
return resources
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def k8s_system(tmp_path: Path) -> KubernetesSystem:
global_env_vars={},
monitor_interval=1,
gpus_per_node=8,
use_host_network=True,
default_namespace="test-namespace",
kube_config_path=kube_config,
)
Expand Down
75 changes: 74 additions & 1 deletion tests/systems/kubernetes/test_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import subprocess
from pathlib import Path
from typing import Dict, List
from typing import ClassVar, Dict, List
from unittest.mock import MagicMock, Mock, patch

import pytest
Expand Down Expand Up @@ -188,3 +190,74 @@ def test_delete_job(k8s_system: KubernetesSystem, job_kind: str):
assert k8s_system._delete_dynamo_graph_deployment.called
else:
raise AssertionError(f"Unhandled job kind in test: {job_kind}")


def _net_attach_def_json(*pairs: tuple[str, str]) -> str:
items = [{"metadata": {"namespace": ns, "name": n}} for ns, n in pairs]
return json.dumps({"items": items})


class TestGetNetworkAttachmentDefinitions:
@pytest.mark.parametrize(
"pairs, expected",
[
([], []),
([("default", "rail-0"), ("default", "rail-1")], ["default/rail-0", "default/rail-1"]),
(
[("default", "nic0-rail0-plane0"), ("kube-system", "nic1-rail1-plane0")],
["default/nic0-rail0-plane0", "kube-system/nic1-rail1-plane0"],
),
],
ids=["empty", "same-namespace", "multiple-namespaces"],
)
def test_success(self, k8s_system: KubernetesSystem, pairs: list, expected: list) -> None:
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(stdout=_net_attach_def_json(*pairs), returncode=0)
assert k8s_system.get_network_attachment_definitions() == expected

@pytest.mark.parametrize(
"side_effect, stdout",
[
(subprocess.CalledProcessError(1, "kubectl", stderr="No CRD found"), None),
(None, "not valid json"),
],
ids=["kubectl-failure", "invalid-json"],
)
def test_returns_empty_on_error(
self, k8s_system: KubernetesSystem, side_effect: Exception | None, stdout: str | None
) -> None:
with patch("subprocess.run") as mock_run:
if side_effect:
mock_run.side_effect = side_effect
else:
mock_run.return_value = MagicMock(stdout=stdout, returncode=0)
assert k8s_system.get_network_attachment_definitions() == []


class TestResolveCniNetworks:
NETS: ClassVar[list[str]] = ["default/rail-0", "default/rail-1"]

@pytest.mark.parametrize(
"use_host_network, discovered, expected",
[
(True, NETS, None), # forced host network — ignores discovered nets
(None, NETS, NETS), # auto — uses CNI when available
(None, [], None), # auto — falls back to host network when none found
(False, NETS, NETS), # require CNI — uses discovered nets
],
ids=["forced-host", "auto-cni", "auto-fallback", "require-cni"],
)
def test_resolve(
self, k8s_system: KubernetesSystem, use_host_network: bool | None, discovered: list, expected: list | None
) -> None:
k8s_system.use_host_network = use_host_network
with patch.object(KubernetesSystem, "get_network_attachment_definitions", return_value=discovered):
assert k8s_system.resolve_cni_networks() == expected

def test_require_cni_raises_when_none_found(self, k8s_system: KubernetesSystem) -> None:
k8s_system.use_host_network = False
with (
patch.object(KubernetesSystem, "get_network_attachment_definitions", return_value=[]),
pytest.raises(RuntimeError, match="NetworkAttachmentDefinitions"),
):
k8s_system.resolve_cni_networks()
53 changes: 53 additions & 0 deletions tests/workloads/nccl_test/test_json_gen_strategy_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
# limitations under the License.


from typing import ClassVar
from unittest.mock import patch

import pytest

from cloudai.core import TestRun
Expand Down Expand Up @@ -130,3 +133,53 @@ def test_launcher_command_generation(self, test_run: TestRun, k8s_system: Kubern
assert cmd_args.subtest_name in launcher_args
assert f"--nthreads {cmd_args.nthreads}" in launcher_args
assert f"--ngpus {cmd_args.ngpus}" in launcher_args


class TestNcclCniNetworking:
CNI_NETS: ClassVar[list[str]] = ["default/nic0-rail0-plane0", "default/nic0-rail0-plane1"]

@pytest.fixture
def test_run(self, k8s_system: KubernetesSystem) -> TestRun:
cmd_args = NCCLCmdArgs.model_validate({"subtest_name": "all_reduce_perf", "docker_image_url": "img"})
nccl = NCCLTestDefinition(name="n", description="d", test_template_name="NcclTest", cmd_args=cmd_args)
tr = TestRun(name="t", test=nccl, nodes=["n1", "n2"], num_nodes=2, output_path=k8s_system.output_path / "t")
tr.output_path.mkdir(parents=True, exist_ok=True)
return tr

def gen_json(self, k8s_system: KubernetesSystem, test_run: TestRun, cni_nets: list[str] | None) -> dict:
with patch.object(KubernetesSystem, "resolve_cni_networks", return_value=cni_nets):
return NcclTestKubernetesJsonGenStrategy(k8s_system, test_run).gen_json()

def test_cni_mode_worker_has_annotations(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None:
payload = self.gen_json(k8s_system, test_run, self.CNI_NETS)
worker = payload["spec"]["mpiReplicaSpecs"]["Worker"]
annotations = worker["template"]["metadata"]["annotations"]
assert annotations["k8s.v1.cni.cncf.io/networks"] == ",".join(self.CNI_NETS)

def test_cni_mode_worker_has_nic_resources(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None:
payload = self.gen_json(k8s_system, test_run, self.CNI_NETS)
resources = payload["spec"]["mpiReplicaSpecs"]["Worker"]["template"]["spec"]["containers"][0]["resources"]
for net in self.CNI_NETS:
nic = net.split("/", 1)[1]
assert resources["requests"][f"nvidia.com/{nic}"] == "1"
assert resources["limits"][f"nvidia.com/{nic}"] == "1"

def test_cni_mode_no_host_network(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None:
payload = self.gen_json(k8s_system, test_run, self.CNI_NETS)
worker_spec = payload["spec"]["mpiReplicaSpecs"]["Worker"]["template"]["spec"]
launcher_spec = payload["spec"]["mpiReplicaSpecs"]["Launcher"]["template"]["spec"]
assert "hostNetwork" not in worker_spec
assert "hostNetwork" not in launcher_spec

def test_host_network_mode_no_annotations(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None:
payload = self.gen_json(k8s_system, test_run, None) # resolve_cni_networks returns None → host network
worker = payload["spec"]["mpiReplicaSpecs"]["Worker"]
assert "metadata" not in worker["template"]
assert worker["template"]["spec"]["hostNetwork"] is True

def test_launcher_never_gets_nic_resources(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None:
payload = self.gen_json(k8s_system, test_run, self.CNI_NETS)
launcher_resources = payload["spec"]["mpiReplicaSpecs"]["Launcher"]["template"]["spec"]["containers"][0][
"resources"
]
assert not any(k.startswith("nvidia.com/nic") for k in launcher_resources["requests"])
Loading