2727import org .apache .flink .connector .base .sink .writer .TestSinkInitContext ;
2828import org .apache .flink .metrics .Gauge ;
2929
30+ import co .elastic .clients .elasticsearch .core .bulk .IndexOperation ;
3031import co .elastic .clients .elasticsearch .core .bulk .UpdateOperation ;
3132import org .apache .http .HttpHost ;
3233import org .junit .jupiter .api .BeforeEach ;
3738import java .util .Collections ;
3839import java .util .List ;
3940import java .util .Optional ;
41+ import java .util .concurrent .atomic .AtomicBoolean ;
4042import java .util .concurrent .locks .Condition ;
4143import java .util .concurrent .locks .Lock ;
4244import java .util .concurrent .locks .ReentrantLock ;
@@ -50,6 +52,7 @@ public class Elasticsearch8AsyncWriterITCase extends ElasticsearchSinkBaseITCase
5052 private final Lock lock = new ReentrantLock ();
5153
5254 private final Condition completed = lock .newCondition ();
55+ private final AtomicBoolean completedExceptionally = new AtomicBoolean (false );
5356
5457 @ BeforeEach
5558 void setUp () {
@@ -171,8 +174,59 @@ public void testSendTimeMetric() throws Exception {
171174 @ Timeout (5 )
172175 public void testHandlePartiallyFailedBulk () throws Exception {
173176 String index = "test-partially-failed-bulk" ;
177+ int maxBatchSize = 3 ;
178+
179+ // First create a document to enable version conflict
180+ try (final Elasticsearch8AsyncWriter <DummyData > setupWriter = createWriter (index , 1 )) {
181+ setupWriter .write (new DummyData ("test-3" , "test-3" ), null );
182+ await ();
183+ }
184+
185+ // Create converter that triggers 409 version conflict for test-3
186+ Elasticsearch8AsyncSinkBuilder .OperationConverter <DummyData > conflictConverter =
187+ new Elasticsearch8AsyncSinkBuilder .OperationConverter <>(
188+ (element , ctx ) -> {
189+ if (element .getId ().equals ("test-3" )) {
190+ // Use wrong version to trigger 409 conflict (retryable)
191+ return new IndexOperation .Builder <>()
192+ .id (element .getId ())
193+ .index (index )
194+ .document (element )
195+ .ifSeqNo (999L ) // Wrong sequence number
196+ .ifPrimaryTerm (1L )
197+ .build ();
198+ } else {
199+ return new IndexOperation .Builder <>()
200+ .id (element .getId ())
201+ .index (index )
202+ .document (element )
203+ .build ();
204+ }
205+ });
206+
207+ try (final Elasticsearch8AsyncWriter <DummyData > writer =
208+ createWriter (maxBatchSize , conflictConverter )) {
209+ writer .write (new DummyData ("test-1" , "test-1" ), null );
210+ writer .write (new DummyData ("test-2" , "test-2" ), null );
211+ writer .write (new DummyData ("test-3" , "version-conflict" ), null );
212+ }
213+
214+ await ();
215+
216+ // 409 is retryable, so test-3 should have not completed the rest handler exceptionally
217+ assertThat (context .metricGroup ().getNumRecordsOutErrorsCounter ().getCount ()).isEqualTo (1 );
218+ assertThat (completedExceptionally .get ()).isFalse ();
219+ assertIdsAreWritten (index , new String [] {"test-1" , "test-2" });
220+ }
221+
222+ @ TestTemplate
223+ @ Timeout (5 )
224+ public void testFailFastUponPartiallyFailedBulk () throws Exception {
225+ String index = "test-fail-fast-partially-failed-bulk" ;
174226 int maxBatchSize = 2 ;
175227
228+ // This simulates a scenario where some operations fail with non-retryable errors.
229+ // test-1 gets docAsUpsert=false on non-existing doc (404 error).
176230 Elasticsearch8AsyncSinkBuilder .OperationConverter <DummyData > elementConverter =
177231 new Elasticsearch8AsyncSinkBuilder .OperationConverter <>(
178232 (element , ctx ) ->
@@ -195,7 +249,9 @@ public void testHandlePartiallyFailedBulk() throws Exception {
195249
196250 await ();
197251
252+ // Verify that non-retryable error (404) increments error counter and fails fast
198253 assertThat (context .metricGroup ().getNumRecordsOutErrorsCounter ().getCount ()).isEqualTo (1 );
254+ assertThat (completedExceptionally .get ()).isTrue ();
199255 assertIdsAreWritten (index , new String [] {"test-2" });
200256 assertIdsAreNotWritten (index , new String [] {"test-1" });
201257 }
@@ -264,6 +320,7 @@ public void complete() {
264320 @ Override
265321 public void completeExceptionally (Exception e ) {
266322 resultHandler .completeExceptionally (e );
323+ completedExceptionally .set (true );
267324 signal ();
268325 }
269326
0 commit comments