From 4c8644ecb8a4f3a1679174aaae201c3a1ac5c0db Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Thu, 5 Mar 2026 11:54:44 +0200 Subject: [PATCH 01/26] Enable autoscaling for additional node pools --- .../providers/gcp/google_kubernetes_engine.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index 90b05878aa..d44bdf25fa 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -387,6 +387,13 @@ def _CreateNodePools(self): nodepool, cmd, ) + if ( + self.min_nodes != self.default_nodepool.num_nodes + or self.max_nodes != self.default_nodepool.num_nodes + ): + cmd.args.append('--enable-autoscaling') + cmd.flags['max-nodes'] = self.max_nodes + cmd.flags['min-nodes'] = self.min_nodes self._IssueResourceCreationCommand(cmd) def _AddNodeParamsToCmd( From a447fc778dd6a39e4b2197dbe037773452e00302 Mon Sep 17 00:00:00 2001 From: Arushi Gaur Date: Thu, 5 Mar 2026 10:44:33 -0800 Subject: [PATCH 02/26] Fix incorrect numjob value in iops under SLA benchmark PiperOrigin-RevId: 879143695 --- .../linux_benchmarks/fio/fio_latency_sla_benchmark.py | 1 + 1 file changed, 1 insertion(+) diff --git a/perfkitbenchmarker/linux_benchmarks/fio/fio_latency_sla_benchmark.py b/perfkitbenchmarker/linux_benchmarks/fio/fio_latency_sla_benchmark.py index 6761979dc1..47f27c0e55 100644 --- a/perfkitbenchmarker/linux_benchmarks/fio/fio_latency_sla_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/fio/fio_latency_sla_benchmark.py @@ -141,6 +141,7 @@ def Run(spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: while numjobs > 0: left_iodepth = 1 right_iodepth = max_iodepth + benchmark_params['numjobs'] = numjobs iodepth_details[numjobs] = {} while left_iodepth <= right_iodepth: iodepth = (left_iodepth + right_iodepth) // 2 From 99fdd28bce1d0963d59ddee387ac3a3811251943 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Thu, 5 Mar 2026 12:05:46 -0800 Subject: [PATCH 03/26] Make Memtier load command retry on connection failures. PiperOrigin-RevId: 879183201 --- perfkitbenchmarker/linux_packages/memtier.py | 2 +- tests/linux_packages/memtier_test.py | 18 ++++++++++-------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/perfkitbenchmarker/linux_packages/memtier.py b/perfkitbenchmarker/linux_packages/memtier.py index e1a4bc0cfc..1a606e2f6a 100644 --- a/perfkitbenchmarker/linux_packages/memtier.py +++ b/perfkitbenchmarker/linux_packages/memtier.py @@ -485,7 +485,7 @@ def _LoadSingleVM( tls=MEMTIER_TLS.value, expiry_range=MEMTIER_EXPIRY_RANGE.value, ) - load_vm.RemoteCommand(cmd) + _IssueRetryableCommand(load_vm, cmd) def Load( diff --git a/tests/linux_packages/memtier_test.py b/tests/linux_packages/memtier_test.py index 564a747b58..210b96fd35 100644 --- a/tests/linux_packages/memtier_test.py +++ b/tests/linux_packages/memtier_test.py @@ -1781,20 +1781,22 @@ def testLoad(self): vm1 = mock.Mock() vm2 = mock.Mock() test_vms = [vm1, vm2] + vm1.RobustRemoteCommand.return_value = ('', '') + vm2.RobustRemoteCommand.return_value = ('', '') memtier.Load(test_vms, 'test_ip', 9999) - vm1.RemoteCommand.assert_called_once_with( - matchers.HAS('--key-minimum 1 --key-maximum 500') + vm1.RobustRemoteCommand.assert_called_once_with( + matchers.HAS('--key-minimum 1 --key-maximum 500'), timeout=mock.ANY ) - vm2.RemoteCommand.assert_called_once_with( - matchers.HAS('--key-minimum 500 --key-maximum 1000') + vm2.RobustRemoteCommand.assert_called_once_with( + matchers.HAS('--key-minimum 500 --key-maximum 1000'), timeout=mock.ANY ) - vm1.RemoteCommand.assert_called_once_with( - matchers.HAS('--data-size-list 1024:1,32:1') + vm1.RobustRemoteCommand.assert_called_once_with( + matchers.HAS('--data-size-list 1024:1,32:1'), timeout=mock.ANY ) - vm2.RemoteCommand.assert_called_once_with( - matchers.HAS('--data-size-list 1024:1,32:1') + vm2.RobustRemoteCommand.assert_called_once_with( + matchers.HAS('--data-size-list 1024:1,32:1'), timeout=mock.ANY ) @parameterized.named_parameters( From 488ae697df1a6d6773c674c4c9409566e498b5bc Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Thu, 5 Mar 2026 13:27:56 -0800 Subject: [PATCH 04/26] Disable retries for Memtier commands when using multiple client VMs. PiperOrigin-RevId: 879219851 --- perfkitbenchmarker/linux_packages/memtier.py | 9 ++++++++- tests/linux_packages/memtier_test.py | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/perfkitbenchmarker/linux_packages/memtier.py b/perfkitbenchmarker/linux_packages/memtier.py index 1a606e2f6a..5cfa460abd 100644 --- a/perfkitbenchmarker/linux_packages/memtier.py +++ b/perfkitbenchmarker/linux_packages/memtier.py @@ -563,6 +563,7 @@ def DistributeClientsToPorts(port_index): clients=clients, password=password, unique_id=str(port_index), + retry_on_failure=len(client_vms) <= 1, ) results = background_tasks.RunThreaded( @@ -646,6 +647,8 @@ def _RunParallelConnections( for conn in connections: connections_by_vm[conn.client_vm].append(conn) + base_args['retry_on_failure'] = len(connections_by_vm) <= 1 + # Currently more than one client VM will cause shards to be distributed # evenly between them. This behavior could be customized later with a flag. if len(connections_by_vm) > 1: @@ -1151,6 +1154,7 @@ def _Run( password: str | None = None, unique_id: str | None = None, shard_addresses: str | None = None, + retry_on_failure: bool = True, ) -> 'MemtierResult': """Runs the memtier benchmark on the vm.""" logging.info( @@ -1223,7 +1227,10 @@ def _Run( logging.info('Memtier command: %s', cmd) # Add a buffer to the timeout to account for command overhead. timeout = test_time + 100 if test_time else None - _IssueRetryableCommand(vm, cmd, timeout=timeout) + if retry_on_failure: + _IssueRetryableCommand(vm, cmd, timeout=timeout) + else: + vm.RobustRemoteCommand(cmd, timeout=timeout) output_path = os.path.join(vm_util.GetTempDir(), memtier_results_file_name) vm_util.IssueCommand(['rm', '-f', output_path]) diff --git a/tests/linux_packages/memtier_test.py b/tests/linux_packages/memtier_test.py index 210b96fd35..7b1ec932dd 100644 --- a/tests/linux_packages/memtier_test.py +++ b/tests/linux_packages/memtier_test.py @@ -1573,6 +1573,7 @@ def testRunParallelSingleVm(self): 'pipeline': 3, 'password': None, 'unique_id': vm1.ip_address, + 'retry_on_failure': True, }, ), ], @@ -1618,6 +1619,7 @@ def testRunParallelMultipleVms(self): '10.0.1.117:6379,10.0.2.104:6379,10.0.3.217:6379' ), 'unique_id': 'vm1', + 'retry_on_failure': False, }, ), ( @@ -1634,6 +1636,7 @@ def testRunParallelMultipleVms(self): '10.0.2.177:6379,10.0.1.174:6379,10.0.3.6:6379' ), 'unique_id': 'vm2', + 'retry_on_failure': False, }, ), ], From e4ce438617dfd9101cda0481d3e0751f8acc9e6d Mon Sep 17 00:00:00 2001 From: p3rf Team Date: Thu, 5 Mar 2026 13:57:54 -0800 Subject: [PATCH 05/26] Add a flag to retry resource creation on insufficient capacity errors. PiperOrigin-RevId: 879232831 --- CHANGES.next.md | 2 ++ perfkitbenchmarker/resource.py | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGES.next.md b/CHANGES.next.md index a6d1fbccef..f48cfe47de 100644 --- a/CHANGES.next.md +++ b/CHANGES.next.md @@ -448,6 +448,8 @@ - Add support for enabling live migration on AMD SEV - Increased maintenance simulation notification timeout to 4 hours in maintenance_simulation_trigger.py. +- Added `--retry_on_insufficient_capacity_cloud_failure` so that resource + creation can be retried on stock outs. ### Bug fixes and maintenance updates: diff --git a/perfkitbenchmarker/resource.py b/perfkitbenchmarker/resource.py index 5ad02af79a..cff757029c 100644 --- a/perfkitbenchmarker/resource.py +++ b/perfkitbenchmarker/resource.py @@ -31,6 +31,13 @@ FLAGS = flags.FLAGS +_RETRY_ON_INSUFFICIENT_CAPACITY_CLOUD_FAILURE = flags.DEFINE_boolean( + 'retry_on_insufficient_capacity_cloud_failure', + False, + 'Whether to retry resource creation on insufficient capacity cloud' + ' failure.', +) + _RESOURCE_REGISTRY = {} RegisteredType = TypeVar('RegisteredType') ResourceType = type[RegisteredType] @@ -286,7 +293,15 @@ def _CreateResource(self): # that the resource was not actually being created on the # backend during previous failed attempts. self.create_start_time = time.time() - self._Create() + try: + self._Create() + except errors.Benchmarks.InsufficientCapacityCloudFailure as e: + if _RETRY_ON_INSUFFICIENT_CAPACITY_CLOUD_FAILURE.value: + raise errors.Resource.RetryableCreationError( + 'Creation of %s failed.' % type(self).__name__ + ) from e + + raise try: if not self._Exists(): raise errors.Resource.RetryableCreationError( From 0161a491073de590922ab8c35c0d97532818bb35 Mon Sep 17 00:00:00 2001 From: Andy Zhu Date: Thu, 5 Mar 2026 15:13:33 -0800 Subject: [PATCH 06/26] Add parsing for more lmbench metrics, including socket bandwidth. PiperOrigin-RevId: 879267755 --- .../linux_benchmarks/lmbench_benchmark.py | 47 +++++++++++++++++-- .../lmbench_benchmark_test.py | 11 ++++- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/lmbench_benchmark.py b/perfkitbenchmarker/linux_benchmarks/lmbench_benchmark.py index 75e8d0784b..9a4c59e791 100644 --- a/perfkitbenchmarker/linux_benchmarks/lmbench_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/lmbench_benchmark.py @@ -20,6 +20,7 @@ import itertools import logging +import re from absl import flags from perfkitbenchmarker import configs from perfkitbenchmarker import regex_util @@ -158,6 +159,32 @@ def _ParseContextSwitching(lines, title, metadata, results): ) +def _ParseSocketBandwidth(lines, unused_title, metadata, results): + """Parse the socket bandwidth test results. + + Args: + lines: The lines following socket bandwidth title. + metadata: A dictionary of metadata. + results: A list of samples to be published. + """ + for line in lines: + parts = line.split() + if len(parts) == 3 and parts[2] == 'MB/sec': + msg_size_mb = float(parts[0]) + bandwidth = float(parts[1]) + unit = parts[2] + current_metadata = metadata.copy() + current_metadata['message_size_MB'] = msg_size_mb + results.append( + sample.Sample( + 'socket_bandwidth', + bandwidth, + unit, + current_metadata, + ) + ) + + def _UpdataMetadata(lmbench_output, metadata): metadata['MB'] = regex_util.ExtractGroup('MB: ([0-9]*)', lmbench_output) metadata['BENCHMARK_HARDWARE'] = regex_util.ExtractGroup( @@ -239,10 +266,13 @@ def _AddProcessorMetricSamples( """ for metric in processor_metric_list: - regex = '%s: (.*)' % metric - value_unit = regex_util.ExtractGroup(regex, lmbench_output) - [value, unit] = value_unit.split(' ') - if unit == 'microseconds': + regex = rf'{metric}: ([^ ]+) (.+)' + match = re.search(regex, lmbench_output) + if not match: + logging.warning('Failed to extract metric: %s', metric) + continue + value, unit = match.groups() + if unit in ['microseconds', 'MB/sec']: results.append( sample.Sample( '%s' % metric.replace('\\', ''), float(value), unit, metadata @@ -278,9 +308,14 @@ def _ParseOutput(lmbench_output): 'Signal handler overhead', 'Protection fault', 'Pipe latency', + 'Pipe bandwidth', + 'AF_UNIX sock stream bandwidth', r'Process fork\+exit', r'Process fork\+execve', r'Process fork\+/bin/sh -c', + 'Pagefaults on /var/tmp/XXX', + 'TCP latency using localhost', + 'TCP/IP connection cost to localhost', ) _AddProcessorMetricSamples( lmbench_output, processor_metric_list, metadata, results @@ -288,6 +323,10 @@ def _ParseOutput(lmbench_output): # Parse some sections from the output. parse_section_func_dict = {} + if 'Socket bandwidth using localhost' in lmbench_output: + parse_section_func_dict['Socket bandwidth using localhost'] = ( + _ParseSocketBandwidth + ) contex_switching_titles = regex_util.ExtractAllMatches( '"size=.* ovr=.*', lmbench_output ) diff --git a/tests/linux_benchmarks/lmbench_benchmark_test.py b/tests/linux_benchmarks/lmbench_benchmark_test.py index d67cd5ba8f..1ad2b9d6e3 100644 --- a/tests/linux_benchmarks/lmbench_benchmark_test.py +++ b/tests/linux_benchmarks/lmbench_benchmark_test.py @@ -35,7 +35,7 @@ def setUp(self): def testParseLmbench(self): samples = lmbench_benchmark._ParseOutput(self.contents) - self.assertEqual(61, len(samples)) + self.assertEqual(74, len(samples)) # Test metadata metadata = samples[0].metadata @@ -63,6 +63,15 @@ def testParseLmbench(self): self.assertAlmostEqual( 800.2188, processor_results['Process fork+/bin/sh -c'] ) + self.assertAlmostEqual( + 11056.28, processor_results['AF_UNIX sock stream bandwidth'] + ) + self.assertAlmostEqual(2648.95, processor_results['Pipe bandwidth']) + + socket_bandwidth = [s for s in samples if s.metric == 'socket_bandwidth'] + self.assertEqual(8, len(socket_bandwidth)) + self.assertAlmostEqual(1.71, socket_bandwidth[0].value) + self.assertEqual(0.000001, socket_bandwidth[0].metadata['message_size_MB']) sample = next( x From 667eecd887616dab48af0a383b19e6e7aadb51d4 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Fri, 6 Mar 2026 07:54:41 -0800 Subject: [PATCH 07/26] Add wait stats query before HammerDB run. PiperOrigin-RevId: 879636524 --- .../data/capture_db_scoped_configurations.sql | 32 ++++++++++++ .../linux_benchmarks/hammerdbcli_benchmark.py | 5 +- perfkitbenchmarker/relational_db.py | 49 +++++++++++++++++++ .../hammerdbcli_benchmark.py | 5 +- 4 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 perfkitbenchmarker/data/capture_db_scoped_configurations.sql diff --git a/perfkitbenchmarker/data/capture_db_scoped_configurations.sql b/perfkitbenchmarker/data/capture_db_scoped_configurations.sql new file mode 100644 index 0000000000..fd260b3c5f --- /dev/null +++ b/perfkitbenchmarker/data/capture_db_scoped_configurations.sql @@ -0,0 +1,32 @@ +SELECT * FROM sys.databases; + +SELECT * FROM sys.master_files; + +DECLARE @dbname SYSNAME; +DECLARE @cmd NVARCHAR(300); + +DECLARE db_cursor CURSOR LOCAL FAST_FORWARD FOR +SELECT name +FROM sys.databases d +WHERE + d.state = 0 + OPEN db_cursor; + +FETCH NEXT FROM db_cursor INTO @dbname; + +WHILE @@FETCH_STATUS += 0 + BEGIN +SET + @cmd = 'USE ' + + QUOTENAME(@dbname) + + '; SELECT DB_NAME() as databasename,* FROM sys.database_scoped_configurations' + EXEC sp_executesql @cmd; + +FETCH NEXT FROM db_cursor INTO @dbname; + +END; + +CLOSE db_cursor; + +DEALLOCATE db_cursor; diff --git a/perfkitbenchmarker/linux_benchmarks/hammerdbcli_benchmark.py b/perfkitbenchmarker/linux_benchmarks/hammerdbcli_benchmark.py index a8333813f8..4fb09cd140 100644 --- a/perfkitbenchmarker/linux_benchmarks/hammerdbcli_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/hammerdbcli_benchmark.py @@ -361,13 +361,12 @@ def _CheckAlloyDbColumnarEngine( def _PreRun(db: relational_db.BaseRelationalDb) -> None: """Prepares the database for the benchmark run.""" db.ClearWaitStats() - db.QueryIOStats() + db.LogDatabaseDebugInfo() def _PostRun(db: relational_db.BaseRelationalDb) -> None: """Records the database metrics after the benchmark run.""" - db.QueryWaitStats() - db.QueryIOStats() + db.LogDatabaseDebugInfo() def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> list[sample.Sample]: diff --git a/perfkitbenchmarker/relational_db.py b/perfkitbenchmarker/relational_db.py index 309b9f781d..8e4c922cd9 100644 --- a/perfkitbenchmarker/relational_db.py +++ b/perfkitbenchmarker/relational_db.py @@ -252,6 +252,10 @@ 'sys.dm_io_virtual_file_stats(NULL,NULL)' ) +CAPTURE_TRACE_STATUS_SQL = 'DBCC TRACESTATUS(-1)' +CAPTURE_SERVER_EVENT_SESSIONS_SQL = 'SELECT * FROM sys.server_event_sessions' +SELECT_VERSION_SQL = 'SELECT @@VERSION' + class RelationalDbPropertyNotSetError(Exception): pass @@ -429,6 +433,51 @@ def QueryWaitStats(self) -> tuple[str, str]: return self.client_vm_query_tools.IssueSqlCommand(f.read()) return ('', '') + def QueryServerVersion(self) -> tuple[str, str]: + if self.engine_type != sql_engine_utils.SQLSERVER: + return ('', '') + logging.info('Querying server version') + return self.client_vm_query_tools.IssueSqlCommand(SELECT_VERSION_SQL) + + def QueryDatabaseScopedConfigurations(self) -> tuple[str, str]: + if self.engine_type != sql_engine_utils.SQLSERVER: + return ('', '') + logging.info('Querying database scoped configurations') + with open( + data.ResourcePath('capture_db_scoped_configurations.sql'), 'r' + ) as f: + return self.client_vm_query_tools.IssueSqlCommand(f.read()) + + def QueryTraceStatus(self) -> tuple[str, str]: + if self.engine_type != sql_engine_utils.SQLSERVER: + return ('', '') + logging.info('Querying trace status') + return self.client_vm_query_tools.IssueSqlCommand(CAPTURE_TRACE_STATUS_SQL) + + def QueryServerEventSessions(self) -> tuple[str, str]: + if self.engine_type != sql_engine_utils.SQLSERVER: + return ('', '') + logging.info('Querying server event sessions') + return self.client_vm_query_tools.IssueSqlCommand( + CAPTURE_SERVER_EVENT_SESSIONS_SQL + ) + + def LogDatabaseDebugInfo(self) -> None: + """Logs database debug information.""" + queries = [ + (self.QueryWaitStats, 'Wait Stats'), + (self.QueryIOStats, 'IO Stats'), + (self.QueryServerVersion, 'DB Version'), + (self.QueryDatabaseScopedConfigurations, 'DB Configuration'), + (self.QueryTraceStatus, 'Trace Status'), + (self.QueryServerEventSessions, 'Server Event Sessions'), + ] + + for query_method, name in queries: + stdout, _ = query_method() + if stdout: + logging.info('%s:\n%s', name, stdout) + @property def port(self): """Port (int) on which the database server is listening.""" diff --git a/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py b/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py index 8fb0edc3c6..ea51d09a5d 100644 --- a/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py +++ b/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py @@ -332,13 +332,12 @@ def SetMinimumRecover(db): def _PreRun(db: relational_db.BaseRelationalDb): """Prepares the database for the benchmark run.""" db.ClearWaitStats() - db.QueryIOStats() + db.LogDatabaseDebugInfo() def _PostRun(db: relational_db.BaseRelationalDb): """Records the database metrics after the benchmark run.""" - db.QueryWaitStats() - db.QueryIOStats() + db.LogDatabaseDebugInfo() def Run(benchmark_spec): From 52c0a29c239d30fa32ecc5ebd1df3c1bfc68cf72 Mon Sep 17 00:00:00 2001 From: Brandon Liu Date: Fri, 6 Mar 2026 09:02:21 -0800 Subject: [PATCH 08/26] Add background collection of SQL Server performance counters during HammerDB runs. PiperOrigin-RevId: 879663501 --- perfkitbenchmarker/relational_db.py | 23 ++++++++++ .../hammerdbcli_benchmark.py | 43 +++++++++++------- .../windows_packages/hammerdb.py | 19 +++++++- .../hammerdbcli_benchmark_test.py | 45 +++++++++++++++++++ 4 files changed, 112 insertions(+), 18 deletions(-) create mode 100644 tests/windows_benchmarks/hammerdbcli_benchmark_test.py diff --git a/perfkitbenchmarker/relational_db.py b/perfkitbenchmarker/relational_db.py index 8e4c922cd9..58ccd772d3 100644 --- a/perfkitbenchmarker/relational_db.py +++ b/perfkitbenchmarker/relational_db.py @@ -254,6 +254,16 @@ CAPTURE_TRACE_STATUS_SQL = 'DBCC TRACESTATUS(-1)' CAPTURE_SERVER_EVENT_SESSIONS_SQL = 'SELECT * FROM sys.server_event_sessions' +CAPTURE_PERFORMANCE_COUNTERS_SQL = ( + 'SELECT SYSDATETIME() AS CURRENTTIME,' + '[cntr_value] AS countervalue,counter_name ' + 'FROM sys.dm_os_performance_counters ' + "WHERE [object_name] LIKE '%Manager%' " + "AND [counter_name] IN ('Page life expectancy'," + "'Buffer cache hit ratio','Buffer cache hit ratio base'," + "'Lazy writes/sec','Memory Grants Pending','Free list stalls/sec'," + "'Target Server Memory (KB)','Total Server Memory (KB)')" +) SELECT_VERSION_SQL = 'SELECT @@VERSION' @@ -478,6 +488,19 @@ def LogDatabaseDebugInfo(self) -> None: if stdout: logging.info('%s:\n%s', name, stdout) + def QueryPerformanceCounters(self) -> tuple[str, str]: + """Queries and logs performance counters. + + Returns: + A tuple of stdout and stderr from the command execution. + """ + if self.engine_type != sql_engine_utils.SQLSERVER: + return ('', '') + logging.info('Querying performance counters') + return self.client_vm_query_tools.IssueSqlCommand( + CAPTURE_PERFORMANCE_COUNTERS_SQL + ) + @property def port(self): """Port (int) on which the database server is listening.""" diff --git a/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py b/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py index ea51d09a5d..48915dab19 100644 --- a/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py +++ b/perfkitbenchmarker/windows_benchmarks/hammerdbcli_benchmark.py @@ -17,8 +17,10 @@ This benchmark uses Windows as the OS for both the database server and the HammerDB client(s). """ + import datetime import logging +import threading import time from absl import flags from perfkitbenchmarker import configs @@ -244,9 +246,7 @@ def Prepare(benchmark_spec): ALTER DATABASE tpcc SET SINGLE_USER WITH ROLLBACK IMMEDIATE; DROP DATABASE tpcc; END;""" - db.client_vm_query_tools.IssueSqlCommand( - drop_sql, timeout=60 * 20 - ) + db.client_vm_query_tools.IssueSqlCommand(drop_sql, timeout=60 * 20) is_azure = FLAGS.cloud == 'Azure' and FLAGS.use_managed_db if ( @@ -256,17 +256,13 @@ def Prepare(benchmark_spec): db_name = linux_hammerdb.MAP_SCRIPT_TO_DATABASE_NAME[ linux_hammerdb.HAMMERDB_SCRIPT.value ] - db.client_vm_query_tools.IssueSqlCommand( - """CREATE DATABASE [{0}]; + db.client_vm_query_tools.IssueSqlCommand("""CREATE DATABASE [{0}]; BACKUP DATABASE [{0}] TO DISK = 'F:\\Backup\\{0}.bak'; ALTER AVAILABILITY GROUP [{1}] ADD DATABASE [{0}]; - """.format(db_name, sql_engine_utils.SQLSERVER_AOAG_NAME) - ) + """.format(db_name, sql_engine_utils.SQLSERVER_AOAG_NAME)) elif is_azure and hammerdb.HAMMERDB_SCRIPT.value == 'tpc_c': # Create the database first only Azure requires creating the database. - db.client_vm_query_tools.IssueSqlCommand( - 'CREATE DATABASE tpcc;' - ) + db.client_vm_query_tools.IssueSqlCommand('CREATE DATABASE tpcc;') hammerdb.SetupConfig( vm, @@ -361,12 +357,27 @@ def Run(benchmark_spec): _PreRun(db) start_time = datetime.datetime.now() - samples = hammerdb.Run( - client_vms[0], - sql_engine_utils.SQLSERVER, - hammerdb.HAMMERDB_SCRIPT.value, - timeout=linux_hammerdb.HAMMERDB_RUN_TIMEOUT.value, - ) + + stop_event = threading.Event() + collection_thread = None + if db.engine_type == sql_engine_utils.SQLSERVER and db.is_managed_db: + collection_thread = threading.Thread( + target=hammerdb.CollectDbPerformanceCounters, args=(db, stop_event) + ) + collection_thread.start() + + try: + samples = hammerdb.Run( + client_vms[0], + sql_engine_utils.SQLSERVER, + hammerdb.HAMMERDB_SCRIPT.value, + timeout=linux_hammerdb.HAMMERDB_RUN_TIMEOUT.value, + ) + finally: + if collection_thread: + stop_event.set() + collection_thread.join() + end_time = datetime.datetime.now() _PostRun(db) samples.extend(db.CollectMetrics(start_time, end_time)) diff --git a/perfkitbenchmarker/windows_packages/hammerdb.py b/perfkitbenchmarker/windows_packages/hammerdb.py index 78d67c4899..90bc3b2fd6 100644 --- a/perfkitbenchmarker/windows_packages/hammerdb.py +++ b/perfkitbenchmarker/windows_packages/hammerdb.py @@ -16,11 +16,13 @@ import ntpath import posixpath +import threading from typing import Any, List from absl import flags from perfkitbenchmarker import data from perfkitbenchmarker import errors +from perfkitbenchmarker import relational_db from perfkitbenchmarker import sample from perfkitbenchmarker import sql_engine_utils from perfkitbenchmarker import vm_util @@ -49,6 +51,7 @@ # Default run timeout TIMEOUT = 60 * 60 * 20 +_COUNTER_QUERY_TIMEOUT = 60 class WindowsHammerDbTclScript(linux_hammerdb.HammerDbTclScript): @@ -170,8 +173,9 @@ def SetupConfig( if db_engine not in linux_hammerdb.SCRIPT_MAPPING: raise ValueError( - '{} is currently not supported for running ' - 'hammerdb benchmarks.'.format(db_engine) + '{} is currently not supported for running hammerdb benchmarks.'.format( + db_engine + ) ) if hammerdb_script not in linux_hammerdb.SCRIPT_MAPPING[db_engine]: @@ -279,3 +283,14 @@ def PushTestFile(vm, data_file: str, path: str): def GetMetadata(db_engine: str): """Returns the meta data needed for hammerdb.""" return linux_hammerdb.GetMetadata(db_engine) + + +def CollectDbPerformanceCounters( + db: relational_db.BaseRelationalDb, + stop_event: threading.Event, +): + """Background task to collect DB performance counters.""" + while not stop_event.is_set(): + db.QueryPerformanceCounters() + # Wait for timeout (default 1 minute) or until stop_event is set. + stop_event.wait(_COUNTER_QUERY_TIMEOUT) diff --git a/tests/windows_benchmarks/hammerdbcli_benchmark_test.py b/tests/windows_benchmarks/hammerdbcli_benchmark_test.py new file mode 100644 index 0000000000..068d4ca98b --- /dev/null +++ b/tests/windows_benchmarks/hammerdbcli_benchmark_test.py @@ -0,0 +1,45 @@ +import time +import unittest +from absl import flags +import mock +from perfkitbenchmarker import relational_db +from perfkitbenchmarker import sql_engine_utils +from perfkitbenchmarker import virtual_machine +from tests import pkb_common_test_case +from perfkitbenchmarker.windows_benchmarks import hammerdbcli_benchmark +from perfkitbenchmarker.windows_packages import hammerdb + +FLAGS = flags.FLAGS + + +class HammerdbcliBenchmarkTest(pkb_common_test_case.PkbCommonTestCase): + + def testRunCallPerformanceCounters(self): + benchmark_spec = mock.Mock() + db_mock = mock.Mock(spec=relational_db.BaseRelationalDb) + db_mock.engine_type = sql_engine_utils.SQLSERVER + db_mock.is_managed_db = True + db_mock.CollectMetrics.return_value = [] + benchmark_spec.relational_db = db_mock + vm_mock = mock.Mock(spec=virtual_machine.BaseVirtualMachine) + benchmark_spec.vm_groups = {'clients': [vm_mock]} + + # Mock hammerdb.Run to simulate a 5-second run. + def mock_hammerdb_run(*args, **kwargs): + del args, kwargs + time.sleep(5) + return [] + + mock_run = self.enter_context( + mock.patch.object(hammerdb, 'Run', side_effect=mock_hammerdb_run) + ) + self.enter_context(mock.patch.object(hammerdb, '_COUNTER_QUERY_TIMEOUT', 1)) + + hammerdbcli_benchmark.Run(benchmark_spec) + + self.assertGreaterEqual(db_mock.QueryPerformanceCounters.call_count, 2) + mock_run.assert_called_once() + + +if __name__ == '__main__': + unittest.main() From a4825b09e3884a03f30ca4769c56c5e7b033725d Mon Sep 17 00:00:00 2001 From: Zach Howell Date: Fri, 6 Mar 2026 09:48:06 -0800 Subject: [PATCH 09/26] Update nodejs package PiperOrigin-RevId: 879682069 --- perfkitbenchmarker/linux_packages/node_js.py | 56 -------------------- perfkitbenchmarker/linux_packages/nodejs.py | 27 ++++++++++ 2 files changed, 27 insertions(+), 56 deletions(-) delete mode 100644 perfkitbenchmarker/linux_packages/node_js.py create mode 100644 perfkitbenchmarker/linux_packages/nodejs.py diff --git a/perfkitbenchmarker/linux_packages/node_js.py b/perfkitbenchmarker/linux_packages/node_js.py deleted file mode 100644 index d402d6c956..0000000000 --- a/perfkitbenchmarker/linux_packages/node_js.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -"""Module containing node.js installation and cleanup functions.""" - -from perfkitbenchmarker import linux_packages - -GIT_REPO = 'https://github.com/joyent/node.git' -GIT_TAG = 'v0.11.14' -NODE_DIR = '%s/node' % linux_packages.INSTALL_DIR - - -def _Install(vm): - """Installs the node.js package on the VM.""" - vm.Install('build_tools') - vm.RemoteCommand('git clone {} {}'.format(GIT_REPO, NODE_DIR)) - vm.RemoteCommand('cd {} && git checkout {}'.format(NODE_DIR, GIT_TAG)) - vm.RemoteCommand('cd {} && ./configure --prefix=/usr'.format(NODE_DIR)) - vm.RemoteCommand('cd {} && make && sudo make install'.format(NODE_DIR)) - - -def YumInstall(vm): - """Installs the node.js package on the VM.""" - _Install(vm) - - -def AptInstall(vm): - """Installs the node.js package on the VM.""" - _Install(vm) - - -def _Uninstall(vm): - """Uninstalls the node.js package on the VM.""" - vm.RemoteCommand('cd {} && sudo make uninstall'.format(NODE_DIR)) - - -def YumUninstall(vm): - """Uninstalls the node.js package on the VM.""" - _Uninstall(vm) - - -def AptUninstall(vm): - """Uninstalls the node.js package on the VM.""" - _Uninstall(vm) diff --git a/perfkitbenchmarker/linux_packages/nodejs.py b/perfkitbenchmarker/linux_packages/nodejs.py new file mode 100644 index 0000000000..191ae3187d --- /dev/null +++ b/perfkitbenchmarker/linux_packages/nodejs.py @@ -0,0 +1,27 @@ +# Copyright 2026 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Module containing nodejs/npm installation and cleanup functions.""" + + +def YumInstall(vm): + """Installs the nodejs & npm packages on the VM.""" + vm.RemoteCommand('sudo yum install -y nodejs npm') + + +def AptInstall(vm): + """Installs the nodejs & npm packages on the VM.""" + vm.RemoteCommand('sudo apt-get install -y nodejs npm') + From 541547faacf5cf0137a0445c561475e2c81d0fe1 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Fri, 30 Jan 2026 18:01:56 +0200 Subject: [PATCH 10/26] Add template and scaling logic --- .../kubernetes_node_scale.yaml.j2 | 54 +++++++++++ .../kubernetes_node_scale_benchmark.py | 90 +++++++++++++------ 2 files changed, 116 insertions(+), 28 deletions(-) create mode 100644 perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 diff --git a/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 b/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 new file mode 100644 index 0000000000..e410f73d85 --- /dev/null +++ b/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 @@ -0,0 +1,54 @@ +{% if cloud == 'GCP' %} +apiVersion: cloud.google.com/v1 +kind: ComputeClass +metadata: + name: app-ccc +spec: + nodePoolAutoCreation: + enabled: true + priorities: + - machineType: e2-medium # smallest machine-type supported by NAP + spot: false + storage: + bootDiskSize: 20 + bootDiskType: pd-standard + whenUnsatisfiable: DoNotScaleUp +{% endif %} +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: app + labels: + app: app +spec: + replicas: 0 + selector: + matchLabels: + app: app + template: + metadata: + labels: + app: app + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - app + topologyKey: "kubernetes.io/hostname" + {% if cloud == 'GCP' %} + nodeSelector: + cloud.google.com/compute-class: app-ccc + {% endif %} + containers: + - name: pause + image: registry.k8s.io/pause:3.10 + resources: + requests: + cpu: 100m + memory: 100Mi diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 27471f2f50..4da0c01634 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -1,17 +1,24 @@ -"""Benchmark which scales up to a number of nodes, then down, then back up. +"""Benchmark for Kubernetes node auto-scaling: scale up, down, then up again. -Similar to kubernetes_scale, but only cares about node scaling & has additional -scaling up/down steps. -""" +Deploys a Deployment with pod anti-affinity to force one pod per node, then +measures node provisioning and de-provisioning times across three phases: + + 1. Scale up to NUM_NODES replicas. + 2. Scale down to 0 replicas and wait for nodes to be removed. + 3. Scale up to NUM_NODES replicas again. +Reuses ParseStatusChanges and CheckForFailures from kubernetes_scale_benchmark +for consistent metric collection. +""" from absl import flags from absl import logging from perfkitbenchmarker import benchmark_spec from perfkitbenchmarker import configs +from perfkitbenchmarker import container_service from perfkitbenchmarker import sample from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark -from perfkitbenchmarker.resources.container_service import kubernetes_cluster +from perfkitbenchmarker.container_service import kubernetes_commands FLAGS = flags.FLAGS @@ -32,49 +39,76 @@ 'kubernetes_scale_num_nodes', 5, 'Number of new nodes to create' ) +MANIFEST_TEMPLATE = 'container/kubernetes_scale/kubernetes_node_scale.yaml.j2' + def CheckPrerequisites(_): - """Validate flags and config.""" - pass + """Validates flags and config.""" def GetConfig(user_config): """Loads and returns benchmark config.""" - config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) - return config + return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME) def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): - """Sets additional spec attributes.""" + """Applies the app deployment manifest with 0 replicas.""" bm_spec.always_call_cleanup = True assert bm_spec.container_cluster + cluster = bm_spec.container_cluster + manifest_kwargs = dict( + cloud=FLAGS.cloud, + ) + + yaml_docs = kubernetes_commands.ConvertManifestToYamlDicts( + MANIFEST_TEMPLATE, + **manifest_kwargs, + ) + cluster.ModifyPodSpecPlacementYaml( + yaml_docs, + 'app', + cluster.default_nodepool.machine_type, + ) + list(kubernetes_commands.ApplyYaml(yaml_docs)) def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: - """Scales a large number of pods on kubernetes.""" + """Runs the scale-up, scale-down, scale-up benchmark sequence. + + Args: + bm_spec: The benchmark specification. + + Returns: + Combined samples from all three phases, each tagged with phase metadata. + """ assert bm_spec.container_cluster cluster = bm_spec.container_cluster - assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) - cluster: kubernetes_cluster.KubernetesCluster = cluster - - # Warm up the cluster by creating a single pod. This compensates for - # differences between Standard & Autopilot, where Standard already has 1 node - # due to its starting nodepool but Autopilot does not. - scale_one_samples, _ = kubernetes_scale_benchmark.ScaleUpPods(cluster, 1) - if not scale_one_samples: - logging.exception( - 'Failed to scale up to 1 pod; now investigating failure reasons.' - ) - unused = 0 - pod_samples = kubernetes_scale_benchmark.ParseStatusChanges('pod', unused) - # Log & check for quota failure. - kubernetes_scale_benchmark.CheckForFailures(cluster, pod_samples, 1) + assert isinstance(cluster, container_service.KubernetesCluster) + cluster: container_service.KubernetesCluster = cluster # Do one scale up, scale down, then scale up again. + _ScaleDeploymentReplicas(NUM_NODES.value) + _ScaleDeploymentReplicas(0) + _ScaleDeploymentReplicas(NUM_NODES.value) return [] +def _ScaleDeploymentReplicas(replicas: int) -> None: + container_service.RunKubectlCommand([ + 'scale', + f'--replicas={replicas}', + 'deployment/app', + ]) + kubernetes_commands.WaitForRollout( + 'deployment/app', + timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + ) + + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" - # Might need to change if kubernetes-scaleup deployment not used. - kubernetes_scale_benchmark.Cleanup(bm_spec) + container_service.RunRetryableKubectlCommand( + ['delete', 'deployment', 'app'], + timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + raise_on_failure=False, + ) From 0994b75cef4875dbe88bbdc5c2131b8a30e8f471 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Mon, 2 Feb 2026 20:02:08 +0200 Subject: [PATCH 11/26] Add scaling down logic and gathering metrics --- .../kubernetes_node_scale_benchmark.py | 41 ++++++++++++++++++- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 4da0c01634..36236fc5c4 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,6 +11,8 @@ for consistent metric collection. """ +import time + from absl import flags from absl import logging from perfkitbenchmarker import benchmark_spec @@ -86,11 +88,24 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: assert isinstance(cluster, container_service.KubernetesCluster) cluster: container_service.KubernetesCluster = cluster + initial_node_count = len(kubernetes_commands.GetNodeNames()) + start_time = time.time() + # Do one scale up, scale down, then scale up again. _ScaleDeploymentReplicas(NUM_NODES.value) + samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'node', + start_time, + resources_to_ignore=set(), + ) _ScaleDeploymentReplicas(0) - _ScaleDeploymentReplicas(NUM_NODES.value) - return [] + if _WaitForScaledNodesDeletion(initial_node_count): + _ScaleDeploymentReplicas(NUM_NODES.value) + else: + logging.warning( + 'Skipping final scale up; scaled nodes not deleted within timeout.' + ) + return samples def _ScaleDeploymentReplicas(replicas: int) -> None: @@ -105,6 +120,28 @@ def _ScaleDeploymentReplicas(replicas: int) -> None: ) +def _WaitForScaledNodesDeletion(initial_node_count: int) -> bool: + timeout = 20 * 60 + kubernetes_scale_benchmark._GetScaleTimeout() + start_time = time.monotonic() + while True: + current_node_count = len(kubernetes_commands.GetNodeNames()) + if current_node_count <= initial_node_count: + logging.info('Node count returned to initial level.') + return True + elapsed = time.monotonic() - start_time + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', + max(current_node_count - initial_node_count, 0), + ) + return False + logging.info( + 'Remaining scaled nodes: %d', + max(current_node_count - initial_node_count, 0), + ) + time.sleep(60) + + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( From e6d58d8f94052096b8df6526a83642b38aae116f Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Tue, 10 Feb 2026 11:17:34 +0200 Subject: [PATCH 12/26] Add scaling down logic, phases and gathering metrics --- .../kubernetes_node_scale_benchmark.py | 308 ++++++++++++++++-- 1 file changed, 282 insertions(+), 26 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 36236fc5c4..8b775a0e8c 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,7 +11,9 @@ for consistent metric collection. """ +import json import time +from typing import Any from absl import flags from absl import logging @@ -88,64 +90,318 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: assert isinstance(cluster, container_service.KubernetesCluster) cluster: container_service.KubernetesCluster = cluster - initial_node_count = len(kubernetes_commands.GetNodeNames()) - start_time = time.time() + initial_nodes = set(kubernetes_commands.GetNodeNames()) + initial_node_count = len(initial_nodes) # Do one scale up, scale down, then scale up again. - _ScaleDeploymentReplicas(NUM_NODES.value) - samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'node', - start_time, - resources_to_ignore=set(), + scaleup1_samples = _ScaleUpAndCollectSamples( + phase='scaleup1', + replicas=NUM_NODES.value, + cluster=cluster, + initial_nodes=initial_nodes, + initial_pods=set(kubernetes_commands.GetPodNames()), + pod_phase_timeout=3 * 60 * 60, + ) + + scaledown_samples, scaledown_complete = _ScaleDownAndCollectSamples( + phase='scaledown', + initial_nodes=initial_nodes, + initial_node_count=initial_node_count, + node_timeout=2 * 60 * 60, ) - _ScaleDeploymentReplicas(0) - if _WaitForScaledNodesDeletion(initial_node_count): - _ScaleDeploymentReplicas(NUM_NODES.value) + + scaleup2_samples: list[sample.Sample] = [] + if scaledown_complete: + scaleup2_samples = _ScaleUpAndCollectSamples( + phase='scaleup2', + replicas=NUM_NODES.value, + cluster=cluster, + initial_nodes=initial_nodes, + initial_pods=set(kubernetes_commands.GetPodNames()), + pod_phase_timeout=3 * 60 * 60, + ) else: logging.warning( 'Skipping final scale up; scaled nodes not deleted within timeout.' ) - return samples + return scaleup1_samples + scaledown_samples + scaleup2_samples -def _ScaleDeploymentReplicas(replicas: int) -> None: + +def _ScaleDeploymentReplicas(replicas: int, wait_for_rollout: bool = True) -> None: container_service.RunKubectlCommand([ 'scale', f'--replicas={replicas}', 'deployment/app', ]) - kubernetes_commands.WaitForRollout( - 'deployment/app', - timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + if wait_for_rollout: + kubernetes_commands.WaitForRollout( + 'deployment/app', + timeout=GetScaleTimeout(replicas), + ) + + +def _ScaleUpAndCollectSamples( + phase: str, + replicas: int, + cluster: container_service.KubernetesCluster, + initial_nodes: set[str], + initial_pods: set[str], + pod_phase_timeout: int, +) -> list[sample.Sample]: + start_time = time.time() + _ScaleDeploymentReplicas(replicas, wait_for_rollout=False) + + phase_samples: list[sample.Sample] = [] + phase_samples += _LogPodPhaseCountsUntilReady( + phase=phase, + timeout=pod_phase_timeout, + desired_replicas=replicas, + ) + + ready_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'pod', + start_time, + resources_to_ignore=initial_pods, + ) + kubernetes_scale_benchmark.CheckForFailures(cluster, ready_samples, replicas) + _AddPhaseMetadata(ready_samples, phase) + phase_samples += ready_samples + node_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'node', + start_time, + resources_to_ignore=initial_nodes, + ) + _AddPhaseMetadata(node_samples, phase) + phase_samples += node_samples + return phase_samples + + +def _ScaleDownAndCollectSamples( + phase: str, + initial_nodes: set[str], + initial_node_count: int, + node_timeout: int, +) -> tuple[list[sample.Sample], bool]: + start_time = time.time() + _ScaleDeploymentReplicas(0, wait_for_rollout=False) + return _LogNodeDeletionUntilGone( + phase=phase, + initial_nodes=initial_nodes, + initial_node_count=initial_node_count, + start_time=start_time, + timeout=node_timeout, ) -def _WaitForScaledNodesDeletion(initial_node_count: int) -> bool: - timeout = 20 * 60 + kubernetes_scale_benchmark._GetScaleTimeout() - start_time = time.monotonic() +def _LogPodPhaseCountsUntilReady( + phase: str, + timeout: int, + desired_replicas: int, +) -> list[sample.Sample]: + samples: list[sample.Sample] = [] + start_monotonic = time.monotonic() while True: - current_node_count = len(kubernetes_commands.GetNodeNames()) - if current_node_count <= initial_node_count: - logging.info('Node count returned to initial level.') + phase_counts, ready_count = _GetPodPhaseCounts() + elapsed = time.monotonic() - start_monotonic + logging.info( + 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', + phase, + int(elapsed), + phase_counts, + ready_count, + desired_replicas, + ) + for pod_phase, count in phase_counts.items(): + samples.append( + sample.Sample( + 'pod_phase_count', + count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': pod_phase, + 'elapsed_seconds': elapsed, + }, + ) + ) + samples.append( + sample.Sample( + 'pod_phase_count', + ready_count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': 'Ready', + 'elapsed_seconds': elapsed, + }, + ) + ) + + if ready_count >= desired_replicas: + return samples + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for pods to be Ready (%s). Ready: %d/%d', + phase, + ready_count, + desired_replicas, + ) + return samples + time.sleep(60) + + +def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: + stdout, _, _ = container_service.RunKubectlCommand( + [ + 'get', + 'pods', + '-o', + 'jsonpath={.items[*].metadata.name}', + ], + suppress_logging=True, + ) + pod_names = stdout.split() + pod_list: list[dict[str, Any]] = [] + for pod_name in pod_names: + pod_stdout, _, _ = container_service.RunKubectlCommand( + [ + 'get', + 'pod', + pod_name, + '-o', + 'json', + ], + suppress_logging=True, + ) + pod_list.append(json.loads(pod_stdout)) + phase_counts: dict[str, int] = {} + ready_count = 0 + for pod in pod_list: + labels = pod.get('metadata', {}).get('labels', {}) + if labels.get('app') != 'app': + continue + phase = pod.get('status', {}).get('phase', 'Unknown') + phase_counts[phase] = phase_counts.get(phase, 0) + 1 + if _IsPodReady(pod): + ready_count += 1 + return phase_counts, ready_count + + +def _IsPodReady(pod: dict[str, Any]) -> bool: + for condition in pod.get('status', {}).get('conditions', []): + if condition.get('type') == 'Ready' and condition.get('status') == 'True': return True - elapsed = time.monotonic() - start_time + return False + + +def _LogNodeDeletionUntilGone( + phase: str, + initial_nodes: set[str], + initial_node_count: int, + start_time: float, + timeout: int, +) -> tuple[list[sample.Sample], bool]: + samples: list[sample.Sample] = [] + deletion_times: dict[str, float] = {} + scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes + start_monotonic = time.monotonic() + + while True: + current_nodes = set(kubernetes_commands.GetNodeNames()) + remaining_nodes = current_nodes - initial_nodes + elapsed = time.monotonic() - start_monotonic + + for node in list(scaled_nodes): + if node not in current_nodes and node not in deletion_times: + deletion_times[node] = elapsed + scaled_nodes.discard(node) + + samples.append( + sample.Sample( + 'node_remaining_count', + max(len(remaining_nodes), 0), + 'count', + metadata={ + 'phase': phase, + 'elapsed_seconds': elapsed, + }, + ) + ) + + if len(current_nodes) <= initial_node_count: + logging.info('Node count returned to initial level.') + break if elapsed >= timeout: logging.warning( 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', - max(current_node_count - initial_node_count, 0), + max(len(remaining_nodes), 0), ) - return False + break + logging.info( 'Remaining scaled nodes: %d', - max(current_node_count - initial_node_count, 0), + max(len(remaining_nodes), 0), ) time.sleep(60) + delete_samples = _BuildNodeDeletionSamples( + deletion_times, + phase, + start_time, + ) + samples += delete_samples + return samples, len(current_nodes) <= initial_node_count + + +def _BuildNodeDeletionSamples( + deletion_times: dict[str, float], + phase: str, + start_time: float, +) -> list[sample.Sample]: + if not deletion_times: + return [] + summaries = kubernetes_scale_benchmark._SummarizeTimestamps( # pylint: disable=protected-access + list(deletion_times.values()) + ) + percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} + samples: list[sample.Sample] = [] + for percentile, value in summaries.items(): + if percentile not in percentiles: + continue + samples.append( + sample.Sample( + f'node_delete_{percentile}', + value, + 'seconds', + metadata={ + 'phase': phase, + 'start_time_epoch': start_time, + }, + ) + ) + return samples + + +def GetScaleTimeout(num_nodes: int | None = None) -> int: + """Returns the timeout for scale operations in this benchmark.""" + nodes = num_nodes if num_nodes is not None else NUM_NODES.value + base_timeout = 60 * 10 # 10 minutes + per_node_timeout = nodes * 3 # 3 seconds per node + proposed_timeout = base_timeout + per_node_timeout + max_timeout = 60 * 60 # 1 hour + return min(proposed_timeout, max_timeout) + + +def _AddPhaseMetadata(samples: list[sample.Sample], phase: str) -> None: + for s in samples: + s.metadata['phase'] = phase + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( ['delete', 'deployment', 'app'], - timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + timeout=GetScaleTimeout(), raise_on_failure=False, ) From d0abf087b987a21a166962821068090155df8471 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Fri, 13 Feb 2026 15:05:32 +0200 Subject: [PATCH 13/26] Refactor kubernetes_node_scale benchmark --- .../kubernetes_node_scale_benchmark.py | 315 +----------------- 1 file changed, 13 insertions(+), 302 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 8b775a0e8c..9ff83cb1a8 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,16 +11,13 @@ for consistent metric collection. """ -import json -import time -from typing import Any - from absl import flags from absl import logging from perfkitbenchmarker import benchmark_spec from perfkitbenchmarker import configs from perfkitbenchmarker import container_service from perfkitbenchmarker import sample +from perfkitbenchmarker.container_service import kubernetes_commands from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark from perfkitbenchmarker.container_service import kubernetes_commands @@ -79,9 +76,12 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: """Runs the scale-up, scale-down, scale-up benchmark sequence. +<<<<<<< HEAD Args: bm_spec: The benchmark specification. +======= +>>>>>>> f178b3d9 (Refactor kubernetes_node_scale benchmark) Returns: Combined samples from all three phases, each tagged with phase metadata. """ @@ -90,318 +90,29 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: assert isinstance(cluster, container_service.KubernetesCluster) cluster: container_service.KubernetesCluster = cluster - initial_nodes = set(kubernetes_commands.GetNodeNames()) - initial_node_count = len(initial_nodes) - # Do one scale up, scale down, then scale up again. - scaleup1_samples = _ScaleUpAndCollectSamples( - phase='scaleup1', - replicas=NUM_NODES.value, - cluster=cluster, - initial_nodes=initial_nodes, - initial_pods=set(kubernetes_commands.GetPodNames()), - pod_phase_timeout=3 * 60 * 60, - ) - - scaledown_samples, scaledown_complete = _ScaleDownAndCollectSamples( - phase='scaledown', - initial_nodes=initial_nodes, - initial_node_count=initial_node_count, - node_timeout=2 * 60 * 60, - ) + _ScaleDeploymentReplicas(NUM_NODES.value) + _ScaleDeploymentReplicas(0) + _ScaleDeploymentReplicas(NUM_NODES.value) + return [] - scaleup2_samples: list[sample.Sample] = [] - if scaledown_complete: - scaleup2_samples = _ScaleUpAndCollectSamples( - phase='scaleup2', - replicas=NUM_NODES.value, - cluster=cluster, - initial_nodes=initial_nodes, - initial_pods=set(kubernetes_commands.GetPodNames()), - pod_phase_timeout=3 * 60 * 60, - ) - else: - logging.warning( - 'Skipping final scale up; scaled nodes not deleted within timeout.' - ) - return scaleup1_samples + scaledown_samples + scaleup2_samples - - -def _ScaleDeploymentReplicas(replicas: int, wait_for_rollout: bool = True) -> None: +def _ScaleDeploymentReplicas(replicas: int) -> None: container_service.RunKubectlCommand([ 'scale', f'--replicas={replicas}', 'deployment/app', ]) - if wait_for_rollout: - kubernetes_commands.WaitForRollout( - 'deployment/app', - timeout=GetScaleTimeout(replicas), - ) - - -def _ScaleUpAndCollectSamples( - phase: str, - replicas: int, - cluster: container_service.KubernetesCluster, - initial_nodes: set[str], - initial_pods: set[str], - pod_phase_timeout: int, -) -> list[sample.Sample]: - start_time = time.time() - _ScaleDeploymentReplicas(replicas, wait_for_rollout=False) - - phase_samples: list[sample.Sample] = [] - phase_samples += _LogPodPhaseCountsUntilReady( - phase=phase, - timeout=pod_phase_timeout, - desired_replicas=replicas, - ) - - ready_samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'pod', - start_time, - resources_to_ignore=initial_pods, - ) - kubernetes_scale_benchmark.CheckForFailures(cluster, ready_samples, replicas) - _AddPhaseMetadata(ready_samples, phase) - phase_samples += ready_samples - node_samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'node', - start_time, - resources_to_ignore=initial_nodes, - ) - _AddPhaseMetadata(node_samples, phase) - phase_samples += node_samples - return phase_samples - - -def _ScaleDownAndCollectSamples( - phase: str, - initial_nodes: set[str], - initial_node_count: int, - node_timeout: int, -) -> tuple[list[sample.Sample], bool]: - start_time = time.time() - _ScaleDeploymentReplicas(0, wait_for_rollout=False) - return _LogNodeDeletionUntilGone( - phase=phase, - initial_nodes=initial_nodes, - initial_node_count=initial_node_count, - start_time=start_time, - timeout=node_timeout, - ) - - -def _LogPodPhaseCountsUntilReady( - phase: str, - timeout: int, - desired_replicas: int, -) -> list[sample.Sample]: - samples: list[sample.Sample] = [] - start_monotonic = time.monotonic() - while True: - phase_counts, ready_count = _GetPodPhaseCounts() - elapsed = time.monotonic() - start_monotonic - logging.info( - 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', - phase, - int(elapsed), - phase_counts, - ready_count, - desired_replicas, - ) - for pod_phase, count in phase_counts.items(): - samples.append( - sample.Sample( - 'pod_phase_count', - count, - 'count', - metadata={ - 'phase': phase, - 'pod_phase': pod_phase, - 'elapsed_seconds': elapsed, - }, - ) - ) - samples.append( - sample.Sample( - 'pod_phase_count', - ready_count, - 'count', - metadata={ - 'phase': phase, - 'pod_phase': 'Ready', - 'elapsed_seconds': elapsed, - }, - ) - ) - - if ready_count >= desired_replicas: - return samples - if elapsed >= timeout: - logging.warning( - 'Timed out waiting for pods to be Ready (%s). Ready: %d/%d', - phase, - ready_count, - desired_replicas, - ) - return samples - time.sleep(60) - - -def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: - stdout, _, _ = container_service.RunKubectlCommand( - [ - 'get', - 'pods', - '-o', - 'jsonpath={.items[*].metadata.name}', - ], - suppress_logging=True, - ) - pod_names = stdout.split() - pod_list: list[dict[str, Any]] = [] - for pod_name in pod_names: - pod_stdout, _, _ = container_service.RunKubectlCommand( - [ - 'get', - 'pod', - pod_name, - '-o', - 'json', - ], - suppress_logging=True, - ) - pod_list.append(json.loads(pod_stdout)) - phase_counts: dict[str, int] = {} - ready_count = 0 - for pod in pod_list: - labels = pod.get('metadata', {}).get('labels', {}) - if labels.get('app') != 'app': - continue - phase = pod.get('status', {}).get('phase', 'Unknown') - phase_counts[phase] = phase_counts.get(phase, 0) + 1 - if _IsPodReady(pod): - ready_count += 1 - return phase_counts, ready_count - - -def _IsPodReady(pod: dict[str, Any]) -> bool: - for condition in pod.get('status', {}).get('conditions', []): - if condition.get('type') == 'Ready' and condition.get('status') == 'True': - return True - return False - - -def _LogNodeDeletionUntilGone( - phase: str, - initial_nodes: set[str], - initial_node_count: int, - start_time: float, - timeout: int, -) -> tuple[list[sample.Sample], bool]: - samples: list[sample.Sample] = [] - deletion_times: dict[str, float] = {} - scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes - start_monotonic = time.monotonic() - - while True: - current_nodes = set(kubernetes_commands.GetNodeNames()) - remaining_nodes = current_nodes - initial_nodes - elapsed = time.monotonic() - start_monotonic - - for node in list(scaled_nodes): - if node not in current_nodes and node not in deletion_times: - deletion_times[node] = elapsed - scaled_nodes.discard(node) - - samples.append( - sample.Sample( - 'node_remaining_count', - max(len(remaining_nodes), 0), - 'count', - metadata={ - 'phase': phase, - 'elapsed_seconds': elapsed, - }, - ) - ) - - if len(current_nodes) <= initial_node_count: - logging.info('Node count returned to initial level.') - break - if elapsed >= timeout: - logging.warning( - 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', - max(len(remaining_nodes), 0), - ) - break - - logging.info( - 'Remaining scaled nodes: %d', - max(len(remaining_nodes), 0), - ) - time.sleep(60) - - delete_samples = _BuildNodeDeletionSamples( - deletion_times, - phase, - start_time, - ) - samples += delete_samples - return samples, len(current_nodes) <= initial_node_count - - -def _BuildNodeDeletionSamples( - deletion_times: dict[str, float], - phase: str, - start_time: float, -) -> list[sample.Sample]: - if not deletion_times: - return [] - summaries = kubernetes_scale_benchmark._SummarizeTimestamps( # pylint: disable=protected-access - list(deletion_times.values()) + kubernetes_commands.WaitForRollout( + 'deployment/app', + timeout=kubernetes_scale_benchmark._GetScaleTimeout(), ) - percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} - samples: list[sample.Sample] = [] - for percentile, value in summaries.items(): - if percentile not in percentiles: - continue - samples.append( - sample.Sample( - f'node_delete_{percentile}', - value, - 'seconds', - metadata={ - 'phase': phase, - 'start_time_epoch': start_time, - }, - ) - ) - return samples - - -def GetScaleTimeout(num_nodes: int | None = None) -> int: - """Returns the timeout for scale operations in this benchmark.""" - nodes = num_nodes if num_nodes is not None else NUM_NODES.value - base_timeout = 60 * 10 # 10 minutes - per_node_timeout = nodes * 3 # 3 seconds per node - proposed_timeout = base_timeout + per_node_timeout - max_timeout = 60 * 60 # 1 hour - return min(proposed_timeout, max_timeout) - - -def _AddPhaseMetadata(samples: list[sample.Sample], phase: str) -> None: - for s in samples: - s.metadata['phase'] = phase def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( ['delete', 'deployment', 'app'], - timeout=GetScaleTimeout(), + timeout=kubernetes_scale_benchmark._GetScaleTimeout(), raise_on_failure=False, ) From bb8aaa2c1040b1b202472ba8e6148ae06175d495 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Fri, 30 Jan 2026 18:01:56 +0200 Subject: [PATCH 14/26] Add template and scaling logic --- .../kubernetes_node_scale_benchmark.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 9ff83cb1a8..0d9688a039 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -109,6 +109,18 @@ def _ScaleDeploymentReplicas(replicas: int) -> None: ) +def _ScaleDeploymentReplicas(replicas: int) -> None: + container_service.RunKubectlCommand([ + 'scale', + f'--replicas={replicas}', + 'deployment/app', + ]) + kubernetes_commands.WaitForRollout( + 'deployment/app', + timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + ) + + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( From fc0b00c9b1c60dc3d03d46a98c2d6051b60c90cd Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Mon, 2 Feb 2026 20:02:08 +0200 Subject: [PATCH 15/26] Add scaling down logic and gathering metrics --- .../kubernetes_node_scale_benchmark.py | 399 +++++++++++++++++- 1 file changed, 383 insertions(+), 16 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 0d9688a039..ea4c1f5723 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,6 +11,14 @@ for consistent metric collection. """ +<<<<<<< HEAD +import json +import time +from typing import Any +======= +import time +>>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) + from absl import flags from absl import logging from perfkitbenchmarker import benchmark_spec @@ -76,37 +84,374 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: """Runs the scale-up, scale-down, scale-up benchmark sequence. -<<<<<<< HEAD - Args: - bm_spec: The benchmark specification. - -======= ->>>>>>> f178b3d9 (Refactor kubernetes_node_scale benchmark) Returns: Combined samples from all three phases, each tagged with phase metadata. """ assert bm_spec.container_cluster cluster = bm_spec.container_cluster assert isinstance(cluster, container_service.KubernetesCluster) - cluster: container_service.KubernetesCluster = cluster - # Do one scale up, scale down, then scale up again. - _ScaleDeploymentReplicas(NUM_NODES.value) - _ScaleDeploymentReplicas(0) - _ScaleDeploymentReplicas(NUM_NODES.value) - return [] +<<<<<<< HEAD + initial_nodes = set(kubernetes_commands.GetNodeNames()) + initial_node_count = len(initial_nodes) + # Phase 1: Scale up. + scaleup1_samples = _ScaleUpAndCollect( + 'scaleup1', + NUM_NODES.value, + cluster, + initial_nodes, + ) -def _ScaleDeploymentReplicas(replicas: int) -> None: + # Phase 2: Scale down and wait for nodes to be removed. + scaledown_samples, nodes_removed = _ScaleDownAndCollect( + 'scaledown', + initial_nodes, + initial_node_count, + ) + + # Phase 3: Scale up again (only if scale-down succeeded). + scaleup2_samples: list[sample.Sample] = [] + if nodes_removed: + scaleup2_samples = _ScaleUpAndCollect( + 'scaleup2', + NUM_NODES.value, + cluster, + initial_nodes, + ) + else: + logging.warning( + 'Skipping second scale up: nodes did not return to baseline.', + ) + + return scaleup1_samples + scaledown_samples + scaleup2_samples + + +def _ScaleUpAndCollect( + phase: str, + replicas: int, + cluster: container_service.KubernetesCluster, + initial_nodes: set[str], +) -> list[sample.Sample]: + """Scales the deployment up and collects pod/node timing samples. + + Args: + phase: Label for this phase (e.g. 'scaleup1'). + replicas: Target replica count. + cluster: The Kubernetes cluster. + initial_nodes: Node names present before scaling. + + Returns: + Samples tagged with phase metadata. + """ + initial_pods = set(kubernetes_commands.GetPodNames()) + start_time = time.time() + _ScaleDeployment(replicas) + + phase_log_samples = _PollPodPhasesUntilReady( + phase, + replicas, + _SCALE_UP_TIMEOUT_SECONDS, + ) + pod_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'pod', + start_time, + resources_to_ignore=initial_pods, + ) + kubernetes_scale_benchmark.CheckForFailures( + cluster, + pod_samples, + replicas, + ) + node_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'node', + start_time, + resources_to_ignore=initial_nodes, + ) + + all_samples = phase_log_samples + pod_samples + node_samples + _AddPhaseMetadata(all_samples, phase) + return all_samples + + +def _ScaleDownAndCollect( + phase: str, + initial_nodes: set[str], + initial_node_count: int, +) -> tuple[list[sample.Sample], bool]: + """Scales deployment to 0 and waits for autoscaler to remove nodes. + + Args: + phase: Label for this phase. + initial_nodes: Node names present before any scaling. + initial_node_count: Number of nodes before any scaling. + + Returns: + A tuple of (samples, whether nodes returned to acceptable level). + """ + _ScaleDeployment(0) + return _PollNodeDeletionUntilDone( + phase, + initial_nodes, + initial_node_count, + _SCALE_DOWN_TIMEOUT_SECONDS, + ) + + +def _PollPodPhasesUntilReady( + phase: str, + desired_replicas: int, + timeout: int, +) -> list[sample.Sample]: + """Logs pod phase counts every minute until all pods are Ready or timeout. + + Args: + phase: Label for this phase. + desired_replicas: Number of pods expected to become Ready. + timeout: Maximum wall-clock seconds to poll. + + Returns: + Time-series samples of pod counts per phase. + """ + samples: list[sample.Sample] = [] + start = time.monotonic() + while True: + phase_counts, ready_count = _GetPodPhaseCounts() + elapsed = time.monotonic() - start + logging.info( + 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', + phase, + int(elapsed), + phase_counts, + ready_count, + desired_replicas, + ) + for pod_phase, count in phase_counts.items(): + samples.append( + sample.Sample( + 'pod_phase_count', + count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': pod_phase, + 'elapsed_seconds': elapsed, + }, + ) + ) + # Always emit a Ready count for a consistent time-series. + samples.append( + sample.Sample( + 'pod_phase_count', + ready_count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': 'Ready', + 'elapsed_seconds': elapsed, + }, + ) + ) + if ready_count >= desired_replicas: + return samples + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for pods to be Ready (%s). Ready: %d/%d', + phase, + ready_count, + desired_replicas, + ) + return samples + time.sleep(_POLL_INTERVAL_SECONDS) + + +def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: + """Returns pod phase counts and Ready count for the 'app' deployment. + + Uses a single kubectl call with label selector `app=app`. + + Returns: + A tuple of (phase_name -> count, ready_count). + """ + stdout, _, _ = container_service.RunKubectlCommand( + ['get', 'pods', '-l', 'app=app', '-o', 'json'], + suppress_logging=True, + ) + pods = json.loads(stdout).get('items', []) + phase_counts: dict[str, int] = {} + ready_count = 0 + for pod in pods: + pod_phase = pod.get('status', {}).get('phase', 'Unknown') + phase_counts[pod_phase] = phase_counts.get(pod_phase, 0) + 1 + if _IsPodReady(pod): + ready_count += 1 + return phase_counts, ready_count + + +def _IsPodReady(pod: dict[str, Any]) -> bool: + """Returns True if the pod has a Ready=True condition.""" + for condition in pod.get('status', {}).get('conditions', []): + if condition.get('type') == 'Ready' and condition.get('status') == 'True': + return True + return False + + +def _PollNodeDeletionUntilDone( + phase: str, + initial_nodes: set[str], + initial_node_count: int, + timeout: int, +) -> tuple[list[sample.Sample], bool]: + """Polls node count until autoscaler removes scaled nodes or timeout. + + Allows a small buffer (_SCALE_DOWN_NODE_BUFFER) above the initial count to + account for the cluster autoscaler being conservative (e.g. system workloads + preventing removal). + + Args: + phase: Label for this phase. + initial_nodes: Node names present before any scaling. + initial_node_count: Number of nodes before any scaling. + timeout: Maximum time in seconds (_SCALE_DOWN_TIMEOUT_SECONDS) to poll. + + Returns: + A tuple of (samples, whether node count reached acceptable level). + """ + acceptable_count = initial_node_count + _SCALE_DOWN_NODE_BUFFER + scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes + deletion_times: dict[str, float] = {} + samples: list[sample.Sample] = [] + start = time.monotonic() + done = False + + while True: + current_nodes = set(kubernetes_commands.GetNodeNames()) + elapsed = time.monotonic() - start + + # Record deletion timestamps for nodes that disappeared. + for node in list(scaled_nodes): + if node not in current_nodes: + deletion_times[node] = elapsed + scaled_nodes.discard(node) + + remaining = max(len(current_nodes) - acceptable_count, 0) + samples.append( + sample.Sample( + 'node_remaining_count', + remaining, + 'count', + metadata={'phase': phase, 'elapsed_seconds': elapsed}, + ) + ) + + if len(current_nodes) <= acceptable_count: + logging.info( + 'Node count (%d) within acceptable threshold (%d).', + len(current_nodes), + acceptable_count, + ) + done = True + break + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for nodes to scale down.' + ' Remaining above threshold: %d', + remaining, + ) + break + + logging.info('Remaining scaled nodes above threshold: %d', remaining) + time.sleep(_POLL_INTERVAL_SECONDS) + + samples += _SummarizeNodeDeletionTimes(deletion_times, phase) + return samples, done + + +def _SummarizeNodeDeletionTimes( + deletion_times: dict[str, float], + phase: str, +) -> list[sample.Sample]: + """Builds percentile samples from per-node deletion durations. + + Args: + deletion_times: Mapping of node name to seconds-until-deleted. + phase: Label for this phase. + + Returns: + Samples for p50, p90, p99, p99.9, p100 of deletion times. + """ + if not deletion_times: + return [] + summaries = kubernetes_scale_benchmark._SummarizeTimestamps( + list(deletion_times.values()) + ) + target_percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} + samples: list[sample.Sample] = [] + for name, value in summaries.items(): + if name in target_percentiles: + samples.append( + sample.Sample( + f'node_delete_{name}', + value, + 'seconds', + metadata={'phase': phase}, + ) + ) + return samples + + +def _ScaleDeployment(replicas: int) -> None: + """Scales the 'app' deployment to the given replica count.""" container_service.RunKubectlCommand([ 'scale', f'--replicas={replicas}', 'deployment/app', ]) - kubernetes_commands.WaitForRollout( - 'deployment/app', - timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + + +def _GetScaleTimeout(num_nodes: int | None = None) -> int: + """Returns the timeout for a scale or cleanup operation. + + Args: + num_nodes: Number of nodes to scale to. Defaults to NUM_NODES flag. + + Returns: + Timeout in seconds, capped at 1 hour. + """ + nodes = num_nodes if num_nodes is not None else NUM_NODES.value + base_timeout = 60 * 10 # 10 minutes + per_node_timeout = nodes * 3 # 3 seconds per node + max_timeout = 60 * 60 # 1 hour + return min(base_timeout + per_node_timeout, max_timeout) + + +def _AddPhaseMetadata( + samples: list[sample.Sample], + phase: str, +) -> None: + """Adds phase metadata to all samples.""" + for s in samples: + s.metadata['phase'] = phase +======= + initial_node_count = len(kubernetes_commands.GetNodeNames()) + start_time = time.time() + + # Do one scale up, scale down, then scale up again. + _ScaleDeploymentReplicas(NUM_NODES.value) + samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'node', + start_time, + resources_to_ignore=set(), ) + _ScaleDeploymentReplicas(0) + if _WaitForScaledNodesDeletion(initial_node_count): + _ScaleDeploymentReplicas(NUM_NODES.value) + else: + logging.warning( + 'Skipping final scale up; scaled nodes not deleted within timeout.' + ) + return samples +>>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) def _ScaleDeploymentReplicas(replicas: int) -> None: @@ -121,6 +466,28 @@ def _ScaleDeploymentReplicas(replicas: int) -> None: ) +def _WaitForScaledNodesDeletion(initial_node_count: int) -> bool: + timeout = 20 * 60 + kubernetes_scale_benchmark._GetScaleTimeout() + start_time = time.monotonic() + while True: + current_node_count = len(kubernetes_commands.GetNodeNames()) + if current_node_count <= initial_node_count: + logging.info('Node count returned to initial level.') + return True + elapsed = time.monotonic() - start_time + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', + max(current_node_count - initial_node_count, 0), + ) + return False + logging.info( + 'Remaining scaled nodes: %d', + max(current_node_count - initial_node_count, 0), + ) + time.sleep(60) + + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( From 5ceda3da622456a8395084a52644dbecab2c48b6 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Tue, 10 Feb 2026 11:17:34 +0200 Subject: [PATCH 16/26] Add scaling down logic, phases and gathering metrics --- .../kubernetes_node_scale_benchmark.py | 319 ++++++++++++++++-- 1 file changed, 296 insertions(+), 23 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index ea4c1f5723..c76c406943 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,6 +11,7 @@ for consistent metric collection. """ +<<<<<<< HEAD <<<<<<< HEAD import json import time @@ -18,6 +19,11 @@ ======= import time >>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) +======= +import json +import time +from typing import Any +>>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) from absl import flags from absl import logging @@ -91,6 +97,7 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: cluster = bm_spec.container_cluster assert isinstance(cluster, container_service.KubernetesCluster) +<<<<<<< HEAD <<<<<<< HEAD initial_nodes = set(kubernetes_commands.GetNodeNames()) initial_node_count = len(initial_nodes) @@ -435,63 +442,329 @@ def _AddPhaseMetadata( ======= initial_node_count = len(kubernetes_commands.GetNodeNames()) start_time = time.time() +======= + initial_nodes = set(kubernetes_commands.GetNodeNames()) + initial_node_count = len(initial_nodes) +>>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) # Do one scale up, scale down, then scale up again. - _ScaleDeploymentReplicas(NUM_NODES.value) - samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'node', - start_time, - resources_to_ignore=set(), + scaleup1_samples = _ScaleUpAndCollectSamples( + phase='scaleup1', + replicas=NUM_NODES.value, + cluster=cluster, + initial_nodes=initial_nodes, + initial_pods=set(kubernetes_commands.GetPodNames()), + pod_phase_timeout=3 * 60 * 60, + ) + + scaledown_samples, scaledown_complete = _ScaleDownAndCollectSamples( + phase='scaledown', + initial_nodes=initial_nodes, + initial_node_count=initial_node_count, + node_timeout=2 * 60 * 60, ) - _ScaleDeploymentReplicas(0) - if _WaitForScaledNodesDeletion(initial_node_count): - _ScaleDeploymentReplicas(NUM_NODES.value) + + scaleup2_samples: list[sample.Sample] = [] + if scaledown_complete: + scaleup2_samples = _ScaleUpAndCollectSamples( + phase='scaleup2', + replicas=NUM_NODES.value, + cluster=cluster, + initial_nodes=initial_nodes, + initial_pods=set(kubernetes_commands.GetPodNames()), + pod_phase_timeout=3 * 60 * 60, + ) else: logging.warning( 'Skipping final scale up; scaled nodes not deleted within timeout.' ) +<<<<<<< HEAD return samples >>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) +======= + + return scaleup1_samples + scaledown_samples + scaleup2_samples +>>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) -def _ScaleDeploymentReplicas(replicas: int) -> None: +def _ScaleDeploymentReplicas(replicas: int, wait_for_rollout: bool = True) -> None: container_service.RunKubectlCommand([ 'scale', f'--replicas={replicas}', 'deployment/app', ]) - kubernetes_commands.WaitForRollout( - 'deployment/app', - timeout=kubernetes_scale_benchmark._GetScaleTimeout(), + if wait_for_rollout: + kubernetes_commands.WaitForRollout( + 'deployment/app', + timeout=GetScaleTimeout(replicas), + ) + + +def _ScaleUpAndCollectSamples( + phase: str, + replicas: int, + cluster: container_service.KubernetesCluster, + initial_nodes: set[str], + initial_pods: set[str], + pod_phase_timeout: int, +) -> list[sample.Sample]: + start_time = time.time() + _ScaleDeploymentReplicas(replicas, wait_for_rollout=False) + + phase_samples: list[sample.Sample] = [] + phase_samples += _LogPodPhaseCountsUntilReady( + phase=phase, + timeout=pod_phase_timeout, + desired_replicas=replicas, ) + ready_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'pod', + start_time, + resources_to_ignore=initial_pods, + ) + kubernetes_scale_benchmark.CheckForFailures(cluster, ready_samples, replicas) + _AddPhaseMetadata(ready_samples, phase) + phase_samples += ready_samples + node_samples = kubernetes_scale_benchmark.ParseStatusChanges( + 'node', + start_time, + resources_to_ignore=initial_nodes, + ) + _AddPhaseMetadata(node_samples, phase) + phase_samples += node_samples + return phase_samples + -def _WaitForScaledNodesDeletion(initial_node_count: int) -> bool: - timeout = 20 * 60 + kubernetes_scale_benchmark._GetScaleTimeout() - start_time = time.monotonic() +def _ScaleDownAndCollectSamples( + phase: str, + initial_nodes: set[str], + initial_node_count: int, + node_timeout: int, +) -> tuple[list[sample.Sample], bool]: + start_time = time.time() + _ScaleDeploymentReplicas(0, wait_for_rollout=False) + return _LogNodeDeletionUntilGone( + phase=phase, + initial_nodes=initial_nodes, + initial_node_count=initial_node_count, + start_time=start_time, + timeout=node_timeout, + ) + + +def _LogPodPhaseCountsUntilReady( + phase: str, + timeout: int, + desired_replicas: int, +) -> list[sample.Sample]: + samples: list[sample.Sample] = [] + start_monotonic = time.monotonic() while True: - current_node_count = len(kubernetes_commands.GetNodeNames()) - if current_node_count <= initial_node_count: - logging.info('Node count returned to initial level.') + phase_counts, ready_count = _GetPodPhaseCounts() + elapsed = time.monotonic() - start_monotonic + logging.info( + 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', + phase, + int(elapsed), + phase_counts, + ready_count, + desired_replicas, + ) + for pod_phase, count in phase_counts.items(): + samples.append( + sample.Sample( + 'pod_phase_count', + count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': pod_phase, + 'elapsed_seconds': elapsed, + }, + ) + ) + samples.append( + sample.Sample( + 'pod_phase_count', + ready_count, + 'count', + metadata={ + 'phase': phase, + 'pod_phase': 'Ready', + 'elapsed_seconds': elapsed, + }, + ) + ) + + if ready_count >= desired_replicas: + return samples + if elapsed >= timeout: + logging.warning( + 'Timed out waiting for pods to be Ready (%s). Ready: %d/%d', + phase, + ready_count, + desired_replicas, + ) + return samples + time.sleep(60) + + +def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: + stdout, _, _ = container_service.RunKubectlCommand( + [ + 'get', + 'pods', + '-o', + 'jsonpath={.items[*].metadata.name}', + ], + suppress_logging=True, + ) + pod_names = stdout.split() + pod_list: list[dict[str, Any]] = [] + for pod_name in pod_names: + pod_stdout, _, _ = container_service.RunKubectlCommand( + [ + 'get', + 'pod', + pod_name, + '-o', + 'json', + ], + suppress_logging=True, + ) + pod_list.append(json.loads(pod_stdout)) + phase_counts: dict[str, int] = {} + ready_count = 0 + for pod in pod_list: + labels = pod.get('metadata', {}).get('labels', {}) + if labels.get('app') != 'app': + continue + phase = pod.get('status', {}).get('phase', 'Unknown') + phase_counts[phase] = phase_counts.get(phase, 0) + 1 + if _IsPodReady(pod): + ready_count += 1 + return phase_counts, ready_count + + +def _IsPodReady(pod: dict[str, Any]) -> bool: + for condition in pod.get('status', {}).get('conditions', []): + if condition.get('type') == 'Ready' and condition.get('status') == 'True': return True - elapsed = time.monotonic() - start_time + return False + + +def _LogNodeDeletionUntilGone( + phase: str, + initial_nodes: set[str], + initial_node_count: int, + start_time: float, + timeout: int, +) -> tuple[list[sample.Sample], bool]: + samples: list[sample.Sample] = [] + deletion_times: dict[str, float] = {} + scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes + start_monotonic = time.monotonic() + + while True: + current_nodes = set(kubernetes_commands.GetNodeNames()) + remaining_nodes = current_nodes - initial_nodes + elapsed = time.monotonic() - start_monotonic + + for node in list(scaled_nodes): + if node not in current_nodes and node not in deletion_times: + deletion_times[node] = elapsed + scaled_nodes.discard(node) + + samples.append( + sample.Sample( + 'node_remaining_count', + max(len(remaining_nodes), 0), + 'count', + metadata={ + 'phase': phase, + 'elapsed_seconds': elapsed, + }, + ) + ) + + if len(current_nodes) <= initial_node_count: + logging.info('Node count returned to initial level.') + break if elapsed >= timeout: logging.warning( 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', - max(current_node_count - initial_node_count, 0), + max(len(remaining_nodes), 0), ) - return False + break + logging.info( 'Remaining scaled nodes: %d', - max(current_node_count - initial_node_count, 0), + max(len(remaining_nodes), 0), ) time.sleep(60) + delete_samples = _BuildNodeDeletionSamples( + deletion_times, + phase, + start_time, + ) + samples += delete_samples + return samples, len(current_nodes) <= initial_node_count + + +def _BuildNodeDeletionSamples( + deletion_times: dict[str, float], + phase: str, + start_time: float, +) -> list[sample.Sample]: + if not deletion_times: + return [] + summaries = kubernetes_scale_benchmark._SummarizeTimestamps( # pylint: disable=protected-access + list(deletion_times.values()) + ) + percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} + samples: list[sample.Sample] = [] + for percentile, value in summaries.items(): + if percentile not in percentiles: + continue + samples.append( + sample.Sample( + f'node_delete_{percentile}', + value, + 'seconds', + metadata={ + 'phase': phase, + 'start_time_epoch': start_time, + }, + ) + ) + return samples + + +def GetScaleTimeout(num_nodes: int | None = None) -> int: + """Returns the timeout for scale operations in this benchmark.""" + nodes = num_nodes if num_nodes is not None else NUM_NODES.value + base_timeout = 60 * 10 # 10 minutes + per_node_timeout = nodes * 3 # 3 seconds per node + proposed_timeout = base_timeout + per_node_timeout + max_timeout = 60 * 60 # 1 hour + return min(proposed_timeout, max_timeout) + + +def _AddPhaseMetadata(samples: list[sample.Sample], phase: str) -> None: + for s in samples: + s.metadata['phase'] = phase + def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( ['delete', 'deployment', 'app'], - timeout=kubernetes_scale_benchmark._GetScaleTimeout(), +<<<<<<< HEAD + timeout=_GetScaleTimeout(), +======= + timeout=GetScaleTimeout(), +>>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) raise_on_failure=False, ) From 8349410a18a37c1f22eb1024ac62d7189a1eabb4 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Fri, 13 Feb 2026 15:05:32 +0200 Subject: [PATCH 17/26] Refactor kubernetes_node_scale benchmark --- .../kubernetes_node_scale_benchmark.py | 332 ++++++++++-------- 1 file changed, 194 insertions(+), 138 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index c76c406943..2feaaf4fc9 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -31,9 +31,13 @@ from perfkitbenchmarker import configs from perfkitbenchmarker import container_service from perfkitbenchmarker import sample +<<<<<<< HEAD from perfkitbenchmarker.container_service import kubernetes_commands from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark +======= +>>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) from perfkitbenchmarker.container_service import kubernetes_commands +from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark FLAGS = flags.FLAGS @@ -95,7 +99,16 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: """ assert bm_spec.container_cluster cluster = bm_spec.container_cluster +<<<<<<< HEAD + assert isinstance(cluster, container_service.KubernetesCluster) +======= +<<<<<<< HEAD + assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) + cluster: kubernetes_cluster.KubernetesCluster = cluster +======= assert isinstance(cluster, container_service.KubernetesCluster) +>>>>>>> f178b3d9 (Refactor kubernetes_node_scale benchmark) +>>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) <<<<<<< HEAD <<<<<<< HEAD @@ -108,6 +121,7 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: NUM_NODES.value, cluster, initial_nodes, +<<<<<<< HEAD ) # Phase 2: Scale down and wait for nodes to be removed. @@ -455,28 +469,29 @@ def _AddPhaseMetadata( initial_nodes=initial_nodes, initial_pods=set(kubernetes_commands.GetPodNames()), pod_phase_timeout=3 * 60 * 60, +======= +>>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) ) - scaledown_samples, scaledown_complete = _ScaleDownAndCollectSamples( - phase='scaledown', - initial_nodes=initial_nodes, - initial_node_count=initial_node_count, - node_timeout=2 * 60 * 60, + # Phase 2: Scale down and wait for nodes to be removed. + scaledown_samples, nodes_removed = _ScaleDownAndCollect( + 'scaledown', + initial_nodes, + initial_node_count, ) + # Phase 3: Scale up again (only if scale-down succeeded). scaleup2_samples: list[sample.Sample] = [] - if scaledown_complete: - scaleup2_samples = _ScaleUpAndCollectSamples( - phase='scaleup2', - replicas=NUM_NODES.value, - cluster=cluster, - initial_nodes=initial_nodes, - initial_pods=set(kubernetes_commands.GetPodNames()), - pod_phase_timeout=3 * 60 * 60, + if nodes_removed: + scaleup2_samples = _ScaleUpAndCollect( + 'scaleup2', + NUM_NODES.value, + cluster, + initial_nodes, ) else: logging.warning( - 'Skipping final scale up; scaled nodes not deleted within timeout.' + 'Skipping second scale up: nodes did not return to baseline.', ) <<<<<<< HEAD return samples @@ -487,82 +502,97 @@ def _AddPhaseMetadata( >>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) -def _ScaleDeploymentReplicas(replicas: int, wait_for_rollout: bool = True) -> None: - container_service.RunKubectlCommand([ - 'scale', - f'--replicas={replicas}', - 'deployment/app', - ]) - if wait_for_rollout: - kubernetes_commands.WaitForRollout( - 'deployment/app', - timeout=GetScaleTimeout(replicas), - ) - - -def _ScaleUpAndCollectSamples( +def _ScaleUpAndCollect( phase: str, replicas: int, cluster: container_service.KubernetesCluster, initial_nodes: set[str], - initial_pods: set[str], - pod_phase_timeout: int, ) -> list[sample.Sample]: + """Scales the deployment up and collects pod/node timing samples. + + Args: + phase: Label for this phase (e.g. 'scaleup1'). + replicas: Target replica count. + cluster: The Kubernetes cluster. + initial_nodes: Node names present before scaling. + + Returns: + Samples tagged with phase metadata. + """ + initial_pods = set(kubernetes_commands.GetPodNames()) start_time = time.time() - _ScaleDeploymentReplicas(replicas, wait_for_rollout=False) + _ScaleDeployment(replicas) - phase_samples: list[sample.Sample] = [] - phase_samples += _LogPodPhaseCountsUntilReady( - phase=phase, - timeout=pod_phase_timeout, - desired_replicas=replicas, + phase_log_samples = _PollPodPhasesUntilReady( + phase, + replicas, + _SCALE_UP_TIMEOUT_SECONDS, ) - - ready_samples = kubernetes_scale_benchmark.ParseStatusChanges( + pod_samples = kubernetes_scale_benchmark.ParseStatusChanges( 'pod', start_time, resources_to_ignore=initial_pods, ) - kubernetes_scale_benchmark.CheckForFailures(cluster, ready_samples, replicas) - _AddPhaseMetadata(ready_samples, phase) - phase_samples += ready_samples + kubernetes_scale_benchmark.CheckForFailures( + cluster, + pod_samples, + replicas, + ) node_samples = kubernetes_scale_benchmark.ParseStatusChanges( 'node', start_time, resources_to_ignore=initial_nodes, ) - _AddPhaseMetadata(node_samples, phase) - phase_samples += node_samples - return phase_samples + + all_samples = phase_log_samples + pod_samples + node_samples + _AddPhaseMetadata(all_samples, phase) + return all_samples -def _ScaleDownAndCollectSamples( +def _ScaleDownAndCollect( phase: str, initial_nodes: set[str], initial_node_count: int, - node_timeout: int, ) -> tuple[list[sample.Sample], bool]: - start_time = time.time() - _ScaleDeploymentReplicas(0, wait_for_rollout=False) - return _LogNodeDeletionUntilGone( - phase=phase, - initial_nodes=initial_nodes, - initial_node_count=initial_node_count, - start_time=start_time, - timeout=node_timeout, + """Scales deployment to 0 and waits for autoscaler to remove nodes. + + Args: + phase: Label for this phase. + initial_nodes: Node names present before any scaling. + initial_node_count: Number of nodes before any scaling. + + Returns: + A tuple of (samples, whether nodes returned to acceptable level). + """ + _ScaleDeployment(0) + return _PollNodeDeletionUntilDone( + phase, + initial_nodes, + initial_node_count, + _SCALE_DOWN_TIMEOUT_SECONDS, ) -def _LogPodPhaseCountsUntilReady( +def _PollPodPhasesUntilReady( phase: str, - timeout: int, desired_replicas: int, + timeout: int, ) -> list[sample.Sample]: + """Logs pod phase counts every minute until all pods are Ready or timeout. + + Args: + phase: Label for this phase. + desired_replicas: Number of pods expected to become Ready. + timeout: Maximum wall-clock seconds to poll. + + Returns: + Time-series samples of pod counts per phase. + """ samples: list[sample.Sample] = [] - start_monotonic = time.monotonic() + start = time.monotonic() while True: phase_counts, ready_count = _GetPodPhaseCounts() - elapsed = time.monotonic() - start_monotonic + elapsed = time.monotonic() - start logging.info( 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', phase, @@ -584,6 +614,7 @@ def _LogPodPhaseCountsUntilReady( }, ) ) + # Always emit a Ready count for a consistent time-series. samples.append( sample.Sample( 'pod_phase_count', @@ -596,7 +627,6 @@ def _LogPodPhaseCountsUntilReady( }, ) ) - if ready_count >= desired_replicas: return samples if elapsed >= timeout: @@ -607,152 +637,174 @@ def _LogPodPhaseCountsUntilReady( desired_replicas, ) return samples - time.sleep(60) + time.sleep(_POLL_INTERVAL_SECONDS) def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: + """Returns pod phase counts and Ready count for the 'app' deployment. + + Uses a single kubectl call with label selector `app=app`. + + Returns: + A tuple of (phase_name -> count, ready_count). + """ stdout, _, _ = container_service.RunKubectlCommand( - [ - 'get', - 'pods', - '-o', - 'jsonpath={.items[*].metadata.name}', - ], + ['get', 'pods', '-l', 'app=app', '-o', 'json'], suppress_logging=True, ) - pod_names = stdout.split() - pod_list: list[dict[str, Any]] = [] - for pod_name in pod_names: - pod_stdout, _, _ = container_service.RunKubectlCommand( - [ - 'get', - 'pod', - pod_name, - '-o', - 'json', - ], - suppress_logging=True, - ) - pod_list.append(json.loads(pod_stdout)) + pods = json.loads(stdout).get('items', []) phase_counts: dict[str, int] = {} ready_count = 0 - for pod in pod_list: - labels = pod.get('metadata', {}).get('labels', {}) - if labels.get('app') != 'app': - continue - phase = pod.get('status', {}).get('phase', 'Unknown') - phase_counts[phase] = phase_counts.get(phase, 0) + 1 + for pod in pods: + pod_phase = pod.get('status', {}).get('phase', 'Unknown') + phase_counts[pod_phase] = phase_counts.get(pod_phase, 0) + 1 if _IsPodReady(pod): ready_count += 1 return phase_counts, ready_count def _IsPodReady(pod: dict[str, Any]) -> bool: + """Returns True if the pod has a Ready=True condition.""" for condition in pod.get('status', {}).get('conditions', []): if condition.get('type') == 'Ready' and condition.get('status') == 'True': return True return False -def _LogNodeDeletionUntilGone( +def _PollNodeDeletionUntilDone( phase: str, initial_nodes: set[str], initial_node_count: int, - start_time: float, timeout: int, ) -> tuple[list[sample.Sample], bool]: - samples: list[sample.Sample] = [] - deletion_times: dict[str, float] = {} + """Polls node count until autoscaler removes scaled nodes or timeout. + + Allows a small buffer (_SCALE_DOWN_NODE_BUFFER) above the initial count to + account for the cluster autoscaler being conservative (e.g. system workloads + preventing removal). + + Args: + phase: Label for this phase. + initial_nodes: Node names present before any scaling. + initial_node_count: Number of nodes before any scaling. + timeout: Maximum time in seconds (_SCALE_DOWN_TIMEOUT_SECONDS) to poll. + + Returns: + A tuple of (samples, whether node count reached acceptable level). + """ + acceptable_count = initial_node_count + _SCALE_DOWN_NODE_BUFFER scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes - start_monotonic = time.monotonic() + deletion_times: dict[str, float] = {} + samples: list[sample.Sample] = [] + start = time.monotonic() + done = False while True: current_nodes = set(kubernetes_commands.GetNodeNames()) - remaining_nodes = current_nodes - initial_nodes - elapsed = time.monotonic() - start_monotonic + elapsed = time.monotonic() - start + # Record deletion timestamps for nodes that disappeared. for node in list(scaled_nodes): - if node not in current_nodes and node not in deletion_times: + if node not in current_nodes: deletion_times[node] = elapsed scaled_nodes.discard(node) + remaining = max(len(current_nodes) - acceptable_count, 0) samples.append( sample.Sample( 'node_remaining_count', - max(len(remaining_nodes), 0), + remaining, 'count', - metadata={ - 'phase': phase, - 'elapsed_seconds': elapsed, - }, + metadata={'phase': phase, 'elapsed_seconds': elapsed}, ) ) - if len(current_nodes) <= initial_node_count: - logging.info('Node count returned to initial level.') + if len(current_nodes) <= acceptable_count: + logging.info( + 'Node count (%d) within acceptable threshold (%d).', + len(current_nodes), + acceptable_count, + ) + done = True break if elapsed >= timeout: logging.warning( - 'Timed out waiting for scaled nodes to delete. Remaining nodes: %d', - max(len(remaining_nodes), 0), + 'Timed out waiting for nodes to scale down.' + ' Remaining above threshold: %d', + remaining, ) break - logging.info( - 'Remaining scaled nodes: %d', - max(len(remaining_nodes), 0), - ) - time.sleep(60) + logging.info('Remaining scaled nodes above threshold: %d', remaining) + time.sleep(_POLL_INTERVAL_SECONDS) - delete_samples = _BuildNodeDeletionSamples( - deletion_times, - phase, - start_time, - ) - samples += delete_samples - return samples, len(current_nodes) <= initial_node_count + samples += _SummarizeNodeDeletionTimes(deletion_times, phase) + return samples, done -def _BuildNodeDeletionSamples( +def _SummarizeNodeDeletionTimes( deletion_times: dict[str, float], phase: str, - start_time: float, ) -> list[sample.Sample]: + """Builds percentile samples from per-node deletion durations. + + Args: + deletion_times: Mapping of node name to seconds-until-deleted. + phase: Label for this phase. + + Returns: + Samples for p50, p90, p99, p99.9, p100 of deletion times. + """ if not deletion_times: return [] - summaries = kubernetes_scale_benchmark._SummarizeTimestamps( # pylint: disable=protected-access + summaries = kubernetes_scale_benchmark._SummarizeTimestamps( list(deletion_times.values()) ) - percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} + target_percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} samples: list[sample.Sample] = [] - for percentile, value in summaries.items(): - if percentile not in percentiles: - continue - samples.append( - sample.Sample( - f'node_delete_{percentile}', - value, - 'seconds', - metadata={ - 'phase': phase, - 'start_time_epoch': start_time, - }, - ) - ) + for name, value in summaries.items(): + if name in target_percentiles: + samples.append( + sample.Sample( + f'node_delete_{name}', + value, + 'seconds', + metadata={'phase': phase}, + ) + ) return samples -def GetScaleTimeout(num_nodes: int | None = None) -> int: - """Returns the timeout for scale operations in this benchmark.""" +def _ScaleDeployment(replicas: int) -> None: + """Scales the 'app' deployment to the given replica count.""" + container_service.RunKubectlCommand([ + 'scale', + f'--replicas={replicas}', + 'deployment/app', + ]) + + +def _GetScaleTimeout(num_nodes: int | None = None) -> int: + """Returns the timeout for a scale or cleanup operation. + + Args: + num_nodes: Number of nodes to scale to. Defaults to NUM_NODES flag. + + Returns: + Timeout in seconds, capped at 1 hour. + """ nodes = num_nodes if num_nodes is not None else NUM_NODES.value base_timeout = 60 * 10 # 10 minutes per_node_timeout = nodes * 3 # 3 seconds per node - proposed_timeout = base_timeout + per_node_timeout max_timeout = 60 * 60 # 1 hour - return min(proposed_timeout, max_timeout) + return min(base_timeout + per_node_timeout, max_timeout) -def _AddPhaseMetadata(samples: list[sample.Sample], phase: str) -> None: +def _AddPhaseMetadata( + samples: list[sample.Sample], + phase: str, +) -> None: + """Adds phase metadata to all samples.""" for s in samples: s.metadata['phase'] = phase @@ -761,10 +813,14 @@ def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): """Cleanups scale benchmark. Runs before teardown.""" container_service.RunRetryableKubectlCommand( ['delete', 'deployment', 'app'], +<<<<<<< HEAD <<<<<<< HEAD timeout=_GetScaleTimeout(), ======= timeout=GetScaleTimeout(), >>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) +======= + timeout=_GetScaleTimeout(), +>>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) raise_on_failure=False, ) From f6564f44a249030a3053bbb6cc100f17dd355825 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Tue, 17 Feb 2026 11:12:11 +0200 Subject: [PATCH 18/26] Update import and j2 template --- .../kubernetes_node_scale.yaml.j2 | 21 - .../kubernetes_node_scale_benchmark.py | 406 +----------------- 2 files changed, 10 insertions(+), 417 deletions(-) diff --git a/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 b/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 index e410f73d85..260da84b74 100644 --- a/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 +++ b/perfkitbenchmarker/data/container/kubernetes_scale/kubernetes_node_scale.yaml.j2 @@ -1,20 +1,3 @@ -{% if cloud == 'GCP' %} -apiVersion: cloud.google.com/v1 -kind: ComputeClass -metadata: - name: app-ccc -spec: - nodePoolAutoCreation: - enabled: true - priorities: - - machineType: e2-medium # smallest machine-type supported by NAP - spot: false - storage: - bootDiskSize: 20 - bootDiskType: pd-standard - whenUnsatisfiable: DoNotScaleUp -{% endif %} ---- apiVersion: apps/v1 kind: Deployment metadata: @@ -41,10 +24,6 @@ spec: values: - app topologyKey: "kubernetes.io/hostname" - {% if cloud == 'GCP' %} - nodeSelector: - cloud.google.com/compute-class: app-ccc - {% endif %} containers: - name: pause image: registry.k8s.io/pause:3.10 diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 2feaaf4fc9..a12a37abbc 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -11,19 +11,9 @@ for consistent metric collection. """ -<<<<<<< HEAD -<<<<<<< HEAD import json import time from typing import Any -======= -import time ->>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) -======= -import json -import time -from typing import Any ->>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) from absl import flags from absl import logging @@ -31,12 +21,9 @@ from perfkitbenchmarker import configs from perfkitbenchmarker import container_service from perfkitbenchmarker import sample -<<<<<<< HEAD -from perfkitbenchmarker.container_service import kubernetes_commands -from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark -======= ->>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) -from perfkitbenchmarker.container_service import kubernetes_commands +from perfkitbenchmarker.resources.container_service import kubectl +from perfkitbenchmarker.resources.container_service import kubernetes_cluster +from perfkitbenchmarker.resources.container_service import kubernetes_commands from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark FLAGS = flags.FLAGS @@ -99,19 +86,8 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: """ assert bm_spec.container_cluster cluster = bm_spec.container_cluster -<<<<<<< HEAD - assert isinstance(cluster, container_service.KubernetesCluster) -======= -<<<<<<< HEAD assert isinstance(cluster, kubernetes_cluster.KubernetesCluster) - cluster: kubernetes_cluster.KubernetesCluster = cluster -======= - assert isinstance(cluster, container_service.KubernetesCluster) ->>>>>>> f178b3d9 (Refactor kubernetes_node_scale benchmark) ->>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) - -<<<<<<< HEAD -<<<<<<< HEAD + initial_nodes = set(kubernetes_commands.GetNodeNames()) initial_node_count = len(initial_nodes) @@ -121,7 +97,6 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: NUM_NODES.value, cluster, initial_nodes, -<<<<<<< HEAD ) # Phase 2: Scale down and wait for nodes to be removed. @@ -151,7 +126,7 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: def _ScaleUpAndCollect( phase: str, replicas: int, - cluster: container_service.KubernetesCluster, + cluster: kubernetes_cluster.KubernetesCluster, initial_nodes: set[str], ) -> list[sample.Sample]: """Scales the deployment up and collects pod/node timing samples. @@ -294,7 +269,7 @@ def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: Returns: A tuple of (phase_name -> count, ready_count). """ - stdout, _, _ = container_service.RunKubectlCommand( + stdout, _, _ = kubectl.RunKubectlCommand( ['get', 'pods', '-l', 'app=app', '-o', 'json'], suppress_logging=True, ) @@ -423,361 +398,7 @@ def _SummarizeNodeDeletionTimes( def _ScaleDeployment(replicas: int) -> None: """Scales the 'app' deployment to the given replica count.""" - container_service.RunKubectlCommand([ - 'scale', - f'--replicas={replicas}', - 'deployment/app', - ]) - - -def _GetScaleTimeout(num_nodes: int | None = None) -> int: - """Returns the timeout for a scale or cleanup operation. - - Args: - num_nodes: Number of nodes to scale to. Defaults to NUM_NODES flag. - - Returns: - Timeout in seconds, capped at 1 hour. - """ - nodes = num_nodes if num_nodes is not None else NUM_NODES.value - base_timeout = 60 * 10 # 10 minutes - per_node_timeout = nodes * 3 # 3 seconds per node - max_timeout = 60 * 60 # 1 hour - return min(base_timeout + per_node_timeout, max_timeout) - - -def _AddPhaseMetadata( - samples: list[sample.Sample], - phase: str, -) -> None: - """Adds phase metadata to all samples.""" - for s in samples: - s.metadata['phase'] = phase -======= - initial_node_count = len(kubernetes_commands.GetNodeNames()) - start_time = time.time() -======= - initial_nodes = set(kubernetes_commands.GetNodeNames()) - initial_node_count = len(initial_nodes) ->>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) - - # Do one scale up, scale down, then scale up again. - scaleup1_samples = _ScaleUpAndCollectSamples( - phase='scaleup1', - replicas=NUM_NODES.value, - cluster=cluster, - initial_nodes=initial_nodes, - initial_pods=set(kubernetes_commands.GetPodNames()), - pod_phase_timeout=3 * 60 * 60, -======= ->>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) - ) - - # Phase 2: Scale down and wait for nodes to be removed. - scaledown_samples, nodes_removed = _ScaleDownAndCollect( - 'scaledown', - initial_nodes, - initial_node_count, - ) - - # Phase 3: Scale up again (only if scale-down succeeded). - scaleup2_samples: list[sample.Sample] = [] - if nodes_removed: - scaleup2_samples = _ScaleUpAndCollect( - 'scaleup2', - NUM_NODES.value, - cluster, - initial_nodes, - ) - else: - logging.warning( - 'Skipping second scale up: nodes did not return to baseline.', - ) -<<<<<<< HEAD - return samples ->>>>>>> 9a938ee8 (Add scaling down logic and gathering metrics) -======= - - return scaleup1_samples + scaledown_samples + scaleup2_samples ->>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) - - -def _ScaleUpAndCollect( - phase: str, - replicas: int, - cluster: container_service.KubernetesCluster, - initial_nodes: set[str], -) -> list[sample.Sample]: - """Scales the deployment up and collects pod/node timing samples. - - Args: - phase: Label for this phase (e.g. 'scaleup1'). - replicas: Target replica count. - cluster: The Kubernetes cluster. - initial_nodes: Node names present before scaling. - - Returns: - Samples tagged with phase metadata. - """ - initial_pods = set(kubernetes_commands.GetPodNames()) - start_time = time.time() - _ScaleDeployment(replicas) - - phase_log_samples = _PollPodPhasesUntilReady( - phase, - replicas, - _SCALE_UP_TIMEOUT_SECONDS, - ) - pod_samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'pod', - start_time, - resources_to_ignore=initial_pods, - ) - kubernetes_scale_benchmark.CheckForFailures( - cluster, - pod_samples, - replicas, - ) - node_samples = kubernetes_scale_benchmark.ParseStatusChanges( - 'node', - start_time, - resources_to_ignore=initial_nodes, - ) - - all_samples = phase_log_samples + pod_samples + node_samples - _AddPhaseMetadata(all_samples, phase) - return all_samples - - -def _ScaleDownAndCollect( - phase: str, - initial_nodes: set[str], - initial_node_count: int, -) -> tuple[list[sample.Sample], bool]: - """Scales deployment to 0 and waits for autoscaler to remove nodes. - - Args: - phase: Label for this phase. - initial_nodes: Node names present before any scaling. - initial_node_count: Number of nodes before any scaling. - - Returns: - A tuple of (samples, whether nodes returned to acceptable level). - """ - _ScaleDeployment(0) - return _PollNodeDeletionUntilDone( - phase, - initial_nodes, - initial_node_count, - _SCALE_DOWN_TIMEOUT_SECONDS, - ) - - -def _PollPodPhasesUntilReady( - phase: str, - desired_replicas: int, - timeout: int, -) -> list[sample.Sample]: - """Logs pod phase counts every minute until all pods are Ready or timeout. - - Args: - phase: Label for this phase. - desired_replicas: Number of pods expected to become Ready. - timeout: Maximum wall-clock seconds to poll. - - Returns: - Time-series samples of pod counts per phase. - """ - samples: list[sample.Sample] = [] - start = time.monotonic() - while True: - phase_counts, ready_count = _GetPodPhaseCounts() - elapsed = time.monotonic() - start - logging.info( - 'Pod phases (%s) after %ds: %s (Ready: %d/%d)', - phase, - int(elapsed), - phase_counts, - ready_count, - desired_replicas, - ) - for pod_phase, count in phase_counts.items(): - samples.append( - sample.Sample( - 'pod_phase_count', - count, - 'count', - metadata={ - 'phase': phase, - 'pod_phase': pod_phase, - 'elapsed_seconds': elapsed, - }, - ) - ) - # Always emit a Ready count for a consistent time-series. - samples.append( - sample.Sample( - 'pod_phase_count', - ready_count, - 'count', - metadata={ - 'phase': phase, - 'pod_phase': 'Ready', - 'elapsed_seconds': elapsed, - }, - ) - ) - if ready_count >= desired_replicas: - return samples - if elapsed >= timeout: - logging.warning( - 'Timed out waiting for pods to be Ready (%s). Ready: %d/%d', - phase, - ready_count, - desired_replicas, - ) - return samples - time.sleep(_POLL_INTERVAL_SECONDS) - - -def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: - """Returns pod phase counts and Ready count for the 'app' deployment. - - Uses a single kubectl call with label selector `app=app`. - - Returns: - A tuple of (phase_name -> count, ready_count). - """ - stdout, _, _ = container_service.RunKubectlCommand( - ['get', 'pods', '-l', 'app=app', '-o', 'json'], - suppress_logging=True, - ) - pods = json.loads(stdout).get('items', []) - phase_counts: dict[str, int] = {} - ready_count = 0 - for pod in pods: - pod_phase = pod.get('status', {}).get('phase', 'Unknown') - phase_counts[pod_phase] = phase_counts.get(pod_phase, 0) + 1 - if _IsPodReady(pod): - ready_count += 1 - return phase_counts, ready_count - - -def _IsPodReady(pod: dict[str, Any]) -> bool: - """Returns True if the pod has a Ready=True condition.""" - for condition in pod.get('status', {}).get('conditions', []): - if condition.get('type') == 'Ready' and condition.get('status') == 'True': - return True - return False - - -def _PollNodeDeletionUntilDone( - phase: str, - initial_nodes: set[str], - initial_node_count: int, - timeout: int, -) -> tuple[list[sample.Sample], bool]: - """Polls node count until autoscaler removes scaled nodes or timeout. - - Allows a small buffer (_SCALE_DOWN_NODE_BUFFER) above the initial count to - account for the cluster autoscaler being conservative (e.g. system workloads - preventing removal). - - Args: - phase: Label for this phase. - initial_nodes: Node names present before any scaling. - initial_node_count: Number of nodes before any scaling. - timeout: Maximum time in seconds (_SCALE_DOWN_TIMEOUT_SECONDS) to poll. - - Returns: - A tuple of (samples, whether node count reached acceptable level). - """ - acceptable_count = initial_node_count + _SCALE_DOWN_NODE_BUFFER - scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes - deletion_times: dict[str, float] = {} - samples: list[sample.Sample] = [] - start = time.monotonic() - done = False - - while True: - current_nodes = set(kubernetes_commands.GetNodeNames()) - elapsed = time.monotonic() - start - - # Record deletion timestamps for nodes that disappeared. - for node in list(scaled_nodes): - if node not in current_nodes: - deletion_times[node] = elapsed - scaled_nodes.discard(node) - - remaining = max(len(current_nodes) - acceptable_count, 0) - samples.append( - sample.Sample( - 'node_remaining_count', - remaining, - 'count', - metadata={'phase': phase, 'elapsed_seconds': elapsed}, - ) - ) - - if len(current_nodes) <= acceptable_count: - logging.info( - 'Node count (%d) within acceptable threshold (%d).', - len(current_nodes), - acceptable_count, - ) - done = True - break - if elapsed >= timeout: - logging.warning( - 'Timed out waiting for nodes to scale down.' - ' Remaining above threshold: %d', - remaining, - ) - break - - logging.info('Remaining scaled nodes above threshold: %d', remaining) - time.sleep(_POLL_INTERVAL_SECONDS) - - samples += _SummarizeNodeDeletionTimes(deletion_times, phase) - return samples, done - - -def _SummarizeNodeDeletionTimes( - deletion_times: dict[str, float], - phase: str, -) -> list[sample.Sample]: - """Builds percentile samples from per-node deletion durations. - - Args: - deletion_times: Mapping of node name to seconds-until-deleted. - phase: Label for this phase. - - Returns: - Samples for p50, p90, p99, p99.9, p100 of deletion times. - """ - if not deletion_times: - return [] - summaries = kubernetes_scale_benchmark._SummarizeTimestamps( - list(deletion_times.values()) - ) - target_percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} - samples: list[sample.Sample] = [] - for name, value in summaries.items(): - if name in target_percentiles: - samples.append( - sample.Sample( - f'node_delete_{name}', - value, - 'seconds', - metadata={'phase': phase}, - ) - ) - return samples - - -def _ScaleDeployment(replicas: int) -> None: - """Scales the 'app' deployment to the given replica count.""" - container_service.RunKubectlCommand([ + kubectl.RunKubectlCommand([ 'scale', f'--replicas={replicas}', 'deployment/app', @@ -810,17 +431,10 @@ def _AddPhaseMetadata( def Cleanup(bm_spec: benchmark_spec.BenchmarkSpec): - """Cleanups scale benchmark. Runs before teardown.""" - container_service.RunRetryableKubectlCommand( + """Deletes the app deployment. Runs before cluster teardown.""" + del bm_spec # Unused. + kubectl.RunRetryableKubectlCommand( ['delete', 'deployment', 'app'], -<<<<<<< HEAD -<<<<<<< HEAD - timeout=_GetScaleTimeout(), -======= - timeout=GetScaleTimeout(), ->>>>>>> 9fbbc449 (Add scaling down logic, phases and gathering metrics) -======= timeout=_GetScaleTimeout(), ->>>>>>> 8dff7d10 (Refactor kubernetes_node_scale benchmark) raise_on_failure=False, ) From 06577af435000181b3707fe89e3ffcec58834989 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Wed, 18 Feb 2026 13:10:26 +0200 Subject: [PATCH 19/26] Fix issue where the first kubectl get command might failed --- .../linux_benchmarks/kubernetes_node_scale_benchmark.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index a12a37abbc..ad7e8f5b46 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -269,10 +269,13 @@ def _GetPodPhaseCounts() -> tuple[dict[str, int], int]: Returns: A tuple of (phase_name -> count, ready_count). """ - stdout, _, _ = kubectl.RunKubectlCommand( + stdout, _, retcode = kubectl.RunKubectlCommand( ['get', 'pods', '-l', 'app=app', '-o', 'json'], suppress_logging=True, + raise_on_failure=False, ) + if retcode: + return {}, 0 pods = json.loads(stdout).get('items', []) phase_counts: dict[str, int] = {} ready_count = 0 From f2dc3bf565bc419fccdeede2941649a20ac877df Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Mon, 23 Feb 2026 13:59:29 +0200 Subject: [PATCH 20/26] Raise an error when the timeout is reached --- .../kubernetes_node_scale_benchmark.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index ad7e8f5b46..783661e155 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -19,7 +19,7 @@ from absl import logging from perfkitbenchmarker import benchmark_spec from perfkitbenchmarker import configs -from perfkitbenchmarker import container_service +from perfkitbenchmarker import errors from perfkitbenchmarker import sample from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_cluster @@ -116,8 +116,9 @@ def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: initial_nodes, ) else: - logging.warning( - 'Skipping second scale up: nodes did not return to baseline.', + raise errors.Benchmarks.RunError( + 'Nodes did not return to baseline after scale-down. ' + 'Cannot proceed with second scale-up.' ) return scaleup1_samples + scaledown_samples + scaleup2_samples @@ -352,12 +353,10 @@ def _PollNodeDeletionUntilDone( done = True break if elapsed >= timeout: - logging.warning( + raise errors.Benchmarks.RunError( 'Timed out waiting for nodes to scale down.' - ' Remaining above threshold: %d', - remaining, + ' Remaining above threshold: %d' % remaining ) - break logging.info('Remaining scaled nodes above threshold: %d', remaining) time.sleep(_POLL_INTERVAL_SECONDS) From 9911e6835fa7e90bf6641a6a0b32e0de6afdaf81 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Tue, 24 Feb 2026 12:19:32 +0200 Subject: [PATCH 21/26] Add optional argument suppress_logging to 'GetAllNamesForResourceType' and 'GetNodeNames' methods --- .../kubernetes_node_scale_benchmark.py | 7 +++++-- .../container_service/kubernetes_commands.py | 12 ++++++++---- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 783661e155..67ced72f14 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -318,14 +318,17 @@ def _PollNodeDeletionUntilDone( A tuple of (samples, whether node count reached acceptable level). """ acceptable_count = initial_node_count + _SCALE_DOWN_NODE_BUFFER - scaled_nodes = set(kubernetes_commands.GetNodeNames()) - initial_nodes + scaled_nodes = ( + set(kubernetes_commands.GetNodeNames(suppress_logging=True)) + - initial_nodes + ) deletion_times: dict[str, float] = {} samples: list[sample.Sample] = [] start = time.monotonic() done = False while True: - current_nodes = set(kubernetes_commands.GetNodeNames()) + current_nodes = set(kubernetes_commands.GetNodeNames(suppress_logging=True)) elapsed = time.monotonic() - start # Record deletion timestamps for nodes that disappeared. diff --git a/perfkitbenchmarker/resources/container_service/kubernetes_commands.py b/perfkitbenchmarker/resources/container_service/kubernetes_commands.py index 7d63c5e163..c0fadb2f9f 100644 --- a/perfkitbenchmarker/resources/container_service/kubernetes_commands.py +++ b/perfkitbenchmarker/resources/container_service/kubernetes_commands.py @@ -720,15 +720,19 @@ def GetPodNames() -> list[str]: return GetAllNamesForResourceType('pods') -def GetNodeNames() -> list[str]: +def GetNodeNames(suppress_logging: bool = False) -> list[str]: """Get the node names for the cluster.""" - return GetAllNamesForResourceType('nodes') + return GetAllNamesForResourceType('nodes', suppress_logging=suppress_logging) -def GetAllNamesForResourceType(resource_type: str) -> list[str]: +def GetAllNamesForResourceType( + resource_type: str, + suppress_logging: bool = False, +) -> list[str]: """Get all names for the specified resource. Type should be plural.""" stdout, _, _ = kubectl.RunKubectlCommand( - ['get', resource_type, '-o', 'jsonpath={.items[*].metadata.name}'] + ['get', resource_type, '-o', 'jsonpath={.items[*].metadata.name}'], + suppress_logging=suppress_logging, ) return stdout.split() From 7de342b85cf137a04638050335297f171d085e47 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Tue, 24 Feb 2026 19:38:36 +0200 Subject: [PATCH 22/26] Enable autoscaler for additional node pools, add pod cidr /10 for big cluster --- .../providers/azure/azure_kubernetes_service.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py b/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py index b547a0f83e..2972420234 100644 --- a/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py +++ b/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py @@ -173,6 +173,9 @@ def _Create(self): '--nodepool-labels', f'pkb_nodepool={container_cluster.DEFAULT_NODEPOOL}', ] + self._GetNodeFlags(self.default_nodepool) + # AKS clusters with more than 256 nodes need to use a larger pod CIDR + if self.max_nodes > 256: + cmd += ['--pod-cidr', '100.64.0.0/10'] if self.enable_vpa: cmd.append('--enable-vpa') if FLAGS.azure_aks_auto_node_provisioning: @@ -214,6 +217,12 @@ def _CreateNodePool(self, nodepool_config: container.BaseNodePoolConfig): '--labels', f'pkb_nodepool={nodepool_config.name}', ] + self._GetNodeFlags(nodepool_config) + if self._IsAutoscalerEnabled(): + cmd += [ + '--enable-cluster-autoscaler', + f'--min-count={self.min_nodes}', + f'--max-count={self.max_nodes}', + ] vm_util.IssueCommand(cmd, timeout=600) def _GetNodeFlags( From 9679fe1a2114fff8f8e8544afe5217a23c700ef6 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Thu, 5 Mar 2026 11:39:17 +0200 Subject: [PATCH 23/26] Add additional params to AKS, address linting warnings --- .../kubernetes_node_scale_benchmark.py | 19 +++++++++++++++++-- .../kubernetes_scale_benchmark.py | 4 ++-- .../azure/azure_kubernetes_service.py | 19 +++++++++++++++++-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index 67ced72f14..ee868d773e 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -81,6 +81,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): def Run(bm_spec: benchmark_spec.BenchmarkSpec) -> list[sample.Sample]: """Runs the scale-up, scale-down, scale-up benchmark sequence. + Args: + bm_spec: The benchmark specification. + Returns: Combined samples from all three phases, each tagged with phase metadata. """ @@ -328,7 +331,19 @@ def _PollNodeDeletionUntilDone( done = False while True: - current_nodes = set(kubernetes_commands.GetNodeNames(suppress_logging=True)) + stdout, _, retcode = kubectl.RunKubectlCommand( + ['get', 'nodes', '-o', 'jsonpath={.items[*].metadata.name}'], + suppress_logging=True, + raise_on_failure=False, + ) + if retcode: + logging.warning( + 'kubectl get nodes failed (retcode=%d), retrying.', + retcode, + ) + time.sleep(_POLL_INTERVAL_SECONDS) + continue + current_nodes = set(stdout.split()) elapsed = time.monotonic() - start # Record deletion timestamps for nodes that disappeared. @@ -383,7 +398,7 @@ def _SummarizeNodeDeletionTimes( """ if not deletion_times: return [] - summaries = kubernetes_scale_benchmark._SummarizeTimestamps( + summaries = kubernetes_scale_benchmark.SummarizeTimestamps( list(deletion_times.values()) ) target_percentiles = {'p50', 'p90', 'p99', 'p99.9', 'p100'} diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py index ff6f8a0389..d5ce1440c9 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_scale_benchmark.py @@ -463,7 +463,7 @@ def ParseStatusChanges( ) if not REPORT_PERCENTILES.value: continue - summaries = _SummarizeTimestamps(timestamps) + summaries = SummarizeTimestamps(timestamps) for percentile, value in summaries.items(): samples.append( sample.Sample( @@ -491,7 +491,7 @@ def ParseStatusChanges( return samples -def _SummarizeTimestamps(timestamps: list[float]) -> dict[str, float]: +def SummarizeTimestamps(timestamps: list[float]) -> dict[str, float]: """Returns a few metrics about a list of timestamps.""" percentiles = [0, 10, 50, 90, 95, 99.9, 100] summary = { diff --git a/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py b/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py index 2972420234..43fbe75b55 100644 --- a/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py +++ b/perfkitbenchmarker/providers/azure/azure_kubernetes_service.py @@ -173,9 +173,24 @@ def _Create(self): '--nodepool-labels', f'pkb_nodepool={container_cluster.DEFAULT_NODEPOOL}', ] + self._GetNodeFlags(self.default_nodepool) - # AKS clusters with more than 256 nodes need to use a larger pod CIDR if self.max_nodes > 256: - cmd += ['--pod-cidr', '100.64.0.0/10'] + cmd += [ + # Default /16 supports ~250 nodes; /10 provides ~4M IPs for up to 16k. + '--pod-cidr', + '100.64.0.0/10', + # Free tier caps at 1k nodes; standard scales upto 5k nodes. + '--tier', + 'standard', + # Standard LB exhausts SNAT ports at ~1k nodes; NAT Gateway required. + '--outbound-type', + 'managedNATGateway', + # 4 IPs = ~256k SNAT ports (~50 ports/node at 5k nodes). + '--nat-gateway-managed-outbound-ip-count', + '4', + # Prevent connection drops during long bootstrap or idle keep-alives. + '--nat-gateway-idle-timeout', + '10', + ] if self.enable_vpa: cmd.append('--enable-vpa') if FLAGS.azure_aks_auto_node_provisioning: From ba8a0818246f0fe7483c99e9778015f7e885c0b8 Mon Sep 17 00:00:00 2001 From: Zach Howell Date: Fri, 6 Mar 2026 15:10:35 -0800 Subject: [PATCH 24/26] Move static_vm_spec to its own file PiperOrigin-RevId: 879840203 --- .../configs/static_vm_decoders.py | 4 +- perfkitbenchmarker/configs/static_vm_spec.py | 86 +++++++++++++++++++ .../configs/vm_group_decoders.py | 4 +- perfkitbenchmarker/static_virtual_machine.py | 71 +-------------- tests/configs/benchmark_config_spec_test.py | 6 +- .../openfoam_benchmark_test.py | 5 +- tests/static_virtual_machine_test.py | 13 +-- 7 files changed, 107 insertions(+), 82 deletions(-) create mode 100644 perfkitbenchmarker/configs/static_vm_spec.py diff --git a/perfkitbenchmarker/configs/static_vm_decoders.py b/perfkitbenchmarker/configs/static_vm_decoders.py index e5d4f5bbae..3c7c362554 100644 --- a/perfkitbenchmarker/configs/static_vm_decoders.py +++ b/perfkitbenchmarker/configs/static_vm_decoders.py @@ -13,8 +13,8 @@ # limitations under the License. """Module containing Static VM Decoders.""" -from perfkitbenchmarker import static_virtual_machine from perfkitbenchmarker.configs import option_decoders +from perfkitbenchmarker.configs import static_vm_spec class StaticVmDecoder(option_decoders.TypeVerifier): @@ -38,7 +38,7 @@ def Decode(self, value, component_full_name, flag_values): errors.Config.InvalidValue upon invalid input value. """ input_dict = super().Decode(value, component_full_name, flag_values) - return static_virtual_machine.StaticVmSpec( + return static_vm_spec.StaticVmSpec( self._GetOptionFullName(component_full_name), flag_values=flag_values, **input_dict diff --git a/perfkitbenchmarker/configs/static_vm_spec.py b/perfkitbenchmarker/configs/static_vm_spec.py new file mode 100644 index 0000000000..994d29c21d --- /dev/null +++ b/perfkitbenchmarker/configs/static_vm_spec.py @@ -0,0 +1,86 @@ +# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""The static VM spec. + +Used to define static VMs (ie not created by PKB). See static_virtual_machine.py +for more details. +""" + +from perfkitbenchmarker import disk +from perfkitbenchmarker import virtual_machine_spec + + +class StaticVmSpec(virtual_machine_spec.BaseVmSpec): + """Object containing all info needed to create a Static VM.""" + + CLOUD = 'Static' + + def __init__( + self, + component_full_name, + ip_address=None, + user_name=None, + ssh_private_key=None, + internal_ip=None, + internal_ips=None, + ssh_port=22, + password=None, + disk_specs=None, + os_type=None, + tag=None, + zone=None, + **kwargs + ): + """Initialize the StaticVmSpec object. + + Args: + component_full_name: string. Fully qualified name of the configurable + component containing the config options. + ip_address: The public ip address of the VM. + user_name: The username of the VM that the keyfile corresponds to. + ssh_private_key: The absolute path to the private keyfile to use to ssh to + the VM. + internal_ip: The internal ip address of the VM. + internal_ips: The internal ip addresses of the VMs. + ssh_port: The port number to use for SSH and SCP commands. + password: The password used to log into the VM (Windows Only). + disk_specs: None or a list of dictionaries containing kwargs used to + create disk.BaseDiskSpecs. + os_type: The OS type of the VM. See the flag of the same name for more + information. + tag: A string that allows the VM to be included or excluded from a run by + using the 'static_vm_tags' flag. + zone: The VM's zone. + **kwargs: Other args for the superclass. + """ + super().__init__(component_full_name, **kwargs) + self.ip_address = ip_address + self.user_name = user_name + self.ssh_private_key = ssh_private_key + self.internal_ip = internal_ip + self.internal_ips = internal_ips + self.ssh_port = ssh_port + self.password = password + self.os_type = os_type + self.tag = tag + self.zone = zone + self.disk_specs = [ + disk.BaseDiskSpec( + '{}.disk_specs[{}]'.format(component_full_name, i), + flag_values=kwargs.get('flag_values'), + **disk_spec + ) + for i, disk_spec in enumerate(disk_specs or ()) + ] diff --git a/perfkitbenchmarker/configs/vm_group_decoders.py b/perfkitbenchmarker/configs/vm_group_decoders.py index f4d271cfaf..d399a9cf89 100644 --- a/perfkitbenchmarker/configs/vm_group_decoders.py +++ b/perfkitbenchmarker/configs/vm_group_decoders.py @@ -18,11 +18,11 @@ from perfkitbenchmarker import os_types from perfkitbenchmarker import provider_info from perfkitbenchmarker import providers -from perfkitbenchmarker import static_virtual_machine from perfkitbenchmarker import virtual_machine_spec from perfkitbenchmarker.configs import option_decoders from perfkitbenchmarker.configs import spec from perfkitbenchmarker.configs import static_vm_decoders +from perfkitbenchmarker.configs import static_vm_spec _DEFAULT_DISK_COUNT = 1 _DEFAULT_VM_COUNT = 1 @@ -52,7 +52,7 @@ class VmGroupSpec(spec.BaseSpec): disk_type: str disk_spec: disk.BaseDiskSpec os_type: str - static_vms: list[static_virtual_machine.StaticVmSpec] + static_vms: list[static_vm_spec.StaticVmSpec] vm_count: int vm_spec: virtual_machine_spec.BaseVmSpec vm_as_nfs: bool diff --git a/perfkitbenchmarker/static_virtual_machine.py b/perfkitbenchmarker/static_virtual_machine.py index a5a1607036..649a1d7f01 100644 --- a/perfkitbenchmarker/static_virtual_machine.py +++ b/perfkitbenchmarker/static_virtual_machine.py @@ -35,8 +35,9 @@ from perfkitbenchmarker import linux_virtual_machine from perfkitbenchmarker import os_types from perfkitbenchmarker import virtual_machine -from perfkitbenchmarker import virtual_machine_spec from perfkitbenchmarker import windows_virtual_machine +from perfkitbenchmarker.configs import static_vm_spec + FLAGS = flags.FLAGS @@ -56,70 +57,6 @@ ) -class StaticVmSpec(virtual_machine_spec.BaseVmSpec): - """Object containing all info needed to create a Static VM.""" - - CLOUD = 'Static' - - def __init__( - self, - component_full_name, - ip_address=None, - user_name=None, - ssh_private_key=None, - internal_ip=None, - internal_ips=None, - ssh_port=22, - password=None, - disk_specs=None, - os_type=None, - tag=None, - zone=None, - **kwargs - ): - """Initialize the StaticVmSpec object. - - Args: - component_full_name: string. Fully qualified name of the configurable - component containing the config options. - ip_address: The public ip address of the VM. - user_name: The username of the VM that the keyfile corresponds to. - ssh_private_key: The absolute path to the private keyfile to use to ssh to - the VM. - internal_ip: The internal ip address of the VM. - internal_ips: The internal ip addresses of the VMs. - ssh_port: The port number to use for SSH and SCP commands. - password: The password used to log into the VM (Windows Only). - disk_specs: None or a list of dictionaries containing kwargs used to - create disk.BaseDiskSpecs. - os_type: The OS type of the VM. See the flag of the same name for more - information. - tag: A string that allows the VM to be included or excluded from a run by - using the 'static_vm_tags' flag. - zone: The VM's zone. - **kwargs: Other args for the superclass. - """ - super().__init__(component_full_name, **kwargs) - self.ip_address = ip_address - self.user_name = user_name - self.ssh_private_key = ssh_private_key - self.internal_ip = internal_ip - self.internal_ips = internal_ips - self.ssh_port = ssh_port - self.password = password - self.os_type = os_type - self.tag = tag - self.zone = zone - self.disk_specs = [ - disk.BaseDiskSpec( - '{}.disk_specs[{}]'.format(component_full_name, i), - flag_values=kwargs.get('flag_values'), - **disk_spec - ) - for i, disk_spec in enumerate(disk_specs or ()) - ] - - class StaticDisk(disk.BaseDisk): """Object representing a static Disk.""" @@ -148,7 +85,7 @@ class StaticVirtualMachine(virtual_machine.BaseVirtualMachine): vm_pool = collections.deque() vm_pool_lock = threading.Lock() - def __init__(self, vm_spec): + def __init__(self, vm_spec: static_vm_spec.StaticVmSpec): """Initialize a static virtual machine. Args: @@ -326,7 +263,7 @@ def VerifyItemFormat(item): for local_disk in local_disks: disk_kwargs_list.append({'device_path': local_disk}) - vm_spec = StaticVmSpec( + vm_spec = static_vm_spec.StaticVmSpec( 'static_vm_file', ip_address=ip_address, user_name=user_name, diff --git a/tests/configs/benchmark_config_spec_test.py b/tests/configs/benchmark_config_spec_test.py index 8d3ec56d78..dd277409f7 100644 --- a/tests/configs/benchmark_config_spec_test.py +++ b/tests/configs/benchmark_config_spec_test.py @@ -22,11 +22,11 @@ from perfkitbenchmarker import os_types from perfkitbenchmarker import provider_info from perfkitbenchmarker import providers -from perfkitbenchmarker import static_virtual_machine from perfkitbenchmarker import virtual_machine_spec from perfkitbenchmarker.configs import benchmark_config_spec from perfkitbenchmarker.configs import spec from perfkitbenchmarker.configs import static_vm_decoders +from perfkitbenchmarker.configs import static_vm_spec from perfkitbenchmarker.configs import vm_group_decoders from perfkitbenchmarker.providers.aws import aws_disk from perfkitbenchmarker.providers.gcp import gce_virtual_machine @@ -128,7 +128,7 @@ def testNone(self): def testValidInput(self): result = self._decoder.Decode({'ssh_port': 111}, _COMPONENT, {}) - self.assertIsInstance(result, static_virtual_machine.StaticVmSpec) + self.assertIsInstance(result, static_vm_spec.StaticVmSpec) self.assertEqual(result.ssh_port, 111) def testVmSpecFlag(self): @@ -318,7 +318,7 @@ def testValidInput(self): self.assertIsInstance(result.static_vms, list) self.assertEqual(len(result.static_vms), 1) self.assertIsInstance( - result.static_vms[0], static_virtual_machine.StaticVmSpec + result.static_vms[0], static_vm_spec.StaticVmSpec ) self.assertEqual(result.vm_count, 0) self.assertIsInstance(result.vm_spec, virtual_machine_spec.BaseVmSpec) diff --git a/tests/linux_benchmarks/openfoam_benchmark_test.py b/tests/linux_benchmarks/openfoam_benchmark_test.py index bbd1e3421b..6c196be836 100644 --- a/tests/linux_benchmarks/openfoam_benchmark_test.py +++ b/tests/linux_benchmarks/openfoam_benchmark_test.py @@ -23,6 +23,7 @@ from perfkitbenchmarker import sample from perfkitbenchmarker import static_virtual_machine from perfkitbenchmarker import test_util +from perfkitbenchmarker.configs import static_vm_spec from perfkitbenchmarker.linux_benchmarks import openfoam_benchmark from perfkitbenchmarker.linux_packages import openmpi from tests import pkb_common_test_case @@ -95,9 +96,9 @@ def testRunCaseReturnsCorrectlyParsedSamples(self, mock_parseruncommands): ) def testYumInstallRaisesNotImplementedError(self): - static_vm_spec = static_virtual_machine.StaticVmSpec('test_static_vm_spec') + vm_spec = static_vm_spec.StaticVmSpec('test_static_vm_spec') self.mock_vm = static_virtual_machine.Rhel9BasedStaticVirtualMachine( - static_vm_spec + vm_spec ) self.mock_vm.install_packages = True with self.assertRaises(NotImplementedError): diff --git a/tests/static_virtual_machine_test.py b/tests/static_virtual_machine_test.py index 14ee7a65f8..ca0c937a2b 100644 --- a/tests/static_virtual_machine_test.py +++ b/tests/static_virtual_machine_test.py @@ -20,6 +20,7 @@ from perfkitbenchmarker import disk from perfkitbenchmarker import static_virtual_machine as svm from perfkitbenchmarker import vm_util +from perfkitbenchmarker.configs import static_vm_spec from tests import pkb_common_test_case FLAGS = flags.FLAGS @@ -38,14 +39,14 @@ class TestStaticVirtualMachine( def CreateTestStaticVm(): - vm_spec = svm.StaticVmSpec(_COMPONENT) + vm_spec = static_vm_spec.StaticVmSpec(_COMPONENT) return TestStaticVirtualMachine(vm_spec=vm_spec) class StaticVmSpecTest(pkb_common_test_case.PkbCommonTestCase): def testDefaults(self): - spec = svm.StaticVmSpec(_COMPONENT) + spec = static_vm_spec.StaticVmSpec(_COMPONENT) self.assertIsNone(spec.ip_address) self.assertIsNone(spec.user_name) self.assertIsNone(spec.ssh_private_key) @@ -56,7 +57,7 @@ def testDefaults(self): self.assertEqual(spec.disk_specs, []) def testDiskSpecs(self): - spec = svm.StaticVmSpec(_COMPONENT, disk_specs=_DISK_SPEC_DICTS) + spec = static_vm_spec.StaticVmSpec(_COMPONENT, disk_specs=_DISK_SPEC_DICTS) self.assertEqual(len(spec.disk_specs), 2) for disk_spec in spec.disk_specs: self.assertIsInstance(disk_spec, disk.BaseDiskSpec) @@ -128,7 +129,7 @@ def testReadFromFile_NoErr(self): self.assertEqual(2, len(vm_pool)) self._AssertStaticVMsEqual( TestStaticVirtualMachine( - svm.StaticVmSpec( + static_vm_spec.StaticVmSpec( _COMPONENT, ip_address='174.12.14.1', user_name='perfkitbenchmarker', @@ -139,7 +140,7 @@ def testReadFromFile_NoErr(self): ) self._AssertStaticVMsEqual( TestStaticVirtualMachine( - svm.StaticVmSpec( + static_vm_spec.StaticVmSpec( _COMPONENT, ip_address='174.12.14.121', user_name='ubuntu', @@ -181,7 +182,7 @@ def testReadFromFile_UnknownOsTypeDefaultsToLinuxRequiredKeys(self): self.assertEqual(1, len(vm_pool)) self._AssertStaticVMsEqual( TestStaticVirtualMachine( - svm.StaticVmSpec( + static_vm_spec.StaticVmSpec( _COMPONENT, ip_address='174.12.14.1', user_name='perfkitbenchmarker', From 77f369a5a2b51aa0b2e9e202181429d875f41990 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Mon, 9 Mar 2026 13:27:46 +0200 Subject: [PATCH 25/26] Update kubernetes_node_scale_benchmark --- .../kubernetes_node_scale_benchmark.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py index ee868d773e..97bd08f36e 100644 --- a/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py +++ b/perfkitbenchmarker/linux_benchmarks/kubernetes_node_scale_benchmark.py @@ -21,10 +21,10 @@ from perfkitbenchmarker import configs from perfkitbenchmarker import errors from perfkitbenchmarker import sample +from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark from perfkitbenchmarker.resources.container_service import kubectl from perfkitbenchmarker.resources.container_service import kubernetes_cluster from perfkitbenchmarker.resources.container_service import kubernetes_commands -from perfkitbenchmarker.linux_benchmarks import kubernetes_scale_benchmark FLAGS = flags.FLAGS @@ -47,6 +47,15 @@ MANIFEST_TEMPLATE = 'container/kubernetes_scale/kubernetes_node_scale.yaml.j2' +# Allow this many extra nodes above the baseline when checking whether +# scale-down is complete. The cluster autoscaler can be conservative and +# keep nodes around for system workloads (e.g. metrics-server). +_SCALE_DOWN_NODE_BUFFER = 2 + +_SCALE_UP_TIMEOUT_SECONDS = 3 * 60 * 60 # 3 hours +_SCALE_DOWN_TIMEOUT_SECONDS = 2 * 60 * 60 # 2 hours +_POLL_INTERVAL_SECONDS = 60 + def CheckPrerequisites(_): """Validates flags and config.""" @@ -62,13 +71,9 @@ def Prepare(bm_spec: benchmark_spec.BenchmarkSpec): bm_spec.always_call_cleanup = True assert bm_spec.container_cluster cluster = bm_spec.container_cluster - manifest_kwargs = dict( - cloud=FLAGS.cloud, - ) - yaml_docs = kubernetes_commands.ConvertManifestToYamlDicts( MANIFEST_TEMPLATE, - **manifest_kwargs, + cloud=FLAGS.cloud, ) cluster.ModifyPodSpecPlacementYaml( yaml_docs, From efad1181016cb4d19a9b9ce561b8e040e9c2c568 Mon Sep 17 00:00:00 2001 From: Vladimir Rybnikov Date: Mon, 9 Mar 2026 14:13:53 +0200 Subject: [PATCH 26/26] Fix cluster-ipv4-cidr calculation to account for additional nodepools --- .../providers/gcp/google_kubernetes_engine.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py index d44bdf25fa..884761c395 100644 --- a/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py +++ b/perfkitbenchmarker/providers/gcp/google_kubernetes_engine.py @@ -371,7 +371,11 @@ def _Create(self): cmd.args.append('--enable-autoscaling') cmd.flags['max-nodes'] = self.max_nodes cmd.flags['min-nodes'] = self.min_nodes - cmd.flags['cluster-ipv4-cidr'] = f'/{_CalculateCidrSize(self.max_nodes)}' + total_max_nodes = self.max_nodes + sum( + getattr(np, 'max_nodes', self.max_nodes) + for np in self.nodepools.values() + ) + cmd.flags['cluster-ipv4-cidr'] = f'/{_CalculateCidrSize(total_max_nodes)}' cmd.flags['metadata'] = util.MakeFormattedDefaultTags() self._RunClusterCreateCommand(cmd)