Skip to content

Commit 985294b

Browse files
authored
feat: a config flag to send a whole failed batch to DLQ (#34)
* more logging * feat: a config flag to send a whole failed batch to DLQ
1 parent 994d4f3 commit 985294b

File tree

3 files changed

+80
-13
lines changed

3 files changed

+80
-13
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkConnectorConfig.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ public final class QuestDBSinkConnectorConfig extends AbstractConfig {
7979
public static final String TIMESTAMP_FORMAT = "timestamp.string.format";
8080
private static final String TIMESTAMP_FORMAT_DOC = "Timestamp format. Used when parsing timestamp string fields";
8181

82+
public static final String DLQ_SEND_BATCH_ON_ERROR_CONFIG = "dlq.send.batch.on.error";
83+
private static final String DLQ_SEND_BATCH_ON_ERROR_DOC = "When true and a Dead Letter Queue (DLQ) is configured, " +
84+
"send the entire batch to DLQ on parsing errors instead of trying to send records one-by-one to the database first. " +
85+
"This can be useful to avoid additional database load when errors are expected to affect multiple records. " +
86+
"Default is false (try one-by-one).";
87+
8288
private static final String DEFAULT_TIMESTAMP_FORMAT = "yyyy-MM-ddTHH:mm:ss.SSSUUUZ";
8389

8490
public QuestDBSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
@@ -111,7 +117,8 @@ public static ConfigDef conf() {
111117
.define(DESIGNATED_TIMESTAMP_KAFKA_NATIVE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, DESIGNATED_TIMESTAMP_KAFKA_NATIVE_DOC)
112118
.define(TLS_VALIDATION_MODE_CONFIG, Type.STRING, "default", ConfigDef.ValidString.in("default", "insecure"), Importance.LOW, TLS_VALIDATION_MODE_DOC)
113119
.define(CONFIGURATION_STRING_CONFIG, Type.PASSWORD, null, Importance.HIGH, CONFIGURATION_STRING_DOC)
114-
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC);
120+
.define(ALLOWED_LAG_CONFIG, Type.INT, 1000, ConfigDef.Range.between(1, Integer.MAX_VALUE), Importance.LOW, ALLOWED_LAG_DOC)
121+
.define(DLQ_SEND_BATCH_ON_ERROR_CONFIG, Type.BOOLEAN, false, Importance.LOW, DLQ_SEND_BATCH_ON_ERROR_DOC);
115122
}
116123

117124
public Password getConfigurationString() {
@@ -212,6 +219,10 @@ public int getMaxRetries() {
212219
return getInt(MAX_RETRIES);
213220
}
214221

222+
public boolean isDlqSendBatchOnError() {
223+
return getBoolean(DLQ_SEND_BATCH_ON_ERROR_CONFIG);
224+
}
225+
215226
private static class TimestampUnitsRecommender implements ConfigDef.Recommender {
216227
private static final TimestampUnitsRecommender INSTANCE = new TimestampUnitsRecommender();
217228
private static final List<Object> VALID_UNITS = Arrays.asList("auto", "millis", "micros", "nanos");

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public final class QuestDBSinkTask extends SinkTask {
5252
private final FlushConfig flushConfig = new FlushConfig();
5353
private final ObjList<SinkRecord> inflightSinkRecords = new ObjList<>();
5454
private ErrantRecordReporter reporter;
55+
private boolean dlqSendBatchOnError;
5556

5657
@Override
5758
public String version() {
@@ -96,6 +97,7 @@ public void start(Map<String, String> map) {
9697
// Kafka older than 2.6
9798
reporter = null;
9899
}
100+
this.dlqSendBatchOnError = config.isDlqSendBatchOnError();
99101
}
100102

101103
private Sender createRawSender() {
@@ -275,18 +277,30 @@ private void onHttpSenderException(Exception e) {
275277
(reporter != null && e.getMessage() != null) // hack to detect data parsing errors originating at server-side
276278
&& (e.getMessage().contains("error in line") || e.getMessage().contains("failed to parse line protocol"))
277279
) {
278-
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
279-
// and we will report it to the error handler. the rest of the records will make it to QuestDB
280-
sender = createSender();
281-
for (int i = 0; i < inflightSinkRecords.size(); i++) {
282-
SinkRecord sinkRecord = inflightSinkRecords.get(i);
283-
try {
284-
handleSingleRecord(sinkRecord);
285-
sender.flush();
286-
} catch (Exception ex) {
287-
context.errantRecordReporter().report(sinkRecord, ex);
288-
closeSenderSilently();
289-
sender = createSender();
280+
if (dlqSendBatchOnError) {
281+
// Send all records directly to DLQ without trying to send them to database
282+
log.warn("Sender exception, sending entire batch to DLQ. Inflight record size = {}", inflightSinkRecords.size(), e);
283+
for (int i = 0; i < inflightSinkRecords.size(); i++) {
284+
SinkRecord sinkRecord = inflightSinkRecords.get(i);
285+
log.debug("Reporting record to Kafka Connect error handler (DLQ)...");
286+
context.errantRecordReporter().report(sinkRecord, e);
287+
}
288+
} else {
289+
// ok, we have a parsing error, let's try to send records one by one to find the problematic record
290+
// and we will report it to the error handler. the rest of the records will make it to QuestDB
291+
log.warn("Sender exception, trying to send problematic record one by one. Inflight record size = {}", inflightSinkRecords.size(), e);
292+
sender = createSender();
293+
for (int i = 0; i < inflightSinkRecords.size(); i++) {
294+
SinkRecord sinkRecord = inflightSinkRecords.get(i);
295+
try {
296+
handleSingleRecord(sinkRecord);
297+
sender.flush();
298+
} catch (Exception ex) {
299+
log.warn("Failed to send problematic record to QuestDB. Reporting to Kafka Connect error handler (DQL)...", ex);
300+
context.errantRecordReporter().report(sinkRecord, ex);
301+
closeSenderSilently();
302+
sender = createSender();
303+
}
290304
}
291305
}
292306
nextFlushNanos = System.nanoTime() + flushConfig.autoFlushNanos;

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,48 @@ public void testDeadLetterQueue_badColumnType() {
585585

586586
}
587587

588+
@Test
589+
public void testDeadLetterQueue_sendBatchOnError() {
590+
connect.kafka().createTopic(topicName, 1);
591+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
592+
props.put("value.converter.schemas.enable", "false");
593+
props.put("errors.deadletterqueue.topic.name", "dlq");
594+
props.put("errors.deadletterqueue.topic.replication.factor", "1");
595+
props.put("errors.tolerance", "all");
596+
props.put("dlq.send.batch.on.error", "true");
597+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
598+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
599+
600+
QuestDBUtils.assertSql(
601+
"{\"ddl\":\"OK\"}",
602+
"create table " + topicName + " (firstname string, lastname string, age int, id uuid, ts timestamp) timestamp(ts) partition by day wal",
603+
httpPort,
604+
QuestDBUtils.Endpoint.EXEC);
605+
606+
String goodRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
607+
String goodRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d042\"}";
608+
String goodRecordC = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d043\"}";
609+
String badRecordA = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42,\"id\":\"Invalid UUID\"}";
610+
String badRecordB = "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":\"not a number\",\"id\":\"ad956a45-a55b-441e-b80d-023a2bf5d041\"}";
611+
612+
// interleave good and bad records
613+
connect.kafka().produce(topicName, "key", goodRecordA);
614+
connect.kafka().produce(topicName, "key", badRecordA);
615+
connect.kafka().produce(topicName, "key", goodRecordB);
616+
connect.kafka().produce(topicName, "key", badRecordB);
617+
connect.kafka().produce(topicName, "key", goodRecordC);
618+
619+
// When dlq.send.batch.on.error is true, ALL records in the batch with errors should go to DLQ
620+
// This means all 5 records should be in DLQ, not just the 2 bad ones
621+
ConsumerRecords<byte[], byte[]> fetchedRecords = connect.kafka().consume(5, 120_000, "dlq");
622+
Assertions.assertEquals(5, fetchedRecords.count());
623+
624+
// Verify that NO records made it to QuestDB
625+
QuestDBUtils.assertSqlEventually("\"firstname\",\"lastname\",\"age\",\"id\"\r\n",
626+
"select firstname,lastname,age, id from " + topicName,
627+
httpPort);
628+
}
629+
588630
@Test
589631
public void testbadColumnType_noDLQ() {
590632
connect.kafka().createTopic(topicName, 1);

0 commit comments

Comments
 (0)