Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
PartitionStartRecordAction partitionStartRecordAction,
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
if (queryChangeStreamActionInstance == null) {
queryChangeStreamActionInstance =
new QueryChangeStreamAction(
Expand All @@ -201,7 +202,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics);
metrics,
isMutableChangeStream);
}
return queryChangeStreamActionInstance;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.ChildPartitionsRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.HeartbeatRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEndRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionEventRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class QueryChangeStreamAction {
private final PartitionEndRecordAction partitionEndRecordAction;
private final PartitionEventRecordAction partitionEventRecordAction;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;

/**
* Constructs an action class for performing a change stream query for a given partition.
Expand All @@ -106,6 +108,7 @@ public class QueryChangeStreamAction {
* @param PartitionEndRecordAction action class to process {@link PartitionEndRecord}s
* @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s
* @param metrics metrics gathering class
* @param isMutableChangeStream whether the change stream is mutable or not
*/
QueryChangeStreamAction(
ChangeStreamDao changeStreamDao,
Expand All @@ -118,7 +121,8 @@ public class QueryChangeStreamAction {
PartitionStartRecordAction partitionStartRecordAction,
PartitionEndRecordAction partitionEndRecordAction,
PartitionEventRecordAction partitionEventRecordAction,
ChangeStreamMetrics metrics) {
ChangeStreamMetrics metrics,
boolean isMutableChangeStream) {
this.changeStreamDao = changeStreamDao;
this.partitionMetadataDao = partitionMetadataDao;
this.changeStreamRecordMapper = changeStreamRecordMapper;
Expand All @@ -130,6 +134,7 @@ public class QueryChangeStreamAction {
this.partitionEndRecordAction = partitionEndRecordAction;
this.partitionEventRecordAction = partitionEventRecordAction;
this.metrics = metrics;
this.isMutableChangeStream = isMutableChangeStream;
}

/**
Expand Down Expand Up @@ -195,13 +200,23 @@ public ProcessContinuation run(
final Timestamp endTimestamp = partition.getEndTimestamp();
final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
final Timestamp changeStreamQueryEndTimestamp =
isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp();
isBoundedRestriction
? getBoundedQueryEndTimestamp(endTimestamp)
: getNextReadChangeStreamEndTimestamp();

// Once the changeStreamQuery completes we may need to resume reading from the partition if we
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we update the comment here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

// had an unbounded restriction for which we set an arbitrary query end timestamp and for which
// we didn't encounter any indications that the partition is done (explicit end records or
// exceptions about being out of timestamp range).
boolean stopAfterQuerySucceeds = isBoundedRestriction;
// exceptions about being out of timestamp range). We also special case the InitialPartition,
// which always stops after the query succeeds.
boolean stopAfterQuerySucceeds = false;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line seems not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the definition for the boolean.

if (InitialPartition.isInitialPartition(partition.getPartitionToken())) {
stopAfterQuerySucceeds = true;
} else {
stopAfterQuerySucceeds =
isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp);
}

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
Expand Down Expand Up @@ -379,4 +394,14 @@ private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
}

// For Mutable Change Stream bounded queries, update the query end timestamp to be within 2
// minutes in the future.
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
Timestamp nextTimestamp = getNextReadChangeStreamEndTimestamp();
return nextTimestamp.compareTo(endTimestamp) < 0 ? nextTimestamp : endTimestamp;
}
return endTimestamp;
}
Comment on lines 400 to 406
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's good practice to extract magic numbers into constants. Also, using Comparators.min can make the intent of the code clearer.

You would need to add the following import:
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;

  private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
    if (this.isMutableChangeStream) {
      final long maxSecondsInFuture = 30 * 60;
      final Timestamp current = Timestamp.now();
      final Timestamp maxTimestamp =
          Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + maxSecondsInFuture, current.getNanos());
      return Comparators.min(maxTimestamp, endTimestamp);
    }
    return endTimestamp;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe define MAX_FUTURE_END_TIME_SCEONDS = 30 * 60 into constant value of the class, in case we need to tune it in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to 2 minutes in the future. Please take a look.

}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
}
return changeStreamDaoInstance;
}

public boolean isMutableChangeStream() {
return this.isMutableChangeStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
private final MapperFactory mapperFactory;
private final ActionFactory actionFactory;
private final ChangeStreamMetrics metrics;
private final boolean isMutableChangeStream;
/**
* Needs to be set through the {@link
* ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} call.
Expand Down Expand Up @@ -104,6 +105,7 @@ public ReadChangeStreamPartitionDoFn(
this.mapperFactory = mapperFactory;
this.actionFactory = actionFactory;
this.metrics = metrics;
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
this.throughputEstimator = new NullThroughputEstimator<>();
}

Expand Down Expand Up @@ -215,7 +217,8 @@ public void setup() {
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics);
metrics,
isMutableChangeStream);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -116,7 +118,8 @@ public void setUp() throws Exception {
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics);
metrics,
false);
final Struct row = mock(Struct.class);
partition =
PartitionMetadata.newBuilder()
Expand Down Expand Up @@ -916,6 +919,121 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
}

@Test
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
// Initialize action with isMutableChangeStream = true
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);

// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos());

partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);

final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

// Verify query was capped at ~2 minutes
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
assertTrue("Query should be capped at approx 2 minutes (120s)", Math.abs(diff - 120) < 10);

// Crucial: Should RESUME to process the rest later
assertEquals(ProcessContinuation.resume(), result);
}
Comment on lines 923 to 969
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using named constants for magic numbers like 60 * 60 and 1800 improves code readability and maintainability.

  public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
    final long sixtyMinutesInSeconds = 60 * 60;
    final long thirtyMinutesInSeconds = 30 * 60;

    // Initialize action with isMutableChangeStream = true
    action =
        new QueryChangeStreamAction(
            changeStreamDao,
            partitionMetadataDao,
            changeStreamRecordMapper,
            partitionMetadataMapper,
            dataChangeRecordAction,
            heartbeatRecordAction,
            childPartitionsRecordAction,
            partitionStartRecordAction,
            partitionEndRecordAction,
            partitionEventRecordAction,
            metrics,
            true);

    // Set endTimestamp to 60 minutes in the future
    Timestamp now = Timestamp.now();
    Timestamp endTimestamp =
        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + sixtyMinutesInSeconds, now.getNanos());

    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
    when(restriction.getTo()).thenReturn(endTimestamp);
    when(partitionMetadataMapper.from(any())).thenReturn(partition);

    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
    final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
    when(changeStreamDao.changeStreamQuery(
            eq(PARTITION_TOKEN),
            eq(PARTITION_START_TIMESTAMP),
            timestampCaptor.capture(),
            eq(PARTITION_HEARTBEAT_MILLIS)))
        .thenReturn(resultSet);
    when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

    final ProcessContinuation result =
        action.run(
            partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

    // Verify query was capped at ~30 minutes
    long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
    assertTrue(
        "Query should be capped at approx 30 minutes (1800s)",
        Math.abs(diff - thirtyMinutesInSeconds) < 10);

    // Crucial: Should RESUME to process the rest later
    assertEquals(ProcessContinuation.resume(), result);
  }


@Test
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);

// Set endTimestamp to only 10 seconds in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp = Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10, now.getNanos());

partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);

final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

// Should use the exact endTimestamp since it is within the limit (10s < 2m)
assertEquals(endTimestamp, timestampCaptor.getValue());

// Should STOP because we reached the actual requested endTimestamp
assertEquals(ProcessContinuation.stop(), result);
}
Comment on lines 972 to 1015
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a named constant for the magic number 10 * 60 would improve readability.

  public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
    final long tenMinutesInSeconds = 10 * 60;
    action =
        new QueryChangeStreamAction(
            changeStreamDao,
            partitionMetadataDao,
            changeStreamRecordMapper,
            partitionMetadataMapper,
            dataChangeRecordAction,
            heartbeatRecordAction,
            childPartitionsRecordAction,
            partitionStartRecordAction,
            partitionEndRecordAction,
            partitionEventRecordAction,
            metrics,
            true);

    // Set endTimestamp to only 10 minutes in the future
    Timestamp now = Timestamp.now();
    Timestamp endTimestamp =
        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + tenMinutesInSeconds, now.getNanos());

    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
    when(restriction.getTo()).thenReturn(endTimestamp);
    when(partitionMetadataMapper.from(any())).thenReturn(partition);

    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
    when(changeStreamDao.changeStreamQuery(
            eq(PARTITION_TOKEN),
            eq(PARTITION_START_TIMESTAMP),
            eq(endTimestamp),
            eq(PARTITION_HEARTBEAT_MILLIS)))
        .thenReturn(resultSet);
    when(resultSet.next()).thenReturn(false);
    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
    when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);

    final ProcessContinuation result =
        action.run(
            partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

    // Should STOP because we reached the actual requested endTimestamp
    assertEquals(ProcessContinuation.stop(), result);
  }


@Test
public void testQueryChangeStreamUnboundedResumesCorrectly() {
// Unbounded restriction (streaming forever)
setupUnboundedPartition();

final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

// Should return RESUME to continue reading the stream every 2 minutes
assertEquals(ProcessContinuation.resume(), result);
verify(metrics).incQueryCounter();
}

private static class BundleFinalizerStub implements BundleFinalizer {
@Override
public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -139,17 +141,18 @@ public void setUp() {
when(actionFactory.partitionEventRecordAction(partitionMetadataDao, metrics))
.thenReturn(partitionEventRecordAction);
when(actionFactory.queryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics))
eq(changeStreamDao),
eq(partitionMetadataDao),
eq(changeStreamRecordMapper),
eq(partitionMetadataMapper),
eq(dataChangeRecordAction),
eq(heartbeatRecordAction),
eq(childPartitionsRecordAction),
eq(partitionStartRecordAction),
eq(partitionEndRecordAction),
eq(partitionEventRecordAction),
eq(metrics),
anyBoolean()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testQueryChangeStreamMode (org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest) failed

sdks/java/io/google-cloud-platform/build/test-results/test/TEST-org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.xml [took 0s]

org.mockito.exceptions.misusing.InvalidUseOfMatchersException: 
Invalid use of argument matchers!
12 matchers expected, 1 recorded:
-> at org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.setUp(ReadChangeStreamPartitionDoFnTest.java:154)

This exception may occur if matchers are combined with raw values:
    //incorrect:
    someMethod(any(), "raw String");
When using matchers, all arguments have to be provided by matchers.
For example:
    //correct:
    someMethod(any(), eq("String by matcher"));

For more info see javadoc for Matchers class.

	at app//org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory.queryChangeStreamAction(ActionFactory.java:192)
	at app//org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.ReadChangeStreamPartitionDoFnTest.setUp(ReadChangeStreamPartitionDoFnTest.java:142)
	at java.base@11.0.24/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

looks relevant test failure

.thenReturn(queryChangeStreamAction);

doFn.setup();
Expand Down
Loading