diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 87cf57955b..f36b5dd8a0 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -210,6 +210,20 @@ public static class Collector { * How long grpc client will timeout in sending data to upstream. */ public static int GRPC_UPSTREAM_TIMEOUT = 30; + /** + * The interval in seconds to send a keepalive ping to the backend. + * If this is less than or equal to 0, the keepalive is disabled. + * + * This maps to `collector.grpc_keepalive_time` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIME = 60L; + /** + * The timeout in seconds to wait for a keepalive ack from the backend. + * If the ack is not received within this time, the connection is considered dead. + * + * This maps to `collector.grpc_keepalive_timeout` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIMEOUT = 30L; /** * Get profile task list interval */ diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java index d7806ad674..aee4c33e95 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java @@ -27,6 +27,8 @@ import io.grpc.netty.NettyChannelBuilder; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.agent.core.conf.Config; public class GRPCChannel { /** @@ -39,6 +41,11 @@ private GRPCChannel(String host, int port, List channelBuilders, List decorators) throws Exception { ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port); + if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) { + channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS) + .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS); + } + NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); for (ChannelBuilder builder : channelBuilders) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index 9744398fd1..f3f3755743 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -130,34 +130,27 @@ public void run() { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); - if (index != selectedIdx) { - selectedIdx = index; + selectedIdx = index; - server = grpcServers.get(index); - String[] ipAndPort = server.split(":"); + server = grpcServers.get(index); + String[] ipAndPort = server.split(":"); - if (managedChannel != null) { - managedChannel.shutdownNow(); - } - - managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .addManagedChannelBuilder(new StandardChannelBuilder()) - .addManagedChannelBuilder(new TLSChannelBuilder()) - .addChannelDecorator(new AgentIDDecorator()) - .addChannelDecorator(new AuthenticationDecorator()) - .build(); - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); - } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) { - // Reconnect to the same server is automatically done by GRPC, - // therefore we are responsible to check the connectivity and - // set the state and notify listeners - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); + LOGGER.debug("Attempting to reconnect to gRPC server {}. Shutting down existing channel if any.", server); + if (managedChannel != null) { + managedChannel.shutdownNow(); } + managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) + .addManagedChannelBuilder(new StandardChannelBuilder()) + .addManagedChannelBuilder(new TLSChannelBuilder()) + .addChannelDecorator(new AgentIDDecorator()) + .addChannelDecorator(new AuthenticationDecorator()) + .build(); + LOGGER.debug("Successfully reconnected to gRPC server {}.", server); + reconnectCount = 0; + reconnect = false; + notify(GRPCChannelStatus.CONNECTED); + return; } catch (Throwable t) { LOGGER.error(t, "Create channel to {} fail.", server); diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 9056993626..51a315ddb5 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -100,6 +100,12 @@ collector.properties_report_period_factor=${SW_AGENT_COLLECTOR_PROPERTIES_REPORT collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} # How long grpc client will timeout in sending data to upstream. Unit is second. collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30} +# The interval in seconds to send a keepalive ping to the backend. +# If this is less than or equal to 0, the keepalive is disabled. +#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:60} +# The timeout in seconds to wait for a keepalive ack from the backend. +# If the ack is not received within this time, the connection is considered dead. +#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30} # Sniffer get profile task list interval. collector.get_profile_task_interval=${SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL:20} # Sniffer get agent dynamic config interval.