Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
values: ["on-demand"]
- key: karpenter.k8s.aws/instance-category
operator: In
values: ["c", "m", "r"]
values: ["c", "m", "r", "t"]
- key: karpenter.k8s.aws/instance-generation
operator: Gt
values: ["2"]
Expand All @@ -27,7 +27,7 @@ spec:
name: default
expireAfter: 720h # 30 * 24h = 720h
limits:
cpu: 1000
cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }}
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
Expand Down
34 changes: 14 additions & 20 deletions perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec):
"""Sets additional spec attributes."""
bm_spec.always_call_cleanup = True
assert bm_spec.container_cluster
_EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster)
cluster = bm_spec.container_cluster
assert isinstance(cluster, kubernetes_cluster.KubernetesCluster)
_EnsureEksKarpenterGpuNodepool(cluster)


def _GetRolloutCreationTime(rollout_name: str) -> int:
Expand Down Expand Up @@ -378,28 +380,20 @@ def GetStatusConditionsForResourceType(
lastTransitionTime.
"""

jsonpath = (
r'{range .items[*]}'
# e.g. '"pod-name-1234": [<condition1>, ...],\n'
r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}'
r'{end}'
)
# Use full JSON output to avoid invalid JSON when manually building from
# jsonpath with many resources or on connection reset (truncated output).
stdout, _, _ = kubectl.RunKubectlCommand(
[
'get',
resource_type,
'-o',
'jsonpath=' + jsonpath,
],
timeout=60 * 2, # 2 minutes; should be a pretty fast call.
# Output can be quite large, so we'll conditionally suppress it.
['get', resource_type, '-o', 'json'],
timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods)
suppress_logging=NUM_PODS.value > 20,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice this is clever

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

)

# Convert output to valid json and parse it
stdout = stdout.rstrip('\t\n\r ,')
stdout = '{' + stdout + '}'
name_to_conditions = json.loads(stdout)
data = json.loads(stdout)
name_to_conditions = {}
for item in data.get('items', []):
name = item.get('metadata', {}).get('name')
conditions = item.get('status', {}).get('conditions')
if name is not None and conditions is not None:
name_to_conditions[name] = conditions

for key in resources_to_ignore:
name_to_conditions.pop(key, None)
Expand Down
96 changes: 70 additions & 26 deletions perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from collections import abc
import json
import logging
import math
import re
import time
from typing import Any
from urllib import parse

Expand Down Expand Up @@ -704,9 +706,7 @@ def _Create(self):
}],
},
'iamIdentityMappings': [{
'arn': (
f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}'
),
'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}',
'username': 'system:node:{{EC2PrivateDNSName}}',
'groups': ['system:bootstrappers', 'system:nodes'],
}],
Expand Down Expand Up @@ -995,6 +995,14 @@ def _PostIngressNetworkingFixups(
def _PostCreate(self):
"""Performs post-creation steps for the cluster."""
super()._PostCreate()
# Karpenter controller resources: default 1/1Gi; scale up when node_scale target is set.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just not specify anything & let Karpenter decide? Or is this indeed necessary? It seems clever but a little annoying / bad user experience by Karpenter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the resources for the Karpenter controller pod (the node where Karpenter itself runs). Karpenter doesn’t manage that node, so it can’t “decide” these values, we have to set them. For runs with ~10 nodes, 1/1Gi is sufficient; we only increase when node_scale is 500+ or 1000+.

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None)
if num_nodes is not None and num_nodes > 1000:
controller_cpu, controller_memory = 4, '16Gi'
elif num_nodes is not None and num_nodes >= 500:
controller_cpu, controller_memory = 2, '8Gi'
else:
controller_cpu, controller_memory = 1, '1Gi'
vm_util.IssueCommand([
'helm',
'upgrade',
Expand All @@ -1013,13 +1021,13 @@ def _PostCreate(self):
'--set',
f'settings.interruptionQueue={self.name}',
'--set',
'controller.resources.requests.cpu=1',
f'controller.resources.requests.cpu={controller_cpu}',
'--set',
'controller.resources.requests.memory=1Gi',
f'controller.resources.requests.memory={controller_memory}',
'--set',
'controller.resources.limits.cpu=1',
f'controller.resources.limits.cpu={controller_cpu}',
'--set',
'controller.resources.limits.memory=1Gi',
f'controller.resources.limits.memory={controller_memory}',
'--set',
'logLevel=debug',
'--wait',
Expand Down Expand Up @@ -1057,10 +1065,14 @@ def _PostCreate(self):
'v'
+ full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1]
)
# NodePool CPU limit: scale with benchmark target (nodes * 2 + 5%), min 1000.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the machine type matter here as well? If I am using a larger machine type, do I need to also set a larger cpu limit? This again seems a little annoying to have to set manually (but maybe makes senses given Karpenter can be machine type agnostic).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to include machine type adjustment, I’ll think about how to cover it.
Thanks.

num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5)
cpu_limit = max(1000, math.ceil(num_nodes * 2 * 1.05))
kubernetes_commands.ApplyManifest(
'container/karpenter/nodepool.yaml.j2',
CLUSTER_NAME=self.name,
ALIAS_VERSION=alias_version,
KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit,
)

def _Delete(self):
Expand Down Expand Up @@ -1149,21 +1161,21 @@ def _CleanupKarpenter(self):
"""Cleanup Karpenter managed nodes before cluster deletion."""
logging.info('Cleaning up Karpenter nodes...')
# Delete NodePool resources - this will trigger node termination
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'delete',
'nodepool,ec2nodeclass',
'--all',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no resources found' in stderr.lower()
or 'not found' in stderr.lower()
or 'timed out waiting for the condition' in stderr.lower()
),
)
# Wait for all Karpenter nodes to be deleted
kubectl.RunKubectlCommand(
kubectl.RunRetryableKubectlCommand(
[
'wait',
'--for=delete',
Expand All @@ -1172,9 +1184,10 @@ def _CleanupKarpenter(self):
'karpenter.sh/nodepool',
'--timeout=120s',
],
timeout=300,
suppress_failure=lambda stdout, stderr, retcode: (
'no matching resources found' in stderr.lower()
or 'timed out' in stderr.lower()
or 'no resources found' in stderr.lower()
),
)
# Force terminate remaining EC2 instances
Expand Down Expand Up @@ -1246,21 +1259,52 @@ def _CleanupKarpenter(self):
if eni_ids:
logging.info('Deleting %d orphaned network interfaces', len(eni_ids))
for eni_id in eni_ids:
vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
suppress_failure=lambda stdout, stderr, retcode: (
'not found' in stderr.lower()
or 'does not exist' in stderr.lower()
),
)
max_retries = 5
backoff_seconds = 10
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this backoff logic looks pretty reasonable, prefer reusing backoff logic in vm_util.Retry. Which means moving this code to a subfunction & adding said decorator.

for attempt in range(max_retries):
stdout, stderr, retcode = vm_util.IssueCommand(
[
'aws',
'ec2',
'delete-network-interface',
'--region',
self.region,
'--network-interface-id',
eni_id,
],
# Observed in logs: InvalidNetworkInterfaceID.NotFound (ENI gone),
# RequestLimitExceeded (throttle). Suppress so we can retry or treat as success.
suppress_failure=lambda _stdout, stderr, _retcode: (
'invalidnetworkinterfaceid.notfound' in (stderr or '').lower()
or 'requestlimitexceeded' in (stderr or '').lower()
),
)
if retcode == 0:
break
stderr_lower = (stderr or '').lower()
# ENI already deleted (e.g. by another process or previous attempt).
if 'invalidnetworkinterfaceid.notfound' in stderr_lower:
break
# Throttle: retry with backoff.
if 'requestlimitexceeded' in stderr_lower:
if attempt < max_retries - 1:
logging.warning(
'AWS rate limit on DeleteNetworkInterface for %s, retry'
' in %ds',
eni_id,
backoff_seconds,
)
time.sleep(backoff_seconds)
backoff_seconds = min(backoff_seconds * 2, 60)
else:
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed after {max_retries} retries: '
f'{stderr}'
)
else:
raise errors.VmUtil.IssueCommandError(
f'DeleteNetworkInterface failed: {stderr}'
)

def _IsReady(self):
"""Returns True if cluster is running. Autopilot defaults to 0 nodes."""
Expand Down
6 changes: 5 additions & 1 deletion perfkitbenchmarker/resources/container_service/kubectl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
'error sending request:',
'(abnormal closure): unexpected EOF',
'deadline exceeded',
# kubectl wait/delete timeouts and connection errors (retried in EKS cleanup)
'timed out',
'unable to connect to the server',
]


Expand All @@ -38,8 +41,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
# Check for kubectl timeout. If found, treat it the same as a regular
# timeout.
if retcode != 0:
stderr_lower = stderr.lower()
for error_substring in RETRYABLE_KUBECTL_ERRORS:
if error_substring in stderr:
if error_substring.lower() in stderr_lower:
# Raise timeout error regardless of raise_on_failure - as the intended
# semantics is to ignore expected errors caused by invoking the
# command not errors from PKB infrastructure.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _DeleteAllFromDefaultNamespace():
run_cmd = ['delete', 'job', '--all', '-n', 'default']
kubectl.RunRetryableKubectlCommand(run_cmd)

timeout = 60 * 20
timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown)
run_cmd = [
'delete',
'all',
Expand Down
35 changes: 22 additions & 13 deletions perfkitbenchmarker/traces/kubernetes_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ def _StopWatchingForNodeChanges(self):
"""Stop watching the cluster for node add/remove events."""
polled_events = self._cluster.GetEvents()

# Resolve machine type only for current nodes; use "unknown" for the rest.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O this makes sense. Was this causing the cluster to take a long time querying everything?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it was the main reason.

_node_names = kubernetes_commands.GetNodeNames(suppress_logging=True)
_current_node_names = set(_node_names)
if _node_names:
_GetMachineTypeFromNodeName(self._cluster, _node_names[0])

for e in polled_events:
if e.resource.kind != "Node":
continue
Expand All @@ -156,10 +162,11 @@ def _StopWatchingForNodeChanges(self):
if name in self._nodes:
continue

machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
logging.info(
"DEBUG: RegisteredNode: %s, %s", name, machine_type
)
if name in _current_node_names:
machine_type = _GetMachineTypeFromNodeName(self._cluster, name)
else:
machine_type = "unknown"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something around here is probably what is causing the TypeError.

logging.info("DEBUG: RegisteredNode: %s, %s", name, machine_type)
self._nodes[name] = _NodeTracker(
name=name,
machine_type=machine_type,
Expand All @@ -173,7 +180,9 @@ def _StopWatchingForNodeChanges(self):
"Detected a kubernetes event indicating that a node (%s) is"
" to be removed, but we have no record of this node. We'll"
" ignore this node - it won't be counted in the"
" %s metric.", name, VM_TIME_METRIC
" %s metric.",
name,
VM_TIME_METRIC,
)
continue

Expand Down Expand Up @@ -242,11 +251,11 @@ def _StartTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec):
if stage != stages.RUN:
return

k8s_cluster: kubernetes_cluster.KubernetesCluster = (
benchmark_spec.container_cluster
)
if k8s_cluster is None:
if not isinstance(
benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster
):
return
k8s_cluster = benchmark_spec.container_cluster

global tracker
tracker = KubernetesResourceTracker(k8s_cluster)
Expand All @@ -264,11 +273,11 @@ def _StopTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec):
if stage != stages.RUN:
return

k8s_cluster: kubernetes_cluster.KubernetesCluster = (
benchmark_spec.container_cluster
)
if k8s_cluster is None:
if not isinstance(
benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster
):
return
k8s_cluster = benchmark_spec.container_cluster

if tracker is not None:
tracker.StopTracking()
Expand Down
Loading