6363import java .util .concurrent .ConcurrentHashMap ;
6464import java .util .concurrent .Executor ;
6565import java .util .concurrent .ScheduledExecutorService ;
66+ import java .util .concurrent .ScheduledFuture ;
6667import java .util .concurrent .TimeUnit ;
6768import java .util .concurrent .atomic .AtomicBoolean ;
6869import java .util .concurrent .atomic .AtomicLong ;
@@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9596
9697 private final SubscriberStub subscriberStub ;
9798 private final int channelAffinity ;
99+ private final long protocolVersion ;
98100 private final String subscription ;
99101 private final SubscriptionName subscriptionNameObject ;
100102 private final ScheduledExecutorService systemExecutor ;
@@ -127,6 +129,17 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
127129 private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer (null , false );
128130 private final SubscriberShutdownSettings subscriberShutdownSettings ;
129131
132+ private final boolean enableKeepalive ;
133+ private static final long KEEP_ALIVE_SUPPORT_VERSION = 2 ;
134+ private static final Duration CLIENT_PING_INTERVAL = Duration .ofSeconds (30 );
135+ private ScheduledFuture <?> pingSchedulerHandle ;
136+
137+ private static final Duration SERVER_TIMEOUT_DURATION = Duration .ofSeconds (45 );
138+ private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration .ofSeconds (15 );
139+ private final AtomicLong lastServerResponseTime ;
140+ private final AtomicLong lastClientPingTime ;
141+ private ScheduledFuture <?> serverMonitorHandle ;
142+
130143 private StreamingSubscriberConnection (Builder builder ) {
131144 subscription = builder .subscription ;
132145 subscriptionNameObject = SubscriptionName .parse (builder .subscription );
@@ -154,6 +167,7 @@ private StreamingSubscriberConnection(Builder builder) {
154167
155168 subscriberStub = builder .subscriberStub ;
156169 channelAffinity = builder .channelAffinity ;
170+ protocolVersion = builder .protocolVersion ;
157171
158172 MessageDispatcher .Builder messageDispatcherBuilder ;
159173 if (builder .receiver != null ) {
@@ -190,6 +204,9 @@ private StreamingSubscriberConnection(Builder builder) {
190204
191205 flowControlSettings = builder .flowControlSettings ;
192206 useLegacyFlowControl = builder .useLegacyFlowControl ;
207+ enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION ;
208+ lastServerResponseTime = new AtomicLong (clock .nanoTime ());
209+ lastClientPingTime = new AtomicLong (clock .nanoTime ());
193210 }
194211
195212 public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled (
@@ -218,6 +235,12 @@ protected void doStop() {
218235 } finally {
219236 lock .unlock ();
220237 }
238+
239+ if (enableKeepalive ) {
240+ stopClientPinger ();
241+ stopServerMonitor ();
242+ }
243+
221244 runShutdown ();
222245 notifyStopped ();
223246 }
@@ -266,6 +289,10 @@ public void onStart(StreamController controller) {
266289
267290 @ Override
268291 public void onResponse (StreamingPullResponse response ) {
292+ if (enableKeepalive ) {
293+ lastServerResponseTime .set (clock .nanoTime ());
294+ }
295+
269296 channelReconnectBackoffMillis .set (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
270297
271298 boolean exactlyOnceDeliveryEnabledResponse =
@@ -295,11 +322,19 @@ public void onResponse(StreamingPullResponse response) {
295322
296323 @ Override
297324 public void onError (Throwable t ) {
325+ if (enableKeepalive ) {
326+ stopClientPinger ();
327+ stopServerMonitor ();
328+ }
298329 errorFuture .setException (t );
299330 }
300331
301332 @ Override
302333 public void onComplete () {
334+ if (enableKeepalive ) {
335+ stopClientPinger ();
336+ stopServerMonitor ();
337+ }
303338 logger .fine ("Streaming pull terminated successfully!" );
304339 errorFuture .set (null );
305340 }
@@ -336,6 +371,7 @@ private void initialize() {
336371 this .useLegacyFlowControl
337372 ? 0
338373 : valueOrZero (flowControlSettings .getMaxOutstandingRequestBytes ()))
374+ .setProtocolVersion (protocolVersion )
339375 .build ());
340376
341377 /**
@@ -350,6 +386,11 @@ private void initialize() {
350386 lock .unlock ();
351387 }
352388
389+ if (enableKeepalive ) {
390+ startClientPinger ();
391+ startServerMonitor ();
392+ }
393+
353394 ApiFutures .addCallback (
354395 errorFuture ,
355396 new ApiFutureCallback <Void >() {
@@ -366,6 +407,10 @@ public void onSuccess(@Nullable Void result) {
366407
367408 @ Override
368409 public void onFailure (Throwable cause ) {
410+ if (enableKeepalive ) {
411+ stopClientPinger ();
412+ stopServerMonitor ();
413+ }
369414 if (!isAlive ()) {
370415 // we don't care about subscription failures when we're no longer running.
371416 logger .log (Level .FINE , "pull failure after service no longer running" , cause );
@@ -410,6 +455,101 @@ private boolean isAlive() {
410455 return state == State .RUNNING || state == State .STARTING ;
411456 }
412457
458+ private void startClientPinger () {
459+ if (pingSchedulerHandle != null ) {
460+ pingSchedulerHandle .cancel (false );
461+ }
462+
463+ pingSchedulerHandle =
464+ systemExecutor .scheduleAtFixedRate (
465+ () -> {
466+ try {
467+ lock .lock ();
468+ try {
469+ if (clientStream != null && isAlive ()) {
470+ clientStream .send (StreamingPullRequest .newBuilder ().build ());
471+ lastClientPingTime .set (clock .nanoTime ());
472+ logger .log (Level .FINEST , "Sent client keepalive ping" );
473+ }
474+ } finally {
475+ lock .unlock ();
476+ }
477+ } catch (Exception e ) {
478+ logger .log (Level .FINE , "Error sending client keepalive ping" , e );
479+ }
480+ },
481+ CLIENT_PING_INTERVAL .getSeconds (),
482+ CLIENT_PING_INTERVAL .getSeconds (),
483+ TimeUnit .SECONDS );
484+ }
485+
486+ private void stopClientPinger () {
487+ if (pingSchedulerHandle != null ) {
488+ pingSchedulerHandle .cancel (false );
489+ pingSchedulerHandle = null ;
490+ }
491+ }
492+
493+ private void startServerMonitor () {
494+ if (serverMonitorHandle != null ) {
495+ serverMonitorHandle .cancel (false );
496+ }
497+
498+ Duration checkInterval = Duration .ofSeconds (15 );
499+ serverMonitorHandle =
500+ systemExecutor .scheduleAtFixedRate (
501+ () -> {
502+ try {
503+ if (!isAlive ()) {
504+ return ;
505+ }
506+
507+ long now = clock .nanoTime ();
508+ long lastResponse = lastServerResponseTime .get ();
509+ long lastPing = lastClientPingTime .get ();
510+
511+ if (lastPing <= lastResponse ) {
512+ return ;
513+ }
514+
515+ Duration elapsedSincePing = Duration .ofNanos (now - lastPing );
516+ if (elapsedSincePing .compareTo (SERVER_PING_TIMEOUT_DURATION ) <= 0 ) {
517+ return ;
518+ }
519+
520+ logger .log (
521+ Level .WARNING ,
522+ "No response from server for {0} seconds. Closing stream." ,
523+ elapsedSincePing .getSeconds ());
524+
525+ lock .lock ();
526+ try {
527+ if (clientStream != null ) {
528+ clientStream .closeSendWithError (
529+ Status .UNAVAILABLE
530+ .withDescription ("Keepalive timeout with server" )
531+ .asException ());
532+ }
533+ } finally {
534+ lock .unlock ();
535+ }
536+ stopServerMonitor ();
537+ } catch (Exception e ) {
538+ logger .log (Level .FINE , "Error in server keepaliver monitor" , e );
539+ }
540+ },
541+ checkInterval .getSeconds (),
542+ checkInterval .getSeconds (),
543+ TimeUnit .SECONDS );
544+ }
545+
546+ private void stopServerMonitor () {
547+ if (serverMonitorHandle != null ) {
548+ serverMonitorHandle .cancel (false );
549+ serverMonitorHandle = null ;
550+ }
551+ }
552+
413553 public void setResponseOutstandingMessages (AckResponse ackResponse ) {
414554 // We will close the futures with ackResponse - if there are multiple references to the same
415555 // future they will be handled appropriately
@@ -769,6 +909,7 @@ public static final class Builder {
769909 private Distribution ackLatencyDistribution ;
770910 private SubscriberStub subscriberStub ;
771911 private int channelAffinity ;
912+ private long protocolVersion ;
772913 private FlowController flowController ;
773914 private FlowControlSettings flowControlSettings ;
774915 private boolean useLegacyFlowControl ;
@@ -840,6 +981,11 @@ public Builder setChannelAffinity(int channelAffinity) {
840981 return this ;
841982 }
842983
984+ public Builder setProtocolVersion (long protocolVersion ) {
985+ this .protocolVersion = protocolVersion ;
986+ return this ;
987+ }
988+
843989 public Builder setFlowController (FlowController flowController ) {
844990 this .flowController = flowController ;
845991 return this ;
0 commit comments