Skip to content

Commit b98176e

Browse files
committed
feat: add keepalive feature to tear down streams in their absence
1 parent 42ad2d0 commit b98176e

File tree

2 files changed

+402
-0
lines changed

2 files changed

+402
-0
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.concurrent.ConcurrentHashMap;
6464
import java.util.concurrent.Executor;
6565
import java.util.concurrent.ScheduledExecutorService;
66+
import java.util.concurrent.ScheduledFuture;
6667
import java.util.concurrent.TimeUnit;
6768
import java.util.concurrent.atomic.AtomicBoolean;
6869
import java.util.concurrent.atomic.AtomicLong;
@@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9596

9697
private final SubscriberStub subscriberStub;
9798
private final int channelAffinity;
99+
private final long protocolVersion;
98100
private final String subscription;
99101
private final SubscriptionName subscriptionNameObject;
100102
private final ScheduledExecutorService systemExecutor;
@@ -127,6 +129,17 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
127129
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
128130
private final SubscriberShutdownSettings subscriberShutdownSettings;
129131

132+
private final boolean enableKeepalive;
133+
private static final long KEEP_ALIVE_SUPPORT_VERSION = 2;
134+
private static final Duration CLIENT_PING_INTERVAL = Duration.ofSeconds(30);
135+
private ScheduledFuture<?> pingSchedulerHandle;
136+
137+
private static final Duration SERVER_TIMEOUT_DURATION = Duration.ofSeconds(45);
138+
private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration.ofSeconds(15);
139+
private final AtomicLong lastServerResponseTime;
140+
private final AtomicLong lastClientPingTime;
141+
private ScheduledFuture<?> serverMonitorHandle;
142+
130143
private StreamingSubscriberConnection(Builder builder) {
131144
subscription = builder.subscription;
132145
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
@@ -154,6 +167,7 @@ private StreamingSubscriberConnection(Builder builder) {
154167

155168
subscriberStub = builder.subscriberStub;
156169
channelAffinity = builder.channelAffinity;
170+
protocolVersion = builder.protocolVersion;
157171

158172
MessageDispatcher.Builder messageDispatcherBuilder;
159173
if (builder.receiver != null) {
@@ -190,6 +204,9 @@ private StreamingSubscriberConnection(Builder builder) {
190204

191205
flowControlSettings = builder.flowControlSettings;
192206
useLegacyFlowControl = builder.useLegacyFlowControl;
207+
enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION;
208+
lastServerResponseTime = new AtomicLong(clock.nanoTime());
209+
lastClientPingTime = new AtomicLong(clock.nanoTime());
193210
}
194211

195212
public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
@@ -218,6 +235,12 @@ protected void doStop() {
218235
} finally {
219236
lock.unlock();
220237
}
238+
239+
if (enableKeepalive) {
240+
stopClientPinger();
241+
stopServerMonitor();
242+
}
243+
221244
runShutdown();
222245
notifyStopped();
223246
}
@@ -266,6 +289,10 @@ public void onStart(StreamController controller) {
266289

267290
@Override
268291
public void onResponse(StreamingPullResponse response) {
292+
if (enableKeepalive) {
293+
lastServerResponseTime.set(clock.nanoTime());
294+
}
295+
269296
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
270297

271298
boolean exactlyOnceDeliveryEnabledResponse =
@@ -295,11 +322,19 @@ public void onResponse(StreamingPullResponse response) {
295322

296323
@Override
297324
public void onError(Throwable t) {
325+
if (enableKeepalive) {
326+
stopClientPinger();
327+
stopServerMonitor();
328+
}
298329
errorFuture.setException(t);
299330
}
300331

301332
@Override
302333
public void onComplete() {
334+
if (enableKeepalive) {
335+
stopClientPinger();
336+
stopServerMonitor();
337+
}
303338
logger.fine("Streaming pull terminated successfully!");
304339
errorFuture.set(null);
305340
}
@@ -336,6 +371,7 @@ private void initialize() {
336371
this.useLegacyFlowControl
337372
? 0
338373
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
374+
.setProtocolVersion(protocolVersion)
339375
.build());
340376

341377
/**
@@ -350,6 +386,11 @@ private void initialize() {
350386
lock.unlock();
351387
}
352388

389+
if (enableKeepalive) {
390+
startClientPinger();
391+
startServerMonitor();
392+
}
393+
353394
ApiFutures.addCallback(
354395
errorFuture,
355396
new ApiFutureCallback<Void>() {
@@ -366,6 +407,10 @@ public void onSuccess(@Nullable Void result) {
366407

367408
@Override
368409
public void onFailure(Throwable cause) {
410+
if (enableKeepalive) {
411+
stopClientPinger();
412+
stopServerMonitor();
413+
}
369414
if (!isAlive()) {
370415
// we don't care about subscription failures when we're no longer running.
371416
logger.log(Level.FINE, "pull failure after service no longer running", cause);
@@ -410,6 +455,113 @@ private boolean isAlive() {
410455
return state == State.RUNNING || state == State.STARTING;
411456
}
412457

458+
private void startClientPinger() {
459+
if (pingSchedulerHandle != null) {
460+
pingSchedulerHandle.cancel(false);
461+
}
462+
463+
pingSchedulerHandle =
464+
systemExecutor.scheduleAtFixedRate(
465+
() -> {
466+
try {
467+
lock.lock();
468+
try {
469+
if (clientStream != null && isAlive()) {
470+
clientStream.send(StreamingPullRequest.newBuilder().build());
471+
lastClientPingTime.set(clock.nanoTime());
472+
logger.log(Level.FINEST, "Sent client keepalive ping");
473+
}
474+
} finally {
475+
lock.unlock();
476+
}
477+
} catch (Exception e) {
478+
logger.log(Level.FINE, "Error sending client keepalive ping", e);
479+
}
480+
},
481+
CLIENT_PING_INTERVAL.getSeconds(),
482+
CLIENT_PING_INTERVAL.getSeconds(),
483+
TimeUnit.SECONDS);
484+
}
485+
486+
private void stopClientPinger() {
487+
if (pingSchedulerHandle != null) {
488+
pingSchedulerHandle.cancel(false);
489+
pingSchedulerHandle = null;
490+
}
491+
}
492+
493+
private void startServerMonitor() {
494+
if (serverMonitorHandle != null) {
495+
serverMonitorHandle.cancel(false);
496+
}
497+
498+
Duration checkInterval = Duration.ofSeconds(15);
499+
serverMonitorHandle =
500+
systemExecutor.scheduleAtFixedRate(
501+
() -> {
502+
try {
503+
if (!isAlive()) {
504+
return;
505+
}
506+
507+
long now = clock.nanoTime();
508+
long lastResponse = lastServerResponseTime.get();
509+
Duration elapsedSinceResponse = Duration.ofNanos(now - lastResponse);
510+
511+
if (elapsedSinceResponse.compareTo(SERVER_TIMEOUT_DURATION) <= 0) {
512+
return;
513+
}
514+
515+
long lastPing = lastClientPingTime.get();
516+
if (lastPing > lastResponse) {
517+
Duration elapsedSincePing = Duration.ofNanos(now - lastPing);
518+
if (elapsedSincePing.compareTo(SERVER_PING_TIMEOUT_DURATION) <= 0) {
519+
// waiting for response from ping
520+
return;
521+
}
522+
523+
logger.log(
524+
Level.WARNING,
525+
"No response from server for {0} seconds, and no response to ping sent {1}"
526+
+ " seconds ago. Closing stream.",
527+
new Object[] {
528+
elapsedSinceResponse.getSeconds(), elapsedSincePing.getSeconds()
529+
});
530+
} else {
531+
logger.log(
532+
Level.WARNING,
533+
"No response from server for {0} seconds. Closing stream.",
534+
elapsedSinceResponse.getSeconds());
535+
}
536+
537+
lock.lock();
538+
try {
539+
if (clientStream != null) {
540+
clientStream.closeSendWithError(
541+
Status.UNAVAILABLE
542+
.withDescription("Keepalive timeout with server")
543+
.asException());
544+
}
545+
} finally {
546+
lock.unlock();
547+
}
548+
stopServerMonitor();
549+
} catch (Exception e) {
550+
logger.log(Level.FINE, "Error in server keepaliver monitor", e);
551+
}
552+
},
553+
checkInterval.getSeconds(),
554+
checkInterval.getSeconds(),
555+
TimeUnit.SECONDS);
556+
}
557+
558+
private void stopServerMonitor() {
559+
if (serverMonitorHandle != null) {
560+
serverMonitorHandle.cancel(false);
561+
serverMonitorHandle = null;
562+
}
563+
}
564+
413565
public void setResponseOutstandingMessages(AckResponse ackResponse) {
414566
// We will close the futures with ackResponse - if there are multiple references to the same
415567
// future they will be handled appropriately
@@ -769,6 +921,7 @@ public static final class Builder {
769921
private Distribution ackLatencyDistribution;
770922
private SubscriberStub subscriberStub;
771923
private int channelAffinity;
924+
private long protocolVersion;
772925
private FlowController flowController;
773926
private FlowControlSettings flowControlSettings;
774927
private boolean useLegacyFlowControl;
@@ -840,6 +993,11 @@ public Builder setChannelAffinity(int channelAffinity) {
840993
return this;
841994
}
842995

996+
public Builder setProtocolVersion(long protocolVersion) {
997+
this.protocolVersion = protocolVersion;
998+
return this;
999+
}
1000+
8431001
public Builder setFlowController(FlowController flowController) {
8441002
this.flowController = flowController;
8451003
return this;

0 commit comments

Comments
 (0)