From 25c305c73f11500cf5031ac79498c1cf53c98d0e Mon Sep 17 00:00:00 2001 From: Gavin Jeong Date: Wed, 26 Nov 2025 13:22:20 +0900 Subject: [PATCH 1/3] Add per shard metric for redis proxy --- .../network/redis_proxy/v3/redis_proxy.proto | 18 +++- .../extensions/clusters/redis/redis_cluster.h | 1 + .../filters/network/common/redis/client.h | 5 + .../network/common/redis/client_impl.cc | 3 +- .../network/common/redis/client_impl.h | 2 + .../network/redis_proxy/conn_pool_impl.cc | 95 ++++++++++++++++++- .../network/redis_proxy/conn_pool_impl.h | 35 ++++++- .../extensions/health_checkers/redis/redis.h | 1 + 8 files changed, 154 insertions(+), 6 deletions(-) diff --git a/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto b/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto index 40cc2858dfbc8..1c5161a47c86c 100644 --- a/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto +++ b/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto @@ -33,7 +33,7 @@ message RedisProxy { "envoy.config.filter.network.redis_proxy.v2.RedisProxy"; // Redis connection pool settings. - // [#next-free-field: 11] + // [#next-free-field: 12] message ConnPoolSettings { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings"; @@ -134,6 +134,22 @@ message RedisProxy { // storm to busy redis server. This config is a protection to rate limit reconnection rate. // If not set, there will be no rate limiting on the reconnection. ConnectionRateLimit connection_rate_limit = 10; + + // Enable per-shard statistics for tracking hot shard usage. When enabled, the following + // statistics will be emitted per upstream host (shard): + // + // * ``upstream_rq_total``: Total requests to this shard + // * ``upstream_rq_success``: Successful requests to this shard + // * ``upstream_rq_failure``: Failed requests to this shard + // * ``upstream_rq_active``: Active requests to this shard (gauge) + // + // The statistics will be emitted under the scope: + // ``cluster..shard..*`` + // + // .. note:: + // Enabling this option may significantly increase metric cardinality in large clusters + // with many shards. Use with caution in production environments. + bool enable_per_shard_stats = 11; } message PrefixRoutes { diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 0a4b8fe8d29e9..047ce8206094c 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -241,6 +241,7 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } uint32_t maxUpstreamUnknownConnections() const override { return 0; } bool enableCommandStats() const override { return true; } + bool enablePerShardStats() const override { return false; } // Not needed for discovery bool connectionRateLimitEnabled() const override { return false; } uint32_t connectionRateLimitPerSec() const override { return 0; } // For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 6879a12b12d4b..0c943446d3753 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -183,6 +183,11 @@ class Config { */ virtual bool enableCommandStats() const PURE; + /** + * @return when enabled, per-shard statistics will be recorded for tracking hot shard usage. + */ + virtual bool enablePerShardStats() const PURE; + /** * @return the read policy the proxy should use. */ diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 92a9f183c11e3..42bdbcf1b9375 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -41,7 +41,8 @@ ConfigImpl::ConfigImpl( // as the buffer is flushed on each request immediately. max_upstream_unknown_connections_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)), - enable_command_stats_(config.enable_command_stats()) { + enable_command_stats_(config.enable_command_stats()), + enable_per_shard_stats_(config.enable_per_shard_stats()) { switch (config.read_policy()) { PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; case envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::ConnPoolSettings::MASTER: diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index a535084033948..a1f064b50651f 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -54,6 +54,7 @@ class ConfigImpl : public Config { return max_upstream_unknown_connections_; } bool enableCommandStats() const override { return enable_command_stats_; } + bool enablePerShardStats() const override { return enable_per_shard_stats_; } ReadPolicy readPolicy() const override { return read_policy_; } bool connectionRateLimitEnabled() const override { return connection_rate_limit_enabled_; } uint32_t connectionRateLimitPerSec() const override { return connection_rate_limit_per_sec_; } @@ -66,6 +67,7 @@ class ConfigImpl : public Config { const std::chrono::milliseconds buffer_flush_timeout_; const uint32_t max_upstream_unknown_connections_; const bool enable_command_stats_; + const bool enable_per_shard_stats_; ReadPolicy read_policy_; bool connection_rate_limit_enabled_; uint32_t connection_rate_limit_per_sec_; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 2facc0d8852e8..0cfab7ca18f82 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -270,6 +270,35 @@ void InstanceImpl::ThreadLocalPool::drainClients() { } } +RedisShardStatsSharedPtr +InstanceImpl::ThreadLocalPool::getOrCreateShardStats(const std::string& host_address) { + auto it = shard_stats_map_.find(host_address); + if (it != shard_stats_map_.end()) { + return it->second.stats_; + } + + // Create a sanitized stat name from the host address (replace ':' and '.' with '_') + std::string stat_name = host_address; + std::replace(stat_name.begin(), stat_name.end(), ':', '_'); + std::replace(stat_name.begin(), stat_name.end(), '.', '_'); + + Stats::ScopeSharedPtr shard_scope = stats_scope_->createScope(fmt::format("shard.{}.", stat_name)); + auto shard_stats = std::make_shared(RedisShardStats{ + REDIS_SHARD_STATS(POOL_COUNTER(*shard_scope), POOL_GAUGE(*shard_scope))}); + // Store both scope and stats to keep the scope alive + shard_stats_map_[host_address] = ShardStatsEntry{shard_scope, shard_stats}; + return shard_stats; +} + +Stats::ScopeSharedPtr +InstanceImpl::ThreadLocalPool::getShardScope(const std::string& host_address) { + auto it = shard_stats_map_.find(host_address); + if (it != shard_stats_map_.end()) { + return it->second.scope_; + } + return nullptr; +} + InstanceImpl::ThreadLocalActiveClientPtr& InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstSharedPtr host) { TokenBucketPtr& rate_limiter = cx_rate_limiter_map_[host]; @@ -456,7 +485,17 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(Upstream::HostConstSharedPtr& h } } - pending_requests_.emplace_back(*this, std::move(request), callbacks, host); + // Get or create per-shard stats for tracking hot shard usage (if enabled) + RedisShardStatsSharedPtr shard_stats = nullptr; + Stats::ScopeSharedPtr shard_scope = nullptr; + if (config_->enablePerShardStats()) { + const std::string host_address = host->address()->asString(); + shard_stats = getOrCreateShardStats(host_address); + shard_scope = getShardScope(host_address); + } + + pending_requests_.emplace_back(*this, std::move(request), callbacks, host, shard_stats, + shard_scope); PendingRequest& pending_request = pending_requests_.back(); if (!transaction.active_) { @@ -519,9 +558,23 @@ void InstanceImpl::ThreadLocalActiveClient::onEvent(Network::ConnectionEvent eve InstanceImpl::PendingRequest::PendingRequest(InstanceImpl::ThreadLocalPool& parent, RespVariant&& incoming_request, PoolCallbacks& pool_callbacks, - Upstream::HostConstSharedPtr& host) + Upstream::HostConstSharedPtr& host, + RedisShardStatsSharedPtr shard_stats, + Stats::ScopeSharedPtr shard_scope) : parent_(parent), incoming_request_(std::move(incoming_request)), - pool_callbacks_(pool_callbacks), host_(host) {} + pool_callbacks_(pool_callbacks), host_(host), shard_stats_(std::move(shard_stats)), + shard_scope_(std::move(shard_scope)) { + // Track per-shard request metrics and command stats + if (shard_stats_) { + shard_stats_->upstream_rq_total_.inc(); + shard_stats_->upstream_rq_active_.inc(); + } + // Extract and track command name for per-shard command stats + if (shard_scope_ && parent_.config_->enableCommandStats()) { + command_ = parent_.redis_command_stats_->getCommandFromRequest(getRequest(incoming_request_)); + parent_.redis_command_stats_->updateStatsTotal(*shard_scope_, command_); + } +} InstanceImpl::PendingRequest::~PendingRequest() { cache_load_handle_.reset(); @@ -529,6 +582,15 @@ InstanceImpl::PendingRequest::~PendingRequest() { if (request_handler_) { request_handler_->cancel(); request_handler_ = nullptr; + // Update per-shard stats - treat cancellation as failure + if (shard_stats_) { + shard_stats_->upstream_rq_active_.dec(); + shard_stats_->upstream_rq_failure_.inc(); + } + // Update per-shard command stats (failure due to cancellation) + if (shard_scope_ && parent_.config_->enableCommandStats()) { + parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false); + } // If we have to cancel the request on the client, then we'll treat this as failure for pool // callback pool_callbacks_.onFailure(); @@ -537,12 +599,30 @@ InstanceImpl::PendingRequest::~PendingRequest() { void InstanceImpl::PendingRequest::onResponse(Common::Redis::RespValuePtr&& response) { request_handler_ = nullptr; + // Update per-shard stats + if (shard_stats_) { + shard_stats_->upstream_rq_active_.dec(); + shard_stats_->upstream_rq_success_.inc(); + } + // Update per-shard command stats (success) + if (shard_scope_ && parent_.config_->enableCommandStats()) { + parent_.redis_command_stats_->updateStats(*shard_scope_, command_, true); + } pool_callbacks_.onResponse(std::move(response)); parent_.onRequestCompleted(); } void InstanceImpl::PendingRequest::onFailure() { request_handler_ = nullptr; + // Update per-shard stats + if (shard_stats_) { + shard_stats_->upstream_rq_active_.dec(); + shard_stats_->upstream_rq_failure_.inc(); + } + // Update per-shard command stats (failure) + if (shard_scope_ && parent_.config_->enableCommandStats()) { + parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false); + } pool_callbacks_.onFailure(); parent_.refresh_manager_->onFailure(parent_.cluster_name_); parent_.onRequestCompleted(); @@ -641,6 +721,15 @@ void InstanceImpl::PendingRequest::doRedirection(Common::Redis::RespValuePtr&& v void InstanceImpl::PendingRequest::cancel() { request_handler_->cancel(); request_handler_ = nullptr; + // Update per-shard stats - treat cancellation as failure + if (shard_stats_) { + shard_stats_->upstream_rq_active_.dec(); + shard_stats_->upstream_rq_failure_.inc(); + } + // Update per-shard command stats (failure due to cancellation) + if (shard_scope_ && parent_.config_->enableCommandStats()) { + parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false); + } parent_.onRequestCompleted(); } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index b8f520777ae34..a15ec6208b976 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -49,6 +49,31 @@ struct RedisClusterStats { REDIS_CLUSTER_STATS(GENERATE_COUNTER_STRUCT) }; +/** + * Per-shard statistics for tracking hot shard usage. + * These metrics help identify which shards are receiving more traffic. + */ +#define REDIS_SHARD_STATS(COUNTER, GAUGE) \ + COUNTER(upstream_rq_total) \ + COUNTER(upstream_rq_success) \ + COUNTER(upstream_rq_failure) \ + GAUGE(upstream_rq_active, Accumulate) + +struct RedisShardStats { + REDIS_SHARD_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT) +}; + +using RedisShardStatsSharedPtr = std::shared_ptr; + +/** + * Struct to hold per-shard stats and the scope they belong to. + * The scope must be kept alive for the stats to remain valid. + */ +struct ShardStatsEntry { + Stats::ScopeSharedPtr scope_; + RedisShardStatsSharedPtr stats_; +}; + class DoNothingPoolCallbacks : public PoolCallbacks { public: void onResponse(Common::Redis::RespValuePtr&&) override {}; @@ -119,7 +144,8 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this { PendingRequest(ThreadLocalPool& parent, RespVariant&& incoming_request, - PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host); + PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host, + RedisShardStatsSharedPtr shard_stats, Stats::ScopeSharedPtr shard_scope); ~PendingRequest() override; // Common::Redis::Client::ClientCallbacks @@ -148,6 +174,9 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this& hosts_added); void onHostsRemoved(const std::vector& hosts_removed); void drainClients(); + RedisShardStatsSharedPtr getOrCreateShardStats(const std::string& host_address); + Stats::ScopeSharedPtr getShardScope(const std::string& host_address); // Upstream::ClusterUpdateCallbacks void onClusterAddOrUpdate(absl::string_view cluster_name, @@ -222,6 +253,8 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this aws_iam_authenticator_; absl::optional aws_iam_config_; + // Per-shard stats map keyed by host address (e.g., "10.0.0.1:6379") + absl::node_hash_map shard_stats_map_; }; const std::string cluster_name_; diff --git a/source/extensions/health_checkers/redis/redis.h b/source/extensions/health_checkers/redis/redis.h index 8668904b26447..9f599ec656fac 100644 --- a/source/extensions/health_checkers/redis/redis.h +++ b/source/extensions/health_checkers/redis/redis.h @@ -95,6 +95,7 @@ class RedisHealthChecker : public Upstream::HealthCheckerImplBase { uint32_t maxUpstreamUnknownConnections() const override { return 0; } bool enableCommandStats() const override { return false; } + bool enablePerShardStats() const override { return false; } // Not needed for health checks bool connectionRateLimitEnabled() const override { return false; } uint32_t connectionRateLimitPerSec() const override { return 0; } From 73b6b34300125989a5b3b292493721b64f20a738 Mon Sep 17 00:00:00 2001 From: Gavin Jeong Date: Thu, 27 Nov 2025 17:11:37 +0900 Subject: [PATCH 2/3] Support per shard redis proxy latency metric --- .../network/redis_proxy/v3/redis_proxy.proto | 17 ++++++- .../extensions/clusters/redis/redis_cluster.h | 3 +- .../filters/network/common/redis/client.h | 5 +++ .../network/common/redis/client_impl.cc | 3 +- .../network/common/redis/client_impl.h | 2 + .../network/redis_proxy/conn_pool_impl.cc | 44 +++++++++++++++++-- .../network/redis_proxy/conn_pool_impl.h | 8 +++- .../extensions/health_checkers/redis/redis.h | 3 +- 8 files changed, 77 insertions(+), 8 deletions(-) diff --git a/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto b/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto index 1c5161a47c86c..401a843c77193 100644 --- a/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto +++ b/api/envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.proto @@ -33,7 +33,7 @@ message RedisProxy { "envoy.config.filter.network.redis_proxy.v2.RedisProxy"; // Redis connection pool settings. - // [#next-free-field: 12] + // [#next-free-field: 13] message ConnPoolSettings { option (udpa.annotations.versioning).previous_message_type = "envoy.config.filter.network.redis_proxy.v2.RedisProxy.ConnPoolSettings"; @@ -150,6 +150,21 @@ message RedisProxy { // Enabling this option may significantly increase metric cardinality in large clusters // with many shards. Use with caution in production environments. bool enable_per_shard_stats = 11; + + // Enable per-shard latency histogram for tracking request latency per upstream host (shard). + // When enabled, the following histogram will be emitted per shard: + // + // * ``upstream_rq_time``: Request latency histogram in microseconds + // + // The histogram will be emitted under the scope: + // ``cluster..shard..upstream_rq_time`` + // + // This option requires ``enable_per_shard_stats`` to be enabled. + // + // .. note:: + // Enabling this option may significantly increase metric cardinality in large clusters + // with many shards. Use with caution in production environments. + bool enable_per_shard_latency_stats = 12; } message PrefixRoutes { diff --git a/source/extensions/clusters/redis/redis_cluster.h b/source/extensions/clusters/redis/redis_cluster.h index 047ce8206094c..23539995c39ca 100644 --- a/source/extensions/clusters/redis/redis_cluster.h +++ b/source/extensions/clusters/redis/redis_cluster.h @@ -241,7 +241,8 @@ class RedisCluster : public Upstream::BaseDynamicClusterImpl { std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } uint32_t maxUpstreamUnknownConnections() const override { return 0; } bool enableCommandStats() const override { return true; } - bool enablePerShardStats() const override { return false; } // Not needed for discovery + bool enablePerShardStats() const override { return false; } // Not needed for discovery + bool enablePerShardLatencyStats() const override { return false; } // Not needed for discovery bool connectionRateLimitEnabled() const override { return false; } uint32_t connectionRateLimitPerSec() const override { return 0; } // For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command diff --git a/source/extensions/filters/network/common/redis/client.h b/source/extensions/filters/network/common/redis/client.h index 0c943446d3753..a1fc16347f73d 100644 --- a/source/extensions/filters/network/common/redis/client.h +++ b/source/extensions/filters/network/common/redis/client.h @@ -188,6 +188,11 @@ class Config { */ virtual bool enablePerShardStats() const PURE; + /** + * @return when enabled, per-shard latency histograms will be recorded. + */ + virtual bool enablePerShardLatencyStats() const PURE; + /** * @return the read policy the proxy should use. */ diff --git a/source/extensions/filters/network/common/redis/client_impl.cc b/source/extensions/filters/network/common/redis/client_impl.cc index 42bdbcf1b9375..83940ca3453fc 100644 --- a/source/extensions/filters/network/common/redis/client_impl.cc +++ b/source/extensions/filters/network/common/redis/client_impl.cc @@ -42,7 +42,8 @@ ConfigImpl::ConfigImpl( max_upstream_unknown_connections_( PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, max_upstream_unknown_connections, 100)), enable_command_stats_(config.enable_command_stats()), - enable_per_shard_stats_(config.enable_per_shard_stats()) { + enable_per_shard_stats_(config.enable_per_shard_stats()), + enable_per_shard_latency_stats_(config.enable_per_shard_latency_stats()) { switch (config.read_policy()) { PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; case envoy::extensions::filters::network::redis_proxy::v3::RedisProxy::ConnPoolSettings::MASTER: diff --git a/source/extensions/filters/network/common/redis/client_impl.h b/source/extensions/filters/network/common/redis/client_impl.h index a1f064b50651f..01970b5ebee7c 100644 --- a/source/extensions/filters/network/common/redis/client_impl.h +++ b/source/extensions/filters/network/common/redis/client_impl.h @@ -55,6 +55,7 @@ class ConfigImpl : public Config { } bool enableCommandStats() const override { return enable_command_stats_; } bool enablePerShardStats() const override { return enable_per_shard_stats_; } + bool enablePerShardLatencyStats() const override { return enable_per_shard_latency_stats_; } ReadPolicy readPolicy() const override { return read_policy_; } bool connectionRateLimitEnabled() const override { return connection_rate_limit_enabled_; } uint32_t connectionRateLimitPerSec() const override { return connection_rate_limit_per_sec_; } @@ -68,6 +69,7 @@ class ConfigImpl : public Config { const uint32_t max_upstream_unknown_connections_; const bool enable_command_stats_; const bool enable_per_shard_stats_; + const bool enable_per_shard_latency_stats_; ReadPolicy read_policy_; bool connection_rate_limit_enabled_; uint32_t connection_rate_limit_per_sec_; diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index 0cfab7ca18f82..a26106fdd62ac 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -299,6 +299,23 @@ InstanceImpl::ThreadLocalPool::getShardScope(const std::string& host_address) { return nullptr; } +Stats::Histogram* +InstanceImpl::ThreadLocalPool::getOrCreateShardLatencyHistogram(const std::string& host_address) { + auto it = shard_stats_map_.find(host_address); + if (it == shard_stats_map_.end()) { + // Create shard stats entry first if it doesn't exist + getOrCreateShardStats(host_address); + it = shard_stats_map_.find(host_address); + } + + if (it->second.latency_histogram_ == nullptr) { + // Create the histogram if it doesn't exist + it->second.latency_histogram_ = &it->second.scope_->histogramFromString( + "upstream_rq_latency", Stats::Histogram::Unit::Microseconds); + } + return it->second.latency_histogram_; +} + InstanceImpl::ThreadLocalActiveClientPtr& InstanceImpl::ThreadLocalPool::threadLocalActiveClient(Upstream::HostConstSharedPtr host) { TokenBucketPtr& rate_limiter = cx_rate_limiter_map_[host]; @@ -488,14 +505,19 @@ InstanceImpl::ThreadLocalPool::makeRequestToHost(Upstream::HostConstSharedPtr& h // Get or create per-shard stats for tracking hot shard usage (if enabled) RedisShardStatsSharedPtr shard_stats = nullptr; Stats::ScopeSharedPtr shard_scope = nullptr; + Stats::Histogram* latency_histogram = nullptr; if (config_->enablePerShardStats()) { const std::string host_address = host->address()->asString(); shard_stats = getOrCreateShardStats(host_address); shard_scope = getShardScope(host_address); } + if (config_->enablePerShardLatencyStats()) { + const std::string host_address = host->address()->asString(); + latency_histogram = getOrCreateShardLatencyHistogram(host_address); + } pending_requests_.emplace_back(*this, std::move(request), callbacks, host, shard_stats, - shard_scope); + shard_scope, latency_histogram); PendingRequest& pending_request = pending_requests_.back(); if (!transaction.active_) { @@ -560,10 +582,12 @@ InstanceImpl::PendingRequest::PendingRequest(InstanceImpl::ThreadLocalPool& pare PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host, RedisShardStatsSharedPtr shard_stats, - Stats::ScopeSharedPtr shard_scope) + Stats::ScopeSharedPtr shard_scope, + Stats::Histogram* latency_histogram) : parent_(parent), incoming_request_(std::move(incoming_request)), pool_callbacks_(pool_callbacks), host_(host), shard_stats_(std::move(shard_stats)), - shard_scope_(std::move(shard_scope)) { + shard_scope_(std::move(shard_scope)), latency_histogram_(latency_histogram), + start_time_(parent.dispatcher_.timeSource().monotonicTime()) { // Track per-shard request metrics and command stats if (shard_stats_) { shard_stats_->upstream_rq_total_.inc(); @@ -608,6 +632,13 @@ void InstanceImpl::PendingRequest::onResponse(Common::Redis::RespValuePtr&& resp if (shard_scope_ && parent_.config_->enableCommandStats()) { parent_.redis_command_stats_->updateStats(*shard_scope_, command_, true); } + // Record per-shard latency histogram + if (latency_histogram_ != nullptr) { + const auto end_time = parent_.dispatcher_.timeSource().monotonicTime(); + const auto latency_us = std::chrono::duration_cast( + end_time - start_time_).count(); + latency_histogram_->recordValue(latency_us); + } pool_callbacks_.onResponse(std::move(response)); parent_.onRequestCompleted(); } @@ -623,6 +654,13 @@ void InstanceImpl::PendingRequest::onFailure() { if (shard_scope_ && parent_.config_->enableCommandStats()) { parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false); } + // Record per-shard latency histogram (even for failures) + if (latency_histogram_ != nullptr) { + const auto end_time = parent_.dispatcher_.timeSource().monotonicTime(); + const auto latency_us = std::chrono::duration_cast( + end_time - start_time_).count(); + latency_histogram_->recordValue(latency_us); + } pool_callbacks_.onFailure(); parent_.refresh_manager_->onFailure(parent_.cluster_name_); parent_.onRequestCompleted(); diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index a15ec6208b976..013a1a78be2e0 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -7,6 +7,7 @@ #include #include +#include "envoy/common/time.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" #include "envoy/stats/stats_macros.h" #include "envoy/thread_local/thread_local.h" @@ -72,6 +73,7 @@ using RedisShardStatsSharedPtr = std::shared_ptr; struct ShardStatsEntry { Stats::ScopeSharedPtr scope_; RedisShardStatsSharedPtr stats_; + Stats::Histogram* latency_histogram_{nullptr}; // Per-shard latency histogram (optional) }; class DoNothingPoolCallbacks : public PoolCallbacks { @@ -145,7 +147,8 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this { PendingRequest(ThreadLocalPool& parent, RespVariant&& incoming_request, PoolCallbacks& pool_callbacks, Upstream::HostConstSharedPtr& host, - RedisShardStatsSharedPtr shard_stats, Stats::ScopeSharedPtr shard_scope); + RedisShardStatsSharedPtr shard_stats, Stats::ScopeSharedPtr shard_scope, + Stats::Histogram* latency_histogram); ~PendingRequest() override; // Common::Redis::Client::ClientCallbacks @@ -177,6 +180,8 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this Date: Fri, 28 Nov 2025 00:06:45 +0900 Subject: [PATCH 3/3] Support per shard redis proxy latency per command metrics --- .../network/redis_proxy/conn_pool_impl.cc | 24 +++++++++++++++++++ .../network/redis_proxy/conn_pool_impl.h | 2 ++ 2 files changed, 26 insertions(+) diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc index a26106fdd62ac..11b707c5d7fcf 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.cc @@ -597,6 +597,11 @@ InstanceImpl::PendingRequest::PendingRequest(InstanceImpl::ThreadLocalPool& pare if (shard_scope_ && parent_.config_->enableCommandStats()) { command_ = parent_.redis_command_stats_->getCommandFromRequest(getRequest(incoming_request_)); parent_.redis_command_stats_->updateStatsTotal(*shard_scope_, command_); + // Create per-shard per-command latency timer when both command stats and per-shard latency are enabled + if (parent_.config_->enablePerShardLatencyStats()) { + command_latency_timer_ = parent_.redis_command_stats_->createCommandTimer( + *shard_scope_, command_, parent_.dispatcher_.timeSource()); + } } } @@ -639,6 +644,10 @@ void InstanceImpl::PendingRequest::onResponse(Common::Redis::RespValuePtr&& resp end_time - start_time_).count(); latency_histogram_->recordValue(latency_us); } + // Complete per-shard per-command latency timer + if (command_latency_timer_) { + command_latency_timer_->complete(); + } pool_callbacks_.onResponse(std::move(response)); parent_.onRequestCompleted(); } @@ -661,6 +670,10 @@ void InstanceImpl::PendingRequest::onFailure() { end_time - start_time_).count(); latency_histogram_->recordValue(latency_us); } + // Complete per-shard per-command latency timer + if (command_latency_timer_) { + command_latency_timer_->complete(); + } pool_callbacks_.onFailure(); parent_.refresh_manager_->onFailure(parent_.cluster_name_); parent_.onRequestCompleted(); @@ -768,6 +781,17 @@ void InstanceImpl::PendingRequest::cancel() { if (shard_scope_ && parent_.config_->enableCommandStats()) { parent_.redis_command_stats_->updateStats(*shard_scope_, command_, false); } + // Record per-shard latency histogram (even for cancellations) + if (latency_histogram_ != nullptr) { + const auto end_time = parent_.dispatcher_.timeSource().monotonicTime(); + const auto latency_us = std::chrono::duration_cast( + end_time - start_time_).count(); + latency_histogram_->recordValue(latency_us); + } + // Complete per-shard per-command latency timer + if (command_latency_timer_) { + command_latency_timer_->complete(); + } parent_.onRequestCompleted(); } diff --git a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h index 013a1a78be2e0..0c720dc1865cf 100644 --- a/source/extensions/filters/network/redis_proxy/conn_pool_impl.h +++ b/source/extensions/filters/network/redis_proxy/conn_pool_impl.h @@ -10,6 +10,7 @@ #include "envoy/common/time.h" #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" #include "envoy/stats/stats_macros.h" +#include "envoy/stats/timespan.h" #include "envoy/thread_local/thread_local.h" #include "envoy/upstream/cluster_manager.h" @@ -182,6 +183,7 @@ class InstanceImpl : public Instance, public std::enable_shared_from_this