From c534c2636ebbaba02e31e1785f9bbe15409d4042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=86=E9=B9=8F=20=E9=BB=84?= Date: Fri, 24 Oct 2025 11:01:25 +0800 Subject: [PATCH 1/5] fix(grpc): Add keepalive and fix reconnect issue This commit addresses two issues related to gRPC connection stability and recovery. 1. **Half-open connections:** In unstable network environments, the agent could encounter half-open TCP connections where the server-side connection is terminated, but the client-side remains. This would cause the send-queue to grow indefinitely without automatic recovery. To resolve this, this change introduces gRPC keepalive probes. The agent will now send keepalive pings to the collector, ensuring that dead connections are detected and pruned in a timely manner. Two new configuration parameters, `collector.grpc_keepalive_time` and `collector.grpc_keepalive_timeout`, have been added to control this behavior. 2. **Reconnect logic:** The existing reconnection logic did not immediately re-establish a connection if the same backend instance was selected during a reconnect attempt. This could lead to a delay of up to an hour before the connection was re-established. The logic has been updated to ensure that the channel is always shut down and recreated, forcing an immediate reconnection attempt regardless of which backend is selected. --- .../apm/agent/core/conf/Config.java | 14 +++++++ .../apm/agent/core/remote/GRPCChannel.java | 7 ++++ .../agent/core/remote/GRPCChannelManager.java | 41 ++++++++----------- apm-sniffer/config/agent.config | 6 +++ 4 files changed, 44 insertions(+), 24 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 87cf57955b..f36b5dd8a0 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -210,6 +210,20 @@ public static class Collector { * How long grpc client will timeout in sending data to upstream. */ public static int GRPC_UPSTREAM_TIMEOUT = 30; + /** + * The interval in seconds to send a keepalive ping to the backend. + * If this is less than or equal to 0, the keepalive is disabled. + * + * This maps to `collector.grpc_keepalive_time` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIME = 60L; + /** + * The timeout in seconds to wait for a keepalive ack from the backend. + * If the ack is not received within this time, the connection is considered dead. + * + * This maps to `collector.grpc_keepalive_timeout` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIMEOUT = 30L; /** * Get profile task list interval */ diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java index d7806ad674..aee4c33e95 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java @@ -27,6 +27,8 @@ import io.grpc.netty.NettyChannelBuilder; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.agent.core.conf.Config; public class GRPCChannel { /** @@ -39,6 +41,11 @@ private GRPCChannel(String host, int port, List channelBuilders, List decorators) throws Exception { ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port); + if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) { + channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS) + .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS); + } + NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); for (ChannelBuilder builder : channelBuilders) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index 9744398fd1..f3f3755743 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -130,34 +130,27 @@ public void run() { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); - if (index != selectedIdx) { - selectedIdx = index; + selectedIdx = index; - server = grpcServers.get(index); - String[] ipAndPort = server.split(":"); + server = grpcServers.get(index); + String[] ipAndPort = server.split(":"); - if (managedChannel != null) { - managedChannel.shutdownNow(); - } - - managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .addManagedChannelBuilder(new StandardChannelBuilder()) - .addManagedChannelBuilder(new TLSChannelBuilder()) - .addChannelDecorator(new AgentIDDecorator()) - .addChannelDecorator(new AuthenticationDecorator()) - .build(); - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); - } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) { - // Reconnect to the same server is automatically done by GRPC, - // therefore we are responsible to check the connectivity and - // set the state and notify listeners - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); + LOGGER.debug("Attempting to reconnect to gRPC server {}. Shutting down existing channel if any.", server); + if (managedChannel != null) { + managedChannel.shutdownNow(); } + managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) + .addManagedChannelBuilder(new StandardChannelBuilder()) + .addManagedChannelBuilder(new TLSChannelBuilder()) + .addChannelDecorator(new AgentIDDecorator()) + .addChannelDecorator(new AuthenticationDecorator()) + .build(); + LOGGER.debug("Successfully reconnected to gRPC server {}.", server); + reconnectCount = 0; + reconnect = false; + notify(GRPCChannelStatus.CONNECTED); + return; } catch (Throwable t) { LOGGER.error(t, "Create channel to {} fail.", server); diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 9056993626..51a315ddb5 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -100,6 +100,12 @@ collector.properties_report_period_factor=${SW_AGENT_COLLECTOR_PROPERTIES_REPORT collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} # How long grpc client will timeout in sending data to upstream. Unit is second. collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30} +# The interval in seconds to send a keepalive ping to the backend. +# If this is less than or equal to 0, the keepalive is disabled. +#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:60} +# The timeout in seconds to wait for a keepalive ack from the backend. +# If the ack is not received within this time, the connection is considered dead. +#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30} # Sniffer get profile task list interval. collector.get_profile_task_interval=${SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL:20} # Sniffer get agent dynamic config interval. From 32fd22272bfd349a870b3c5c6d1b614d54585521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=86=E9=B9=8F=20=E9=BB=84?= Date: Fri, 7 Nov 2025 02:02:40 +0800 Subject: [PATCH 2/5] fix(grpc): Improve reconnection logic and fix race condition MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Restore original reconnection logic with TRANSIENT_FAILURE monitoring: - Keep original behavior: only force reconnect when different server selected - When same server selected, rely on gRPC auto-reconnect mechanism - Add TRANSIENT_FAILURE state monitoring to detect prolonged failures - Force rebuild channel if either reconnectCount or transientFailureCount exceeds threshold - Add keepAliveWithoutCalls(true) to detect half-open connections 2. Fix race condition between reportError() and run(): - Wrap all state changes (reconnect flag + notifications) in synchronized blocks - Prevents reconnect flag and listener status from becoming inconsistent - Fixes production issue where reconnect=false but listeners in DISCONNECT state 3. Additional improvements: - Adjust keepalive default from 60s to 120s (reduces overhead by 50%) - Add getState() method to GRPCChannel for state monitoring 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../apm/agent/core/conf/Config.java | 2 +- .../apm/agent/core/remote/GRPCChannel.java | 7 +- .../agent/core/remote/GRPCChannelManager.java | 127 +++++++++++++++--- apm-sniffer/config/agent.config | 2 +- 4 files changed, 115 insertions(+), 23 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index f36b5dd8a0..e0ce5242a6 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -216,7 +216,7 @@ public static class Collector { * * This maps to `collector.grpc_keepalive_time` in agent.config. */ - public static long GRPC_KEEPALIVE_TIME = 60L; + public static long GRPC_KEEPALIVE_TIME = 120L; /** * The timeout in seconds to wait for a keepalive ack from the backend. * If the ack is not received within this time, the connection is considered dead. diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java index aee4c33e95..18b35d31d9 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java @@ -43,7 +43,8 @@ private GRPCChannel(String host, int port, List channelBuilders, if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) { channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS) - .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS); + .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true); } NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); @@ -90,6 +91,10 @@ public boolean isConnected(boolean requestConnection) { return originChannel.getState(requestConnection) == ConnectivityState.READY; } + public ConnectivityState getState(boolean requestConnection) { + return originChannel.getState(requestConnection); + } + public static class Builder { private final String host; private final int port; diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index f3f3755743..cfd79948b9 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -19,6 +19,7 @@ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; +import io.grpc.ConnectivityState; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -57,6 +58,8 @@ public class GRPCChannelManager implements BootService, Runnable { private volatile List grpcServers; private volatile int selectedIdx = -1; private volatile int reconnectCount = 0; + private volatile int transientFailureCount = 0; + private final Object statusLock = new Object(); @Override public void prepare() { @@ -99,7 +102,15 @@ public void shutdown() { @Override public void run() { - LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect); + if (reconnect) { + LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect); + } else { + LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect); + } + + // Check channel state even when reconnect is false to detect prolonged failures + checkChannelStateAndTriggerReconnectIfNeeded(); + if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) { grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(",")) .filter(StringUtil::isNotBlank) @@ -130,26 +141,34 @@ public void run() { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); - selectedIdx = index; server = grpcServers.get(index); String[] ipAndPort = server.split(":"); - LOGGER.debug("Attempting to reconnect to gRPC server {}. Shutting down existing channel if any.", server); - if (managedChannel != null) { - managedChannel.shutdownNow(); - } + if (index != selectedIdx) { + selectedIdx = index; + LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server); + createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + } else { + // Same server, increment reconnectCount and check state + reconnectCount++; - managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .addManagedChannelBuilder(new StandardChannelBuilder()) - .addManagedChannelBuilder(new TLSChannelBuilder()) - .addChannelDecorator(new AgentIDDecorator()) - .addChannelDecorator(new AuthenticationDecorator()) - .build(); - LOGGER.debug("Successfully reconnected to gRPC server {}.", server); - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); + // Force reconnect if reconnectCount or transientFailureCount exceeds threshold + boolean forceReconnect = reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD + || transientFailureCount > Config.Agent.FORCE_RECONNECTION_PERIOD; + + if (forceReconnect) { + // Failed to reconnect after multiple attempts, force rebuild channel + LOGGER.warn("Force rebuild channel to {} (reconnectCount={}, transientFailureCount={})", + server, reconnectCount, transientFailureCount); + createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + } else if (managedChannel.isConnected(false)) { + // Reconnect to the same server is automatically done by GRPC, + // therefore we are responsible to check the connectivity and + // set the state and notify listeners + markAsConnected(); + } + } return; } catch (Throwable t) { @@ -177,17 +196,85 @@ public Channel getChannel() { */ public void reportError(Throwable throwable) { if (isNetworkError(throwable)) { + triggerReconnect(); + } + } + + private void notify(GRPCChannelStatus status) { + synchronized (listeners) { + for (GRPCChannelListener listener : listeners) { + try { + listener.statusChanged(status); + } catch (Throwable t) { + LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName()); + } + } + } + } + + /** + * Create a new gRPC channel to the specified server and reset connection state. + */ + private void createNewChannel(String host, int port) throws Exception { + if (managedChannel != null) { + managedChannel.shutdownNow(); + } + + managedChannel = GRPCChannel.newBuilder(host, port) + .addManagedChannelBuilder(new StandardChannelBuilder()) + .addManagedChannelBuilder(new TLSChannelBuilder()) + .addChannelDecorator(new AgentIDDecorator()) + .addChannelDecorator(new AuthenticationDecorator()) + .build(); + + markAsConnected(); + } + + /** + * Trigger reconnection by setting reconnect flag and notifying listeners. + */ + private void triggerReconnect() { + synchronized (statusLock) { reconnect = true; notify(GRPCChannelStatus.DISCONNECT); } } - private void notify(GRPCChannelStatus status) { - for (GRPCChannelListener listener : listeners) { + /** + * Mark connection as successful and reset connection state. + */ + private void markAsConnected() { + synchronized (statusLock) { + reconnectCount = 0; + reconnect = false; + notify(GRPCChannelStatus.CONNECTED); + } + } + + /** + * Check the connectivity state of existing channel and trigger reconnect if needed. + * This method monitors TRANSIENT_FAILURE state and triggers reconnect if the failure persists too long. + */ + private void checkChannelStateAndTriggerReconnectIfNeeded() { + if (managedChannel != null) { try { - listener.statusChanged(status); + ConnectivityState state = managedChannel.getState(false); + LOGGER.debug("Current channel state: {}", state); + + if (state == ConnectivityState.TRANSIENT_FAILURE) { + transientFailureCount++; + LOGGER.warn("Channel in TRANSIENT_FAILURE state, count: {}", transientFailureCount); + } else if (state == ConnectivityState.SHUTDOWN) { + LOGGER.warn("Channel is SHUTDOWN"); + if (!reconnect) { + triggerReconnect(); + } + } else { + // IDLE, READY, CONNECTING are all normal states + transientFailureCount = 0; + } } catch (Throwable t) { - LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName()); + LOGGER.error(t, "Error checking channel state"); } } } diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 51a315ddb5..2a8b9bfbb5 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -102,7 +102,7 @@ collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30} # The interval in seconds to send a keepalive ping to the backend. # If this is less than or equal to 0, the keepalive is disabled. -#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:60} +#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120} # The timeout in seconds to wait for a keepalive ack from the backend. # If the ack is not received within this time, the connection is considered dead. #collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30} From c1b8ab43094aa55e335c0b34442e6de77d048bb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=86=E9=B9=8F=20=E9=BB=84?= Date: Mon, 10 Nov 2025 16:17:25 +0800 Subject: [PATCH 3/5] refactor: Simplify reconnection logic and address review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes: - Remove transientFailureCount mechanism as TRANSIENT_FAILURE already triggers UNAVAILABLE exceptions handled by reportError() - Remove checkChannelStateAndTriggerReconnectIfNeeded() method to simplify logic - Rename markAsConnected() to notifyConnected() for better clarity on method responsibility - Only reset reconnectCount in createNewChannel() after actual channel rebuild to handle half-open connections - Remove unnecessary else branch in run() method logging - Add documentation about minimum safe keepalive time (10 seconds) in Config.java - Remove unused stableConnectionCount field Key improvement: The reconnectCount will continue to accumulate even when isConnected() returns false positives, ensuring forced channel rebuild after threshold is exceeded. This solves the issue where connections could remain in half-open state for extended periods. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../apm/agent/core/conf/Config.java | 3 + .../apm/agent/core/remote/GRPCChannel.java | 4 -- .../agent/core/remote/GRPCChannelManager.java | 70 +++++-------------- 3 files changed, 20 insertions(+), 57 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index e0ce5242a6..39e7dfc480 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -214,6 +214,9 @@ public static class Collector { * The interval in seconds to send a keepalive ping to the backend. * If this is less than or equal to 0, the keepalive is disabled. * + *

+ * Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server. + * * This maps to `collector.grpc_keepalive_time` in agent.config. */ public static long GRPC_KEEPALIVE_TIME = 120L; diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java index 18b35d31d9..b752cd62f6 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java @@ -91,10 +91,6 @@ public boolean isConnected(boolean requestConnection) { return originChannel.getState(requestConnection) == ConnectivityState.READY; } - public ConnectivityState getState(boolean requestConnection) { - return originChannel.getState(requestConnection); - } - public static class Builder { private final String host; private final int port; diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index cfd79948b9..759cf55f64 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -19,7 +19,6 @@ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; -import io.grpc.ConnectivityState; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -58,7 +57,6 @@ public class GRPCChannelManager implements BootService, Runnable { private volatile List grpcServers; private volatile int selectedIdx = -1; private volatile int reconnectCount = 0; - private volatile int transientFailureCount = 0; private final Object statusLock = new Object(); @Override @@ -104,13 +102,8 @@ public void shutdown() { public void run() { if (reconnect) { LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect); - } else { - LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect); } - // Check channel state even when reconnect is false to detect prolonged failures - checkChannelStateAndTriggerReconnectIfNeeded(); - if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) { grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(",")) .filter(StringUtil::isNotBlank) @@ -141,7 +134,6 @@ public void run() { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); - server = grpcServers.get(index); String[] ipAndPort = server.split(":"); @@ -150,24 +142,20 @@ public void run() { LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server); createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); } else { - // Same server, increment reconnectCount and check state + // Same server, increment reconnectCount reconnectCount++; - // Force reconnect if reconnectCount or transientFailureCount exceeds threshold - boolean forceReconnect = reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD - || transientFailureCount > Config.Agent.FORCE_RECONNECTION_PERIOD; - - if (forceReconnect) { - // Failed to reconnect after multiple attempts, force rebuild channel - LOGGER.warn("Force rebuild channel to {} (reconnectCount={}, transientFailureCount={})", - server, reconnectCount, transientFailureCount); + if (reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD) { + // Reconnect attempts exceeded threshold, force rebuild channel + LOGGER.warn("Reconnect attempts to {} exceeded threshold ({}), forcing channel rebuild", + server, Config.Agent.FORCE_RECONNECTION_PERIOD); createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); } else if (managedChannel.isConnected(false)) { - // Reconnect to the same server is automatically done by GRPC, - // therefore we are responsible to check the connectivity and - // set the state and notify listeners - markAsConnected(); + // Channel appears connected, trust it but keep reconnectCount for monitoring + LOGGER.debug("Channel to {} appears connected (reconnect attempt: {})", server, reconnectCount); + notifyConnected(); } + // else: Channel is disconnected and under threshold, wait for next retry } return; @@ -227,7 +215,9 @@ private void createNewChannel(String host, int port) throws Exception { .addChannelDecorator(new AuthenticationDecorator()) .build(); - markAsConnected(); + // Reset reconnectCount after actually rebuilding the channel + reconnectCount = 0; + notifyConnected(); } /** @@ -241,44 +231,18 @@ private void triggerReconnect() { } /** - * Mark connection as successful and reset connection state. + * Notify listeners that connection is established without resetting reconnectCount. + * This is used when the channel appears connected but we want to keep monitoring + * reconnect attempts in case it's a false positive (half-open connection). */ - private void markAsConnected() { + private void notifyConnected() { synchronized (statusLock) { - reconnectCount = 0; + // Don't reset reconnectCount - connection might still be half-open reconnect = false; notify(GRPCChannelStatus.CONNECTED); } } - /** - * Check the connectivity state of existing channel and trigger reconnect if needed. - * This method monitors TRANSIENT_FAILURE state and triggers reconnect if the failure persists too long. - */ - private void checkChannelStateAndTriggerReconnectIfNeeded() { - if (managedChannel != null) { - try { - ConnectivityState state = managedChannel.getState(false); - LOGGER.debug("Current channel state: {}", state); - - if (state == ConnectivityState.TRANSIENT_FAILURE) { - transientFailureCount++; - LOGGER.warn("Channel in TRANSIENT_FAILURE state, count: {}", transientFailureCount); - } else if (state == ConnectivityState.SHUTDOWN) { - LOGGER.warn("Channel is SHUTDOWN"); - if (!reconnect) { - triggerReconnect(); - } - } else { - // IDLE, READY, CONNECTING are all normal states - transientFailureCount = 0; - } - } catch (Throwable t) { - LOGGER.error(t, "Error checking channel state"); - } - } - } - private boolean isNetworkError(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable; From 67a44dcc2bb363c430909795ffcb3647c5148909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=86=E9=B9=8F=20=E9=BB=84?= Date: Mon, 10 Nov 2025 16:26:22 +0800 Subject: [PATCH 4/5] docs: Add minimum safe value note for grpc_keepalive_time in agent.config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add documentation noting that the minimum safe keepalive time value is 10 seconds, as values below this threshold may be rejected by the gRPC server according to gRPC policies. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- apm-sniffer/config/agent.config | 1 + 1 file changed, 1 insertion(+) diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 2a8b9bfbb5..c44eebed2a 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -102,6 +102,7 @@ collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30} # The interval in seconds to send a keepalive ping to the backend. # If this is less than or equal to 0, the keepalive is disabled. +# Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server. #collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120} # The timeout in seconds to wait for a keepalive ack from the backend. # If the ack is not received within this time, the connection is considered dead. From af2ddd46540ae11fa954590ccef631e8001a0182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=86=E9=B9=8F=20=E9=BB=84?= Date: Wed, 12 Nov 2025 10:46:25 +0800 Subject: [PATCH 5/5] refactor: Remove unnecessary synchronization in notify() method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Remove redundant synchronized block in notify() method since listeners is already a Collections.synchronizedList and the method is only called from single-threaded ScheduledExecutor. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../apm/agent/core/remote/GRPCChannelManager.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index 759cf55f64..846c343bbd 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -189,13 +189,11 @@ public void reportError(Throwable throwable) { } private void notify(GRPCChannelStatus status) { - synchronized (listeners) { - for (GRPCChannelListener listener : listeners) { - try { - listener.statusChanged(status); - } catch (Throwable t) { - LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName()); - } + for (GRPCChannelListener listener : listeners) { + try { + listener.statusChanged(status); + } catch (Throwable t) { + LOGGER.error(t, "Fail to notify {} about channel connected.", listener.getClass().getName()); } } }