Skip to content

Commit 7fd2bbe

Browse files
committed
fix: update initial delay vs period in fake scheduled executor and fix test cases
1 parent a3503ef commit 7fd2bbe

File tree

2 files changed

+20
-16
lines changed

2 files changed

+20
-16
lines changed

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeScheduledExecutorService.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,14 @@ public FakeClock getClock() {
5555
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
5656
return schedulePendingCallable(
5757
new PendingCallable<>(
58-
Duration.ofMillis(unit.toMillis(delay)), command, PendingCallableType.NORMAL));
58+
Duration.ofMillis(unit.toMillis(delay)), command, null, PendingCallableType.NORMAL));
5959
}
6060

6161
@Override
6262
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
6363
return schedulePendingCallable(
6464
new PendingCallable<>(
65-
Duration.ofMillis(unit.toMillis(delay)), callable, PendingCallableType.NORMAL));
65+
Duration.ofMillis(unit.toMillis(delay)), callable, null, PendingCallableType.NORMAL));
6666
}
6767

6868
@Override
@@ -72,6 +72,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(
7272
new PendingCallable<>(
7373
Duration.ofMillis(unit.toMillis(initialDelay)),
7474
command,
75+
Duration.ofMillis(unit.toMillis(period)),
7576
PendingCallableType.FIXED_RATE));
7677
}
7778

@@ -82,6 +83,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(
8283
new PendingCallable<>(
8384
Duration.ofMillis(unit.toMillis(initialDelay)),
8485
command,
86+
Duration.ofMillis(unit.toMillis(delay)),
8587
PendingCallableType.FIXED_DELAY));
8688
}
8789

@@ -212,13 +214,15 @@ enum PendingCallableType {
212214
class PendingCallable<T> implements Comparable<PendingCallable<T>> {
213215
Instant creationTime = Instant.ofEpochMilli(clock.millisTime());
214216
Duration delay;
217+
Duration period;
215218
Callable<T> pendingCallable;
216219
SettableFuture<T> future = SettableFuture.create();
217220
AtomicBoolean cancelled = new AtomicBoolean(false);
218221
AtomicBoolean done = new AtomicBoolean(false);
219222
PendingCallableType type;
220223

221-
PendingCallable(Duration delay, final Runnable runnable, PendingCallableType type) {
224+
PendingCallable(
225+
Duration delay, final Runnable runnable, Duration period, PendingCallableType type) {
222226
pendingCallable =
223227
new Callable<T>() {
224228
@Override
@@ -229,12 +233,15 @@ public T call() {
229233
};
230234
this.type = type;
231235
this.delay = delay;
236+
this.period = period;
232237
}
233238

234-
PendingCallable(Duration delay, Callable<T> callable, PendingCallableType type) {
239+
PendingCallable(
240+
Duration delay, Callable<T> callable, Duration period, PendingCallableType type) {
235241
pendingCallable = callable;
236242
this.type = type;
237243
this.delay = delay;
244+
this.period = period;
238245
}
239246

240247
private Instant getScheduledTime() {
@@ -305,10 +312,12 @@ T call() {
305312
break;
306313
case FIXED_DELAY:
307314
this.creationTime = Instant.ofEpochMilli(clock.millisTime());
315+
this.delay = period;
308316
schedulePendingCallable(this);
309317
break;
310318
case FIXED_RATE:
311319
this.creationTime = this.creationTime.plus(delay);
320+
this.delay = period;
312321
schedulePendingCallable(this);
313322
break;
314323
default:

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ public class StreamingSubscriberConnectionTest {
9696

9797
private static final long KEEP_ALIVE_SUPPORT_VERSION = 1;
9898
private static final Duration CLIENT_PING_INTERVAL = Duration.ofSeconds(30);
99-
private static final Duration SERVER_TIMEOUT_DURATION = Duration.ofSeconds(45);
10099
private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
101100

102101
@Before
@@ -703,8 +702,8 @@ public void testClientPinger_pingSent() {
703702

704703
ArgumentCaptor<StreamingPullRequest> requestCaptor =
705704
ArgumentCaptor.forClass(StreamingPullRequest.class);
706-
// 1 initial request + 2 pings
707-
verify(mockClientStream, times(3)).send(requestCaptor.capture());
705+
// 1 initial request + 3 pings
706+
verify(mockClientStream, times(4)).send(requestCaptor.capture());
708707
List<StreamingPullRequest> requests = requestCaptor.getAllValues();
709708

710709
StreamingPullRequest initialRequest = requests.get(0);
@@ -723,7 +722,7 @@ public void testClientPinger_pingSent() {
723722

724723
// No more pings
725724
systemExecutor.advanceTime(CLIENT_PING_INTERVAL);
726-
verify(mockClientStream, times(3)).send(any(StreamingPullRequest.class));
725+
verify(mockClientStream, times(4)).send(any(StreamingPullRequest.class));
727726
}
728727

729728
@Test
@@ -769,7 +768,7 @@ public void testServerMonitor_timesOut() {
769768
StreamingPullRequest req = invocation.getArgument(0);
770769
// Pings are empty requests
771770
if (req.getSubscription().isEmpty()) {
772-
if (pingCount.incrementAndGet() > 1) { // allow first ping
771+
if (pingCount.incrementAndGet() > 2) { // allow first 2 pings
773772
throw new RuntimeException("ping failed");
774773
}
775774
}
@@ -788,15 +787,10 @@ public void testServerMonitor_timesOut() {
788787
StreamController mockController = mock(StreamController.class);
789788
observer.onStart(mockController);
790789

791-
// Pings are sent every 30s, monitor checks every 15s. Timeout is 15s after ping if no response.
792-
// t=30s: first ping sent, lastClientPingTime=30s.
793-
// t=45s: monitor check. now=45, lastPing=30. 45-30=15. 15>15 is false. No timeout.
794-
systemExecutor.advanceTime(SERVER_TIMEOUT_DURATION);
790+
systemExecutor.advanceTime(CLIENT_PING_INTERVAL);
795791
verify(mockClientStream, never()).closeSendWithError(any(Exception.class));
796792

797-
// t=60s: second ping fails to send. lastClientPingTime remains 30s.
798-
// t=60s: monitor check. now=60, lastPing=30. 60-30=30. 30>15 is true. Timeout.
799-
systemExecutor.advanceTime(Duration.ofSeconds(16));
793+
systemExecutor.advanceTime(CLIENT_PING_INTERVAL);
800794
ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
801795
verify(mockClientStream, times(1)).closeSendWithError(exceptionCaptor.capture());
802796
StatusException exception = (StatusException) exceptionCaptor.getValue();
@@ -832,6 +826,7 @@ public void testServerMonitor_doesNotTimeOutIfResponseReceived() {
832826
systemExecutor.advanceTime(Duration.ofSeconds(40));
833827
observer.onResponse(StreamingPullResponse.getDefaultInstance());
834828
systemExecutor.advanceTime(Duration.ofSeconds(20)); // to t=60s
829+
observer.onResponse(StreamingPullResponse.getDefaultInstance());
835830

836831
verify(mockClientStream, never()).closeSendWithError(any(Exception.class));
837832
}

0 commit comments

Comments
 (0)