From 17814fa77d61095da1ca7d7ddcb5426e40e1cfe1 Mon Sep 17 00:00:00 2001 From: Manoj Sasankan Date: Tue, 24 Feb 2026 20:04:01 -0500 Subject: [PATCH] Feat: Redis-GKE - Update Redis package for replication and add HAProxy package --- perfkitbenchmarker/linux_packages/haproxy.py | 116 ++++++++++++++++++ .../linux_packages/redis_server.py | 102 ++++++++++++--- 2 files changed, 201 insertions(+), 17 deletions(-) create mode 100644 perfkitbenchmarker/linux_packages/haproxy.py diff --git a/perfkitbenchmarker/linux_packages/haproxy.py b/perfkitbenchmarker/linux_packages/haproxy.py new file mode 100644 index 0000000000..9c89936bbe --- /dev/null +++ b/perfkitbenchmarker/linux_packages/haproxy.py @@ -0,0 +1,116 @@ +# Copyright 2024 PerfKitBenchmarker Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Module containing HAProxy installation and configuration for Redis HA.""" + +from absl import flags +from perfkitbenchmarker import errors +from perfkitbenchmarker import vm_util + +FLAGS = flags.FLAGS + +HAPROXY_CONFIG = '/etc/haproxy/haproxy.cfg' + + +def Install(vm): + """Installs HAProxy on the VM.""" + vm.RemoteCommand('sudo apt-get update', ignore_failure=True) + vm.RemoteCommand('sudo apt-get install -y haproxy') + + # Verify installation + vm.RemoteCommand('haproxy -v') + + +def Configure(vm, primary_ip, replica_ip): + """Configures HAProxy to proxy Redis traffic to primary and replica.""" + config_content = f""" +global + log /dev/log local0 + log /dev/log local1 notice + chroot /var/lib/haproxy + stats socket /run/haproxy/admin.sock mode 660 level admin + stats timeout 30s + user haproxy + group haproxy + daemon + +defaults + log global + mode tcp + option tcplog + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +# Frontend for writes (port 6379) -> Primary +frontend redis_write + bind *:6379 + default_backend redis_primary + +# Backend for Primary +backend redis_primary + mode tcp + server primary {primary_ip}:6379 check + +# Frontend for reads (port 6380) -> Replica +frontend redis_read + bind *:6380 + default_backend redis_replica + +# Backend for Replica +backend redis_replica + mode tcp + server replica {replica_ip}:6379 check +""" + + # Write config to file + vm.RemoteCommand(f'echo "{config_content}" | sudo tee {HAPROXY_CONFIG}') + + +@vm_util.Retry(poll_interval=2, timeout=30) +def _WaitForHAProxyUp(vm): + """Waits until HAProxy is running and listening on its ports.""" + haproxy_check = vm.RemoteCommand('pgrep -f haproxy', ignore_failure=True)[0] + if not haproxy_check: + raise errors.Resource.RetryableCreationError( + 'HAProxy process not found yet.' + ) + + # Check if HAProxy is listening on ports 6379 and 6380 + vm.RemoteCommand('echo "=== Checking HAProxy ports ==="') + vm.RemoteCommand( + 'sudo netstat -tlnp | grep haproxy || ss -tlnp | grep haproxy', + ignore_failure=True, + ) + + # Verify ports are reachable via netcat + _, _, rc_6379 = vm.RemoteCommandWithReturnCode( + 'nc -zv localhost 6379', ignore_failure=True + ) + _, _, rc_6380 = vm.RemoteCommandWithReturnCode( + 'nc -zv localhost 6380', ignore_failure=True + ) + + if rc_6379 != 0 or rc_6380 != 0: + raise errors.Resource.RetryableCreationError( + 'HAProxy ports not reachable yet.' + ) + + +def Start(vm): + """Starts HAProxy as a background process.""" + # Start HAProxy directly (not via systemctl - doesn't exist in containers) + vm.RemoteCommand('sudo haproxy -f /etc/haproxy/haproxy.cfg -D') + + _WaitForHAProxyUp(vm) + vm.RemoteCommand('echo "=== HAProxy started successfully ==="') diff --git a/perfkitbenchmarker/linux_packages/redis_server.py b/perfkitbenchmarker/linux_packages/redis_server.py index 8e57a623f2..d4e48fd61a 100644 --- a/perfkitbenchmarker/linux_packages/redis_server.py +++ b/perfkitbenchmarker/linux_packages/redis_server.py @@ -37,6 +37,18 @@ class RedisEvictionPolicy: _VERSION = flags.DEFINE_string( 'redis_server_version', '6.2.1', 'Version of redis server to use.' ) +_GIT_REPO = flags.DEFINE_string( + 'redis_git_repo', + 'https://github.com/redis/redis.git', + 'Git repository URL for Redis or Valkey. ' + 'Use https://github.com/valkey-io/valkey.git for Valkey.', +) +_REDIS_TYPE = flags.DEFINE_enum( + 'redis_type', + 'redis', + ['redis', 'valkey'], + 'Type of server to install: redis or valkey.', +) CLUSTER_MODE = flags.DEFINE_bool( 'redis_server_cluster_mode', False, 'Whether to use cluster mode.' ) @@ -113,6 +125,14 @@ def GetRedisDir() -> str: return f'{linux_packages.INSTALL_DIR}/redis' +def GetRedisServerBinary() -> str: + return 'valkey-server' if _REDIS_TYPE.value == 'valkey' else 'redis-server' + + +def GetRedisCliBinary() -> str: + return 'valkey-cli' if _REDIS_TYPE.value == 'valkey' else 'redis-cli' + + def _GetNumProcesses(vm) -> int: num_processes = NUM_PROCESSES.value if num_processes == 0 and vm is not None: @@ -184,10 +204,12 @@ def _Install(vm) -> None: CheckPrerequisites() vm.Install('build_tools') vm.Install('wget') - vm.RemoteCommand(f'cd {linux_packages.INSTALL_DIR}; git clone {REDIS_GIT}') + git_repo = _GIT_REPO.value + repo_dir = GetRedisDir() vm.RemoteCommand( - f'cd {GetRedisDir()} && git checkout {_VERSION.value} && make' + f'cd {linux_packages.INSTALL_DIR}; git clone {git_repo} redis' ) + vm.RemoteCommand(f'cd {repo_dir} && git checkout {_VERSION.value} && make') def YumInstall(vm) -> None: @@ -245,7 +267,8 @@ def _BuildStartCommand(vm, port: int) -> str: A command that can be used to start redis in the background. """ redis_dir = GetRedisDir() - cmd = 'nohup sudo {redis_dir}/src/redis-server {args} &> /dev/null &' + server_binary = GetRedisServerBinary() + cmd = 'nohup sudo {redis_dir}/src/{server_binary} {args} &> /dev/null &' cmd_args = [ f'--port {port}', '--protected-mode no', @@ -291,16 +314,19 @@ def _BuildStartCommand(vm, port: int) -> str: else: max_memory_per_instance = int(vm.total_memory_kb * 0.7 / num_processes) cmd_args.append(f'--maxmemory {max_memory_per_instance}kb') - return cmd.format(redis_dir=redis_dir, args=' '.join(cmd_args)) + return cmd.format( + redis_dir=redis_dir, server_binary=server_binary, args=' '.join(cmd_args) + ) @vm_util.Retry(poll_interval=5, timeout=60) def _WaitForRedisUp(vm, port): """Wait until redis server is up on a given port.""" localhost = vm.GetLocalhostAddr() + cli_binary = GetRedisCliBinary() vm.RemoteCommand( - f'sudo {GetRedisDir()}/src/redis-cli -h {localhost} -p {port} ping | grep' - ' PONG' + f'sudo {GetRedisDir()}/src/{cli_binary} -h {localhost} -p {port} ping |' + ' grep PONG' ) @@ -314,42 +340,59 @@ def Start(vm) -> None: _WaitForRedisUp(vm, port) +@vm_util.Retry(poll_interval=1, timeout=15) +def _WaitForRedisDown(vm, port): + """Wait until redis server is down on a given port.""" + redis_dir = GetRedisDir() + localhost = vm.GetLocalhostAddr() + cli_binary = GetRedisCliBinary() + _, stderr, return_code = vm.RemoteCommandWithReturnCode( + f'sudo {redis_dir}/src/{cli_binary} -h {localhost} -p {port} ping', + ignore_failure=True, + ) + if return_code == 0 or 'Could not connect to' not in stderr: + raise errors.Resource.RetryableDeletionError( + f'Redis on port {port} is still running. (RC={return_code}, ' + f'stderr="{stderr}")' + ) + + def Stop(vm) -> None: """Stops redis server processes, flushes all keys, and resets the cluster.""" redis_dir = GetRedisDir() ports = GetRedisPorts(vm) localhost = vm.GetLocalhostAddr() + cli_binary = GetRedisCliBinary() + for port in ports: vm.TryRemoteCommand( - f'sudo {redis_dir}/src/redis-cli -h {localhost} -p {port} flushall' + f'sudo {redis_dir}/src/{cli_binary} -h {localhost} -p {port} flushall' ) for port in ports: vm.TryRemoteCommand( - f'sudo {redis_dir}/src/redis-cli -h {localhost} -p {port} cluster reset' - ' hard' + f'sudo {redis_dir}/src/{cli_binary} -h {localhost} -p {port} cluster' + ' reset hard' ) for port in ports: # Gracefully send SHUTDOWN command to redis server. vm.TryRemoteCommand( - f'sudo {redis_dir}/src/redis-cli -h {localhost} -p {port} shutdown' + f'sudo {redis_dir}/src/{cli_binary} -h {localhost} -p {port} shutdown' ) for port in ports: # Check that redis server is not running anymore. - _, stderr, return_code = vm.RemoteCommandWithReturnCode( - f'sudo {redis_dir}/src/redis-cli -h {localhost} -p {port} ping', - ignore_failure=True, - ) - if return_code == 0 or 'Could not connect to Redis' not in stderr: - raise errors.Error(f'Redis on port {port} failed to shut down.') + _WaitForRedisDown(vm, port) def StartCluster(server_vms) -> None: """Start redis cluster; assumes redis shards started with cluster mode.""" - cluster_create_cmd = f'sudo {GetRedisDir()}/src/redis-cli --cluster create ' + cli_binary = GetRedisCliBinary() + cluster_create_cmd = ( + f'sudo {GetRedisDir()}/src/{cli_binary} --cluster create ' + ) for server_vm in server_vms: cluster_create_cmd += f' {server_vm.internal_ip}:{DEFAULT_PORT}' stdout, _ = server_vms[0].RemoteCommand(f'echo "yes" | {cluster_create_cmd}') @@ -360,6 +403,7 @@ def GetMetadata(vm) -> Dict[str, Any]: num_processes = _GetNumProcesses(vm) return { 'redis_server_version': _VERSION.value, + 'redis_type': _REDIS_TYPE.value, 'redis_server_cluster_mode': CLUSTER_MODE.value, 'redis_server_io_threads': ( _GetIOThreads(vm) if _VERSION.value >= '6.2.1' else 0 @@ -372,6 +416,30 @@ def GetMetadata(vm) -> Dict[str, Any]: } +def ConfigureReplication(replica_vm, primary_ip, primary_port=DEFAULT_PORT): + """Configure replica to replicate from primary. + + Args: + replica_vm: The replica VM instance. + primary_ip: IP address of the primary Redis server. + primary_port: Port of the primary Redis server (default: DEFAULT_PORT). + """ + cli_binary = GetRedisCliBinary() + localhost = replica_vm.GetLocalhostAddr() + replica_port = GetRedisPorts(replica_vm)[0] + + # Configure replica to follow primary + cmd = ( + f'{GetRedisDir()}/src/{cli_binary} -h {localhost} -p {replica_port}' + f' REPLICAOF {primary_ip} {primary_port}' + ) + replica_vm.RemoteCommand(cmd) + logging.info( + f'Configured replica on {replica_vm.name} to replicate from' + f' {primary_ip}:{primary_port}' + ) + + def GetRedisPorts(vm=None) -> List[int]: """Returns a list of redis port(s).""" num_processes = _GetNumProcesses(vm)