Skip to content

Commit 0dee152

Browse files
committed
[#2518] Test id generation with connection opened lazily
1 parent 58ecd46 commit 0dee152

File tree

1 file changed

+276
-0
lines changed

1 file changed

+276
-0
lines changed
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/* Hibernate, Relational Persistence for Idiomatic Java
2+
*
3+
* SPDX-License-Identifier: Apache-2.0
4+
* Copyright: Red Hat Inc. and Hibernate Authors
5+
*/
6+
package org.hibernate.reactive;
7+
8+
import java.util.concurrent.CompletionStage;
9+
import java.util.concurrent.CountDownLatch;
10+
import java.util.concurrent.TimeUnit;
11+
12+
import org.hibernate.SessionFactory;
13+
import org.hibernate.boot.registry.StandardServiceRegistry;
14+
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
15+
import org.hibernate.cfg.Configuration;
16+
import org.hibernate.reactive.provider.ReactiveServiceRegistryBuilder;
17+
import org.hibernate.reactive.stage.Stage;
18+
import org.hibernate.reactive.util.impl.CompletionStages;
19+
import org.hibernate.reactive.vertx.VertxInstance;
20+
21+
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.BeforeAll;
23+
import org.junit.jupiter.api.Test;
24+
import org.junit.jupiter.api.TestInstance;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
27+
import io.vertx.core.AbstractVerticle;
28+
import io.vertx.core.DeploymentOptions;
29+
import io.vertx.core.Promise;
30+
import io.vertx.core.Vertx;
31+
import io.vertx.core.VertxOptions;
32+
import io.vertx.junit5.Timeout;
33+
import io.vertx.junit5.VertxExtension;
34+
import io.vertx.junit5.VertxTestContext;
35+
import jakarta.persistence.Entity;
36+
import jakarta.persistence.GeneratedValue;
37+
import jakarta.persistence.Id;
38+
import jakarta.persistence.Table;
39+
40+
import static java.util.concurrent.TimeUnit.MINUTES;
41+
import static org.assertj.core.api.Assertions.fail;
42+
import static org.hibernate.cfg.AvailableSettings.SHOW_SQL;
43+
import static org.hibernate.reactive.BaseReactiveTest.setDefaultProperties;
44+
import static org.hibernate.reactive.provider.Settings.POOL_CONNECT_TIMEOUT;
45+
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
46+
import static org.hibernate.reactive.util.impl.CompletionStages.loop;
47+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
48+
49+
/**
50+
* This is a multi-threaded stress test, intentionally consuming some time
51+
* that also opens the connection lazily.
52+
* The purpose is to verify that the sequence optimizer used by Hibernate Reactive
53+
* is indeed able to generate unique IDs backed by the database sequences, while
54+
* running multiple operations in different threads and on multiple Vert.x eventloops.
55+
* This is very similar to MultithreadedIdentityGenerationTest except it models
56+
* the full operations including the insert statements, while the latter focuses
57+
* on the generated IDs to be unique; it's useful to maintain both tests as:
58+
* - ID generation needs to be unique so it's good to stress that aspect
59+
* in isolation
60+
* - insert operations are downstream events, so this allows us to test that
61+
* such downstream events are not being unintentionally duplicated/dropped,
62+
* which could actually happen when the id generator triggers unintended
63+
* threading behaviours.
64+
*
65+
* N.B. We actually had a case in which the IDs were uniquely generated but the
66+
* downstream event was being processed twice (or more) concurrently, so it's
67+
* useful to have both integration tests.
68+
*
69+
* A typical reactive application will not require multiple threads, but we
70+
* specifically want to test for the case in which the single ID source is being
71+
* shared across multiple threads and also multiple eventloops.
72+
* @see MultithreadedInsertionTest
73+
*/
74+
@ExtendWith(VertxExtension.class)
75+
@TestInstance(TestInstance.Lifecycle.PER_METHOD)
76+
@Timeout(value = MultithreadedInsertionWithLazyConnectionTest.TIMEOUT_MINUTES, timeUnit = MINUTES)
77+
public class MultithreadedInsertionWithLazyConnectionTest {
78+
79+
/**
80+
* The number of threads should be higher than the default size of the connection pool so that
81+
* this test is also effective in detecting problems with resource starvation.
82+
*/
83+
private static final int N_THREADS = 12;
84+
private static final int ENTITIES_STORED_PER_THREAD = 2000;
85+
86+
//Should finish much sooner, but generating this amount of IDs could be slow on some CIs
87+
public static final int TIMEOUT_MINUTES = 10;
88+
89+
// Keeping this disabled because it generates a lot of queries
90+
private static final boolean LOG_SQL = false;
91+
92+
/**
93+
* If true, it will print info about the threads
94+
*/
95+
private static final boolean THREAD_PRETTY_MSG = true;
96+
97+
private static final Latch startLatch = new Latch( "start", N_THREADS );
98+
private static final Latch endLatch = new Latch( "end", N_THREADS );
99+
100+
private static Stage.SessionFactory stageSessionFactory;
101+
private static Vertx vertx;
102+
private static SessionFactory sessionFactory;
103+
104+
@BeforeAll
105+
public static void setupSessionFactory() {
106+
vertx = Vertx.vertx( getVertxOptions() );
107+
Configuration configuration = new Configuration();
108+
setDefaultProperties( configuration );
109+
configuration.addAnnotatedClass( EntityWithGeneratedId.class );
110+
configuration.setProperty( SHOW_SQL, String.valueOf( LOG_SQL ) );
111+
configuration.setProperty( POOL_CONNECT_TIMEOUT, String.valueOf( TIMEOUT_MINUTES * 60 * 1000 ) );
112+
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
113+
.applySettings( configuration.getProperties() )
114+
//Inject our custom vert.x instance:
115+
.addService( VertxInstance.class, () -> vertx );
116+
StandardServiceRegistry registry = builder.build();
117+
sessionFactory = configuration.buildSessionFactory( registry );
118+
stageSessionFactory = sessionFactory.unwrap( Stage.SessionFactory.class );
119+
}
120+
121+
private static VertxOptions getVertxOptions() {
122+
final VertxOptions vertxOptions = new VertxOptions();
123+
vertxOptions.setEventLoopPoolSize( N_THREADS );
124+
//We relax the blocked thread checks as we'll actually use latches to block them
125+
//intentionally for the purpose of the test; functionally this isn't required
126+
//but it's useful as self-test in the design of this, to ensure that the way
127+
//things are setup are indeed being run in multiple, separate threads.
128+
vertxOptions.setBlockedThreadCheckInterval( TIMEOUT_MINUTES );
129+
vertxOptions.setBlockedThreadCheckIntervalUnit( TimeUnit.MINUTES );
130+
return vertxOptions;
131+
}
132+
133+
@AfterAll
134+
public static void closeSessionFactory() {
135+
stageSessionFactory.close();
136+
}
137+
138+
@Test
139+
public void testIdentityGenerator(VertxTestContext context) {
140+
final DeploymentOptions deploymentOptions = new DeploymentOptions();
141+
deploymentOptions.setInstances( N_THREADS );
142+
143+
vertx
144+
.deployVerticle( InsertEntitiesVerticle::new, deploymentOptions )
145+
.onSuccess( res -> {
146+
endLatch.waitForEveryone();
147+
context.completeNow();
148+
} )
149+
.onFailure( context::failNow )
150+
.eventually( () -> vertx.close() );
151+
}
152+
153+
private static class InsertEntitiesVerticle extends AbstractVerticle {
154+
155+
int sequentialOperation = 0;
156+
157+
public InsertEntitiesVerticle() {
158+
}
159+
160+
@Override
161+
public void start(Promise<Void> startPromise) {
162+
startLatch.reached();
163+
startLatch.waitForEveryone();//Not essential, but to ensure a good level of parallelism
164+
final String initialThreadName = Thread.currentThread().getName();
165+
final Stage.Session session = stageSessionFactory.createSession();
166+
storeMultipleEntities( session )
167+
.handle( CompletionStages::handle )
168+
.thenCompose( handler -> session
169+
.close()
170+
.thenCompose( handler::getResultAsCompletionStage )
171+
)
172+
.whenComplete( (o, throwable) -> {
173+
endLatch.reached();
174+
if ( throwable != null ) {
175+
startPromise.fail( throwable );
176+
}
177+
else {
178+
if ( !initialThreadName.equals( Thread.currentThread().getName() ) ) {
179+
startPromise.fail( "Thread switch detected!" );
180+
}
181+
else {
182+
startPromise.complete();
183+
}
184+
}
185+
} );
186+
}
187+
188+
private CompletionStage<Void> storeMultipleEntities(Stage.Session s) {
189+
return loop( 0, ENTITIES_STORED_PER_THREAD, index -> storeEntity( s ) );
190+
}
191+
192+
private CompletionStage<Void> storeEntity(Stage.Session s) {
193+
final Thread beforeOperationThread = Thread.currentThread();
194+
final int localVerticleOperationSequence = sequentialOperation++;
195+
final EntityWithGeneratedId entity = new EntityWithGeneratedId();
196+
entity.name = beforeOperationThread + "__" + localVerticleOperationSequence;
197+
198+
return s
199+
.withTransaction( t -> s.persist( entity ) )
200+
.thenCompose( v -> beforeOperationThread != Thread.currentThread()
201+
? failedFuture( new IllegalStateException( "Detected an unexpected switch of carrier threads!" ) )
202+
: voidFuture() );
203+
}
204+
205+
@Override
206+
public void stop() {
207+
prettyOut( "Verticle stopped " + super.toString() );
208+
}
209+
}
210+
211+
/**
212+
* Trivial entity using a Sequence for Id generation
213+
*/
214+
@Entity
215+
@Table(name="Entity")
216+
private static class EntityWithGeneratedId {
217+
@Id
218+
@GeneratedValue
219+
Long id;
220+
221+
String name;
222+
223+
public EntityWithGeneratedId() {
224+
}
225+
}
226+
227+
/**
228+
* Custom latch which is rather verbose about threads reaching the milestones, to help verifying the design
229+
*/
230+
private static final class Latch {
231+
private final String label;
232+
private final CountDownLatch countDownLatch;
233+
234+
public Latch(String label, int membersCount) {
235+
this.label = label;
236+
this.countDownLatch = new CountDownLatch( membersCount );
237+
}
238+
239+
public void reached() {
240+
final long count = countDownLatch.getCount();
241+
countDownLatch.countDown();
242+
prettyOut( "Reached latch '" + label + "', current countdown is " + ( count - 1 ) );
243+
}
244+
245+
public void waitForEveryone() {
246+
try {
247+
boolean reachedZero = countDownLatch.await( TIMEOUT_MINUTES, MINUTES );
248+
if ( reachedZero ) {
249+
prettyOut( "Everyone has now breached '" + label + "'" );
250+
}
251+
else {
252+
fail( "Time out reached" );
253+
}
254+
}
255+
catch ( InterruptedException e ) {
256+
fail( e );
257+
}
258+
}
259+
}
260+
261+
private static void prettyOut(final String message) {
262+
if ( THREAD_PRETTY_MSG ) {
263+
final String threadName = Thread.currentThread().getName();
264+
final long l = System.currentTimeMillis();
265+
final long seconds = ( l / 1000 ) - initialSecond;
266+
//We prefix log messages by seconds since bootstrap; I'm preferring this over millisecond precision
267+
//as it's not very relevant to see exactly how long each stage took (it's actually distracting)
268+
//but it's more useful to group things coarsely when some lock or timeout introduces a significant
269+
//divide between some operations (when a starvation or timeout happens it takes some seconds).
270+
System.out.println( seconds + " - " + threadName + ": " + message );
271+
}
272+
}
273+
274+
private static final long initialSecond = ( System.currentTimeMillis() / 1000 );
275+
276+
}

0 commit comments

Comments
 (0)