diff --git a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java index 0768f4f63c6..f0a59967f0a 100644 --- a/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java +++ b/driver-benchmarks/src/main/com/mongodb/benchmark/benchmarks/RawBsonArrayEncodingBenchmark.java @@ -17,7 +17,8 @@ package com.mongodb.benchmark.benchmarks; -import org.bson.BsonArray;import org.bson.BsonDocument; +import org.bson.BsonArray; +import org.bson.BsonDocument; import org.bson.RawBsonDocument; import org.bson.codecs.BsonDocumentCodec; @@ -52,4 +53,4 @@ public void setUp() throws IOException { public int getBytesPerRun() { return documentBytes.length * NUM_INTERNAL_ITERATIONS; } -} \ No newline at end of file +} diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 838c5208807..a263946f5ca 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -109,10 +109,6 @@ public TimeoutContext(final TimeoutSettings timeoutSettings) { this(false, timeoutSettings, startTimeout(timeoutSettings.getTimeoutMS())); } - private TimeoutContext(final TimeoutSettings timeoutSettings, @Nullable final Timeout timeout) { - this(false, timeoutSettings, timeout); - } - private TimeoutContext(final boolean isMaintenanceContext, final TimeoutSettings timeoutSettings, @Nullable final Timeout timeout) { @@ -176,6 +172,7 @@ public Timeout timeoutIncludingRoundTrip() { * @param alternativeTimeoutMS the alternative timeout. * @return timeout to use. */ + @VisibleForTesting(otherwise = PRIVATE) public long timeoutOrAlternative(final long alternativeTimeoutMS) { if (timeout == null) { return alternativeTimeoutMS; @@ -380,11 +377,6 @@ public TimeoutContext withAdditionalReadTimeout(final int additionalReadTimeout) return new TimeoutContext(timeoutSettings.withReadTimeoutMS(newReadTimeout > 0 ? newReadTimeout : Long.MAX_VALUE)); } - // Creates a copy of the timeout context that can be reset without resetting the original. - public TimeoutContext copyTimeoutContext() { - return new TimeoutContext(getTimeoutSettings(), getTimeout()); - } - @Override public String toString() { return "TimeoutContext{" diff --git a/driver-core/src/main/com/mongodb/internal/connection/Time.java b/driver-core/src/main/com/mongodb/internal/connection/Time.java index e3940adf1de..9b7f935e631 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Time.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Time.java @@ -20,7 +20,11 @@ * To enable unit testing of classes that rely on System.nanoTime * *

This class is not part of the public API and may be removed or changed at any time

+ * + * @deprecated Use {@link com.mongodb.internal.time.SystemNanoTime} in production code, + * and {@code Mockito.mockStatic} in test code to tamper with it. */ +@Deprecated public final class Time { static final long CONSTANT_TIME = 42; diff --git a/driver-core/src/main/com/mongodb/internal/time/ExponentialBackoff.java b/driver-core/src/main/com/mongodb/internal/time/ExponentialBackoff.java new file mode 100644 index 00000000000..db5d2efa996 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/time/ExponentialBackoff.java @@ -0,0 +1,76 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.time; + +import com.mongodb.internal.VisibleForTesting; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.DoubleSupplier; + +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; + +/** + * Provides exponential backoff calculations with jitter for retry scenarios. + */ +public final class ExponentialBackoff { + + private static final double TRANSACTION_BASE_MS = 5.0; + @VisibleForTesting(otherwise = PRIVATE) + static final double TRANSACTION_MAX_MS = 500.0; + private static final double TRANSACTION_GROWTH = 1.5; + + // TODO-JAVA-6079 + private static DoubleSupplier testJitterSupplier = null; + + private ExponentialBackoff() { + } + + /** + * Calculate the backoff in milliseconds for transaction retries. + * + * @param attemptNumber attempt number > 0 + * @return The calculated backoff in milliseconds. + */ + public static long calculateTransactionBackoffMs(final int attemptNumber) { + assertTrue(attemptNumber > 0, "Attempt number must be at least 1 (1-based) in the context of transaction backoff calculation"); + double jitter = testJitterSupplier != null + ? testJitterSupplier.getAsDouble() + : ThreadLocalRandom.current().nextDouble(); + return Math.round(jitter * Math.min( + TRANSACTION_BASE_MS * Math.pow(TRANSACTION_GROWTH, attemptNumber - 1), + TRANSACTION_MAX_MS)); + } + + /** + * Set a custom jitter supplier for testing purposes. + * + * @param supplier A DoubleSupplier that returns values in [0, 1] range. + */ + @VisibleForTesting(otherwise = PRIVATE) + public static void setTestJitterSupplier(final DoubleSupplier supplier) { + testJitterSupplier = supplier; + } + + /** + * Clear the test jitter supplier, reverting to default ThreadLocalRandom behavior. + */ + @VisibleForTesting(otherwise = PRIVATE) + public static void clearTestJitterSupplier() { + testJitterSupplier = null; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/time/StartTime.java b/driver-core/src/main/com/mongodb/internal/time/StartTime.java index 1d8f186ab67..650f9a0ebb9 100644 --- a/driver-core/src/main/com/mongodb/internal/time/StartTime.java +++ b/driver-core/src/main/com/mongodb/internal/time/StartTime.java @@ -59,6 +59,6 @@ public interface StartTime { * @return a StartPoint, as of now */ static StartTime now() { - return TimePoint.at(System.nanoTime()); + return TimePoint.at(SystemNanoTime.get()); } } diff --git a/driver-core/src/main/com/mongodb/internal/time/SystemNanoTime.java b/driver-core/src/main/com/mongodb/internal/time/SystemNanoTime.java new file mode 100644 index 00000000000..f047108d509 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/time/SystemNanoTime.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.time; + +/** + * Avoid using this class directly and prefer using other program elements from {@link com.mongodb.internal.time}, if possible. + *

+ * We do not use {@link System#nanoTime()} directly in the rest of the {@link com.mongodb.internal.time} package, + * and use {@link SystemNanoTime#get()} instead because we need to tamper with it via {@code Mockito.mockStatic}, + * and mocking methods of {@link System} class is both impossible and unwise. + */ +public final class SystemNanoTime { + private SystemNanoTime() { + } + + public static long get() { + return System.nanoTime(); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java index 811065d13a6..c3b130e584d 100644 --- a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java +++ b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java @@ -61,14 +61,14 @@ static TimePoint at(@Nullable final Long nanos) { @VisibleForTesting(otherwise = PRIVATE) long currentNanos() { - return System.nanoTime(); + return SystemNanoTime.get(); } /** * Returns the current {@link TimePoint}. */ static TimePoint now() { - return at(System.nanoTime()); + return at(SystemNanoTime.get()); } /** diff --git a/driver-core/src/test/unit/com/mongodb/internal/time/ExponentialBackoffTest.java b/driver-core/src/test/unit/com/mongodb/internal/time/ExponentialBackoffTest.java new file mode 100644 index 00000000000..a7978e23556 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/time/ExponentialBackoffTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.time; + +import org.junit.jupiter.api.Test; + +import java.util.function.DoubleSupplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class ExponentialBackoffTest { + /** + * Expected {@linkplain ExponentialBackoff#calculateTransactionBackoffMs(int) backoffs} with 1.0 as + * {@link ExponentialBackoff#setTestJitterSupplier(DoubleSupplier) jitter}. + */ + private static final double[] EXPECTED_BACKOFFS_MAX_VALUES = {5.0, 7.5, 11.25, 16.875, 25.3125, 37.96875, 56.953125, 85.4296875, 128.14453125, + 192.21679688, 288.32519531, 432.48779297, 500.0}; + + @Test + void testCalculateTransactionBackoffMs() { + for (int attemptNumber = 1; attemptNumber <= EXPECTED_BACKOFFS_MAX_VALUES.length; attemptNumber++) { + long backoff = ExponentialBackoff.calculateTransactionBackoffMs(attemptNumber); + long expectedBackoff = Math.round(EXPECTED_BACKOFFS_MAX_VALUES[attemptNumber - 1]); + assertTrue(backoff >= 0 && backoff <= expectedBackoff, + String.format("Attempt %d: backoff should be between 0 ms and %d ms, got: %d", attemptNumber, + expectedBackoff, backoff)); + } + } + + @Test + void testCalculateTransactionBackoffMsRespectsMaximum() { + for (int attemptNumber = 1; attemptNumber < EXPECTED_BACKOFFS_MAX_VALUES.length * 2; attemptNumber++) { + long backoff = ExponentialBackoff.calculateTransactionBackoffMs(attemptNumber); + assertTrue(backoff >= 0 && backoff <= ExponentialBackoff.TRANSACTION_MAX_MS, + String.format("Attempt %d: backoff should be capped at %f ms, got: %d ms", + attemptNumber, ExponentialBackoff.TRANSACTION_MAX_MS, backoff)); + } + } + + @Test + void testCustomJitterWithOne() { + ExponentialBackoff.setTestJitterSupplier(() -> 1.0); + try { + for (int attemptNumber = 1; attemptNumber <= EXPECTED_BACKOFFS_MAX_VALUES.length; attemptNumber++) { + long backoff = ExponentialBackoff.calculateTransactionBackoffMs(attemptNumber); + long expected = Math.round(EXPECTED_BACKOFFS_MAX_VALUES[attemptNumber - 1]); + assertEquals(expected, backoff, + String.format("Attempt %d: with jitter=1.0, backoff should be %d ms", attemptNumber, expected)); + } + } finally { + ExponentialBackoff.clearTestJitterSupplier(); + } + } + + @Test + void testCustomJitterWithZero() { + ExponentialBackoff.setTestJitterSupplier(() -> 0.0); + try { + for (int attemptNumber = 1; attemptNumber <= EXPECTED_BACKOFFS_MAX_VALUES.length; attemptNumber++) { + long backoff = ExponentialBackoff.calculateTransactionBackoffMs(attemptNumber); + assertEquals(0, backoff, "With jitter=0, backoff should always be 0 ms"); + } + } finally { + ExponentialBackoff.clearTestJitterSupplier(); + } + } +} diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionClock.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionClock.java deleted file mode 100644 index a5ba63e3cd6..00000000000 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionClock.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.client.internal; - -/** - *

This class is not part of the public API and may be removed or changed at any time

- */ -public final class ClientSessionClock { - public static final ClientSessionClock INSTANCE = new ClientSessionClock(0L); - - private long currentTime; - - private ClientSessionClock(final long millis) { - currentTime = millis; - } - - public long now() { - if (currentTime == 0L) { - return System.currentTimeMillis(); - } - return currentTime; - } - - public void setTime(final long millis) { - currentTime = millis; - } -} diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java index aa1414dce5d..99f9dc9708b 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java @@ -21,13 +21,15 @@ import com.mongodb.MongoException; import com.mongodb.MongoExecutionTimeoutException; import com.mongodb.MongoInternalException; -import com.mongodb.MongoOperationTimeoutException; +import com.mongodb.MongoTimeoutException; import com.mongodb.ReadConcern; import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.client.ClientSession; import com.mongodb.client.TransactionBody; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.observability.micrometer.TracingManager; +import com.mongodb.internal.observability.micrometer.TransactionSpan; import com.mongodb.internal.operation.AbortTransactionOperation; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.OperationHelper; @@ -36,20 +38,25 @@ import com.mongodb.internal.operation.WriteOperation; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; -import com.mongodb.internal.observability.micrometer.TracingManager; -import com.mongodb.internal.observability.micrometer.TransactionSpan; +import com.mongodb.internal.time.ExponentialBackoff; +import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; + import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; final class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession { - private static final int MAX_RETRY_TIME_LIMIT_MS = 120000; + private static final long MAX_RETRY_TIME_LIMIT_MS = 120000; private final OperationExecutor operationExecutor; private TransactionState transactionState = TransactionState.NONE; @@ -152,6 +159,12 @@ public void abortTransaction() { } } + private void abortIfInTransaction() { + if (transactionState == TransactionState.IN) { + abortTransaction(); + } + } + private void startTransaction(final TransactionOptions transactionOptions, final TimeoutContext timeoutContext) { Boolean snapshot = getOptions().isSnapshot(); if (snapshot != null && snapshot) { @@ -249,31 +262,50 @@ public T withTransaction(final TransactionBody transactionBody) { @Override public T withTransaction(final TransactionBody transactionBody, final TransactionOptions options) { notNull("transactionBody", transactionBody); - long startTime = ClientSessionClock.INSTANCE.now(); TimeoutContext withTransactionTimeoutContext = createTimeoutContext(options); + boolean timeoutMsConfigured = withTransactionTimeoutContext.hasTimeoutMS(); + Timeout withTransactionTimeout = assertNotNull(timeoutMsConfigured + ? withTransactionTimeoutContext.getTimeout() + : TimeoutContext.startTimeout(MAX_RETRY_TIME_LIMIT_MS)); + BooleanSupplier withTransactionTimeoutExpired = () -> withTransactionTimeout.call(TimeUnit.MILLISECONDS, + () -> false, ms -> false, () -> true); + int transactionAttempt = 0; + MongoException lastError = null; try { outer: while (true) { - T retVal; + if (transactionAttempt > 0) { + backoff(transactionAttempt, withTransactionTimeout, assertNotNull(lastError), timeoutMsConfigured); + } try { - startTransaction(options, withTransactionTimeoutContext.copyTimeoutContext()); + startTransaction(options, withTransactionTimeoutContext); + transactionAttempt++; if (transactionSpan != null) { transactionSpan.setIsConvenientTransaction(); } + } catch (Throwable e) { + abortIfInTransaction(); + throw e; + } + T retVal; + try { retVal = transactionBody.execute(); } catch (Throwable e) { - if (transactionState == TransactionState.IN) { - abortTransaction(); - } - if (e instanceof MongoException && !(e instanceof MongoOperationTimeoutException)) { - MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e); - if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL) - && ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) { + abortIfInTransaction(); + if (e instanceof MongoException) { + MongoException mongoException = (MongoException) e; + MongoException labelCarryingException = OperationHelper.unwrap(mongoException); + if (labelCarryingException.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)) { if (transactionSpan != null) { transactionSpan.spanFinalizing(false); } + lastError = mongoException; continue; + } else if (labelCarryingException.hasErrorLabel(UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { + throw e; + } else { + throw mongoException; } } throw e; @@ -283,23 +315,22 @@ public T withTransaction(final TransactionBody transactionBody, final Tra try { commitTransaction(false); break; - } catch (MongoException e) { - clearTransactionContextOnError(e); - if (!(e instanceof MongoOperationTimeoutException) - && ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) { + } catch (MongoException mongoException) { + if (mongoException.hasErrorLabel(UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL) + && !(mongoException instanceof MongoExecutionTimeoutException)) { + if (withTransactionTimeoutExpired.getAsBoolean()) { + throw wrapInMongoTimeoutException(mongoException, timeoutMsConfigured); + } applyMajorityWriteConcernToTransactionOptions(); - - if (!(e instanceof MongoExecutionTimeoutException) - && e.hasErrorLabel(UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) { - continue; - } else if (e.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)) { - if (transactionSpan != null) { - transactionSpan.spanFinalizing(true); - } - continue outer; + continue; + } else if (mongoException.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)) { + if (transactionSpan != null) { + transactionSpan.spanFinalizing(true); } + lastError = mongoException; + continue outer; } - throw e; + throw mongoException; } } } @@ -321,9 +352,7 @@ public TransactionSpan getTransactionSpan() { @Override public void close() { try { - if (transactionState == TransactionState.IN) { - abortTransaction(); - } + abortIfInTransaction(); } finally { clearTransactionContext(); super.close(); @@ -359,4 +388,35 @@ private TimeoutContext createTimeoutContext(final TransactionOptions transaction TransactionOptions.merge(transactionOptions, getOptions().getDefaultTransactionOptions()), operationExecutor.getTimeoutSettings())); } + + private static void backoff(final int transactionAttempt, + final Timeout withTransactionTimeout, final MongoException lastError, final boolean timeoutMsConfigured) { + long backoffMs = ExponentialBackoff.calculateTransactionBackoffMs(transactionAttempt); + withTransactionTimeout.shortenBy(backoffMs, TimeUnit.MILLISECONDS).onExpired(() -> { + throw wrapInMongoTimeoutException(lastError, timeoutMsConfigured); + }); + try { + if (backoffMs > 0) { + Thread.sleep(backoffMs); + } + } catch (InterruptedException e) { + throw interruptAndCreateMongoInterruptedException("Transaction retry interrupted", e); + } + } + + private static MongoException wrapInMongoTimeoutException(final MongoException cause, final boolean timeoutMsConfigured) { + MongoException timeoutException = timeoutMsConfigured + ? createMongoTimeoutException(cause) + : wrapInNonTimeoutMsMongoTimeoutException(cause); + if (timeoutException != cause) { + cause.getErrorLabels().forEach(timeoutException::addLabel); + } + return timeoutException; + } + + private static MongoTimeoutException wrapInNonTimeoutMsMongoTimeoutException(final MongoException cause) { + return cause instanceof MongoTimeoutException + ? (MongoTimeoutException) cause + : new MongoTimeoutException("Operation exceeded the timeout limit.", cause); + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 3d2d58dc4c8..7eecdfc4702 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -47,11 +47,6 @@ import com.mongodb.event.ConnectionClosedEvent; import com.mongodb.event.ConnectionCreatedEvent; import com.mongodb.event.ConnectionReadyEvent; - -import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; -import static com.mongodb.internal.connection.CommandHelper.HELLO; -import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; - import com.mongodb.internal.connection.InternalStreamConnection; import com.mongodb.internal.connection.ServerHelper; import com.mongodb.internal.connection.TestCommandListener; @@ -89,8 +84,11 @@ import static com.mongodb.ClusterFixture.isStandalone; import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.ClusterFixture.sleep; +import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getPrimary; +import static com.mongodb.internal.connection.CommandHelper.HELLO; +import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; import static java.lang.Long.MAX_VALUE; import static java.lang.String.join; import static java.util.Arrays.asList; @@ -1122,6 +1120,7 @@ public void setUp() { filesCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsFileNamespace); chunksCollectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), gridFsChunksNamespace); commandListener = new TestCommandListener(); + } @AfterEach diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java index 4dcbc4d1a0f..bbabcd8f61a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutProseTest.java @@ -19,6 +19,9 @@ import com.mongodb.MongoClientSettings; import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.internal.time.ExponentialBackoff; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; /** @@ -36,6 +39,23 @@ protected GridFSBucket createGridFsBucket(final MongoDatabase mongoDatabase, fin return GridFSBuckets.create(mongoDatabase, bucketName); } + @BeforeEach + @Override + public void setUp() { + super.setUp(); + ExponentialBackoff.setTestJitterSupplier(() -> 0); + } + + @AfterEach + @Override + public void tearDown() throws InterruptedException { + try { + super.tearDown(); + } finally { + ExponentialBackoff.clearTestJitterSupplier(); + } + } + @Override protected boolean isAsync() { return false; diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java index cb62545f4e4..9fc2f0e6acc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java @@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assumptions.assumeFalse; - // See https://github.com/mongodb/specifications/tree/master/source/client-side-operation-timeout/tests public class ClientSideOperationTimeoutTest extends UnifiedSyncTest { diff --git a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java index a840a83babb..687e9f8dccd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/WithTransactionProseTest.java @@ -18,20 +18,34 @@ import com.mongodb.ClientSessionOptions; import com.mongodb.MongoClientException; +import com.mongodb.MongoCommandException; import com.mongodb.MongoException; +import com.mongodb.MongoNodeIsRecoveringException; +import com.mongodb.MongoTimeoutException; import com.mongodb.TransactionOptions; -import com.mongodb.client.internal.ClientSessionClock; import com.mongodb.client.model.Sorts; +import com.mongodb.internal.time.ExponentialBackoff; +import com.mongodb.internal.time.StartTime; +import com.mongodb.internal.time.SystemNanoTime; +import org.bson.BsonDocument; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import java.time.Duration; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; import static com.mongodb.ClusterFixture.isSharded; +import static com.mongodb.client.Fixture.getPrimary; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -41,8 +55,7 @@ * Prose Tests. */ public class WithTransactionProseTest extends DatabaseTestCase { - private static final long START_TIME_MS = 1L; - private static final long ERROR_GENERATING_INTERVAL = 121000L; + private static final Duration ERROR_GENERATING_INTERVAL = Duration.ofSeconds(120); @BeforeEach @Override @@ -62,7 +75,7 @@ public void setUp() { public void testCallbackRaisesCustomError() { final String exceptionMessage = "NotTransientOrUnknownError"; try (ClientSession session = client.startSession()) { - session.withTransaction((TransactionBody) () -> { + session.withTransaction(() -> { throw new MongoException(exceptionMessage); }); // should not get here @@ -97,17 +110,20 @@ public void testRetryTimeoutEnforcedTransientTransactionError() { final String errorMessage = "transient transaction error"; try (ClientSession session = client.startSession()) { - ClientSessionClock.INSTANCE.setTime(START_TIME_MS); - session.withTransaction((TransactionBody) () -> { - ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL); - MongoException e = new MongoException(112, errorMessage); - e.addLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL); - throw e; - }); + doWithSystemNanoTimeHandle(systemNanoTimeHandle -> + session.withTransaction(() -> { + systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL); + MongoException e = new MongoException(112, errorMessage); + e.addLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL); + throw e; + })); fail("Test should have thrown an exception."); } catch (Exception e) { - assertEquals(errorMessage, e.getMessage()); - assertTrue(((MongoException) e).getErrorLabels().contains(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); + MongoTimeoutException exception = assertInstanceOf(MongoTimeoutException.class, e); + assertTrue(exception.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); + MongoException cause = assertInstanceOf(MongoException.class, exception.getCause()); + assertEquals(errorMessage, cause.getMessage()); + assertTrue(cause.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); } } @@ -123,16 +139,19 @@ public void testRetryTimeoutEnforcedUnknownTransactionCommit() { + "'data': {'failCommands': ['commitTransaction'], 'errorCode': 91, 'closeConnection': false}}")); try (ClientSession session = client.startSession()) { - ClientSessionClock.INSTANCE.setTime(START_TIME_MS); - session.withTransaction((TransactionBody) () -> { - ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL); - collection.insertOne(session, new Document("_id", 2)); - return null; - }); + doWithSystemNanoTimeHandle(systemNanoTimeHandle -> + session.withTransaction(() -> { + systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL); + collection.insertOne(session, new Document("_id", 2)); + return null; + })); fail("Test should have thrown an exception."); } catch (Exception e) { - assertEquals(91, ((MongoException) e).getCode()); - assertTrue(((MongoException) e).getErrorLabels().contains(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)); + MongoTimeoutException exception = assertInstanceOf(MongoTimeoutException.class, e); + assertTrue(exception.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)); + MongoNodeIsRecoveringException cause = assertInstanceOf(MongoNodeIsRecoveringException.class, exception.getCause()); + assertEquals(91, cause.getCode()); + assertTrue(cause.hasErrorLabel(MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)); } finally { failPointAdminDb.runCommand(Document.parse("{'configureFailPoint': 'failCommand', 'mode': 'off'}")); } @@ -151,16 +170,19 @@ public void testRetryTimeoutEnforcedTransientTransactionErrorOnCommit() { + "'errmsg': 'Transaction 0 has been aborted', 'closeConnection': false}}")); try (ClientSession session = client.startSession()) { - ClientSessionClock.INSTANCE.setTime(START_TIME_MS); - session.withTransaction((TransactionBody) () -> { - ClientSessionClock.INSTANCE.setTime(ERROR_GENERATING_INTERVAL); - collection.insertOne(session, Document.parse("{ _id : 1 }")); - return null; - }); + doWithSystemNanoTimeHandle(systemNanoTimeHandle -> + session.withTransaction(() -> { + systemNanoTimeHandle.setRelativeToStart(ERROR_GENERATING_INTERVAL); + collection.insertOne(session, Document.parse("{ _id : 1 }")); + return null; + })); fail("Test should have thrown an exception."); } catch (Exception e) { - assertEquals(251, ((MongoException) e).getCode()); - assertTrue(((MongoException) e).getErrorLabels().contains(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); + MongoTimeoutException exception = assertInstanceOf(MongoTimeoutException.class, e); + assertTrue(exception.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); + MongoCommandException cause = assertInstanceOf(MongoCommandException.class, exception.getCause()); + assertEquals(251, cause.getCode()); + assertTrue(cause.hasErrorLabel(MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL)); } finally { failPointAdminDb.runCommand(Document.parse("{'configureFailPoint': 'failCommand', 'mode': 'off'}")); } @@ -203,7 +225,73 @@ public void testTimeoutMSAndLegacySettings() { } } - private boolean canRunTests() { + /** + * See + * Retry Backoff is Enforced. + */ + @DisplayName("Retry Backoff is Enforced") + @Test + public void testRetryBackoffIsEnforced() throws InterruptedException { + long noBackoffTimeMs = measureTransactionLatencyMs(0.0); + long withBackoffTimeMs = measureTransactionLatencyMs(1.0); + + long sumOfBackoffsMs = 1800; + long toleranceMs = 500; + long actualDifferenceMs = Math.abs(withBackoffTimeMs - (noBackoffTimeMs + sumOfBackoffsMs)); + + assertTrue(actualDifferenceMs < toleranceMs, + String.format("Observed backoff time deviates from expected by %d ms (tolerance: %d ms)", actualDifferenceMs, toleranceMs)); + } + + /** + * This test is not from the specification. + */ + @Test + public void testExponentialBackoffOnTransientError() throws InterruptedException { + BsonDocument failPointDocument = BsonDocument.parse("{'configureFailPoint': 'failCommand', 'mode': {'times': 3}, " + + "'data': {'failCommands': ['insert'], 'errorCode': 112, " + + "'errorLabels': ['TransientTransactionError']}}"); + + try (ClientSession session = client.startSession(); + FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { + AtomicInteger attemptsCount = new AtomicInteger(0); + + session.withTransaction(() -> { + attemptsCount.incrementAndGet(); // Count the attempt before the operation that might fail + return collection.insertOne(session, Document.parse("{}")); + }); + + assertEquals(4, attemptsCount.get(), "Expected 1 initial attempt + 3 retries"); + } + } + + private long measureTransactionLatencyMs(final double jitter) throws InterruptedException { + BsonDocument failPointDocument = BsonDocument.parse("{'configureFailPoint': 'failCommand', 'mode': {'times': 13}, " + + "'data': {'failCommands': ['commitTransaction'], 'errorCode': 251}}"); + ExponentialBackoff.setTestJitterSupplier(() -> jitter); + try (ClientSession session = client.startSession(); + FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { + StartTime startTime = StartTime.now(); + session.withTransaction(() -> collection.insertOne(session, Document.parse("{}"))); + return startTime.elapsed().toMillis(); + } finally { + ExponentialBackoff.clearTestJitterSupplier(); + } + } + + private static boolean canRunTests() { return isSharded() || isDiscoverableReplicaSet(); } + + private static void doWithSystemNanoTimeHandle(final Consumer action) { + long startNanos = SystemNanoTime.get(); + try (MockedStatic mockedStaticSystemNanoTime = Mockito.mockStatic(SystemNanoTime.class)) { + mockedStaticSystemNanoTime.when(SystemNanoTime::get).thenReturn(startNanos); + action.accept(change -> mockedStaticSystemNanoTime.when(SystemNanoTime::get).thenReturn(startNanos + change.toNanos())); + } + } + + private interface SystemNanoTimeHandle { + void setRelativeToStart(Duration change); + } }