From 78b731fddfbeed2bf18c2181fe7f246cbc24605f Mon Sep 17 00:00:00 2001 From: Xue Chen Date: Fri, 30 Jan 2026 19:44:26 +0000 Subject: [PATCH 1/3] Add logic to handle end timestamp of mutable change stream bounded query. --- .../changestreams/action/ActionFactory.java | 6 +- .../action/QueryChangeStreamAction.java | 30 ++++- .../spanner/changestreams/dao/DaoFactory.java | 4 + .../dofn/ReadChangeStreamPartitionDoFn.java | 5 +- .../action/QueryChangeStreamActionTest.java | 120 +++++++++++++++++- .../ReadChangeStreamPartitionDoFnTest.java | 4 +- 6 files changed, 161 insertions(+), 8 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index e8749a836669..cd84168b23f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -187,7 +187,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( PartitionStartRecordAction partitionStartRecordAction, PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + boolean isMutableChangeStream) { if (queryChangeStreamActionInstance == null) { queryChangeStreamActionInstance = new QueryChangeStreamAction( @@ -201,7 +202,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + isMutableChangeStream); } return queryChangeStreamActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 8da9f3d09515..e9456b169f03 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; @@ -89,6 +90,7 @@ public class QueryChangeStreamAction { private final PartitionEndRecordAction partitionEndRecordAction; private final PartitionEventRecordAction partitionEventRecordAction; private final ChangeStreamMetrics metrics; + private final boolean isMutableChangeStream; /** * Constructs an action class for performing a change stream query for a given partition. @@ -106,6 +108,7 @@ public class QueryChangeStreamAction { * @param PartitionEndRecordAction action class to process {@link PartitionEndRecord}s * @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s * @param metrics metrics gathering class + * @param isMutableChangeStream whether the change stream is mutable or not */ QueryChangeStreamAction( ChangeStreamDao changeStreamDao, @@ -118,7 +121,8 @@ public class QueryChangeStreamAction { PartitionStartRecordAction partitionStartRecordAction, PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + boolean isMutableChangeStream) { this.changeStreamDao = changeStreamDao; this.partitionMetadataDao = partitionMetadataDao; this.changeStreamRecordMapper = changeStreamRecordMapper; @@ -130,6 +134,7 @@ public class QueryChangeStreamAction { this.partitionEndRecordAction = partitionEndRecordAction; this.partitionEventRecordAction = partitionEventRecordAction; this.metrics = metrics; + this.isMutableChangeStream = isMutableChangeStream; } /** @@ -195,13 +200,22 @@ public ProcessContinuation run( final Timestamp endTimestamp = partition.getEndTimestamp(); final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT); final Timestamp changeStreamQueryEndTimestamp = - isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp(); + isBoundedRestriction + ? getBoundedQueryEndTimestamp(endTimestamp) + : getNextReadChangeStreamEndTimestamp(); // Once the changeStreamQuery completes we may need to resume reading from the partition if we // had an unbounded restriction for which we set an arbitrary query end timestamp and for which // we didn't encounter any indications that the partition is done (explicit end records or // exceptions about being out of timestamp range). - boolean stopAfterQuerySucceeds = isBoundedRestriction; + boolean stopAfterQuerySucceeds = false; + if (InitialPartition.isInitialPartition(partition.getPartitionToken())) { + stopAfterQuerySucceeds = true; + } else { + stopAfterQuerySucceeds = + isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp); + } + try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) { @@ -379,4 +393,14 @@ private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); } + + // For Mutable Change Stream, Spanner only allow the max query end timestamp to be 2 minutes in + // the future. + private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { + if (this.isMutableChangeStream) { + Timestamp maxTimestamp = getNextReadChangeStreamEndTimestamp(); + return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp; + } + return endTimestamp; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 67b58bace70f..95bdbfed7ca5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -151,4 +151,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() { } return changeStreamDaoInstance; } + + public boolean isMutableChangeStream() { + return this.isMutableChangeStream; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 4f5631c468be..c3650b42761b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends DoFn(); } @@ -215,7 +217,8 @@ public void setup() { partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + isMutableChangeStream); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index cf4c047025c4..26ab41dff878 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -21,7 +21,9 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -116,7 +118,8 @@ public void setUp() throws Exception { partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + false); final Struct row = mock(Struct.class); partition = PartitionMetadata.newBuilder() @@ -916,6 +919,121 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() verify(partitionMetadataDao, never()).updateWatermark(any(), any()); } + @Test + public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { + // Initialize action with isMutableChangeStream = true + action = + new QueryChangeStreamAction( + changeStreamDao, + partitionMetadataDao, + changeStreamRecordMapper, + partitionMetadataMapper, + dataChangeRecordAction, + heartbeatRecordAction, + childPartitionsRecordAction, + partitionStartRecordAction, + partitionEndRecordAction, + partitionEventRecordAction, + metrics, + true); + + // Set endTimestamp to 60 minutes in the future + Timestamp now = Timestamp.now(); + Timestamp endTimestamp = + Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos()); + + partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); + when(restriction.getTo()).thenReturn(endTimestamp); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap) + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Verify query was capped at ~2 minutes + long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds(); + assertTrue("Query should be capped at approx 2 minutes (120s)", Math.abs(diff - 120) < 10); + + // Crucial: Should RESUME to process the rest later + assertEquals(ProcessContinuation.resume(), result); + } + + @Test + public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { + action = + new QueryChangeStreamAction( + changeStreamDao, + partitionMetadataDao, + changeStreamRecordMapper, + partitionMetadataMapper, + dataChangeRecordAction, + heartbeatRecordAction, + childPartitionsRecordAction, + partitionStartRecordAction, + partitionEndRecordAction, + partitionEventRecordAction, + metrics, + true); + + // Set endTimestamp to only 10 seconds in the future + Timestamp now = Timestamp.now(); + Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10, now.getNanos()); + + partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); + when(restriction.getTo()).thenReturn(endTimestamp); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Should use the exact endTimestamp since it is within the limit (10s < 2m) + assertEquals(endTimestamp, timestampCaptor.getValue()); + + // Should STOP because we reached the actual requested endTimestamp + assertEquals(ProcessContinuation.stop(), result); + } + + @Test + public void testQueryChangeStreamUnboundedResumesCorrectly() { + // Unbounded restriction (streaming forever) + setupUnboundedPartition(); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Should return RESUME to continue reading the stream every 2 minutes + assertEquals(ProcessContinuation.resume(), result); + verify(metrics).incQueryCounter(); + } + private static class BundleFinalizerStub implements BundleFinalizer { @Override public void afterBundleCommit(Instant callbackExpiry, Callback callback) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 62fa39eef55a..a64889b59473 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -149,7 +150,8 @@ public void setUp() { partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics)) + metrics, + anyBoolean())) .thenReturn(queryChangeStreamAction); doFn.setup(); From 5ebada3271bac36bec3dd3de140a4575e7fbf8ed Mon Sep 17 00:00:00 2001 From: Xue Chen Date: Thu, 5 Feb 2026 18:18:21 +0000 Subject: [PATCH 2/3] Resolve comments --- .../changestreams/action/QueryChangeStreamAction.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index e9456b169f03..69e89e74a38b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -207,7 +207,8 @@ public ProcessContinuation run( // Once the changeStreamQuery completes we may need to resume reading from the partition if we // had an unbounded restriction for which we set an arbitrary query end timestamp and for which // we didn't encounter any indications that the partition is done (explicit end records or - // exceptions about being out of timestamp range). + // exceptions about being out of timestamp range). We also special case the InitialPartition, + // which always stops after the query succeeds. boolean stopAfterQuerySucceeds = false; if (InitialPartition.isInitialPartition(partition.getPartitionToken())) { stopAfterQuerySucceeds = true; @@ -394,12 +395,12 @@ private Timestamp getNextReadChangeStreamEndTimestamp() { return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); } - // For Mutable Change Stream, Spanner only allow the max query end timestamp to be 2 minutes in - // the future. + // For Mutable Change Stream bounded queries, update the query end timestamp to be within 2 + // minutes in the future. private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { if (this.isMutableChangeStream) { - Timestamp maxTimestamp = getNextReadChangeStreamEndTimestamp(); - return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp; + Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp(); + return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp : endTimestamp; } return endTimestamp; } From b1087f318d4752653ba1d189505996b5c4483fc2 Mon Sep 17 00:00:00 2001 From: Xue Chen Date: Mon, 9 Feb 2026 21:11:21 +0000 Subject: [PATCH 3/3] Fix tests --- .../ReadChangeStreamPartitionDoFnTest.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index a64889b59473..9e588de77a03 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -140,17 +141,17 @@ public void setUp() { when(actionFactory.partitionEventRecordAction(partitionMetadataDao, metrics)) .thenReturn(partitionEventRecordAction); when(actionFactory.queryChangeStreamAction( - changeStreamDao, - partitionMetadataDao, - changeStreamRecordMapper, - partitionMetadataMapper, - dataChangeRecordAction, - heartbeatRecordAction, - childPartitionsRecordAction, - partitionStartRecordAction, - partitionEndRecordAction, - partitionEventRecordAction, - metrics, + eq(changeStreamDao), + eq(partitionMetadataDao), + eq(changeStreamRecordMapper), + eq(partitionMetadataMapper), + eq(dataChangeRecordAction), + eq(heartbeatRecordAction), + eq(childPartitionsRecordAction), + eq(partitionStartRecordAction), + eq(partitionEndRecordAction), + eq(partitionEventRecordAction), + eq(metrics), anyBoolean())) .thenReturn(queryChangeStreamAction);