diff --git a/perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2 b/perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2 index b95c778951..320d77eeca 100644 --- a/perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2 +++ b/perfkitbenchmarker/data/container/karpenter/nodepool.yaml.j2 @@ -15,19 +15,28 @@ spec: - key: karpenter.sh/capacity-type operator: In values: ["on-demand"] +{% if KARPENTER_INSTANCE_TYPES %} + - key: karpenter.k8s.aws/instance-type + operator: In + values: +{% for instance_type in KARPENTER_INSTANCE_TYPES %} + - "{{ instance_type }}" +{% endfor %} +{% else %} - key: karpenter.k8s.aws/instance-category operator: In - values: ["c", "m", "r"] + values: ["c", "m", "r", "t"] - key: karpenter.k8s.aws/instance-generation operator: Gt values: ["2"] +{% endif %} nodeClassRef: group: karpenter.k8s.aws kind: EC2NodeClass name: default expireAfter: 720h # 30 * 24h = 720h limits: - cpu: 1000 + cpu: {{ KARPENTER_NODEPOOL_CPU_LIMIT }} disruption: consolidationPolicy: WhenEmptyOrUnderutilized consolidateAfter: 1m diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py index d5ce1440c9..4b04815b4c 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py @@ -129,7 +129,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): """Sets additional spec attributes.""" bm_spec.always_call_cleanup = True assert bm_spec.container_cluster - _EnsureEksKarpenterGpuNodepool(bm_spec.container_cluster) + cluster = bm_spec.container_cluster + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + _EnsureEksKarpenterGpuNodepool(cluster) def _GetRolloutCreationTime(rollout_name: str) -> int: @@ -378,28 +380,20 @@ def GetStatusConditionsForResourceType( lastTransitionTime. """ - jsonpath = ( - r'{range .items[*]}' - # e.g. '"pod-name-1234": [, ...],\n' - r'{"\""}{.metadata.name}{"\": "}{.status.conditions}{",\n"}' - r'{end}' - ) + # Use full JSON output to avoid invalid JSON when manually building from + # jsonpath with many resources or on connection reset (truncated output). stdout, _, _ = kubectl.RunKubectlCommand( - [ - 'get', - resource_type, - '-o', - 'jsonpath=' + jsonpath, - ], - timeout=60 * 2, # 2 minutes; should be a pretty fast call. - # Output can be quite large, so we'll conditionally suppress it. + ['get', resource_type, '-o', 'json'], + timeout=60 * 5, # 5 minutes for large clusters (e.g. 1000 pods) suppress_logging=NUM_PODS.value > 20, ) - - # Convert output to valid json and parse it - stdout = stdout.rstrip('\t\n\r ,') - stdout = '{' + stdout + '}' - name_to_conditions = json.loads(stdout) + data = json.loads(stdout) + name_to_conditions = {} + for item in data.get('items', []): + name = item.get('metadata', {}).get('name') + conditions = item.get('status', {}).get('conditions') + if name is not None and conditions is not None: + name_to_conditions[name] = conditions for key in resources_to_ignore: name_to_conditions.pop(key, None) diff --git a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py index dbe3a64418..49e7c9cb8c 100644 --- a/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py +++ b/perfkitbenchmarker/providers/aws/elastic_kubernetes_service.py @@ -23,6 +23,7 @@ from collections import abc import json import logging +import math import re from typing import Any from urllib import parse @@ -704,9 +705,7 @@ def _Create(self): }], }, 'iamIdentityMappings': [{ - 'arn': ( - f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}' - ), + 'arn': f'arn:aws:iam::{self.account}:role/KarpenterNodeRole-{self.name}', 'username': 'system:node:{{EC2PrivateDNSName}}', 'groups': ['system:bootstrappers', 'system:nodes'], }], @@ -992,9 +991,29 @@ def _PostIngressNetworkingFixups( '[PKB][EKS] Allowed ALB SG %s -> node SGs on port %s', alb_sg, port ) + @staticmethod + def _DefaultNodepoolInstanceTypes() -> list[str]: + """EC2 types for default NodePool manifest (--eks_karpenter_nodepool_instance_types). + + Empty list means the Jinja template keeps instance-category/generation rules. + """ + return [ + t.strip() + for t in FLAGS.eks_karpenter_nodepool_instance_types + if t.strip() + ] + def _PostCreate(self): """Performs post-creation steps for the cluster.""" super()._PostCreate() + # Karpenter controller resources: default 1/1Gi; scale up when node_scale target is set. + num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', None) + if num_nodes is not None and num_nodes > 1000: + controller_cpu, controller_memory = 4, '16Gi' + elif num_nodes is not None and num_nodes >= 500: + controller_cpu, controller_memory = 2, '8Gi' + else: + controller_cpu, controller_memory = 1, '1Gi' vm_util.IssueCommand([ 'helm', 'upgrade', @@ -1013,13 +1032,13 @@ def _PostCreate(self): '--set', f'settings.interruptionQueue={self.name}', '--set', - 'controller.resources.requests.cpu=1', + f'controller.resources.requests.cpu={controller_cpu}', '--set', - 'controller.resources.requests.memory=1Gi', + f'controller.resources.requests.memory={controller_memory}', '--set', - 'controller.resources.limits.cpu=1', + f'controller.resources.limits.cpu={controller_cpu}', '--set', - 'controller.resources.limits.memory=1Gi', + f'controller.resources.limits.memory={controller_memory}', '--set', 'logLevel=debug', '--wait', @@ -1057,10 +1076,16 @@ def _PostCreate(self): 'v' + full_version.strip().strip('"').split(f'{self.cluster_version}-v')[1] ) + # NodePool CPU limit: scale with benchmark target (nodes * vCPU + 5%), min 1000. + num_nodes = getattr(FLAGS, 'kubernetes_scale_num_nodes', 5) + vcpu_per_node = FLAGS.eks_karpenter_limits_vcpu_per_node + cpu_limit = max(1000, math.ceil(num_nodes * vcpu_per_node * 1.05)) kubernetes_commands.ApplyManifest( 'container/karpenter/nodepool.yaml.j2', CLUSTER_NAME=self.name, ALIAS_VERSION=alias_version, + KARPENTER_NODEPOOL_CPU_LIMIT=cpu_limit, + KARPENTER_INSTANCE_TYPES=self._DefaultNodepoolInstanceTypes(), ) def _Delete(self): @@ -1149,21 +1174,21 @@ def _CleanupKarpenter(self): """Cleanup Karpenter managed nodes before cluster deletion.""" logging.info('Cleaning up Karpenter nodes...') # Delete NodePool resources - this will trigger node termination - kubectl.RunKubectlCommand( + kubectl.RunRetryableKubectlCommand( [ 'delete', 'nodepool,ec2nodeclass', '--all', '--timeout=120s', ], + timeout=300, suppress_failure=lambda stdout, stderr, retcode: ( 'no resources found' in stderr.lower() or 'not found' in stderr.lower() - or 'timed out waiting for the condition' in stderr.lower() ), ) # Wait for all Karpenter nodes to be deleted - kubectl.RunKubectlCommand( + kubectl.RunRetryableKubectlCommand( [ 'wait', '--for=delete', @@ -1172,9 +1197,10 @@ def _CleanupKarpenter(self): 'karpenter.sh/nodepool', '--timeout=120s', ], + timeout=300, suppress_failure=lambda stdout, stderr, retcode: ( 'no matching resources found' in stderr.lower() - or 'timed out' in stderr.lower() + or 'no resources found' in stderr.lower() ), ) # Force terminate remaining EC2 instances @@ -1246,21 +1272,39 @@ def _CleanupKarpenter(self): if eni_ids: logging.info('Deleting %d orphaned network interfaces', len(eni_ids)) for eni_id in eni_ids: - vm_util.IssueCommand( - [ - 'aws', - 'ec2', - 'delete-network-interface', - '--region', - self.region, - '--network-interface-id', - eni_id, - ], - suppress_failure=lambda stdout, stderr, retcode: ( - 'not found' in stderr.lower() - or 'does not exist' in stderr.lower() - ), - ) + # Bind eni_id by default to avoid loop closure issues if this is refactored. + def _delete_one_eni(eni_id=eni_id) -> None: + _, stderr, retcode = vm_util.IssueCommand( + [ + 'aws', + 'ec2', + 'delete-network-interface', + '--region', + self.region, + '--network-interface-id', + eni_id, + ], + raise_on_failure=False, + ) + if retcode == 0: + return + stderr_lower = (stderr or '').lower() + # ENI already deleted (e.g. by another process or previous attempt). + if 'invalidnetworkinterfaceid.notfound' in stderr_lower: + return + # RequestLimitExceeded (throttle): retry via vm_util.Retry. + if 'requestlimitexceeded' in stderr_lower: + raise errors.Resource.RetryableDeletionError(stderr or '') + raise errors.VmUtil.IssueCommandError( + f'DeleteNetworkInterface failed: {stderr}' + ) + + # max_retries=5 yields 6 CLI attempts (tries > 5 on 6th failure). + vm_util.Retry( + poll_interval=10, + max_retries=5, + retryable_exceptions=(errors.Resource.RetryableDeletionError,), + )(_delete_one_eni)() def _IsReady(self): """Returns True if cluster is running. Autopilot defaults to 0 nodes.""" diff --git a/perfkitbenchmarker/providers/aws/flags.py b/perfkitbenchmarker/providers/aws/flags.py index f4bc9a1cb3..e631f738d8 100644 --- a/perfkitbenchmarker/providers/aws/flags.py +++ b/perfkitbenchmarker/providers/aws/flags.py @@ -233,6 +233,19 @@ 'Whether to install AWS Load Balancer Controller in EKS Karpenter clusters' 'Default value - do not install unless explicitly requested', ) +flags.DEFINE_integer( + 'eks_karpenter_limits_vcpu_per_node', + 2, + 'Assumed vCPUs per provisioned node when computing Karpenter NodePool ' + 'limits.cpu on EKS (uses kubernetes_scale_num_nodes, this value, and 5% ' + 'headroom; minimum limit 1000). Raise for larger EC2 instance shapes.', +) +flags.DEFINE_list( + 'eks_karpenter_nodepool_instance_types', + [], + 'Comma-separated EC2 types for the Karpenter default NodePool (worker ' + 'nodes only). Empty keeps instance-category/generation in the template.', +) AWS_CAPACITY_BLOCK_RESERVATION_ID = flags.DEFINE_string( 'aws_capacity_block_reservation_id', None, diff --git a/perfkitbenchmarker/resources/container_service/kubectl.py b/perfkitbenchmarker/resources/container_service/kubectl.py index c5e55d5d22..09d5b344e4 100644 --- a/perfkitbenchmarker/resources/container_service/kubectl.py +++ b/perfkitbenchmarker/resources/container_service/kubectl.py @@ -18,6 +18,9 @@ 'error sending request:', '(abnormal closure): unexpected EOF', 'deadline exceeded', + # kubectl wait/delete timeouts and connection errors (retried in EKS cleanup) + 'timed out', + 'unable to connect to the server', ] @@ -38,8 +41,9 @@ def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode): # Check for kubectl timeout. If found, treat it the same as a regular # timeout. if retcode != 0: + stderr_lower = stderr.lower() for error_substring in RETRYABLE_KUBECTL_ERRORS: - if error_substring in stderr: + if error_substring.lower() in stderr_lower: # Raise timeout error regardless of raise_on_failure - as the intended # semantics is to ignore expected errors caused by invoking the # command not errors from PKB infrastructure. diff --git a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py index 73f957bcd1..fa1a228c41 100644 --- a/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py +++ b/perfkitbenchmarker/resources/container_service/kubernetes_cluster.py @@ -294,7 +294,7 @@ def _DeleteAllFromDefaultNamespace(): run_cmd = ['delete', 'job', '--all', '-n', 'default'] kubectl.RunRetryableKubectlCommand(run_cmd) - timeout = 60 * 20 + timeout = 60 * 60 # 1 hour for kubectl delete all -n default (teardown) run_cmd = [ 'delete', 'all', diff --git a/perfkitbenchmarker/traces/kubernetes_tracker.py b/perfkitbenchmarker/traces/kubernetes_tracker.py index 66aacc3b11..5472347b42 100644 --- a/perfkitbenchmarker/traces/kubernetes_tracker.py +++ b/perfkitbenchmarker/traces/kubernetes_tracker.py @@ -141,6 +141,12 @@ def _StopWatchingForNodeChanges(self): """Stop watching the cluster for node add/remove events.""" polled_events = self._cluster.GetEvents() + # Resolve machine type only for current nodes; use "unknown" for the rest. + _node_names = kubernetes_commands.GetNodeNames(suppress_logging=True) + _current_node_names = set(_node_names) + if _node_names: + _GetMachineTypeFromNodeName(self._cluster, _node_names[0]) + for e in polled_events: if e.resource.kind != "Node": continue @@ -156,10 +162,11 @@ def _StopWatchingForNodeChanges(self): if name in self._nodes: continue - machine_type = _GetMachineTypeFromNodeName(self._cluster, name) - logging.info( - "DEBUG: RegisteredNode: %s, %s", name, machine_type - ) + if name in _current_node_names: + machine_type = _GetMachineTypeFromNodeName(self._cluster, name) + else: + machine_type = "unknown" + logging.info("DEBUG: RegisteredNode: %s, %s", name, machine_type) self._nodes[name] = _NodeTracker( name=name, machine_type=machine_type, @@ -173,7 +180,9 @@ def _StopWatchingForNodeChanges(self): "Detected a kubernetes event indicating that a node (%s) is" " to be removed, but we have no record of this node. We'll" " ignore this node - it won't be counted in the" - " %s metric.", name, VM_TIME_METRIC + " %s metric.", + name, + VM_TIME_METRIC, ) continue @@ -242,11 +251,11 @@ def _StartTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec): if stage != stages.RUN: return - k8s_cluster: kubernetes_cluster.KubernetesCluster = ( - benchmark_spec.container_cluster - ) - if k8s_cluster is None: + if not isinstance( + benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster + ): return + k8s_cluster = benchmark_spec.container_cluster global tracker tracker = KubernetesResourceTracker(k8s_cluster) @@ -264,11 +273,11 @@ def _StopTrackingVMUsage(stage: str, benchmark_spec: bm_spec.BenchmarkSpec): if stage != stages.RUN: return - k8s_cluster: kubernetes_cluster.KubernetesCluster = ( - benchmark_spec.container_cluster - ) - if k8s_cluster is None: + if not isinstance( + benchmark_spec.container_cluster, kubernetes_cluster.KubernetesCluster + ): return + k8s_cluster = benchmark_spec.container_cluster if tracker is not None: tracker.StopTracking() diff --git a/tests/linux_benchmarks/kubernetes_scale_benchmark_test.py b/tests/linux_benchmarks/kubernetes_scale_benchmark_test.py index f470850cbc..b77b9f62c4 100644 --- a/tests/linux_benchmarks/kubernetes_scale_benchmark_test.py +++ b/tests/linux_benchmarks/kubernetes_scale_benchmark_test.py @@ -1,5 +1,6 @@ """Tests for kubernetes_scale_benchmark, especially parsing events.""" +import json import unittest from absl.testing import flagsaver @@ -38,47 +39,59 @@ def testTimestampConvert(self): ) def testPodStatusConditions(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod123'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:19Z', + 'status': 'True', + 'type': 'PodReadyToStartContainers', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T18:51:17Z', + 'status': 'True', + 'type': 'Initialized', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:19Z', + 'status': 'True', + 'type': 'Ready', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:19Z', + 'status': 'True', + 'type': 'ContainersReady', + }, + ] + }, + }, + { + 'metadata': {'name': 'pod456'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T18:51:17Z', + 'status': 'True', + 'type': 'PodScheduled', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - return_value=( - """ - "pod123": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:19Z", - "status":"True", - "type":"PodReadyToStartContainers" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T18:51:17Z", - "status":"True", - "type":"Initialized" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:19Z", - "status":"True", - "type":"Ready" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:19Z", - "status":"True", - "type":"ContainersReady" - } - ], - "pod456": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T18:51:17Z", - "status":"True", - "type":"PodScheduled" - } - ], - """, - '', - 0, - ), + return_value=(stdout, '', 0), ) ) conditions = kubernetes_scale_benchmark.GetStatusConditionsForResourceType( @@ -88,37 +101,47 @@ def testPodStatusConditions(self): self.assertLen(conditions, 5) def testPodStatusConditionsWithIgnoredResources(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod123'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:19Z', + 'status': 'True', + 'type': 'PodReadyToStartContainers', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T18:51:17Z', + 'status': 'True', + 'type': 'Initialized', + }, + ] + }, + }, + { + 'metadata': {'name': 'pod456'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T18:51:17Z', + 'status': 'True', + 'type': 'PodScheduled', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - return_value=( - """ - "pod123": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:19Z", - "status":"True", - "type":"PodReadyToStartContainers" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T18:51:17Z", - "status":"True", - "type":"Initialized" - } - ], - "pod456": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T18:51:17Z", - "status":"True", - "type":"PodScheduled" - } - ], - """, - '', - 0, - ), + return_value=(stdout, '', 0), ) ) conditions = kubernetes_scale_benchmark.GetStatusConditionsForResourceType( @@ -128,24 +151,28 @@ def testPodStatusConditionsWithIgnoredResources(self): self.assertLen(conditions, 2) def testOneStatForOnePod(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod1'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'Ready', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - side_effect=[ - ( - """ - "pod1": [{ - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"Ready" - }], - """, - '', - 0, - ), - ], + side_effect=[(stdout, '', 0)], ) ) samples = kubernetes_scale_benchmark.ParseStatusChanges('pod', 50) @@ -159,36 +186,48 @@ def testOneStatForOnePod(self): self.assertEqual(samples_by_metric['pod_Ready_count'].value, 1) def testOneStatForMultiplePods(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod1'}, + 'status': { + 'conditions': [{ + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'Ready', + }] + }, + }, + { + 'metadata': {'name': 'pod2'}, + 'status': { + 'conditions': [{ + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:00:40Z', + 'status': 'True', + 'type': 'Ready', + }] + }, + }, + { + 'metadata': {'name': 'pod3'}, + 'status': { + 'conditions': [{ + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:20Z', + 'status': 'True', + 'type': 'Ready', + }] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - side_effect=[ - ( - """ - "pod1": [{ - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"Ready" - }], - "pod2": [{ - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:00:40Z", - "status":"True", - "type":"Ready" - }], - "pod3": [{ - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:20Z", - "status":"True", - "type":"Ready" - }], - """, - '', - 0, - ), - ], + side_effect=[(stdout, '', 0)], ) ) samples = kubernetes_scale_benchmark.ParseStatusChanges('pod', 40) @@ -205,31 +244,34 @@ def testOneStatForMultiplePods(self): self.assertEqual(samples_by_metric['pod_Ready_count'].value, 3) def testMultipleStatForOnePod(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod1'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'Ready', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'ContainersReady', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - side_effect=[ - ( - """ - "pod1": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"Ready" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"ContainersReady" - } - ], - """, - '', - 0, - ), - ], + side_effect=[(stdout, '', 0)], ) ) samples = kubernetes_scale_benchmark.ParseStatusChanges('pod', 40) @@ -241,24 +283,28 @@ def testMultipleStatForOnePod(self): self.assertIn('pod_ContainersReady_p50', samples_by_metric.keys()) def testOneStatForOneNode(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'node1'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'Ready', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - side_effect=[ - ( - """ - "node1": [{ - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"Ready" - }], - """, - '', - 0, - ), - ], + side_effect=[(stdout, '', 0)], ) ) samples = kubernetes_scale_benchmark.ParseStatusChanges('node', 50) @@ -274,31 +320,34 @@ def testOneStatForOneNode(self): @flagsaver.flagsaver(kubernetes_scale_report_latency_percentiles=False) @flagsaver.flagsaver(kubernetes_scale_report_individual_latencies=True) def testReportLatenciesMultipleStatsOnePod(self): + stdout = json.dumps({ + 'items': [ + { + 'metadata': {'name': 'pod1'}, + 'status': { + 'conditions': [ + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'Ready', + }, + { + 'lastProbeTime': None, + 'lastTransitionTime': '1970-01-01T00:01:00Z', + 'status': 'True', + 'type': 'ContainersReady', + }, + ] + }, + }, + ] + }) self.enter_context( mock.patch.object( kubectl, 'RunKubectlCommand', - side_effect=[ - ( - """ - "pod1": [ - { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"Ready" - }, { - "lastProbeTime":null, - "lastTransitionTime":"1970-01-01T00:01:00Z", - "status":"True", - "type":"ContainersReady" - } - ], - """, - '', - 0, - ), - ], + side_effect=[(stdout, '', 0)], ) ) samples = kubernetes_scale_benchmark.ParseStatusChanges('pod', 40) diff --git a/tests/traces/kubernetes_tracker_test.py b/tests/traces/kubernetes_tracker_test.py index 287130efc0..acf0988775 100644 --- a/tests/traces/kubernetes_tracker_test.py +++ b/tests/traces/kubernetes_tracker_test.py @@ -45,7 +45,7 @@ def testTrackUsageWithFixedNumberOfNodes(self): cluster = CreateMockCluster( name="pkb-cluster", machine_type="e2-standard-8" ) - kubernetes_commands.GetNodeNames = lambda: [ + kubernetes_commands.GetNodeNames = lambda suppress_logging=False: [ "gke-pkb-cluster-default-pool-node-1", "gke-pkb-cluster-default-pool-node-2", "gke-pkb-cluster-default-pool-node-3", @@ -79,9 +79,15 @@ def testTrackUsageWithVariableNumberOfNodes(self): cluster = CreateMockCluster( name="pkb-cluster", machine_type="e2-standard-8" ) - kubernetes_commands.GetNodeNames = lambda: [ - "gke-pkb-cluster-default-pool-node-1" - ] + kubernetes_commands.GetNodeNames = mock.Mock( + side_effect=[ + ["gke-pkb-cluster-default-pool-node-1"], + [ + "gke-pkb-cluster-default-pool-node-1", + "gke-pkb-cluster-default-pool-node-2", + ], + ] + ) # pylint: disable=invalid-name cluster.GetEvents = lambda: [ kubernetes_events.KubernetesEvent( @@ -131,7 +137,7 @@ def testTrackUsageWithMultipleNodePools(self): "node-pool-2": "n2-highcpu-96", }, ) - kubernetes_commands.GetNodeNames = lambda: [ + kubernetes_commands.GetNodeNames = lambda suppress_logging=False: [ "gke-pkb-cluster-node-pool-1-node-1", "gke-pkb-cluster-node-pool-2-node-2", ] @@ -166,7 +172,7 @@ def testTrackUsageWithEventAddingNodeBeforeBenchmarkRun(self): cluster = CreateMockCluster( name="pkb-cluster", machine_type="e2-standard-8" ) - kubernetes_commands.GetNodeNames = lambda: [ + kubernetes_commands.GetNodeNames = lambda suppress_logging=False: [ "gke-pkb-cluster-default-pool-node-1" ] cluster.GetEvents = lambda: [ @@ -242,7 +248,7 @@ def CreateMockCluster( ) ) cluster.GetEvents = lambda: [] - kubernetes_commands.GetNodeNames = lambda: [] + kubernetes_commands.GetNodeNames = lambda suppress_logging=False: [] return cluster