Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1786,6 +1786,7 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,

SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int skippedMessages = 0;
int processedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
Expand Down Expand Up @@ -1815,13 +1816,15 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
continue;
}
executeNotifyCallback(message);
processedMessages++;
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
} catch (IllegalStateException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
Comment thread
gosonzhang marked this conversation as resolved.
} catch (IllegalStateException | IllegalArgumentException e) {
Comment thread
gosonzhang marked this conversation as resolved.
// For IllegalArgumentException see PR: https://github.com/apache/pulsar/pull/24061
discardCorruptedBatchMessage(messageId, cnx, batchSize,
skippedMessages, processedMessages, ValidationError.BatchDeSerializeError);
}

if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
Expand Down Expand Up @@ -2153,6 +2156,38 @@ private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentC
discardMessage(messageId, currentCnx, validationError, 1);
}

/**
* When batch index ack is enabled, ack the messages that failed to deserialize by their index,
* while keeping successfully enqueued messages unacknowledged to avoid message loss.
*/
private void discardCorruptedBatchMessage(MessageIdData messageId, ClientCnx currentCnx,
int batchSize, int skipped, int processed, ValidationError validationError) {
log.error("[{}] [{}] Discarding corrupted batch messages with batch index ack at {}:{}, "
+ "batchSize={}, skipped={}, processed={}, exception={}",
subscription, consumerName, messageId.getLedgerId(), messageId.getEntryId(),
batchSize, skipped, processed, validationError);
BitSetRecyclable ackBitSet = null;
int corruptedStartIndex = skipped + processed;
if (conf.isBatchIndexAckEnabled()) {
// When batch index ack is enabled, only ack the messages that failed to deserialize.
// Messages that have been successfully enqueued remain unacknowledged,
// waiting for the user to consume and acknowledge them normally.
ackBitSet = BitSetRecyclable.create();
ackBitSet.set(0, batchSize);
for (int i = corruptedStartIndex; i < batchSize; i++) {
ackBitSet.clear(i);
}
}
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(),
ackBitSet, AckType.Individual, validationError, Collections.emptyMap(), -1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
if (ackBitSet != null) {
ackBitSet.recycle();
}
increaseAvailablePermits(currentCnx, batchSize - corruptedStartIndex);
stats.incrementNumReceiveFailed();
}

private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError,
int batchMessages) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null,
Expand Down
Loading
Loading