6363import java .util .concurrent .ConcurrentHashMap ;
6464import java .util .concurrent .Executor ;
6565import java .util .concurrent .ScheduledExecutorService ;
66+ import java .util .concurrent .ScheduledFuture ;
6667import java .util .concurrent .TimeUnit ;
6768import java .util .concurrent .atomic .AtomicBoolean ;
6869import java .util .concurrent .atomic .AtomicLong ;
@@ -95,6 +96,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9596
9697 private final SubscriberStub subscriberStub ;
9798 private final int channelAffinity ;
99+ private final long protocolVersion ;
98100 private final String subscription ;
99101 private final SubscriptionName subscriptionNameObject ;
100102 private final ScheduledExecutorService systemExecutor ;
@@ -127,6 +129,17 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
127129 private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer (null , false );
128130 private final SubscriberShutdownSettings subscriberShutdownSettings ;
129131
132+ private final boolean enableKeepalive ;
133+ private static final long KEEP_ALIVE_SUPPORT_VERSION = 1 ;
134+ private static final Duration CLIENT_PING_INTERVAL = Duration .ofSeconds (30 );
135+ private ScheduledFuture <?> pingSchedulerHandle ;
136+
137+ private static final Duration SERVER_MONITOR_INTERVAL = Duration .ofSeconds (10 );
138+ private static final Duration SERVER_PING_TIMEOUT_DURATION = Duration .ofSeconds (15 );
139+ private final AtomicLong lastServerResponseTime ;
140+ private final AtomicLong lastClientPingTime ;
141+ private ScheduledFuture <?> serverMonitorHandle ;
142+
130143 private StreamingSubscriberConnection (Builder builder ) {
131144 subscription = builder .subscription ;
132145 subscriptionNameObject = SubscriptionName .parse (builder .subscription );
@@ -154,6 +167,7 @@ private StreamingSubscriberConnection(Builder builder) {
154167
155168 subscriberStub = builder .subscriberStub ;
156169 channelAffinity = builder .channelAffinity ;
170+ protocolVersion = builder .protocolVersion ;
157171
158172 MessageDispatcher .Builder messageDispatcherBuilder ;
159173 if (builder .receiver != null ) {
@@ -190,6 +204,9 @@ private StreamingSubscriberConnection(Builder builder) {
190204
191205 flowControlSettings = builder .flowControlSettings ;
192206 useLegacyFlowControl = builder .useLegacyFlowControl ;
207+ enableKeepalive = protocolVersion >= KEEP_ALIVE_SUPPORT_VERSION ;
208+ lastServerResponseTime = new AtomicLong (clock .nanoTime ());
209+ lastClientPingTime = new AtomicLong (-1L );
193210 }
194211
195212 public StreamingSubscriberConnection setExactlyOnceDeliveryEnabled (
@@ -218,6 +235,12 @@ protected void doStop() {
218235 } finally {
219236 lock .unlock ();
220237 }
238+
239+ if (enableKeepalive ) {
240+ stopClientPinger ();
241+ stopServerMonitor ();
242+ }
243+
221244 runShutdown ();
222245 notifyStopped ();
223246 }
@@ -266,6 +289,10 @@ public void onStart(StreamController controller) {
266289
267290 @ Override
268291 public void onResponse (StreamingPullResponse response ) {
292+ if (enableKeepalive ) {
293+ lastServerResponseTime .set (clock .nanoTime ());
294+ }
295+
269296 channelReconnectBackoffMillis .set (INITIAL_CHANNEL_RECONNECT_BACKOFF .toMillis ());
270297
271298 boolean exactlyOnceDeliveryEnabledResponse =
@@ -295,11 +322,19 @@ public void onResponse(StreamingPullResponse response) {
295322
296323 @ Override
297324 public void onError (Throwable t ) {
325+ if (enableKeepalive ) {
326+ stopClientPinger ();
327+ stopServerMonitor ();
328+ }
298329 errorFuture .setException (t );
299330 }
300331
301332 @ Override
302333 public void onComplete () {
334+ if (enableKeepalive ) {
335+ stopClientPinger ();
336+ stopServerMonitor ();
337+ }
303338 logger .fine ("Streaming pull terminated successfully!" );
304339 errorFuture .set (null );
305340 }
@@ -336,6 +371,7 @@ private void initialize() {
336371 this .useLegacyFlowControl
337372 ? 0
338373 : valueOrZero (flowControlSettings .getMaxOutstandingRequestBytes ()))
374+ .setProtocolVersion (protocolVersion )
339375 .build ());
340376
341377 /**
@@ -350,6 +386,13 @@ private void initialize() {
350386 lock .unlock ();
351387 }
352388
389+ if (enableKeepalive ) {
390+ lastServerResponseTime .set (clock .nanoTime ());
391+ lastClientPingTime .set (-1L );
392+ startClientPinger ();
393+ startServerMonitor ();
394+ }
395+
353396 ApiFutures .addCallback (
354397 errorFuture ,
355398 new ApiFutureCallback <Void >() {
@@ -366,6 +409,10 @@ public void onSuccess(@Nullable Void result) {
366409
367410 @ Override
368411 public void onFailure (Throwable cause ) {
412+ if (enableKeepalive ) {
413+ stopClientPinger ();
414+ stopServerMonitor ();
415+ }
369416 if (!isAlive ()) {
370417 // we don't care about subscription failures when we're no longer running.
371418 logger .log (Level .FINE , "pull failure after service no longer running" , cause );
@@ -410,6 +457,100 @@ private boolean isAlive() {
410457 return state == State .RUNNING || state == State .STARTING ;
411458 }
412459
460+ private void startClientPinger () {
461+ if (pingSchedulerHandle != null ) {
462+ pingSchedulerHandle .cancel (false );
463+ }
464+
465+ pingSchedulerHandle =
466+ systemExecutor .scheduleAtFixedRate (
467+ () -> {
468+ try {
469+ lock .lock ();
470+ try {
471+ if (clientStream != null && isAlive ()) {
472+ clientStream .send (StreamingPullRequest .newBuilder ().build ());
473+ lastClientPingTime .set (clock .nanoTime ());
474+ logger .log (Level .FINEST , "Sent client keepalive ping" );
475+ }
476+ } finally {
477+ lock .unlock ();
478+ }
479+ } catch (Exception e ) {
480+ logger .log (Level .FINE , "Error sending client keepalive ping" , e );
481+ }
482+ },
483+ 0 ,
484+ CLIENT_PING_INTERVAL .getSeconds (),
485+ TimeUnit .SECONDS );
486+ }
487+
488+ private void stopClientPinger () {
489+ if (pingSchedulerHandle != null ) {
490+ pingSchedulerHandle .cancel (false );
491+ pingSchedulerHandle = null ;
492+ }
493+ }
494+
495+ private void startServerMonitor () {
496+ if (serverMonitorHandle != null ) {
497+ serverMonitorHandle .cancel (false );
498+ }
499+
500+ serverMonitorHandle =
501+ systemExecutor .scheduleAtFixedRate (
502+ () -> {
503+ try {
504+ if (!isAlive ()) {
505+ return ;
506+ }
507+
508+ long now = clock .nanoTime ();
509+ long lastResponse = lastServerResponseTime .get ();
510+ long lastPing = lastClientPingTime .get ();
511+
512+ if (lastPing <= lastResponse ) {
513+ return ;
514+ }
515+
516+ Duration elapsedSincePing = Duration .ofNanos (now - lastPing );
517+ if (elapsedSincePing .compareTo (SERVER_PING_TIMEOUT_DURATION ) < 0 ) {
518+ return ;
519+ }
520+
521+ logger .log (
522+ Level .WARNING ,
523+ "No response from server for {0} seconds since last ping. Closing stream." ,
524+ elapsedSincePing .getSeconds ());
525+
526+ lock .lock ();
527+ try {
528+ if (clientStream != null ) {
529+ clientStream .closeSendWithError (
530+ Status .UNAVAILABLE
531+ .withDescription ("Keepalive timeout with server" )
532+ .asException ());
533+ }
534+ } finally {
535+ lock .unlock ();
536+ }
537+ stopServerMonitor ();
538+ } catch (Exception e ) {
539+ logger .log (Level .FINE , "Error in server keepalive monitor" , e );
540+ }
541+ },
542+ SERVER_MONITOR_INTERVAL .getSeconds (),
543+ SERVER_MONITOR_INTERVAL .getSeconds (),
544+ TimeUnit .SECONDS );
545+ }
546+
547+ private void stopServerMonitor () {
548+ if (serverMonitorHandle != null ) {
549+ serverMonitorHandle .cancel (false );
550+ serverMonitorHandle = null ;
551+ }
552+ }
553+
413554 public void setResponseOutstandingMessages (AckResponse ackResponse ) {
414555 // We will close the futures with ackResponse - if there are multiple references to the same
415556 // future they will be handled appropriately
@@ -769,6 +910,7 @@ public static final class Builder {
769910 private Distribution ackLatencyDistribution ;
770911 private SubscriberStub subscriberStub ;
771912 private int channelAffinity ;
913+ private long protocolVersion ;
772914 private FlowController flowController ;
773915 private FlowControlSettings flowControlSettings ;
774916 private boolean useLegacyFlowControl ;
@@ -840,6 +982,11 @@ public Builder setChannelAffinity(int channelAffinity) {
840982 return this ;
841983 }
842984
985+ public Builder setProtocolVersion (long protocolVersion ) {
986+ this .protocolVersion = protocolVersion ;
987+ return this ;
988+ }
989+
843990 public Builder setFlowController (FlowController flowController ) {
844991 this .flowController = flowController ;
845992 return this ;
0 commit comments