Skip to content

Commit b89478d

Browse files
committed
feat: add keepalive feature to tear down streams in their absence
1 parent 42ad2d0 commit b89478d

File tree

3 files changed

+332
-0
lines changed

3 files changed

+332
-0
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.concurrent.ConcurrentHashMap;
6464
import java.util.concurrent.Executor;
6565
import java.util.concurrent.ScheduledExecutorService;
66+
import java.util.concurrent.ScheduledFuture;
6667
import java.util.concurrent.TimeUnit;
6768
import java.util.concurrent.atomic.AtomicBoolean;
6869
import java.util.concurrent.atomic.AtomicLong;
@@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9596

9697
private final SubscriberStub subscriberStub;
9798
private final int channelAffinity;
99+
private final long protocolVersion;
98100
private final String subscription;
99101
private final SubscriptionName subscriptionNameObject;
100102
private final ScheduledExecutorService systemExecutor;
@@ -127,6 +129,17 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
127129
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
128130
private final SubscriberShutdownSettings subscriberShutdownSettings;
129131

132+
private final boolean enableKeepalive;
133+
private static final long KEEP_ALIVE_SUPPORT_VERSION = 1;
134+
private static final Duration CLIENT_PING_INTERVAL = Duration.ofSeconds(30);
135+
private ScheduledFuture<?> pingSchedulerHandle;
136+
137+
private static final Duration SERVER_MONITOR_INTERVAL = Duration.ofSeconds(15);
138+
private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration.ofSeconds(15);
139+
private final AtomicLong lastServerResponseTime;
140+
private final AtomicLong lastClientPingTime;
141+
private ScheduledFuture<?> serverMonitorHandle;
142+
130143
private StreamingSubscriberConnection(Builder builder) {
131144
subscription = builder.subscription;
132145
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
@@ -154,6 +167,7 @@ private StreamingSubscriberConnection(Builder builder) {
154167

155168
subscriberStub = builder.subscriberStub;
156169
channelAffinity = builder.channelAffinity;
170+
protocolVersion = builder.protocolVersion;
157171

158172
MessageDispatcher.Builder messageDispatcherBuilder;
159173
if (builder.receiver != null) {
@@ -190,6 +204,9 @@ private StreamingSubscriberConnection(Builder builder) {
190204

191205
flowControlSettings = builder.flowControlSettings;
192206
useLegacyFlowControl = builder.useLegacyFlowControl;
207+
enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION;
208+
lastServerResponseTime = new AtomicLong(clock.nanoTime());
209+
lastClientPingTime = new AtomicLong(-1L);
193210
}
194211

195212
public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled(
@@ -218,6 +235,12 @@ protected void doStop() {
218235
} finally {
219236
lock.unlock();
220237
}
238+
239+
if (enableKeepalive) {
240+
stopClientPinger();
241+
stopServerMonitor();
242+
}
243+
221244
runShutdown();
222245
notifyStopped();
223246
}
@@ -266,6 +289,10 @@ public void onStart(StreamController controller) {
266289

267290
@Override
268291
public void onResponse(StreamingPullResponse response) {
292+
if (enableKeepalive) {
293+
lastServerResponseTime.set(clock.nanoTime());
294+
}
295+
269296
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
270297

271298
boolean exactlyOnceDeliveryEnabledResponse =
@@ -295,11 +322,19 @@ public void onResponse(StreamingPullResponse response) {
295322

296323
@Override
297324
public void onError(Throwable t) {
325+
if (enableKeepalive) {
326+
stopClientPinger();
327+
stopServerMonitor();
328+
}
298329
errorFuture.setException(t);
299330
}
300331

301332
@Override
302333
public void onComplete() {
334+
if (enableKeepalive) {
335+
stopClientPinger();
336+
stopServerMonitor();
337+
}
303338
logger.fine("Streaming pull terminated successfully!");
304339
errorFuture.set(null);
305340
}
@@ -336,6 +371,7 @@ private void initialize() {
336371
this.useLegacyFlowControl
337372
? 0
338373
: valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes()))
374+
.setProtocolVersion(protocolVersion)
339375
.build());
340376

341377
/**
@@ -350,6 +386,11 @@ private void initialize() {
350386
lock.unlock();
351387
}
352388

389+
if (enableKeepalive) {
390+
startClientPinger();
391+
startServerMonitor();
392+
}
393+
353394
ApiFutures.addCallback(
354395
errorFuture,
355396
new ApiFutureCallback<Void>() {
@@ -366,6 +407,10 @@ public void onSuccess(@Nullable Void result) {
366407

367408
@Override
368409
public void onFailure(Throwable cause) {
410+
if (enableKeepalive) {
411+
stopClientPinger();
412+
stopServerMonitor();
413+
}
369414
if (!isAlive()) {
370415
// we don't care about subscription failures when we're no longer running.
371416
logger.log(Level.FINE, "pull failure after service no longer running", cause);
@@ -410,6 +455,100 @@ private boolean isAlive() {
410455
return state == State.RUNNING || state == State.STARTING;
411456
}
412457

458+
private void startClientPinger() {
459+
if (pingSchedulerHandle != null) {
460+
pingSchedulerHandle.cancel(false);
461+
}
462+
463+
pingSchedulerHandle =
464+
systemExecutor.scheduleAtFixedRate(
465+
() -> {
466+
try {
467+
lock.lock();
468+
try {
469+
if (clientStream != null && isAlive()) {
470+
clientStream.send(StreamingPullRequest.newBuilder().build());
471+
lastClientPingTime.set(clock.nanoTime());
472+
logger.log(Level.FINEST, "Sent client keepalive ping");
473+
}
474+
} finally {
475+
lock.unlock();
476+
}
477+
} catch (Exception e) {
478+
logger.log(Level.FINE, "Error sending client keepalive ping", e);
479+
}
480+
},
481+
CLIENT_PING_INTERVAL.getSeconds(),
482+
CLIENT_PING_INTERVAL.getSeconds(),
483+
TimeUnit.SECONDS);
484+
}
485+
486+
private void stopClientPinger() {
487+
if (pingSchedulerHandle != null) {
488+
pingSchedulerHandle.cancel(false);
489+
pingSchedulerHandle = null;
490+
}
491+
}
492+
493+
private void startServerMonitor() {
494+
if (serverMonitorHandle != null) {
495+
serverMonitorHandle.cancel(false);
496+
}
497+
498+
serverMonitorHandle =
499+
systemExecutor.scheduleAtFixedRate(
500+
() -> {
501+
try {
502+
if (!isAlive()) {
503+
return;
504+
}
505+
506+
long now = clock.nanoTime();
507+
long lastResponse = lastServerResponseTime.get();
508+
long lastPing = lastClientPingTime.get();
509+
510+
if (lastPing <= lastResponse) {
511+
return;
512+
}
513+
514+
Duration elapsedSincePing = Duration.ofNanos(now - lastPing);
515+
if (elapsedSincePing.compareTo(SERVER_PING_TIMEOUT_DURATION) <= 0) {
516+
return;
517+
}
518+
519+
logger.log(
520+
Level.WARNING,
521+
"No response from server for {0} seconds. Closing stream.",
522+
elapsedSincePing.getSeconds());
523+
524+
lock.lock();
525+
try {
526+
if (clientStream != null) {
527+
clientStream.closeSendWithError(
528+
Status.UNAVAILABLE
529+
.withDescription("Keepalive timeout with server")
530+
.asException());
531+
}
532+
} finally {
533+
lock.unlock();
534+
}
535+
stopServerMonitor();
536+
} catch (Exception e) {
537+
logger.log(Level.FINE, "Error in server keepalive monitor", e);
538+
}
539+
},
540+
SERVER_MONITOR_INTERVAL.getSeconds(),
541+
SERVER_MONITOR_INTERVAL.getSeconds(),
542+
TimeUnit.SECONDS);
543+
}
544+
545+
private void stopServerMonitor() {
546+
if (serverMonitorHandle != null) {
547+
serverMonitorHandle.cancel(false);
548+
serverMonitorHandle = null;
549+
}
550+
}
551+
413552
public void setResponseOutstandingMessages(AckResponse ackResponse) {
414553
// We will close the futures with ackResponse - if there are multiple references to the same
415554
// future they will be handled appropriately
@@ -769,6 +908,7 @@ public static final class Builder {
769908
private Distribution ackLatencyDistribution;
770909
private SubscriberStub subscriberStub;
771910
private int channelAffinity;
911+
private long protocolVersion;
772912
private FlowController flowController;
773913
private FlowControlSettings flowControlSettings;
774914
private boolean useLegacyFlowControl;
@@ -840,6 +980,11 @@ public Builder setChannelAffinity(int channelAffinity) {
840980
return this;
841981
}
842982

983+
public Builder setProtocolVersion(long protocolVersion) {
984+
this.protocolVersion = protocolVersion;
985+
return this;
986+
}
987+
843988
public Builder setFlowController(FlowController flowController) {
844989
this.flowController = flowController;
845990
return this;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
144144
private final boolean maxDurationPerAckExtensionDefaultUsed;
145145
private final java.time.Duration minDurationPerAckExtension;
146146
private final boolean minDurationPerAckExtensionDefaultUsed;
147+
private final long protocolVersion;
147148

148149
// The ExecutorProvider used to generate executors for processing messages.
149150
private final ExecutorProvider executorProvider;
@@ -182,6 +183,7 @@ private Subscriber(Builder builder) {
182183
maxDurationPerAckExtensionDefaultUsed = builder.maxDurationPerAckExtensionDefaultUsed;
183184
minDurationPerAckExtension = builder.minDurationPerAckExtension;
184185
minDurationPerAckExtensionDefaultUsed = builder.minDurationPerAckExtensionDefaultUsed;
186+
protocolVersion = builder.protocolVersion;
185187

186188
clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock();
187189

@@ -428,6 +430,7 @@ private void startStreamingConnections() {
428430
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
429431
.setTracer(tracer)
430432
.setSubscriberShutdownSettings(subscriberShutdownSettings)
433+
.setProtocolVersion(protocolVersion)
431434
.build();
432435

433436
streamingSubscriberConnections.add(streamingSubscriberConnection);
@@ -548,6 +551,8 @@ public static final class Builder {
548551
private boolean enableOpenTelemetryTracing = false;
549552
private OpenTelemetry openTelemetry = null;
550553

554+
private long protocolVersion = 0L;
555+
551556
private SubscriberShutdownSettings subscriberShutdownSettings =
552557
SubscriberShutdownSettings.newBuilder().build();
553558

@@ -771,6 +776,12 @@ Builder setClock(ApiClock clock) {
771776
return this;
772777
}
773778

779+
/** Gives the ability to override the protocol version */
780+
public Builder setProtocolVersion(long protocolVersion) {
781+
this.protocolVersion = protocolVersion;
782+
return this;
783+
}
784+
774785
/**
775786
* OpenTelemetry will be enabled if setEnableOpenTelemetry is true and and instance of
776787
* OpenTelemetry has been provied. Warning: traces are subject to change. The name and

0 commit comments

Comments
 (0)