6363import java .util .concurrent .ConcurrentHashMap ;
6464import java .util .concurrent .Executor ;
6565import java .util .concurrent .ScheduledExecutorService ;
66+ import java .util .concurrent .ScheduledFuture ;
6667import java .util .concurrent .TimeUnit ;
6768import java .util .concurrent .atomic .AtomicBoolean ;
6869import java .util .concurrent .atomic .AtomicLong ;
@@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9596
9697 private final SubscriberStub subscriberStub ;
9798 private final int channelAffinity ;
99+ private final long protocolVersion ;
98100 private final String subscription ;
99101 private final SubscriptionName subscriptionNameObject ;
100102 private final ScheduledExecutorService systemExecutor ;
@@ -127,6 +129,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
127129 private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer (null , false );
128130 private final SubscriberShutdownSettings subscriberShutdownSettings ;
129131
132+ private final boolean enableKeepalive ;
133+ private static final long KEEP_ALIVE_SUPPORT_VERSION = 2 ;
134+ private static final Duration CLIENT_PING_INTERVAL = Duration .ofSeconds (30 );
135+ private ScheduledFuture <?> pingSchedulerHandle ;
136+
137+ private static final Duration SERVER_TIMEOUT_DURATION = Duration .ofSeconds (45 );
138+ private final AtomicLong lastServerResponseTime ;
139+ private ScheduledFuture <?> serverMonitorHandle ;
140+
130141 private StreamingSubscriberConnection (Builder builder ) {
131142 subscription = builder .subscription ;
132143 subscriptionNameObject = SubscriptionName .parse (builder .subscription );
@@ -154,6 +165,7 @@ private StreamingSubscriberConnection(Builder builder) {
154165
155166 subscriberStub = builder .subscriberStub ;
156167 channelAffinity = builder .channelAffinity ;
168+ protocolVersion = builder .protocolVersion ;
157169
158170 MessageDispatcher .Builder messageDispatcherBuilder ;
159171 if (builder .receiver != null ) {
@@ -190,6 +202,8 @@ private StreamingSubscriberConnection(Builder builder) {
190202
191203 flowControlSettings = builder .flowControlSettings ;
192204 useLegacyFlowControl = builder .useLegacyFlowControl ;
205+ enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION ;
206+ lastServerResponseTime = new AtomicLong (clock .nanoTime ());
193207 }
194208
195209 public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled (
@@ -218,6 +232,12 @@ protected void doStop() {
218232 } finally {
219233 lock .unlock ();
220234 }
235+
236+ if (enableKeepalive ) {
237+ stopClientPinger ();
238+ stopServerMonitor ();
239+ }
240+
221241 runShutdown ();
222242 notifyStopped ();
223243 }
@@ -266,6 +286,10 @@ public void onStart(StreamController controller) {
266286
267287 @ Override
268288 public void onResponse (StreamingPullResponse response ) {
289+ if (enableKeepalive ) {
290+ lastServerResponseTime .set (clock .nanoTime ());
291+ }
292+
269293 channelReconnectBackoffMillis .set (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
270294
271295 boolean exactlyOnceDeliveryEnabledResponse =
@@ -295,11 +319,19 @@ public void onResponse(StreamingPullResponse response) {
295319
296320 @ Override
297321 public void onError (Throwable t ) {
322+ if (enableKeepalive ) {
323+ stopClientPinger ();
324+ stopServerMonitor ();
325+ }
298326 errorFuture .setException (t );
299327 }
300328
301329 @ Override
302330 public void onComplete () {
331+ if (enableKeepalive ) {
332+ stopClientPinger ();
333+ stopServerMonitor ();
334+ }
303335 logger .fine ("Streaming pull terminated successfully!" );
304336 errorFuture .set (null );
305337 }
@@ -336,6 +368,7 @@ private void initialize() {
336368 this .useLegacyFlowControl
337369 ? 0
338370 : valueOrZero (flowControlSettings .getMaxOutstandingRequestBytes ()))
371+ .setProtocolVersion (protocolVersion )
339372 .build ());
340373
341374 /**
@@ -350,6 +383,11 @@ private void initialize() {
350383 lock .unlock ();
351384 }
352385
386+ if (enableKeepalive ) {
387+ startClientPinger ();
388+ startServerMonitor ();
389+ }
390+
353391 ApiFutures .addCallback (
354392 errorFuture ,
355393 new ApiFutureCallback <Void >() {
@@ -366,6 +404,10 @@ public void onSuccess(@Nullable Void result) {
366404
367405 @ Override
368406 public void onFailure (Throwable cause ) {
407+ if (enableKeepalive ) {
408+ stopClientPinger ();
409+ stopServerMonitor ();
410+ }
369411 if (!isAlive ()) {
370412 // we don't care about subscription failures when we're no longer running.
371413 logger .log (Level .FINE , "pull failure after service no longer running" , cause );
@@ -410,6 +452,93 @@ private boolean isAlive() {
410452 return state == State .RUNNING || state == State .STARTING ;
411453 }
412454
455+ private void startClientPinger () {
456+ if (pingSchedulerHandle != null ) {
457+ pingSchedulerHandle .cancel (false );
458+ }
459+
460+ pingSchedulerHandle =
461+ systemExecutor .scheduleAtFixedRate (
462+ () -> {
463+ try {
464+ lock .lock ();
465+ try {
466+ if (clientStream != null && isAlive ()) {
467+ clientStream .send (StreamingPullRequest .newBuilder ().build ());
468+ logger .log (Level .FINEST , "Sent client keepalive ping" );
469+ }
470+ } finally {
471+ lock .unlock ();
472+ }
473+ } catch (Exception e ) {
474+ logger .log (Level .FINE , "Error sending client keepalive ping" , e );
475+ }
476+ },
477+ CLIENT_PING_INTERVAL .getSeconds (),
478+ CLIENT_PING_INTERVAL .getSeconds (),
479+ TimeUnit .SECONDS );
480+ }
481+
482+ private void stopClientPinger () {
483+ if (pingSchedulerHandle != null ) {
484+ pingSchedulerHandle .cancel (false );
485+ pingSchedulerHandle = null ;
486+ }
487+ }
488+
489+ private void startServerMonitor () {
490+ if (serverMonitorHandle != null ) {
491+ serverMonitorHandle .cancel (false );
492+ }
493+
494+ Duration checkInterval = Duration .ofSeconds (15 );
495+ serverMonitorHandle =
496+ systemExecutor .scheduleAtFixedRate (
497+ () -> {
498+ try {
499+ if (!isAlive ()) {
500+ return ;
501+ }
502+
503+ long now = clock .nanoTime ();
504+ long lastResponse = lastServerResponseTime .get ();
505+ Duration elapsed = Duration .ofNanos (now - lastResponse );
506+
507+ if (elapsed .compareTo (SERVER_TIMEOUT_DURATION ) > 0 ) {
508+ logger .log (
509+ Level .WARNING ,
510+ "No response from server for {0} seconds. Closing stream." ,
511+ elapsed .getSeconds ());
512+
513+ lock .lock ();
514+ try {
515+ if (clientStream != null ) {
516+ clientStream .closeSendWithError (
517+ Status .UNAVAILABLE
518+ .withDescription ("Keepalive timeout with server" )
519+ .asException ());
520+ }
521+ } finally {
522+ lock .unlock ();
523+ }
524+ stopServerMonitor ();
525+ }
526+ } catch (Exception e ) {
527+ logger .log (Level .FINE , "Error in server keepaliver monitor" , e );
528+ }
529+ },
530+ checkInterval .getSeconds (),
531+ checkInterval .getSeconds (),
532+ TimeUnit .SECONDS );
533+ }
534+
535+ private void stopServerMonitor () {
536+ if (serverMonitorHandle != null ) {
537+ serverMonitorHandle .cancel (false );
538+ serverMonitorHandle = null ;
539+ }
540+ }
541+
413542 public void setResponseOutstandingMessages (AckResponse ackResponse ) {
414543 // We will close the futures with ackResponse - if there are multiple references to the same
415544 // future they will be handled appropriately
@@ -769,6 +898,7 @@ public static final class Builder {
769898 private Distribution ackLatencyDistribution ;
770899 private SubscriberStub subscriberStub ;
771900 private int channelAffinity ;
901+ private long protocolVersion ;
772902 private FlowController flowController ;
773903 private FlowControlSettings flowControlSettings ;
774904 private boolean useLegacyFlowControl ;
@@ -840,6 +970,11 @@ public Builder setChannelAffinity(int channelAffinity) {
840970 return this ;
841971 }
842972
973+ public Builder setProtocolVersion (long protocolVersion ) {
974+ this .protocolVersion = protocolVersion ;
975+ return this ;
976+ }
977+
843978 public Builder setFlowController (FlowController flowController ) {
844979 this .flowController = flowController ;
845980 return this ;
0 commit comments