Add logic to handle end timestamp of mutable change stream bounded query#37459
Add logic to handle end timestamp of mutable change stream bounded query#37459chenxuesdu wants to merge 2 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @chenxuesdu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a specific constraint in Google Cloud Spanner regarding mutable change stream bounded queries, where the end timestamp cannot exceed 30 minutes into the future. The changes introduce a mechanism to detect if a change stream is mutable and, if so, automatically adjust the provided end timestamp to comply with this limitation. This ensures that queries remain valid and prevents potential errors when users specify end timestamps beyond the allowed future window. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
76aa3cd to
56ab9e1
Compare
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos()); |
There was a problem hiding this comment.
Should we leave some margin for the end timestamp, e.g. 25min or 20min in the future, in case that the query is processed too fast and this future timestamp still too far in the future?
There was a problem hiding this comment.
Updated to 2 minutes in the future. Please take a look.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request correctly implements the logic to handle the end timestamp for mutable change stream bounded queries in Spanner, which limits the end timestamp to a maximum of 30 minutes in the future. The changes are well-implemented, and the new logic is propagated correctly through the different layers of the application. The added tests effectively verify the new behavior. My review includes a few suggestions to improve code readability and maintainability by replacing magic numbers with named constants.
| private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { | ||
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos()); | ||
| return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp; | ||
| } | ||
| return endTimestamp; | ||
| } |
There was a problem hiding this comment.
For better readability and maintainability, it's good practice to extract magic numbers into constants. Also, using Comparators.min can make the intent of the code clearer.
You would need to add the following import:
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
final long maxSecondsInFuture = 30 * 60;
final Timestamp current = Timestamp.now();
final Timestamp maxTimestamp =
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + maxSecondsInFuture, current.getNanos());
return Comparators.min(maxTimestamp, endTimestamp);
}
return endTimestamp;
}There was a problem hiding this comment.
Maybe define MAX_FUTURE_END_TIME_SCEONDS = 30 * 60 into constant value of the class, in case we need to tune it in the future.
There was a problem hiding this comment.
Updated to 2 minutes in the future. Please take a look.
| public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { | ||
| // Initialize action with isMutableChangeStream = true | ||
| action = | ||
| new QueryChangeStreamAction( | ||
| changeStreamDao, | ||
| partitionMetadataDao, | ||
| changeStreamRecordMapper, | ||
| partitionMetadataMapper, | ||
| dataChangeRecordAction, | ||
| heartbeatRecordAction, | ||
| childPartitionsRecordAction, | ||
| partitionStartRecordAction, | ||
| partitionEndRecordAction, | ||
| partitionEventRecordAction, | ||
| metrics, | ||
| true); | ||
|
|
||
| // Set endTimestamp to 60 minutes in the future | ||
| Timestamp now = Timestamp.now(); | ||
| Timestamp endTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos()); | ||
|
|
||
| partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); | ||
| when(restriction.getTo()).thenReturn(endTimestamp); | ||
| when(partitionMetadataMapper.from(any())).thenReturn(partition); | ||
|
|
||
| final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); | ||
| final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); | ||
| when(changeStreamDao.changeStreamQuery( | ||
| eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), | ||
| timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) | ||
| .thenReturn(resultSet); | ||
| when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap) | ||
| when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); | ||
| when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); | ||
|
|
||
| final ProcessContinuation result = | ||
| action.run( | ||
| partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); | ||
|
|
||
| // Verify query was capped at ~30 minutes | ||
| long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds(); | ||
| assertTrue("Query should be capped at approx 30 minutes (1800s)", Math.abs(diff - 1800) < 10); | ||
|
|
||
| // Crucial: Should RESUME to process the rest later | ||
| assertEquals(ProcessContinuation.resume(), result); | ||
| } |
There was a problem hiding this comment.
Using named constants for magic numbers like 60 * 60 and 1800 improves code readability and maintainability.
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
final long sixtyMinutesInSeconds = 60 * 60;
final long thirtyMinutesInSeconds = 30 * 60;
// Initialize action with isMutableChangeStream = true
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + sixtyMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
// Verify query was capped at ~30 minutes
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
assertTrue(
"Query should be capped at approx 30 minutes (1800s)",
Math.abs(diff - thirtyMinutesInSeconds) < 10);
// Crucial: Should RESUME to process the rest later
assertEquals(ProcessContinuation.resume(), result);
}| public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { | ||
| action = | ||
| new QueryChangeStreamAction( | ||
| changeStreamDao, | ||
| partitionMetadataDao, | ||
| changeStreamRecordMapper, | ||
| partitionMetadataMapper, | ||
| dataChangeRecordAction, | ||
| heartbeatRecordAction, | ||
| childPartitionsRecordAction, | ||
| partitionStartRecordAction, | ||
| partitionEndRecordAction, | ||
| partitionEventRecordAction, | ||
| metrics, | ||
| true); | ||
|
|
||
| // Set endTimestamp to only 10 minutes in the future | ||
| Timestamp now = Timestamp.now(); | ||
| Timestamp endTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60, now.getNanos()); | ||
|
|
||
| partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); | ||
| when(restriction.getTo()).thenReturn(endTimestamp); | ||
| when(partitionMetadataMapper.from(any())).thenReturn(partition); | ||
|
|
||
| final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); | ||
| when(changeStreamDao.changeStreamQuery( | ||
| eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), | ||
| eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS))) | ||
| .thenReturn(resultSet); | ||
| when(resultSet.next()).thenReturn(false); | ||
| when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); | ||
| when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true); | ||
|
|
||
| final ProcessContinuation result = | ||
| action.run( | ||
| partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); | ||
|
|
||
| // Should STOP because we reached the actual requested endTimestamp | ||
| assertEquals(ProcessContinuation.stop(), result); | ||
| } |
There was a problem hiding this comment.
Using a named constant for the magic number 10 * 60 would improve readability.
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
final long tenMinutesInSeconds = 10 * 60;
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to only 10 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + tenMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
eq(endTimestamp),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
// Should STOP because we reached the actual requested endTimestamp
assertEquals(ProcessContinuation.stop(), result);
}| // exceptions about being out of timestamp range). | ||
| boolean stopAfterQuerySucceeds = isBoundedRestriction; | ||
| boolean stopAfterQuerySucceeds = | ||
| isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp); |
There was a problem hiding this comment.
For v1, this changeStreamQueryEndTimestamp.equals(endTimestamp) is valid.
For v2, this changeStreamQueryEndTimestamp.equals(endTimestamp) may never happen. Is that intended? Let's walk through v2 scenario a little bit more.
There was a problem hiding this comment.
For v2 bounded query, the changeStreamQueryEndTimestamp.equals(endTimestamp) condition can meet, since we choose the min of the changeStreamQueryEndTimestamp and endTimestamp, so they will equal to each other sometime. Started one connector test it, it stopped after the endTimestamp given by user.
| private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { | ||
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = |
There was a problem hiding this comment.
For v1 we use endTs,
For v2 we should use getNextReadChangeStreamEndTimestamp() here so that for V2, bounded endTs or unbounded endTs have similar query patterns.
There was a problem hiding this comment.
Updated to 2 minutes in the future. Please take a look.
56ab9e1 to
78b731f
Compare
chenxuesdu
left a comment
There was a problem hiding this comment.
updated. Please take another look.
| ? getBoundedQueryEndTimestamp(endTimestamp) | ||
| : getNextReadChangeStreamEndTimestamp(); | ||
|
|
||
| // Once the changeStreamQuery completes we may need to resume reading from the partition if we |
There was a problem hiding this comment.
Should we update the comment here?
| // the future. | ||
| private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { | ||
| if (this.isMutableChangeStream) { | ||
| Timestamp maxTimestamp = getNextReadChangeStreamEndTimestamp(); |
There was a problem hiding this comment.
Should we call it nextTimestamp instead of maxTimestamp?
Also in general, for code we modified, can you double check the comments are updated as well. Thanks.
There was a problem hiding this comment.
Updated. Please take a look.
| // we didn't encounter any indications that the partition is done (explicit end records or | ||
| // exceptions about being out of timestamp range). | ||
| boolean stopAfterQuerySucceeds = isBoundedRestriction; | ||
| boolean stopAfterQuerySucceeds = false; |
There was a problem hiding this comment.
this is the definition for the boolean.
9412858 to
94b3d4f
Compare
94b3d4f to
5ebada3
Compare
| partitionEventRecordAction, | ||
| metrics)) | ||
| metrics, | ||
| anyBoolean())) |
There was a problem hiding this comment.
testQueryChangeStreamMode (org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest) failed
sdks/java/io/google-cloud-platform/build/test-results/test/TEST-org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.xml [took 0s]
org.mockito.exceptions.misusing.InvalidUseOfMatchersException:
Invalid use of argument matchers!
12 matchers expected, 1 recorded:
-> at org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.setUp(ReadChangeStreamPartitionDoFnTest.java:154)
This exception may occur if matchers are combined with raw values:
//incorrect:
someMethod(any(), "raw String");
When using matchers, all arguments have to be provided by matchers.
For example:
//correct:
someMethod(any(), eq("String by matcher"));
For more info see javadoc for Matchers class.
at app//org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory.queryChangeStreamAction(ActionFactory.java:192)
at app//org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.setUp(ReadChangeStreamPartitionDoFnTest.java:142)
at java.base@11.0.24/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
looks relevant test failure
Abacn
left a comment
There was a problem hiding this comment.
Besides that, there is also a Spanner ChangeStream Integration failure:
testOrderedWithinKey (org.apache.beam.sdk.io.gcp.spanner.changestreams.it.SpannerChangeStreamOrderedWithinKeyIT) failed
java.lang.AssertionError: ParDo(ToString)/ParMultiDo(ToString).output:
Expected: iterable with items ["{\"SingerId\":\"0\"}\n{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};{};", "{\"SingerId\":\"1\"}\n{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 1\"};{};{\"FirstName\":\"Inserting mutation 1\",\"LastName\":null,\"SingerInfo\":null};{};", "{\"SingerId\":\"2\"}\n{\"FirstName\":\"Inserting mutation 2\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 2\"};{};", "{\"SingerId\":\"3\"}\n{\"FirstName\":\"Inserting mutation 3\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 3\"};{};", "{\"SingerId\":\"4\"}\n{\"FirstName\":\"Inserting mutation 4\",\"LastName\":null,\"SingerInfo\":null};{};", "{\"SingerId\":\"5\"}\n{\"FirstName\":\"Updating mutation 5\",\"LastName\":null,\"SingerInfo\":null};{\"FirstName\":\"Updating mutation 5\"};{};"] in any order
but: not matched: "{\"SingerId\":\"0\"}\n{\"FirstName\":\"Inserting mutation 0\",\"LastName\":null,\"SingerInfo\":null};"
at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:176)
at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:461)
at org.apache.beam.sdk.testing.PAssert.that(PAssert.java:453)
For mutable change stream bounded queries, Spanner only allow the max end timestamp to be 30 minutes in the future. So when user provided a end timestamp to read, we choose the min of max allow timestamp and the provided timestamp.