diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java index e8749a836669..cd84168b23f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java @@ -187,7 +187,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( PartitionStartRecordAction partitionStartRecordAction, PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + boolean isMutableChangeStream) { if (queryChangeStreamActionInstance == null) { queryChangeStreamActionInstance = new QueryChangeStreamAction( @@ -201,7 +202,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction( partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + isMutableChangeStream); } return queryChangeStreamActionInstance; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 8da9f3d09515..69e89e74a38b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -34,6 +34,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; @@ -89,6 +90,7 @@ public class QueryChangeStreamAction { private final PartitionEndRecordAction partitionEndRecordAction; private final PartitionEventRecordAction partitionEventRecordAction; private final ChangeStreamMetrics metrics; + private final boolean isMutableChangeStream; /** * Constructs an action class for performing a change stream query for a given partition. @@ -106,6 +108,7 @@ public class QueryChangeStreamAction { * @param PartitionEndRecordAction action class to process {@link PartitionEndRecord}s * @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s * @param metrics metrics gathering class + * @param isMutableChangeStream whether the change stream is mutable or not */ QueryChangeStreamAction( ChangeStreamDao changeStreamDao, @@ -118,7 +121,8 @@ public class QueryChangeStreamAction { PartitionStartRecordAction partitionStartRecordAction, PartitionEndRecordAction partitionEndRecordAction, PartitionEventRecordAction partitionEventRecordAction, - ChangeStreamMetrics metrics) { + ChangeStreamMetrics metrics, + boolean isMutableChangeStream) { this.changeStreamDao = changeStreamDao; this.partitionMetadataDao = partitionMetadataDao; this.changeStreamRecordMapper = changeStreamRecordMapper; @@ -130,6 +134,7 @@ public class QueryChangeStreamAction { this.partitionEndRecordAction = partitionEndRecordAction; this.partitionEventRecordAction = partitionEventRecordAction; this.metrics = metrics; + this.isMutableChangeStream = isMutableChangeStream; } /** @@ -195,13 +200,23 @@ public ProcessContinuation run( final Timestamp endTimestamp = partition.getEndTimestamp(); final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT); final Timestamp changeStreamQueryEndTimestamp = - isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp(); + isBoundedRestriction + ? getBoundedQueryEndTimestamp(endTimestamp) + : getNextReadChangeStreamEndTimestamp(); // Once the changeStreamQuery completes we may need to resume reading from the partition if we // had an unbounded restriction for which we set an arbitrary query end timestamp and for which // we didn't encounter any indications that the partition is done (explicit end records or - // exceptions about being out of timestamp range). - boolean stopAfterQuerySucceeds = isBoundedRestriction; + // exceptions about being out of timestamp range). We also special case the InitialPartition, + // which always stops after the query succeeds. + boolean stopAfterQuerySucceeds = false; + if (InitialPartition.isInitialPartition(partition.getPartitionToken())) { + stopAfterQuerySucceeds = true; + } else { + stopAfterQuerySucceeds = + isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp); + } + try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) { @@ -379,4 +394,14 @@ private Timestamp getNextReadChangeStreamEndTimestamp() { final Timestamp current = Timestamp.now(); return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos()); } + + // For Mutable Change Stream bounded queries, update the query end timestamp to be within 2 + // minutes in the future. + private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { + if (this.isMutableChangeStream) { + Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp(); + return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp : endTimestamp; + } + return endTimestamp; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index 67b58bace70f..95bdbfed7ca5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -151,4 +151,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() { } return changeStreamDaoInstance; } + + public boolean isMutableChangeStream() { + return this.isMutableChangeStream; + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java index 4f5631c468be..c3650b42761b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java @@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends DoFn(); } @@ -215,7 +217,8 @@ public void setup() { partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + isMutableChangeStream); } /** diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java index cf4c047025c4..26ab41dff878 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java @@ -21,7 +21,9 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -116,7 +118,8 @@ public void setUp() throws Exception { partitionStartRecordAction, partitionEndRecordAction, partitionEventRecordAction, - metrics); + metrics, + false); final Struct row = mock(Struct.class); partition = PartitionMetadata.newBuilder() @@ -916,6 +919,121 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction() verify(partitionMetadataDao, never()).updateWatermark(any(), any()); } + @Test + public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { + // Initialize action with isMutableChangeStream = true + action = + new QueryChangeStreamAction( + changeStreamDao, + partitionMetadataDao, + changeStreamRecordMapper, + partitionMetadataMapper, + dataChangeRecordAction, + heartbeatRecordAction, + childPartitionsRecordAction, + partitionStartRecordAction, + partitionEndRecordAction, + partitionEventRecordAction, + metrics, + true); + + // Set endTimestamp to 60 minutes in the future + Timestamp now = Timestamp.now(); + Timestamp endTimestamp = + Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos()); + + partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); + when(restriction.getTo()).thenReturn(endTimestamp); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap) + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Verify query was capped at ~2 minutes + long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds(); + assertTrue("Query should be capped at approx 2 minutes (120s)", Math.abs(diff - 120) < 10); + + // Crucial: Should RESUME to process the rest later + assertEquals(ProcessContinuation.resume(), result); + } + + @Test + public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { + action = + new QueryChangeStreamAction( + changeStreamDao, + partitionMetadataDao, + changeStreamRecordMapper, + partitionMetadataMapper, + dataChangeRecordAction, + heartbeatRecordAction, + childPartitionsRecordAction, + partitionStartRecordAction, + partitionEndRecordAction, + partitionEventRecordAction, + metrics, + true); + + // Set endTimestamp to only 10 seconds in the future + Timestamp now = Timestamp.now(); + Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10, now.getNanos()); + + partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); + when(restriction.getTo()).thenReturn(endTimestamp); + when(partitionMetadataMapper.from(any())).thenReturn(partition); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + final ArgumentCaptor timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); + when(changeStreamDao.changeStreamQuery( + eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), + timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) + .thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Should use the exact endTimestamp since it is within the limit (10s < 2m) + assertEquals(endTimestamp, timestampCaptor.getValue()); + + // Should STOP because we reached the actual requested endTimestamp + assertEquals(ProcessContinuation.stop(), result); + } + + @Test + public void testQueryChangeStreamUnboundedResumesCorrectly() { + // Unbounded restriction (streaming forever) + setupUnboundedPartition(); + + final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); + when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet); + when(resultSet.next()).thenReturn(false); + when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); + when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); + + final ProcessContinuation result = + action.run( + partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); + + // Should return RESUME to continue reading the stream every 2 minutes + assertEquals(ProcessContinuation.resume(), result); + verify(metrics).incQueryCounter(); + } + private static class BundleFinalizerStub implements BundleFinalizer { @Override public void afterBundleCommit(Instant callbackExpiry, Callback callback) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java index 62fa39eef55a..9e588de77a03 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java @@ -20,6 +20,8 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -139,17 +141,18 @@ public void setUp() { when(actionFactory.partitionEventRecordAction(partitionMetadataDao, metrics)) .thenReturn(partitionEventRecordAction); when(actionFactory.queryChangeStreamAction( - changeStreamDao, - partitionMetadataDao, - changeStreamRecordMapper, - partitionMetadataMapper, - dataChangeRecordAction, - heartbeatRecordAction, - childPartitionsRecordAction, - partitionStartRecordAction, - partitionEndRecordAction, - partitionEventRecordAction, - metrics)) + eq(changeStreamDao), + eq(partitionMetadataDao), + eq(changeStreamRecordMapper), + eq(partitionMetadataMapper), + eq(dataChangeRecordAction), + eq(heartbeatRecordAction), + eq(childPartitionsRecordAction), + eq(partitionStartRecordAction), + eq(partitionEndRecordAction), + eq(partitionEventRecordAction), + eq(metrics), + anyBoolean())) .thenReturn(queryChangeStreamAction); doFn.setup();