diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java index bee3a5c1f..c8d1dfd5e 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/Mutiny.java @@ -5,6 +5,11 @@ */ package org.hibernate.reactive.mutiny; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; + import org.hibernate.Cache; import org.hibernate.CacheMode; import org.hibernate.Filter; @@ -44,10 +49,6 @@ import jakarta.persistence.criteria.CriteriaUpdate; import jakarta.persistence.metamodel.Attribute; import jakarta.persistence.metamodel.Metamodel; -import java.lang.invoke.MethodHandles; -import java.util.List; -import java.util.function.BiFunction; -import java.util.function.Function; import static org.hibernate.engine.internal.ManagedTypeHelper.asPersistentAttributeInterceptable; import static org.hibernate.engine.internal.ManagedTypeHelper.isPersistentAttributeInterceptable; @@ -1971,6 +1972,59 @@ interface Transaction { */ interface SessionFactory extends AutoCloseable { + /** + * Obtain a new {@link Session reactive session}, the main + * interaction point between the user's program and Hibernate + * Reactive. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link Session} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + */ + @Incubating + Session createSession(); + + /** + * Obtain a new {@link Session reactive session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link Session} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + * @param tenantId the id of the tenant + */ + @Incubating + Session createSession(String tenantId); + + /** + * Obtain a new {@link StatelessSession reactive stateless session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link StatelessSession} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + */ + @Incubating + StatelessSession createStatelessSession(); + + /** + * Obtain a new {@link StatelessSession reactive stateless session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link StatelessSession} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + * + * @param tenantId the id of the tenant + */ + @Incubating + StatelessSession createStatelessSession(String tenantId); + /** * Obtain a new {@link Session reactive session} {@link Uni}, the main * interaction point between the user's program and Hibernate diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java index 93d5d5386..1959872ad 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionFactoryImpl.java @@ -28,6 +28,8 @@ import org.hibernate.reactive.mutiny.Mutiny; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; +import org.hibernate.reactive.session.ReactiveSession; +import org.hibernate.reactive.session.ReactiveStatelessSession; import org.hibernate.reactive.session.impl.ReactiveSessionImpl; import org.hibernate.reactive.session.impl.ReactiveStatelessSessionImpl; import org.hibernate.service.ServiceRegistry; @@ -88,21 +90,48 @@ public Context getContext() { return context; } + @Override + public Mutiny.Session createSession() { + return createSession( getTenantIdentifier( options() ) ); + } + + @Override + public Mutiny.Session createSession(String tenantId) { + final SessionCreationOptions options = options(); + ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class ); + ReactiveSession sessionImpl = new ReactiveSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) ); + return new MutinySessionImpl( sessionImpl, this ); + } + + @Override + public Mutiny.StatelessSession createStatelessSession() { + return createStatelessSession( getTenantIdentifier( options() ) ); + } + + @Override + public Mutiny.StatelessSession createStatelessSession(String tenantId) { + final SessionCreationOptions options = options(); + ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class ); + ReactiveStatelessSession sessionImpl = new ReactiveStatelessSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) ); + return new MutinyStatelessSessionImpl( sessionImpl, this ); + } + @Override public Uni openSession() { SessionCreationOptions options = options(); return uni( () -> connection( getTenantIdentifier( options ) ) ) - .chain( reactiveConnection -> create( reactiveConnection, - () -> new ReactiveSessionImpl( delegate, options, reactiveConnection ) ) ) - .map( s -> new MutinySessionImpl(s, this) ); + .chain( reactiveConnection -> create( + reactiveConnection, + () -> new ReactiveSessionImpl( delegate, options, reactiveConnection ) + ) ) + .map( s -> new MutinySessionImpl( s, this ) ); } @Override public Uni openSession(String tenantId) { return uni( () -> connection( tenantId ) ) - .chain( reactiveConnection -> create( reactiveConnection, - () -> new ReactiveSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) ) - .map( s -> new MutinySessionImpl(s, this) ); + .chain( reactiveConnection -> create( reactiveConnection, () -> new ReactiveSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) ) + .map( s -> new MutinySessionImpl( s, this ) ); } /** @@ -122,16 +151,20 @@ private static Uni close(ReactiveConnection connection) { public Uni openStatelessSession() { SessionCreationOptions options = options(); return uni( () -> connection( getTenantIdentifier( options ) ) ) - .chain( reactiveConnection -> create( reactiveConnection, - () -> new ReactiveStatelessSessionImpl( delegate, options, reactiveConnection ) ) ) - .map( s -> new MutinyStatelessSessionImpl(s, this) ); + .chain( reactiveConnection -> create( + reactiveConnection, + () -> new ReactiveStatelessSessionImpl( delegate, options, reactiveConnection ) + ) ) + .map( s -> new MutinyStatelessSessionImpl( s, this ) ); } @Override public Uni openStatelessSession(String tenantId) { return uni( () -> connection( tenantId ) ) - .chain( reactiveConnection -> create( reactiveConnection, - () -> new ReactiveStatelessSessionImpl( delegate, options( tenantId ), reactiveConnection ) ) ) + .chain( reactiveConnection -> create( + reactiveConnection, + () -> new ReactiveStatelessSessionImpl( delegate, options( tenantId ), reactiveConnection ) + ) ) .map( s -> new MutinyStatelessSessionImpl( s, this ) ); } @@ -190,8 +223,8 @@ public Uni withSession(String tenantId, Function> Objects.requireNonNull( tenantId, "parameter 'tenantId' is required" ); Objects.requireNonNull( work, "parameter 'work' is required" ); Context.Key key = new MultitenantKey<>( contextKeyForSession, tenantId ); - Mutiny.Session current = context.get(key); - if ( current!=null && current.isOpen() ) { + Mutiny.Session current = context.get( key ); + if ( current != null && current.isOpen() ) { LOG.debugf( "Reusing existing open Mutiny.Session which was found in the current Vert.x context for current tenant '%s'", tenantId ); return work.apply( current ); } @@ -227,11 +260,11 @@ public Uni withStatelessSession(String tenantId, Function Uni withSession( + private Uni withSession( Uni sessionUni, Function> work, Context.Key contextKey) { @@ -246,25 +279,25 @@ private Uni withSession( @Override public Uni withTransaction(BiFunction> work) { Objects.requireNonNull( work, "parameter 'work' is required" ); - return withSession( s -> s.withTransaction( t -> work.apply(s, t) ) ); + return withSession( s -> s.withTransaction( t -> work.apply( s, t ) ) ); } @Override public Uni withStatelessTransaction(BiFunction> work) { Objects.requireNonNull( work, "parameter 'work' is required" ); - return withStatelessSession( s -> s.withTransaction( t -> work.apply(s, t) ) ); + return withStatelessSession( s -> s.withTransaction( t -> work.apply( s, t ) ) ); } @Override public Uni withTransaction(String tenantId, BiFunction> work) { Objects.requireNonNull( work, "parameter 'work' is required" ); - return withSession( tenantId, s -> s.withTransaction( t -> work.apply(s, t) ) ); + return withSession( tenantId, s -> s.withTransaction( t -> work.apply( s, t ) ) ); } @Override public Uni withStatelessTransaction(String tenantId, BiFunction> work) { Objects.requireNonNull( work, "parameter 'work' is required" ); - return withStatelessSession( tenantId, s -> s.withTransaction( t -> work.apply(s, t) ) ); + return withStatelessSession( tenantId, s -> s.withTransaction( t -> work.apply( s, t ) ) ); } @Override diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java index ec1bdbd52..ad259652b 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/ReactiveConnectionPool.java @@ -37,12 +37,25 @@ */ @Incubating public interface ReactiveConnectionPool extends Service { + /** + * Obtain a lazily-initializing reactive connection. The + * actual connection might be made when the returned + * instance if {@link ReactiveConnection} is first used. + */ + ReactiveConnection getProxyConnection(); /** * Obtain a reactive connection, returning the connection - * via a {@link CompletionStage}. + * via a {@link CompletionStage} and overriding the default + * {@link SqlExceptionHelper} for the pool. */ - CompletionStage getConnection(); + ReactiveConnection getProxyConnection(SqlExceptionHelper sqlExceptionHelper); + + /** + * Obtain a reactive connection for the given tenant id, + * returning the connection via a {@link CompletionStage}. + */ + ReactiveConnection getProxyConnection(String tenantId); /** * Obtain a reactive connection, returning the connection @@ -57,6 +70,12 @@ public interface ReactiveConnectionPool extends Service { */ CompletionStage getConnection(String tenantId); + /** + * Obtain a reactive connection, returning the connection + * via a {@link CompletionStage}. + */ + CompletionStage getConnection(); + /** * Obtain a reactive connection for the given tenant id, * returning the connection via a {@link CompletionStage} diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java index 5e2a59517..bfbf8bc76 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/pool/impl/SqlClientPool.java @@ -7,10 +7,12 @@ import java.sql.ResultSet; import java.sql.SQLException; +import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.function.Supplier; import org.hibernate.engine.jdbc.internal.FormatStyle; import org.hibernate.engine.jdbc.spi.SqlExceptionHelper; @@ -26,8 +28,11 @@ import io.vertx.sqlclient.RowSet; import io.vertx.sqlclient.SqlConnection; import io.vertx.sqlclient.Tuple; +import io.vertx.sqlclient.spi.DatabaseMetadata; +import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; /** * A pool of reactive connections backed by a supplier of @@ -72,6 +77,7 @@ public abstract class SqlClientPool implements ReactiveConnectionPool { * subclasses which support multitenancy. * * @param tenantId the id of the tenant + * * @throws UnsupportedOperationException if multitenancy is not supported * @see ReactiveConnectionPool#getConnection(String) */ @@ -79,6 +85,23 @@ protected Pool getTenantPool(String tenantId) { throw new UnsupportedOperationException( "multitenancy not supported by built-in SqlClientPool" ); } + @Override + public ReactiveConnection getProxyConnection() { + return new ProxyConnection( this::getConnection ); + } + + @Override + public ReactiveConnection getProxyConnection(String tenantId) { + return tenantId == null + ? new ProxyConnection( this::getConnection ) + : new ProxyConnection( () -> getConnection( tenantId ) ); + } + + @Override + public ReactiveConnection getProxyConnection(SqlExceptionHelper sqlExceptionHelper) { + return new ProxyConnection( () -> getConnection( sqlExceptionHelper ) ); + } + @Override public CompletionStage getConnection() { return getConnectionFromPool( getPool() ); @@ -143,10 +166,13 @@ private T convertException(T rows, String sql, Throwable sqlException) { if ( sqlException == null ) { return rows; } - if ( sqlException instanceof DatabaseException ) { - DatabaseException de = (DatabaseException) sqlException; + if ( sqlException instanceof DatabaseException de ) { sqlException = getSqlExceptionHelper() - .convert( new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), "error executing SQL statement", sql ); + .convert( + new SQLException( de.getMessage(), de.getSqlState(), de.getErrorCode() ), + "error executing SQL statement", + sql + ); } return rethrow( sqlException ); } @@ -186,4 +212,170 @@ private SqlClientConnection newConnection(SqlConnection connection) { private SqlClientConnection newConnection(SqlConnection connection, SqlExceptionHelper sqlExceptionHelper) { return new SqlClientConnection( connection, getPool(), getSqlStatementLogger(), sqlExceptionHelper ); } + + private static class ProxyConnection implements ReactiveConnection { + private final Supplier> connectionSupplier; + private Integer batchSize; + private ReactiveConnection connection; + + public ProxyConnection(Supplier> connectionSupplier) { + this.connectionSupplier = connectionSupplier; + } + + /** + * @return the existing {@link ReactiveConnection}, or open a new one + */ + CompletionStage connection() { + if ( connection == null ) { + return connectionSupplier.get() + .thenApply( conn -> { + if ( batchSize != null ) { + conn.withBatchSize( batchSize ); + } + connection = conn; + return connection; + } ); + } + return completedFuture( connection ); + } + + @Override + public boolean isTransactionInProgress() { + return connection != null && connection.isTransactionInProgress(); + } + + @Override + public DatabaseMetadata getDatabaseMetadata() { + Objects.requireNonNull( connection, "Database metadata not available until the connection is opened" ); + return connection.getDatabaseMetadata(); + } + + @Override + public CompletionStage execute(String sql) { + return connection().thenCompose( conn -> conn.execute( sql ) ); + } + + @Override + public CompletionStage executeOutsideTransaction(String sql) { + return connection().thenCompose( conn -> conn.executeOutsideTransaction( sql ) ); + } + + @Override + public CompletionStage executeUnprepared(String sql) { + return connection().thenCompose( conn -> conn.executeUnprepared( sql ) ); + } + + @Override + public CompletionStage update(String sql) { + return connection().thenCompose( conn -> conn.update( sql ) ); + } + + @Override + public CompletionStage update(String sql, Object[] paramValues) { + return connection().thenCompose( conn -> conn.update( sql, paramValues ) ); + } + + @Override + public CompletionStage update(String sql, Object[] paramValues, boolean allowBatching, Expectation expectation) { + return connection().thenCompose( conn -> conn.update( sql, paramValues, allowBatching, expectation ) ); + } + + @Override + public CompletionStage update(String sql, List paramValues) { + return connection().thenCompose( conn -> conn.update( sql, paramValues ) ); + } + + @Override + public CompletionStage select(String sql) { + return connection().thenCompose( conn -> conn.select( sql ) ); + } + + @Override + public CompletionStage select(String sql, Object[] paramValues) { + return connection().thenCompose( conn -> conn.select( sql ) ); + } + + @Override + public CompletionStage selectJdbc(String sql, Object[] paramValues) { + return connection().thenCompose( conn -> conn.selectJdbc( sql, paramValues ) ); + } + + @Override + public CompletionStage insertAndSelectIdentifier( + String sql, + Object[] paramValues, + Class idClass, + String idColumnName) { + return connection().thenCompose( conn -> conn + .insertAndSelectIdentifier( sql, paramValues, idClass, idColumnName ) ); + } + + @Override + public CompletionStage insertAndSelectIdentifierAsResultSet( + String sql, + Object[] paramValues, + Class idClass, + String idColumnName) { + return connection().thenCompose( conn -> conn + .insertAndSelectIdentifierAsResultSet( sql, paramValues, idClass, idColumnName ) ); + } + + @Override + public CompletionStage selectJdbc(String sql) { + return connection().thenCompose( conn -> conn.selectJdbc( sql ) ); + } + + @Override + public CompletionStage executeAndSelectGeneratedValues( + String sql, + Object[] paramValues, + List> idClass, + List generatedColumnName) { + return connection().thenCompose( conn -> conn + .executeAndSelectGeneratedValues( sql, paramValues, idClass, generatedColumnName ) ); + } + + @Override + public CompletionStage selectIdentifier(String sql, Object[] paramValues, Class idClass) { + return connection().thenCompose( conn -> conn.selectIdentifier( sql, paramValues, idClass ) ); + } + + @Override + public CompletionStage beginTransaction() { + return connection().thenCompose( ReactiveConnection::beginTransaction ); + } + + @Override + public CompletionStage commitTransaction() { + return connection().thenCompose( ReactiveConnection::commitTransaction ); + } + + @Override + public CompletionStage rollbackTransaction() { + return connection().thenCompose( ReactiveConnection::rollbackTransaction ); + } + + @Override + public ReactiveConnection withBatchSize(int batchSize) { + if ( connection == null ) { + this.batchSize = batchSize; + } + else { + connection = connection.withBatchSize( batchSize ); + } + return this; + } + + @Override + public CompletionStage executeBatch() { + return connection().thenCompose( ReactiveConnection::executeBatch ); + } + + @Override + public CompletionStage close() { + return connection != null + ? connection.close().thenAccept( v -> connection = null ) + : voidFuture(); + } + } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java index 2b74f2d77..0ecd4f266 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/session/impl/ReactiveSessionImpl.java @@ -151,7 +151,6 @@ import static org.hibernate.reactive.util.impl.CompletionStages.nullFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; import static org.hibernate.reactive.util.impl.CompletionStages.returnNullorRethrow; -import static org.hibernate.reactive.util.impl.CompletionStages.returnOrRethrow; import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; @@ -171,10 +170,7 @@ public class ReactiveSessionImpl extends SessionImpl implements ReactiveSession, //Lazily initialized private transient ExceptionConverter exceptionConverter; - public ReactiveSessionImpl( - SessionFactoryImpl delegate, - SessionCreationOptions options, - ReactiveConnection connection) { + public ReactiveSessionImpl(SessionFactoryImpl delegate, SessionCreationOptions options, ReactiveConnection connection) { super( delegate, options ); InternalStateAssertions.assertUseOnEventLoop(); this.associatedWorkThread = Thread.currentThread(); @@ -977,20 +973,13 @@ private CompletionStage fireRemove(DeleteEvent event) { return getFactory().getEventListenerGroups().eventListenerGroup_DELETE .fireEventOnEachListener( event, (ReactiveDeleteEventListener l) -> l::reactiveOnDelete ) - .handle( (v, e) -> { + .handle( CompletionStages::handle ) + .thenCompose( handler -> { delayedAfterCompletion(); - - if ( e instanceof ObjectDeletedException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e ) ); - } - else if ( e instanceof MappingException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); - } - else if ( e instanceof RuntimeException ) { - //including HibernateException - throw getExceptionConverter().convert( (RuntimeException) e ); - } - return returnNullorRethrow( e ); + final Throwable e = handler.getThrowable(); + return e != null + ? failedFuture( convertException( e ) ) + : voidFuture(); } ); } @@ -999,20 +988,13 @@ private CompletionStage fireRemove(DeleteEvent event, DeleteContext transi return getFactory().getEventListenerGroups().eventListenerGroup_DELETE .fireEventOnEachListener( event, transientEntities, (ReactiveDeleteEventListener l) -> l::reactiveOnDelete ) - .handle( (v, e) -> { + .handle( CompletionStages::handle ) + .thenCompose( handler -> { delayedAfterCompletion(); - - if ( e instanceof ObjectDeletedException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e ) ); - } - else if ( e instanceof MappingException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); - } - else if ( e instanceof RuntimeException ) { - //including HibernateException - throw getExceptionConverter().convert( (RuntimeException) e ); - } - return returnNullorRethrow( e ); + final Throwable e = handler.getThrowable(); + return e != null + ? failedFuture( convertException( e ) ) + : voidFuture(); } ); } @@ -1036,42 +1018,45 @@ private CompletionStage fireMerge(MergeEvent event) { return getFactory().getEventListenerGroups().eventListenerGroup_MERGE .fireEventOnEachListener( event, (ReactiveMergeEventListener l) -> l::reactiveOnMerge ) - .handle( (v, e) -> { + .handle( CompletionStages::handle ) + .thenCompose( handler -> { checkNoUnresolvedActionsAfterOperation(); - - if ( e instanceof ObjectDeletedException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e ) ); - } - else if ( e instanceof MappingException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); - } - else if ( e instanceof RuntimeException ) { - //including HibernateException - throw getExceptionConverter().convert( (RuntimeException) e ); - } - return returnOrRethrow( e, (T) event.getResult() ); + final Throwable e = handler.getThrowable(); + return e != null + ? failedFuture( convertException( e ) ) + : completedFuture( (T) event.getResult() ); } ); } + private Throwable convertException(Throwable e) { + if ( e instanceof CompletionException) { + return convertException( e.getCause() ); + } + if ( e instanceof ObjectDeletedException ) { + return getExceptionConverter().convert( new IllegalArgumentException( e ) ); + } + if ( e instanceof MappingException ) { + return getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); + } + if ( e instanceof RuntimeException ) { + //including HibernateException + return getExceptionConverter().convert( (RuntimeException) e ); + } + return e; + } + private CompletionStage fireMerge(MergeContext copiedAlready, MergeEvent event) { pulseTransactionCoordinator(); return getFactory().getEventListenerGroups().eventListenerGroup_MERGE .fireEventOnEachListener( event, copiedAlready,(ReactiveMergeEventListener l) -> l::reactiveOnMerge ) - .handle( (v, e) -> { + .handle( CompletionStages::handle ) + .thenCompose( handler -> { delayedAfterCompletion(); - - if ( e instanceof ObjectDeletedException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e ) ); - } - else if ( e instanceof MappingException ) { - throw getExceptionConverter().convert( new IllegalArgumentException( e.getMessage(), e ) ); - } - else if ( e instanceof RuntimeException ) { - //including HibernateException - throw getExceptionConverter().convert( (RuntimeException) e ); - } - return returnNullorRethrow( e ); + final Throwable e = handler.getThrowable(); + return e != null + ? failedFuture( convertException( e ) ) + : voidFuture(); } ); } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java index dccb3a072..9941bd257 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/Stage.java @@ -1979,6 +1979,55 @@ interface Transaction { */ interface SessionFactory extends AutoCloseable { + /** + * Obtain a new {@link Session reactive session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link Session} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + */ + @Incubating + Session createSession(); + + /** + * Obtain a new {@link Session reactive session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link Session} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + */ + @Incubating + Session createSession(String tenantId); + + /** + * Obtain a new {@link Session reactive session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link Session} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + */ + @Incubating + StatelessSession createStatelessSession(); + + /** + * Obtain a new {@link StatelessSession reactive stateless session}. + *

+ * The underlying database connection is obtained lazily + * when the returned {@link StatelessSession} needs to access the + * database. + *

+ * The client must close the session using {@link Session#close()}. + * @param tenantId the id of the tenant + */ + @Incubating + StatelessSession createStatelessSession(String tenantId); + /** * Obtain a new {@linkplain Session reactive session} {@link CompletionStage}, the main * interaction point between the user's program and Hibernate diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionFactoryImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionFactoryImpl.java index 2c8a3c473..6a04e25ae 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionFactoryImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionFactoryImpl.java @@ -5,7 +5,13 @@ */ package org.hibernate.reactive.stage.impl; -import jakarta.persistence.metamodel.Metamodel; +import java.lang.invoke.MethodHandles; +import java.util.Objects; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + import org.hibernate.Cache; import org.hibernate.engine.creation.internal.SessionBuilderImpl; import org.hibernate.engine.creation.internal.SessionCreationOptions; @@ -21,18 +27,14 @@ import org.hibernate.reactive.logging.impl.LoggerFactory; import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.ReactiveConnectionPool; +import org.hibernate.reactive.session.ReactiveStatelessSession; import org.hibernate.reactive.session.impl.ReactiveSessionImpl; import org.hibernate.reactive.session.impl.ReactiveStatelessSessionImpl; import org.hibernate.reactive.stage.Stage; import org.hibernate.service.ServiceRegistry; import org.hibernate.stat.Statistics; -import java.lang.invoke.MethodHandles; -import java.util.Objects; -import java.util.concurrent.CompletionStage; -import java.util.function.BiFunction; -import java.util.function.Function; -import java.util.function.Supplier; +import jakarta.persistence.metamodel.Metamodel; import static org.hibernate.reactive.util.impl.CompletionStages.completedFuture; import static org.hibernate.reactive.util.impl.CompletionStages.rethrow; @@ -76,6 +78,32 @@ public Context getContext() { return context; } + @Override + public Stage.Session createSession() { + return createSession( getTenantIdentifier( options() ) ); + } + + @Override + public Stage.Session createSession(String tenantId) { + final SessionCreationOptions options = options(); + ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class ); + ReactiveSessionImpl sessionImpl = new ReactiveSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) ); + return new StageSessionImpl( sessionImpl ); + } + + @Override + public Stage.StatelessSession createStatelessSession() { + return createStatelessSession( getTenantIdentifier( options() ) ); + } + + @Override + public Stage.StatelessSession createStatelessSession(String tenantId) { + final SessionCreationOptions options = options(); + ReactiveConnectionPool pool = delegate.getServiceRegistry().getService( ReactiveConnectionPool.class ); + ReactiveStatelessSession sessionImpl = new ReactiveStatelessSessionImpl( delegate, options, pool.getProxyConnection( tenantId ) ); + return new StageStatelessSessionImpl( sessionImpl ); + } + @Override public CompletionStage openSession() { SessionCreationOptions options = options(); @@ -306,7 +334,8 @@ public HibernateCriteriaBuilder getCriteriaBuilder() { } private String getTenantIdentifier(SessionCreationOptions options) { - return options.getTenantIdentifierValue() == null ? null : delegate.getTenantIdentifierJavaType().toString( - options.getTenantIdentifierValue() ); + return options.getTenantIdentifierValue() == null + ? null + : delegate.getTenantIdentifierJavaType().toString( options.getTenantIdentifierValue() ); } } diff --git a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java index daf467b2e..882393ed9 100644 --- a/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java +++ b/hibernate-reactive-core/src/main/java/org/hibernate/reactive/stage/impl/StageSessionImpl.java @@ -391,7 +391,7 @@ public Stage.Session setSubselectFetchingEnabled(boolean enabled) { @Override public CompletionStage withTransaction(Function> work) { - return currentTransaction==null ? new Transaction().execute(work) : work.apply(currentTransaction); + return currentTransaction == null ? new Transaction().execute( work ) : work.apply( currentTransaction ); } private Transaction currentTransaction; diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java index 75d90375e..869d2ab9b 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/BatchingConnectionTest.java @@ -5,13 +5,13 @@ */ package org.hibernate.reactive; - import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.cfg.AvailableSettings; import org.hibernate.cfg.Configuration; import org.hibernate.reactive.mutiny.impl.MutinySessionImpl; import org.hibernate.reactive.mutiny.impl.MutinyStatelessSessionImpl; import org.hibernate.reactive.pool.BatchingConnection; +import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.pool.impl.SqlClientConnection; import org.hibernate.reactive.stage.impl.StageSessionImpl; import org.hibernate.reactive.stage.impl.StageStatelessSessionImpl; @@ -30,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Timeout(value = 10, timeUnit = MINUTES) - public class BatchingConnectionTest extends ReactiveSessionTest { private static SqlStatementTracker sqlTracker; @@ -65,6 +64,27 @@ private static boolean filter(String s) { return false; } + @Override + protected void assertConnectionIsLazy(ReactiveConnection connection) { + assertConnectionIsLazy( connection, false ); + } + + @Override + protected void assertConnectionIsLazy(ReactiveConnection connection, boolean stateless) { + final ReactiveConnection actualConnection; + if ( !stateless ) { + // Only the stateful session creates a batching connection + assertThat( connection ).isInstanceOf( BatchingConnection.class ); + // A little hack, withBatchSize returns the underlying connection when the parameter is less than 1 + actualConnection = connection.withBatchSize( -1 ); + } + else { + actualConnection = connection; + } + assertThat( actualConnection.getClass().getName() ) + .isEqualTo( org.hibernate.reactive.pool.impl.SqlClientPool.class.getName() + "$ProxyConnection" ); + } + @Test public void testBatchingWithPersistAll(VertxTestContext context) { test( context, openSession().thenCompose( s -> s diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java index a41cefd72..5134a32e5 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedIdentityGenerationTest.java @@ -212,7 +212,7 @@ public void start(Promise startPromise) { startPromise.fail( "Thread switch detected!" ); } else { - allResults.deliverResulst( generatedIds ); + allResults.deliverResults( generatedIds ); startPromise.complete(); } } @@ -233,7 +233,7 @@ private static class ResultsCollector { private final ConcurrentMap> resultsByThread = new ConcurrentHashMap<>(); - public void deliverResulst(List generatedIds) { + public void deliverResults(List generatedIds) { final String threadName = Thread.currentThread().getName(); resultsByThread.put( threadName, generatedIds ); } diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java index bd2a36d1e..7fcf1c8c4 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionTest.java @@ -98,15 +98,7 @@ public class MultithreadedInsertionTest { @BeforeAll public static void setupSessionFactory() { - final VertxOptions vertxOptions = new VertxOptions(); - vertxOptions.setEventLoopPoolSize( N_THREADS ); - //We relax the blocked thread checks as we'll actually use latches to block them - //intentionally for the purpose of the test; functionally this isn't required - //but it's useful as self-test in the design of this, to ensure that the way - //things are setup are indeed being run in multiple, separate threads. - vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES ); - vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES ); - vertx = Vertx.vertx( vertxOptions ); + vertx = Vertx.vertx( getVertxOptions() ); Configuration configuration = new Configuration(); setDefaultProperties( configuration ); configuration.addAnnotatedClass( EntityWithGeneratedId.class ); @@ -121,6 +113,18 @@ public static void setupSessionFactory() { stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class ); } + private static VertxOptions getVertxOptions() { + final VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setEventLoopPoolSize( N_THREADS ); + //We relax the blocked thread checks as we'll actually use latches to block them + //intentionally for the purpose of the test; functionally this isn't required, + //but it's useful as self-test in the design of this, to ensure that the way + //things are set up are indeed being run in multiple, separate threads. + vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES ); + vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES ); + return vertxOptions; + } + @AfterAll public static void closeSessionFactory() { stageSessionFactory.close(); @@ -158,10 +162,12 @@ public void start(Promise startPromise) { .whenComplete( (o, throwable) -> { endLatch.reached(); if ( throwable != null ) { + prettyOut( throwable.getMessage() ); startPromise.fail( throwable ); } else { if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) { + prettyOut( "Thread switch detected. Expecting " + initialThreadName + ", actual " + Thread.currentThread().getName() ); startPromise.fail( "Thread switch detected!" ); } else { @@ -203,7 +209,7 @@ public void stop() { */ @Entity @Table(name="Entity") - private static class EntityWithGeneratedId { + public static class EntityWithGeneratedId { @Id @GeneratedValue Long id; diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java new file mode 100644 index 000000000..2948cce27 --- /dev/null +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/MultithreadedInsertionWithLazyConnectionTest.java @@ -0,0 +1,278 @@ +/* Hibernate, Relational Persistence for Idiomatic Java + * + * SPDX-License-Identifier: Apache-2.0 + * Copyright: Red Hat Inc. and Hibernate Authors + */ +package org.hibernate.reactive; + +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.hibernate.SessionFactory; +import org.hibernate.boot.registry.StandardServiceRegistry; +import org.hibernate.boot.registry.StandardServiceRegistryBuilder; +import org.hibernate.cfg.Configuration; +import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder; +import org.hibernate.reactive.stage.Stage; +import org.hibernate.reactive.util.impl.CompletionStages; +import org.hibernate.reactive.vertx.VertxInstance; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.extension.ExtendWith; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.junit5.Timeout; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.assertj.core.api.Assertions.fail; +import static org.hibernate.cfg.AvailableSettings.SHOW_SQL; +import static org.hibernate.reactive.BaseReactiveTest.setDefaultProperties; +import static org.hibernate.reactive.provider.Settings.POOL_CONNECT_TIMEOUT; +import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture; +import static org.hibernate.reactive.util.impl.CompletionStages.loop; +import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; + +/** + * This is a multi-threaded stress test, intentionally consuming some time + * that also opens the connection lazily. + * The purpose is to verify that the sequence optimizer used by Hibernate Reactive + * is indeed able to generate unique IDs backed by the database sequences, while + * running multiple operations in different threads and on multiple Vert.x eventloops. + * This is very similar to MultithreadedIdentityGenerationTest except it models + * the full operations including the insert statements, while the latter focuses + * on the generated IDs to be unique; it's useful to maintain both tests as: + * - ID generation needs to be unique so it's good to stress that aspect + * in isolation + * - insert operations are downstream events, so this allows us to test that + * such downstream events are not being unintentionally duplicated/dropped, + * which could actually happen when the id generator triggers unintended + * threading behaviours. + * + * N.B. We actually had a case in which the IDs were uniquely generated but the + * downstream event was being processed twice (or more) concurrently, so it's + * useful to have both integration tests. + * + * A typical reactive application will not require multiple threads, but we + * specifically want to test for the case in which the single ID source is being + * shared across multiple threads and also multiple eventloops. + * @see MultithreadedInsertionTest + */ +@ExtendWith(VertxExtension.class) +@TestInstance(TestInstance.Lifecycle.PER_METHOD) +@Timeout(value = MultithreadedInsertionWithLazyConnectionTest.TIMEOUT_MINUTES, timeUnit = MINUTES) +public class MultithreadedInsertionWithLazyConnectionTest { + + /** + * The number of threads should be higher than the default size of the connection pool so that + * this test is also effective in detecting problems with resource starvation. + */ + private static final int N_THREADS = 12; + private static final int ENTITIES_STORED_PER_THREAD = 2000; + + //Should finish much sooner, but generating this amount of IDs could be slow on some CIs + public static final int TIMEOUT_MINUTES = 10; + + // Keeping this disabled because it generates a lot of queries + private static final boolean LOG_SQL = false; + + /** + * If true, it will print info about the threads + */ + private static final boolean THREAD_PRETTY_MSG = true; + + private static final Latch startLatch = new Latch( "start", N_THREADS ); + private static final Latch endLatch = new Latch( "end", N_THREADS ); + + private static Stage.SessionFactory stageSessionFactory; + private static Vertx vertx; + private static SessionFactory sessionFactory; + + @BeforeAll + public static void setupSessionFactory() { + vertx = Vertx.vertx( getVertxOptions() ); + Configuration configuration = new Configuration(); + setDefaultProperties( configuration ); + configuration.addAnnotatedClass( EntityWithGeneratedId.class ); + configuration.setProperty( SHOW_SQL, String.valueOf( LOG_SQL ) ); + configuration.setProperty( POOL_CONNECT_TIMEOUT, String.valueOf( TIMEOUT_MINUTES * 60 * 1000 ) ); + StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder() + .applySettings( configuration.getProperties() ) + //Inject our custom vert.x instance: + .addService( VertxInstance.class, () -> vertx ); + StandardServiceRegistry registry = builder.build(); + sessionFactory = configuration.buildSessionFactory( registry ); + stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class ); + } + + private static VertxOptions getVertxOptions() { + final VertxOptions vertxOptions = new VertxOptions(); + vertxOptions.setEventLoopPoolSize( N_THREADS ); + //We relax the blocked thread checks as we'll actually use latches to block them + //intentionally for the purpose of the test; functionally this isn't required + //but it's useful as self-test in the design of this, to ensure that the way + //things are setup are indeed being run in multiple, separate threads. + vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES ); + vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES ); + return vertxOptions; + } + + @AfterAll + public static void closeSessionFactory() { + stageSessionFactory.close(); + } + + @Test + public void testIdentityGenerator(VertxTestContext context) { + final DeploymentOptions deploymentOptions = new DeploymentOptions(); + deploymentOptions.setInstances( N_THREADS ); + + vertx + .deployVerticle( InsertEntitiesVerticle::new, deploymentOptions ) + .onSuccess( res -> { + endLatch.waitForEveryone(); + context.completeNow(); + } ) + .onFailure( context::failNow ) + .eventually( () -> vertx.close() ); + } + + private static class InsertEntitiesVerticle extends AbstractVerticle { + + int sequentialOperation = 0; + + public InsertEntitiesVerticle() { + } + + @Override + public void start(Promise startPromise) { + startLatch.reached(); + startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism + final String initialThreadName = Thread.currentThread().getName(); + final Stage.Session session = stageSessionFactory.createSession(); + storeMultipleEntities( session ) + .handle( CompletionStages::handle ) + .thenCompose( handler -> session + .close() + .thenCompose( handler::getResultAsCompletionStage ) + ) + .whenComplete( (o, throwable) -> { + endLatch.reached(); + if ( throwable != null ) { + prettyOut( throwable.getMessage() ); + startPromise.fail( throwable ); + } + else { + if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) { + prettyOut( "Thread switch detected. Expecting " + initialThreadName + ", actual " + Thread.currentThread().getName() ); + startPromise.fail( "Thread switch detected!" ); + } + else { + startPromise.complete(); + } + } + } ); + } + + private CompletionStage storeMultipleEntities(Stage.Session s) { + return loop( 0, ENTITIES_STORED_PER_THREAD, index -> storeEntity( s ) ); + } + + private CompletionStage storeEntity(Stage.Session s) { + final Thread beforeOperationThread = Thread.currentThread(); + final int localVerticleOperationSequence = sequentialOperation++; + final EntityWithGeneratedId entity = new EntityWithGeneratedId(); + entity.name = beforeOperationThread + "__" + localVerticleOperationSequence; + + return s + .withTransaction( t -> s.persist( entity ) ) + .thenCompose( v -> beforeOperationThread != Thread.currentThread() + ? failedFuture( new IllegalStateException( "Detected an unexpected switch of carrier threads!" ) ) + : voidFuture() ); + } + + @Override + public void stop() { + prettyOut( "Verticle stopped " + super.toString() ); + } + } + + /** + * Trivial entity using default id generation + */ + @Entity + @Table(name = "Entity") + private static class EntityWithGeneratedId { + @Id + @GeneratedValue + Long id; + + String name; + + public EntityWithGeneratedId() { + } + } + + /** + * Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design + */ + private static final class Latch { + private final String label; + private final CountDownLatch countDownLatch; + + public Latch(String label, int membersCount) { + this.label = label; + this.countDownLatch = new CountDownLatch( membersCount ); + } + + public void reached() { + final long count = countDownLatch.getCount(); + countDownLatch.countDown(); + prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) ); + } + + public void waitForEveryone() { + try { + boolean reachedZero = countDownLatch.await( TIMEOUT_MINUTES, MINUTES ); + if ( reachedZero ) { + prettyOut( "Everyone has now breached '" + label + "'" ); + } + else { + fail( "Time out reached" ); + } + } + catch ( InterruptedException e ) { + fail( e ); + } + } + } + + private static void prettyOut(final String message) { + if ( THREAD_PRETTY_MSG ) { + final String threadName = Thread.currentThread().getName(); + final long l = System.currentTimeMillis(); + final long seconds = ( l / 1000 ) - initialSecond; + //We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision + //as it's not very relevant to see exactly how long each stage took (it's actually distracting) + //but it's more useful to group things coarsely when some lock or timeout introduces a significant + //divide between some operations (when a starvation or timeout happens it takes some seconds). + System.out.println( seconds + " - " + threadName + ": " + message ); + } + } + + private static final long initialSecond = ( System.currentTimeMillis() / 1000 ); + +} diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveMultitenantTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveMultitenantTest.java index 4173668be..15e41716a 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveMultitenantTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveMultitenantTest.java @@ -10,8 +10,9 @@ import org.hibernate.LockMode; import org.hibernate.cfg.AvailableSettings; import org.hibernate.cfg.Configuration; -import org.hibernate.reactive.provider.Settings; import org.hibernate.reactive.annotations.EnabledFor; +import org.hibernate.reactive.provider.Settings; +import org.hibernate.reactive.stage.Stage; import org.junit.jupiter.api.Test; @@ -23,14 +24,11 @@ import jakarta.persistence.Version; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; import static org.hibernate.reactive.MyCurrentTenantIdentifierResolver.Tenant.DEFAULT; import static org.hibernate.reactive.MyCurrentTenantIdentifierResolver.Tenant.TENANT_1; import static org.hibernate.reactive.MyCurrentTenantIdentifierResolver.Tenant.TENANT_2; import static org.hibernate.reactive.containers.DatabaseConfiguration.DBType.POSTGRESQL; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; /** * This class creates multiple additional databases so that we can check that queries run @@ -46,10 +44,8 @@ public class ReactiveMultitenantTest extends BaseReactiveTest { protected Configuration constructConfiguration() { Configuration configuration = super.constructConfiguration(); configuration.addAnnotatedClass( GuineaPig.class ); - configuration.setProperty( - AvailableSettings.MULTI_TENANT_CONNECTION_PROVIDER, - "anything" - );//FIXME this is terrible? + // FIXME this is terrible? + configuration.setProperty( AvailableSettings.MULTI_TENANT_CONNECTION_PROVIDER, "anything" ); configuration.getProperties().put( Settings.MULTI_TENANT_IDENTIFIER_RESOLVER, TENANT_RESOLVER ); // Contains the SQL scripts for the creation of the additional databases configuration.setProperty( Settings.HBM2DDL_IMPORT_FILES, "/multitenancy-test.sql" ); @@ -61,27 +57,27 @@ protected Configuration constructConfiguration() { public void reactivePersistFindDelete(VertxTestContext context) { TENANT_RESOLVER.setTenantIdentifier( DEFAULT ); final GuineaPig guineaPig = new GuineaPig( 5, "Aloi" ); - test( - context, - getSessionFactory().openSession() - .thenCompose( session -> session.withTransaction( t -> session - .persist( guineaPig ) - .thenCompose( v -> session.flush() ) - .thenAccept( v -> session.detach( guineaPig ) ) - .thenAccept( v -> assertFalse( session.contains( guineaPig ) ) ) - .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) - .thenAccept( actualPig -> { - assertThatPigsAreEqual( guineaPig, actualPig ); - assertTrue( session.contains( actualPig ) ); - assertFalse( session.contains( guineaPig ) ); - assertEquals( LockMode.READ, session.getLockMode( actualPig ) ); - session.detach( actualPig ); - assertFalse( session.contains( actualPig ) ); - } ) - .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) - .thenCompose( session::remove ) - .thenCompose( v -> session.flush() ) ) - ) + test( context, openSession() + .thenCompose( session -> session + .persist( guineaPig ) + .thenCompose( v -> session.flush() ) + .thenAccept( v -> session.detach( guineaPig ) ) + .thenAccept( v -> assertThat( session.contains( guineaPig ) ).isFalse() ) + .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) + .thenAccept( actualPig -> { + assertThat( actualPig ).isNotNull(); + assertThat( actualPig.getId() ).isEqualTo( guineaPig.getId() ); + assertThat( actualPig.getName() ).isEqualTo( guineaPig.getName() ); + assertThat( session.contains( actualPig ) ).isTrue(); + assertThat( session.contains( guineaPig ) ).isFalse(); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.READ ); + session.detach( actualPig ); + assertThat( session.contains( actualPig ) ).isFalse(); + } ) + .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) + .thenCompose( session::remove ) + .thenCompose( v -> session.flush() ) + ) ); } @@ -92,13 +88,34 @@ public void testTenantSelection(VertxTestContext context) { .thenCompose( session -> session .createNativeQuery( "select current_database()" ) .getSingleResult() - .thenAccept( result -> assertEquals( TENANT_1.getDbName(), result ) ) ) + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_1.getDbName() ) ) ) .thenAccept( unused -> TENANT_RESOLVER.setTenantIdentifier( TENANT_2 ) ) .thenCompose( unused -> openSession() ) .thenCompose( session -> session .createNativeQuery( "select current_database()" ) .getSingleResult() - .thenAccept( result -> assertEquals( TENANT_2.getDbName(), result ) ) ) + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_2.getDbName() ) ) ) + ); + } + + @Test + public void testTenantSelectionWithProxy(VertxTestContext context) { + TENANT_RESOLVER.setTenantIdentifier( TENANT_1 ); + Stage.Session t1Session = getSessionFactory().createSession(); + test( + context, t1Session + .createNativeQuery( "select current_database()" ) + .getSingleResult() + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_1.getDbName() ) ) + .thenCompose( v -> t1Session.close() ) + .thenAccept( v -> TENANT_RESOLVER.setTenantIdentifier( TENANT_2 ) ) + .thenApply( v -> getSessionFactory().createSession() ) + .thenCompose( t2Session -> t2Session + .createNativeQuery( "select current_database()" ) + .getSingleResult() + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_2.getDbName() ) ) + .thenCompose( v -> t2Session.close() ) + ) ); } @@ -109,14 +126,14 @@ public void testTenantSelectionStatelessSession(VertxTestContext context) { .thenCompose( t1Session -> t1Session .createNativeQuery( "select current_database()" ) .getSingleResult() - .thenAccept( result -> assertEquals( TENANT_1.getDbName(), result ) ) + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_1.getDbName() ) ) .thenCompose( unused -> t1Session.close() ) ) .thenAccept( unused -> TENANT_RESOLVER.setTenantIdentifier( TENANT_2 ) ) .thenCompose( v -> getSessionFactory().openStatelessSession() ) .thenCompose( t2Session -> t2Session .createNativeQuery( "select current_database()" ) .getSingleResult() - .thenAccept( result -> assertEquals( TENANT_2.getDbName(), result ) ) + .thenAccept( result -> assertThat( result ).isEqualTo( TENANT_2.getDbName() ) ) .thenCompose( v -> t2Session.close() ) ) ); } @@ -124,26 +141,21 @@ public void testTenantSelectionStatelessSession(VertxTestContext context) { @Test public void testTenantSelectionStatelessSessionMutiny(VertxTestContext context) { TENANT_RESOLVER.setTenantIdentifier( TENANT_1 ); - test( context, getMutinySessionFactory().withStatelessSession( t1Session -> - t1Session - .createNativeQuery( "select current_database()" ) - .getSingleResult() - .invoke( result -> assertEquals( TENANT_1.getDbName(), result ) ) - ) + test( context, getMutinySessionFactory() + .withStatelessSession( t1Session -> t1Session + .createNativeQuery( "select current_database()" ) + .getSingleResult() + .invoke( result -> assertThat( result ).isEqualTo( TENANT_1.getDbName() ) ) + ) .invoke( result -> TENANT_RESOLVER.setTenantIdentifier( TENANT_2 ) ) .chain( () -> getMutinySessionFactory().withStatelessSession( t2Session -> t2Session .createNativeQuery( "select current_database()" ) .getSingleResult() - .invoke( result -> assertEquals( TENANT_2.getDbName(), result ) ) ) ) + .invoke( result -> assertThat( result ).isEqualTo( TENANT_2.getDbName() ) ) + ) ) ); } - private void assertThatPigsAreEqual( GuineaPig expected, GuineaPig actual) { - assertNotNull( actual ); - assertEquals( expected.getId(), actual.getId() ); - assertEquals( expected.getName(), actual.getName() ); - } - @Entity(name = "GuineaPig") @Table(name = "Pig") public static class GuineaPig { diff --git a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java index b471ab1b0..d83cb64cd 100644 --- a/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java +++ b/hibernate-reactive-core/src/test/java/org/hibernate/reactive/ReactiveSessionTest.java @@ -10,12 +10,17 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; +import org.hibernate.HibernateException; import org.hibernate.LockMode; import org.hibernate.reactive.common.AffectedEntities; +import org.hibernate.reactive.mutiny.Mutiny; +import org.hibernate.reactive.mutiny.impl.MutinySessionImpl; +import org.hibernate.reactive.mutiny.impl.MutinyStatelessSessionImpl; +import org.hibernate.reactive.pool.ReactiveConnection; import org.hibernate.reactive.stage.Stage; +import org.hibernate.reactive.stage.impl.StageSessionImpl; +import org.hibernate.reactive.stage.impl.StageStatelessSessionImpl; - -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -31,8 +36,8 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThat; +import static org.hibernate.reactive.testing.ReactiveAssertions.assertThrown; import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture; -import static org.junit.jupiter.api.Assertions.*; @Timeout(value = 10, timeUnit = MINUTES) public class ReactiveSessionTest extends BaseReactiveTest { @@ -56,6 +61,104 @@ private CompletionStage selectNameFromId(Integer id) { ); } + @Test + public void reactivePersistFindRemoveWithSessionProxy(VertxTestContext context) { + final GuineaPig guineaPig = new GuineaPig( 5, "Aloi" ); + Stage.Session session = getSessionFactory().createSession(); + assertConnectionIsLazy( ( (StageSessionImpl) session ).getReactiveConnection() ); + session.setBatchSize( 55 ); + + test( context, session + .persist( guineaPig ) + .thenCompose( v -> session.flush() ) + .thenAccept( v -> session.detach( guineaPig ) ) + .thenAccept( v -> assertThat( session.contains( guineaPig ) ).isFalse() ) + .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( guineaPig, actualPig ); + assertThat( session.contains( actualPig ) ).isTrue(); + assertThat( session.contains( guineaPig ) ).isFalse(); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.READ ); + assertThat( session.getBatchSize() ).isEqualTo( 55 ); + session.detach( actualPig ); + assertThat( session.contains( actualPig ) ).isFalse(); + } ) + .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) + .thenCompose( session::remove ) + .thenCompose( v -> session.flush() ) + .thenCompose( v -> session.close() ) + ); + } + + @Test + public void reactiveInsertGetDeleteWithStatelessSessionProxy(VertxTestContext context) { + final GuineaPig guineaPig = new GuineaPig( 5, "Aloi" ); + Stage.StatelessSession session = getSessionFactory().createStatelessSession(); + assertConnectionIsLazy( ( (StageStatelessSessionImpl) session ).getReactiveConnection(), true ); + test( context, session + .insert( guineaPig ) + .thenCompose( v -> session.get( GuineaPig.class, guineaPig.getId() ) ) + .thenAccept( actualPig -> assertThatPigsAreEqual( guineaPig, actualPig ) ) + .thenCompose( v -> session.get( GuineaPig.class, guineaPig.getId() ) ) + .thenCompose( session::delete ) + .thenCompose( v -> session.close() ) + ); + } + + @Test + public void reactivePersistFindRemoveWithSessionProxyAndMutiny(VertxTestContext context) { + final GuineaPig guineaPig = new GuineaPig( 5, "Aloi" ); + Mutiny.Session session = getMutinySessionFactory().createSession(); + assertConnectionIsLazy( ( (MutinySessionImpl) session ).getReactiveConnection() ); + session.setBatchSize( 55 ); + test( context, session + .persist( guineaPig ) + .call( session::flush ) + .chain( () -> { + session.detach( guineaPig ); + assertThat( session.contains( guineaPig ) ).isFalse(); + return session.find( GuineaPig.class, guineaPig.getId() ); + } ) + .chain( actualPig -> { + assertThatPigsAreEqual( guineaPig, actualPig ); + assertThat( session.contains( actualPig ) ).isTrue(); + assertThat( session.contains( guineaPig ) ).isFalse(); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.READ ); + assertThat( session.getBatchSize() ).isEqualTo( 55 ); + session.detach( actualPig ); + assertThat( session.contains( actualPig ) ).isFalse(); + return session.find( GuineaPig.class, guineaPig.getId() ); + } ) + .chain( session::remove ) + .call( session::flush ) + .eventually( session::close ) + ); + } + + protected void assertConnectionIsLazy(ReactiveConnection connection, boolean stateless) { + assertConnectionIsLazy( connection ); + } + + protected void assertConnectionIsLazy(ReactiveConnection connection) { + assertThat( connection.getClass().getName() ) + .isEqualTo( org.hibernate.reactive.pool.impl.SqlClientPool.class.getName() + "$ProxyConnection" ); + } + + @Test + public void reactiveInsertGetDeleteWithStatelessSessionProxyAndMutiny(VertxTestContext context) { + final GuineaPig guineaPig = new GuineaPig( 5, "Aloi" ); + Mutiny.StatelessSession session = getMutinySessionFactory().createStatelessSession(); + assertConnectionIsLazy( ( (MutinyStatelessSessionImpl) session ).getReactiveConnection(), true ); + test( context, session + .insert( guineaPig ) + .chain( () -> session.get( GuineaPig.class, guineaPig.getId() ) ) + .invoke( actualPig -> assertThatPigsAreEqual( guineaPig, actualPig ) ) + .chain( () -> session.get( GuineaPig.class, guineaPig.getId() ) ) + .call( session::delete ) + .eventually( session::close ) + ); + } + @Test public void reactiveFind(VertxTestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); @@ -66,11 +169,11 @@ public void reactiveFind(VertxTestContext context) { .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertTrue( session.contains( actualPig ) ); - assertFalse( session.contains( expectedPig ) ); - assertEquals( LockMode.READ, session.getLockMode( actualPig ) ); + assertThat( session.contains( actualPig ) ).isTrue(); + assertThat( session.contains( expectedPig ) ).isFalse(); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.READ ); session.detach( actualPig ); - assertFalse( session.contains( actualPig ) ); + assertThat( session.contains( actualPig ) ).isFalse(); } ) ) ); @@ -86,9 +189,7 @@ context, populateDB() .thenCompose( v -> getSessionFactory().withTransaction( s -> s .find( GuineaPig.class, emma.getId(), rump.getId() ) ) ) - .thenAccept( pigs -> { - org.assertj.core.api.Assertions.assertThat( pigs ).containsExactlyInAnyOrder( emma, rump ); - } ) + .thenAccept( pigs -> assertThat( pigs ).containsExactlyInAnyOrder( emma, rump ) ) ); } @@ -144,15 +245,15 @@ public void reactivePersistFindDelete(VertxTestContext context) { .persist( guineaPig ) .thenCompose( v -> session.flush() ) .thenAccept( v -> session.detach( guineaPig ) ) - .thenAccept( v -> assertFalse( session.contains( guineaPig ) ) ) + .thenAccept( v -> assertThat( session.contains( guineaPig ) ).isFalse() ) .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) .thenAccept( actualPig -> { assertThatPigsAreEqual( guineaPig, actualPig ); - assertTrue( session.contains( actualPig ) ); - assertFalse( session.contains( guineaPig ) ); - assertEquals( LockMode.READ, session.getLockMode( actualPig ) ); + assertThat( session.contains( actualPig ) ).isTrue(); + assertThat( session.contains( guineaPig ) ).isFalse(); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.READ ); session.detach( actualPig ); - assertFalse( session.contains( actualPig ) ); + assertThat( session.contains( actualPig ) ).isFalse(); } ) .thenCompose( v -> session.find( GuineaPig.class, guineaPig.getId() ) ) .thenCompose( session::remove ) @@ -170,7 +271,7 @@ context, populateDB().thenCompose( v -> getSessionFactory() .find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_WRITE ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_WRITE ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); } ) ) ) ); @@ -188,10 +289,7 @@ context, populateDB() .refresh( pig, LockMode.PESSIMISTIC_WRITE ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, pig ); - assertEquals( - session.getLockMode( pig ), - LockMode.PESSIMISTIC_WRITE - ); + assertThat( session.getLockMode( pig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); } ) ) ) ) @@ -212,8 +310,8 @@ public void reactiveFindReadOnlyRefreshWithLock(VertxTestContext context) { return session.flush() .thenCompose( v -> session.refresh( pig ) ) .thenAccept( v -> { - assertEquals( expectedPig.name, pig.name ); - assertTrue( session.isReadOnly( pig ) ); + assertThat( expectedPig.getName() ).isEqualTo( pig.getName() ); + assertThat( session.isReadOnly( pig ) ).isTrue(); } ); } ) ) @@ -225,8 +323,8 @@ public void reactiveFindReadOnlyRefreshWithLock(VertxTestContext context) { return session.flush() .thenCompose( v -> session.refresh( pig ) ) .thenAccept( v -> { - assertEquals( "XXXX", pig.name ); - assertFalse( session.isReadOnly( pig ) ); + assertThat( "XXXX" ).isEqualTo( pig.getName() ); + assertThat( session.isReadOnly( pig ) ).isFalse(); } ); } ) ) @@ -245,10 +343,7 @@ context, populateDB() .lock( pig, LockMode.PESSIMISTIC_READ ) .thenAccept( v -> { assertThatPigsAreEqual( expectedPig, pig ); - assertEquals( - session.getLockMode( pig ), - LockMode.PESSIMISTIC_READ - ); + assertThat( session.getLockMode( pig ) ).isEqualTo( LockMode.PESSIMISTIC_READ ); } ) ) ) @@ -267,8 +362,8 @@ context, populateDB().thenCompose( v -> getSessionFactory() .lock( pig, LockMode.PESSIMISTIC_WRITE ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, pig ); - assertEquals( session.getLockMode( pig ), LockMode.PESSIMISTIC_WRITE ); - assertEquals( pig.version, 0 ); + assertThat( session.getLockMode( pig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); + assertThat( pig.version ).isEqualTo( 0 ); } ) ) ) ) @@ -287,15 +382,12 @@ public void reactiveFindThenForceLock(VertxTestContext context) { .thenApply( v -> pig ) ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( - session.getLockMode( actualPig ), - LockMode.PESSIMISTIC_FORCE_INCREMENT - ); - assertEquals( actualPig.version, 1 ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 1 ); } ) .thenCompose( v -> session.createSelectionQuery( "select version from GuineaPig", Integer.class ) .getSingleResult() ) - .thenAccept( version -> assertEquals( 1, version ) ) + .thenAccept( version -> assertThat( version ).isEqualTo( 1 ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) @@ -303,16 +395,13 @@ public void reactiveFindThenForceLock(VertxTestContext context) { .thenApply( v -> pig ) ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( - session.getLockMode( actualPig ), - LockMode.PESSIMISTIC_FORCE_INCREMENT - ); - assertEquals( actualPig.version, 2 ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 2 ); } ) .thenCompose( v -> session .createSelectionQuery( "select version from GuineaPig", Integer.class ) .getSingleResult() ) - .thenAccept( version -> assertEquals( 2, version ) ) + .thenAccept( version -> assertThat( version ).isEqualTo( 2 ) ) ) ); } @@ -327,13 +416,13 @@ public void reactiveFindWithPessimisticIncrementLock(VertxTestContext context) { .withTransaction( session -> session.find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_FORCE_INCREMENT ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.PESSIMISTIC_FORCE_INCREMENT, session.getLockMode( actualPig ) ); // grrr, lame - assertEquals( 1, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 1 ); } ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 1, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 1 ) ) ); } @@ -341,48 +430,41 @@ public void reactiveFindWithPessimisticIncrementLock(VertxTestContext context) { public void reactiveFindWithOptimisticIncrementLock(VertxTestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); test( context, populateDB() - .thenCompose( v -> getSessionFactory() - .withTransaction( session -> session - .find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC_FORCE_INCREMENT ) - .thenAccept( actualPig -> { - assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.OPTIMISTIC_FORCE_INCREMENT, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); - } ) - ) + .thenCompose( v -> getSessionFactory().withTransaction( session -> session + .find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC_FORCE_INCREMENT ) + .thenAccept( actualPig -> { + assertThatPigsAreEqual( expectedPig, actualPig ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.OPTIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 0 ); + } ) + ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 1, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 1 ) ) ); } @Test public void reactiveLockWithOptimisticIncrement(VertxTestContext context) { final GuineaPig expectedPig = new GuineaPig( 5, "Aloi" ); - test( - context, - populateDB() - .thenCompose( v -> getSessionFactory().withTransaction( - (session, transaction) -> session.find( GuineaPig.class, expectedPig.getId() ) - .thenCompose( actualPig -> session.lock( - actualPig, - LockMode.OPTIMISTIC_FORCE_INCREMENT - ) - .thenAccept( vv -> { - assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( - session.getLockMode( actualPig ), - LockMode.OPTIMISTIC_FORCE_INCREMENT - ); - assertEquals( 0, actualPig.version ); - } ) - ) - ) + test( context, populateDB() + .thenCompose( v -> getSessionFactory() + .withTransaction( session -> session + .find( GuineaPig.class, expectedPig.getId() ) + .thenCompose( actualPig -> session + .lock( actualPig, LockMode.OPTIMISTIC_FORCE_INCREMENT ) + .thenAccept( vv -> { + assertThatPigsAreEqual( expectedPig, actualPig ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.OPTIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 0 ); + } ) + ) ) - .thenCompose( v -> openSession() ) - .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 1, actualPig.version ) ) + ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 1 ) ) ); } @@ -400,18 +482,15 @@ public void reactiveLockWithIncrement(VertxTestContext context) { ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( - session.getLockMode( actualPig ), - LockMode.PESSIMISTIC_FORCE_INCREMENT - ); - assertEquals( 1, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_FORCE_INCREMENT ); + assertThat( actualPig.version ).isEqualTo( 1 ); } ) ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 1, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 1 ) ) ); } @@ -426,12 +505,12 @@ public void reactiveFindWithOptimisticVerifyLock(VertxTestContext context) { .find( GuineaPig.class, expectedPig.getId(), LockMode.OPTIMISTIC ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.OPTIMISTIC, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.OPTIMISTIC ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -446,12 +525,12 @@ public void reactiveLockWithOptimisticVerify(VertxTestContext context) { .thenCompose( actualPig -> session.lock( actualPig, LockMode.OPTIMISTIC ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.OPTIMISTIC, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.OPTIMISTIC ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -467,12 +546,12 @@ public void reactiveFindWithPessimisticRead(VertxTestContext context) { .find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_READ ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( session.getLockMode( actualPig ), LockMode.PESSIMISTIC_READ ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_READ ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -489,12 +568,12 @@ public void reactiveLockWithPessimisticRead(VertxTestContext context) { .thenCompose( actualPig -> session.lock( actualPig, LockMode.PESSIMISTIC_READ ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.PESSIMISTIC_READ, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_READ ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -510,12 +589,12 @@ public void reactiveFindWithPessimisticWrite(VertxTestContext context) { .find( GuineaPig.class, expectedPig.getId(), LockMode.PESSIMISTIC_WRITE ) .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.PESSIMISTIC_WRITE, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -532,12 +611,12 @@ public void reactiveLockWithPessimisticWrite(VertxTestContext context) { .thenCompose( actualPig -> session.lock( actualPig, LockMode.PESSIMISTIC_WRITE ) .thenAccept( vv -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.PESSIMISTIC_WRITE, session.getLockMode( actualPig ) ); - assertEquals( 0, actualPig.version ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); + assertThat( actualPig.version ).isEqualTo( 0 ); } ) ) ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, expectedPig.getId() ) ) - .thenAccept( actualPig -> assertEquals( 0, actualPig.version ) ) + .thenAccept( actualPig -> assertThat( actualPig.version ).isEqualTo( 0 ) ) ); } @@ -554,7 +633,7 @@ public void reactiveQueryWithLock(VertxTestContext context) { .getSingleResult() .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( LockMode.PESSIMISTIC_WRITE, session.getLockMode( actualPig ) ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); } ) ) ) ); } @@ -571,10 +650,7 @@ context, populateDB() .getSingleResult() .thenAccept( actualPig -> { assertThatPigsAreEqual( expectedPig, actualPig ); - assertEquals( - LockMode.PESSIMISTIC_WRITE, - session.getLockMode( actualPig ) - ); + assertThat( session.getLockMode( actualPig ) ).isEqualTo( LockMode.PESSIMISTIC_WRITE ); } ) ) ) @@ -591,7 +667,7 @@ public void reactivePersist(VertxTestContext context) { .thenCompose( v -> s.close() ) ) .thenCompose( v -> selectNameFromId( 10 ) ) - .thenAccept( selectRes -> assertEquals( "Tulip", selectRes ) ) + .thenAccept( selectRes -> assertThat( selectRes ).isEqualTo( "Tulip" ) ) ); } @@ -604,80 +680,70 @@ public void reactivePersistInTx(VertxTestContext context) { .withTransaction( t -> s.persist( new GuineaPig( 10, "Tulip" ) ) ) .thenCompose( v -> s.close() ) ) .thenCompose( vv -> selectNameFromId( 10 ) ) - .thenAccept( selectRes -> assertEquals( "Tulip", selectRes ) ) + .thenAccept( selectRes -> assertThat( selectRes ).isEqualTo( "Tulip" ) ) ); } @Test public void reactiveRollbackTx(VertxTestContext context) { - test( - context, - openSession() - .thenCompose( s -> s - .withTransaction( t -> s - .persist( new GuineaPig( 10, "Tulip" ) ) - .thenCompose( v -> s.flush() ) - .thenAccept( v -> { - throw new RuntimeException( "No Panic: This is just a test" ); - } ) - ) - .thenCompose( v -> s.close() ) + test( context, openSession() + .thenCompose( s -> s + .withTransaction( t -> s + .persist( new GuineaPig( 10, "Tulip" ) ) + .thenCompose( v -> s.flush() ) + .thenAccept( v -> { + throw new RuntimeException( "No Panic: This is just a test" ); + } ) ) - .handle( (v, e) -> null ) - .thenCompose( vv -> selectNameFromId( 10 ) ) - .thenAccept( Assertions::assertNull ) + .thenCompose( v -> s.close() ) + ) + .handle( (v, e) -> null ) + .thenCompose( vv -> selectNameFromId( 10 ) ) + .thenAccept( result -> assertThat( result ).isNull() ) ); } @Test public void reactiveMarkedRollbackTx(VertxTestContext context) { - test( - context, openSession() - .thenCompose( s -> s - .withTransaction( t -> s - .persist( new GuineaPig( 10, "Tulip" ) ) - .thenCompose( vv -> s.flush() ) - .thenAccept( vv -> t.markForRollback() ) - ) - .thenCompose( v -> s.close() ) + test( context, openSession() + .thenCompose( s -> s + .withTransaction( t -> s + .persist( new GuineaPig( 10, "Tulip" ) ) + .thenCompose( vv -> s.flush() ) + .thenAccept( vv -> t.markForRollback() ) ) - .thenCompose( vv -> selectNameFromId( 10 ) ) - .thenAccept( Assertions::assertNull ) + .thenCompose( v -> s.close() ) + ) + .thenCompose( vv -> selectNameFromId( 10 ) ) + .thenAccept( result -> assertThat( result ).isNull() ) ); } @Test public void reactiveRemoveTransientEntity(VertxTestContext context) { - test( - context, - populateDB() - .thenCompose( v -> selectNameFromId( 5 ) ) - .thenAccept( Assertions::assertNotNull ) - .thenCompose( v -> openSession() ) - .thenCompose( session -> session.remove( new GuineaPig( 5, "Aloi" ) ) - .thenCompose( v -> session.flush() ) - .thenCompose( v -> session.close() ) - ) - .handle( (r, e) -> { - assertNotNull( e ); - return r; - } ) - + test( context, populateDB() + .thenCompose( v -> selectNameFromId( 5 ) ) + .thenAccept( result -> assertThat( result ).isNotNull() ) + .thenCompose( v -> openSession() ) + .thenCompose( session -> assertThrown( HibernateException.class, session.remove( new GuineaPig( 5, "Aloi" ) ) ) + ) + .thenAccept( t -> assertThat( t ) + .hasCauseInstanceOf( IllegalArgumentException.class ) + .hasMessageContaining( "unmanaged instance" ) + ) ); } @Test public void reactiveRemoveManagedEntity(VertxTestContext context) { - test( - context, - populateDB() + test( context, populateDB() .thenCompose( v -> openSession() ) - .thenCompose( session -> - session.find( GuineaPig.class, 5 ) - .thenCompose( session::remove ) - .thenCompose( v -> session.flush() ) - .thenCompose( v -> selectNameFromId( 5 ) ) - .thenAccept( Assertions::assertNull ) ) + .thenCompose( session -> session + .find( GuineaPig.class, 5 ) + .thenCompose( session::remove ) + .thenCompose( v -> session.flush() ) + .thenCompose( v -> selectNameFromId( 5 ) ) + .thenAccept( result -> assertThat( result ).isNull() ) ) ); } @@ -690,16 +756,16 @@ public void reactiveUpdate(VertxTestContext context) { .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, 5 ) .thenAccept( pig -> { - assertNotNull( pig ); + assertThat( pig ).isNotNull(); // Checking we are actually changing the name - assertNotEquals( NEW_NAME, pig.getName() ); + assertThat( pig.getName() ).isNotEqualTo( NEW_NAME ); pig.setName( NEW_NAME ); } ) .thenCompose( v -> session.flush() ) .thenCompose( v -> session.close() ) ) .thenCompose( v -> selectNameFromId( 5 ) ) - .thenAccept( name -> assertEquals( NEW_NAME, name ) ) + .thenAccept( name -> assertThat( name ).isEqualTo( NEW_NAME ) ) ); } @@ -712,10 +778,10 @@ public void reactiveUpdateVersion(VertxTestContext context) { .thenCompose( v -> openSession() ) .thenCompose( session -> session.find( GuineaPig.class, 5 ) .thenAccept( pig -> { - assertNotNull( pig ); + assertThat( pig ).isNotNull(); // Checking we are actually changing the name - assertNotEquals( NEW_NAME, pig.getName() ); - assertEquals( 0, pig.version ); + assertThat( pig.getName() ).isNotEqualTo( NEW_NAME ); + assertThat( pig.version ).isEqualTo( 0 ); pig.setName( NEW_NAME ); pig.version = 10; //ignored by Hibernate } ) @@ -724,7 +790,7 @@ public void reactiveUpdateVersion(VertxTestContext context) { ) .thenCompose( v -> openSession() ) .thenCompose( s -> s.find( GuineaPig.class, 5 ) - .thenAccept( pig -> assertEquals( 1, pig.version ) ) ) + .thenAccept( pig -> assertThat( pig.version ).isEqualTo( 1 ) ) ) ); } @@ -733,9 +799,9 @@ public void reactiveClose(VertxTestContext context) { test( context, openSession() .thenCompose( session -> { - assertTrue( session.isOpen() ); + assertThat( session.isOpen() ).isTrue(); return session.close() - .thenAccept( v -> assertFalse( session.isOpen() ) ); + .thenAccept( v -> assertThat( session.isOpen() ).isFalse() ); } ) ); } @@ -754,8 +820,8 @@ public void testSessionWithNativeAffectedEntities(VertxTestContext context) { .setParameter( "n", pig.name ) .getResultList() ) .thenAccept( list -> { - assertFalse( list.isEmpty() ); - assertEquals( 1, list.size() ); + assertThat( list ).isNotEmpty(); + assertThat( list.size() ).isEqualTo( 1 ); assertThatPigsAreEqual( pig, list.get( 0 ) ); } ) .thenCompose( v -> s.find( GuineaPig.class, pig.id ) ) @@ -764,23 +830,23 @@ public void testSessionWithNativeAffectedEntities(VertxTestContext context) { p.name = "X"; } ) .thenCompose( v -> s.createNativeQuery( "update pig set name='Y' where name='X'", affectsPigs ).executeUpdate() ) - .thenAccept( rows -> assertEquals( 1, rows ) ) + .thenAccept( rows -> assertThat( rows ).isEqualTo( 1 ) ) .thenCompose( v -> s.refresh( pig ) ) - .thenAccept( v -> assertEquals( "Y", pig.name ) ) + .thenAccept( v -> assertThat( pig.name ).isEqualTo( "Y" ) ) .thenAccept( v -> pig.name = "Z" ) .thenCompose( v -> s.createNativeQuery( "delete from pig where name='Z'", affectsPigs ).executeUpdate() ) - .thenAccept( rows -> assertEquals( 1, rows ) ) + .thenAccept( rows -> assertThat( rows ).isEqualTo( 1 ) ) .thenCompose( v -> s.createNativeQuery( "select id from pig", affectsPigs ).getResultList() ) - .thenAccept( list -> assertTrue( list.isEmpty() ) ) ) + .thenAccept( list -> assertThat( list ).isEmpty() ) ) ); } @Test public void testMetamodel() { EntityType pig = getSessionFactory().getMetamodel().entity( GuineaPig.class ); - assertNotNull( pig ); - assertEquals( 3, pig.getAttributes().size() ); - assertEquals( "GuineaPig", pig.getName() ); + assertThat( pig ).isNotNull(); + assertThat( pig.getAttributes().size() ).isEqualTo( 3 ); + assertThat( pig.getName() ).isEqualTo( "GuineaPig" ); } @Test @@ -804,14 +870,14 @@ context, getSessionFactory().withTransaction( .createSelectionQuery( "from GuineaPig", GuineaPig.class ) .getResultList() .thenCompose( list -> { - assertNotNull( session.currentTransaction() ); - assertFalse( session.currentTransaction().isMarkedForRollback() ); + assertThat( session.currentTransaction() ).isNotNull(); + assertThat( session.currentTransaction().isMarkedForRollback() ).isFalse(); session.currentTransaction().markForRollback(); - assertTrue( session.currentTransaction().isMarkedForRollback() ); - assertTrue( transaction.isMarkedForRollback() ); + assertThat( session.currentTransaction().isMarkedForRollback() ).isTrue(); + assertThat( transaction.isMarkedForRollback() ).isTrue(); return session.withTransaction( t -> { - assertEquals( t, transaction ); - assertTrue( t.isMarkedForRollback() ); + assertThat( t ).isEqualTo( transaction ); + assertThat( t.isMarkedForRollback() ).isTrue(); return session.createSelectionQuery( "from GuineaPig", GuineaPig.class ).getResultList(); } ); } ) @@ -823,11 +889,11 @@ context, getSessionFactory().withTransaction( public void testSessionPropagation(VertxTestContext context) { test( context, getSessionFactory().withSession( session -> { - assertFalse( session.isDefaultReadOnly() ); + assertThat( session.isDefaultReadOnly() ).isFalse(); session.setDefaultReadOnly( true ); return session.createSelectionQuery( "from GuineaPig", GuineaPig.class ).getResultList() .thenCompose( list -> getSessionFactory().withSession( s -> { - assertTrue( s.isDefaultReadOnly() ); + assertThat( s.isDefaultReadOnly() ).isTrue(); return s.createSelectionQuery( "from GuineaPig", GuineaPig.class ).getResultList(); } ) ); } ) @@ -843,9 +909,9 @@ public void testDupeException(VertxTestContext context) { .thenCompose( v -> getSessionFactory() .withTransaction( (s, t) -> s.persist( new GuineaPig( 10, "Tulip" ) ) ) ).handle( (i, t) -> { - assertNotNull( t ); - assertInstanceOf( CompletionException.class, t ); - assertInstanceOf( PersistenceException.class, t.getCause() ); + assertThat( t ).isNotNull(); + assertThat( t ).isInstanceOf( CompletionException.class ); + assertThat( t.getCause() ).isInstanceOf( PersistenceException.class ); return null; } ) ); @@ -856,12 +922,12 @@ public void testExceptionInWithSession(VertxTestContext context) { final Stage.Session[] savedSession = new Stage.Session[1]; test( context, getSessionFactory().withSession( session -> { - assertTrue( session.isOpen() ); + assertThat( session.isOpen() ).isTrue(); savedSession[0] = session; throw new RuntimeException( "No Panic: This is just a test" ); } ).handle( (o, t) -> { - assertNotNull( t ); - assertFalse( savedSession[0].isOpen(), "Session should be closed" ); + assertThat( t ).isNotNull(); + assertThat( savedSession[0].isOpen() ).withFailMessage( "Session should be closed" ).isFalse(); return null; } ) ); @@ -872,12 +938,12 @@ public void testExceptionInWithTransaction(VertxTestContext context) { final Stage.Session[] savedSession = new Stage.Session[1]; test( context, getSessionFactory().withTransaction( (session, tx) -> { - assertTrue( session.isOpen() ); + assertThat( session.isOpen() ).isTrue(); savedSession[0] = session; throw new RuntimeException( "No Panic: This is just a test" ); } ).handle( (o, t) -> { - assertNotNull( t ); - assertFalse( savedSession[0].isOpen(), "Session should be closed" ); + assertThat( t ).isNotNull(); + assertThat( savedSession[0].isOpen() ).withFailMessage( "Session should be closed" ).isFalse(); return null; } ) ); @@ -888,12 +954,12 @@ public void testExceptionInWithStatelessSession(VertxTestContext context) { final Stage.StatelessSession[] savedSession = new Stage.StatelessSession[1]; test( context, getSessionFactory().withStatelessSession( session -> { - assertTrue( session.isOpen() ); + assertThat( session.isOpen() ).isTrue(); savedSession[0] = session; throw new RuntimeException( "No Panic: This is just a test" ); } ).handle( (o, t) -> { - assertNotNull( t ); - assertFalse( savedSession[0].isOpen(), "Session should be closed" ); + assertThat( t ).isNotNull(); + assertThat( savedSession[0].isOpen() ).withFailMessage( "Session should be closed" ).isFalse(); return null; } ) ); @@ -948,54 +1014,60 @@ public void testCreateSelectionQueryNull(VertxTestContext context) { context, openSession() .thenCompose( session -> session.createSelectionQuery( "from GuineaPig", GuineaPig.class ) .getSingleResultOrNull() - .thenAccept( Assertions::assertNull ) ) + .thenAccept( result -> assertThat( result ).isNull() ) ) .thenCompose( v -> openSession() ) .thenCompose( session -> session.createSelectionQuery( "from GuineaPig", GuineaPig.class ) .getSingleResultOrNull() - .thenAccept( Assertions::assertNull ) ) + .thenAccept( result -> assertThat( result ).isNull() ) ) ); } @Test public void testCurrentSession(VertxTestContext context) { - test( context, - getSessionFactory().withSession(session -> - getSessionFactory().withSession(s -> { - assertEquals(session, s); - Stage.Session currentSession = getSessionFactory().getCurrentSession(); - assertNotNull(currentSession); - assertTrue(currentSession.isOpen()); - assertEquals(session, currentSession); - return voidFuture(); - }) - .thenAccept(v -> assertNotNull(getSessionFactory().getCurrentSession())) - ) - .thenAccept(v -> assertNull(getSessionFactory().getCurrentSession())) + test( + context, getSessionFactory() + .withSession( s1 -> getSessionFactory() + .withSession( s2 -> { + assertThat( s2 ).isEqualTo( s1 ); + Stage.Session currentSession = getSessionFactory().getCurrentSession(); + assertThat( currentSession ).isNotNull(); + assertThat( currentSession.isOpen() ).isTrue(); + assertThat( currentSession ).isEqualTo( s1 ); + return voidFuture(); + } ) + // We closed s2, not s1 + .thenAccept( v -> assertThat( getSessionFactory().getCurrentSession() ).isNotNull() ) + ) + // Both sessions are closed now + .thenAccept( v -> assertThat( getSessionFactory().getCurrentSession() ).isNull() ) ); } @Test public void testCurrentStatelessSession(VertxTestContext context) { - test( context, - getSessionFactory().withStatelessSession(session -> - getSessionFactory().withStatelessSession(s -> { - assertEquals(session, s); - Stage.StatelessSession currentSession = getSessionFactory().getCurrentStatelessSession(); - assertNotNull(currentSession); - assertTrue(currentSession.isOpen()); - assertEquals(session, currentSession); - return voidFuture(); - }) - .thenAccept(v -> assertNotNull(getSessionFactory().getCurrentStatelessSession())) - ) - .thenAccept(v -> assertNull(getSessionFactory().getCurrentStatelessSession())) + test( + context, getSessionFactory() + .withStatelessSession( session -> getSessionFactory() + .withStatelessSession( s -> { + assertThat( s ).isEqualTo( session ); + Stage.StatelessSession currentSession = getSessionFactory().getCurrentStatelessSession(); + assertThat( currentSession ).isNotNull(); + assertThat( currentSession.isOpen() ).isTrue(); + assertThat( currentSession ).isEqualTo( session ); + return voidFuture(); + } ) + // We closed s2, not s1 + .thenAccept( v -> assertThat( getSessionFactory().getCurrentStatelessSession() ).isNotNull() ) + ) + // Both sessions are closed now + .thenAccept( v -> assertThat( getSessionFactory().getCurrentStatelessSession() ).isNull() ) ); } private void assertThatPigsAreEqual(GuineaPig expected, GuineaPig actual) { - assertNotNull( actual ); - assertEquals( expected.getId(), actual.getId() ); - assertEquals( expected.getName(), actual.getName() ); + assertThat( actual ).isNotNull(); + assertThat( actual.getId() ).isEqualTo( expected.getId() ); + assertThat( actual.getName() ).isEqualTo( expected.getName() ); } @Entity(name = "GuineaPig")