Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,11 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
int filteredMessageCount = 0;
int filteredEntryCount = 0;
long filteredBytesCount = 0;
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<Position> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
List<Position> entriesToRedeliver = new ArrayList<>(entries.size());
int i;
int entriesSize = entries.size();
for (i = 0; i < entriesSize; i++) {
final Entry entry = entries.get(i);
if (entry == null) {
continue;
Expand All @@ -160,30 +159,35 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
this.filterProcessedMsgs.add(entryMsgCnt);
}

boolean filtered = false;
EntryFilter.FilterResult filterResult = runFiltersForEntry(entry, msgMetadata, consumer);
if (filterResult == EntryFilter.FilterResult.REJECT) {
entriesToFiltered.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
filtered = true;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
entriesToRedeliver.add(entry.getPosition());
entries.set(i, null);
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
filteredEntryCount++;
filteredMessageCount += entryMsgCnt;
filteredBytesCount += metadataAndPayload.readableBytes();
filtered = true;
}

if (filtered) {
int readableBytes = metadataAndPayload.readableBytes();
entries.set(i, null);
entry.release();
if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
if (!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, entryMsgCnt,
readableBytes)) {
break; // do not process further entries
}
}
continue;
}

if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
Expand Down Expand Up @@ -283,6 +287,11 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}
}

if (!tryAcquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, entryMsgCnt,
metadataAndPayload.readableBytes())) {
break; // do not process further entries
}

totalEntries++;
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
Expand All @@ -296,6 +305,19 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
}
}

if (i < entriesSize) {
// throttled, release the unprocessed entries
for (int j = i; j < entriesSize; j++) {
Entry entry = entries.get(j);
if (entry != null) {
entriesToRedeliver.add(entry.getPosition());
entries.set(j, null);
entry.release();
}
}
}

if (CollectionUtils.isNotEmpty(entriesToFiltered)) {
individualAcknowledgeMessageIfNeeded(entriesToFiltered, Collections.emptyMap());

Expand All @@ -315,11 +337,6 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i

}

if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
filteredMessageCount, filteredBytesCount);
}

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
Expand All @@ -332,17 +349,23 @@ private void individualAcknowledgeMessageIfNeeded(List<Position> positions, Map<
}
}

protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
long totalMessagesSent, long totalBytesSent) {
private boolean tryAcquirePermitsForDeliveredMessages(
Topic topic, ManagedCursor cursor, long totalMessagesSent, long totalBytesSent) {
Comment on lines +352 to +353
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous parameter, totalBytesSent is misleading since this method is now called per-entry rather than once after processing all entries. Consider renaming to byteCount or numBytes to better reflect that it represents the count for a single entry, not a cumulative total.

Copilot uses AI. Check for mistakes.
Comment on lines +352 to +353
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name totalMessagesSent is misleading since this method is now called per-entry rather than once after processing all entries. Consider renaming to messageCount or numMessages to better reflect that it represents the count for a single entry, not a cumulative total.

Copilot uses AI. Check for mistakes.
boolean permitsAcquired = true;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
|| (cursor != null && !cursor.isActive())) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
topic.getDispatchRateLimiter().ifPresent(rateLimter ->
rateLimter.consumeDispatchQuota(permits, totalBytesSent));
getRateLimiter().ifPresent(rateLimiter -> rateLimiter.consumeDispatchQuota(permits, totalBytesSent));
long permits = dispatchThrottlingOnBatchMessageEnabled ? 1 : totalMessagesSent;
permitsAcquired &= topic.getBrokerDispatchRateLimiter()
.map(l -> l.tryConsumeDispatchQuota(permits, totalBytesSent))
.orElse(true);
permitsAcquired &= topic.getDispatchRateLimiter()
.map(l -> l.tryConsumeDispatchQuota(permits, totalBytesSent))
.orElse(true);
permitsAcquired &= getRateLimiter()
.map(l -> l.tryConsumeDispatchQuota(permits, totalBytesSent))
.orElse(true);
}
return permitsAcquired;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ protected DispatchRateLimiter(BrokerService brokerService) {
*/
public abstract void consumeDispatchQuota(long numberOfMessages, long byteSize);

/**
* It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param numberOfMessages
* @param byteSize
Comment on lines +90 to +91
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing the @return tag to document the return value.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 *
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
* @param numberOfMessages
* @param byteSize
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise

Copilot uses AI. Check for mistakes.
*/
public abstract boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize);
Comment on lines +87 to +93
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is incomplete and missing parameter descriptions. The @param tags should have descriptions for what numberOfMessages and byteSize represent.

Copilot uses AI. Check for mistakes.

/**
* Checks if dispatch-rate limiting is enabled.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,21 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
tryConsumeDispatchQuota(numberOfMessages, byteSize);
}

Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing a description and the @return tag. The method should document its return value.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
/**
* Tries to acquire message and byte permits from the rate-limiter and returns whether the acquisition succeeded.
*
* @param numberOfMessages the number of message permits to acquire
* @param byteSize the number of byte permits to acquire
* @return true if permits were successfully acquired, false otherwise
*/

Copilot uses AI. Check for mistakes.
@Override
public boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize) {
boolean res = true;
AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
res &= localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages);
}
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
localDispatchRateLimiterOnByte.consumeTokens(byteSize);
res &= localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize);
}
return res;
}
Comment on lines +80 to 91
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method consumes tokens from multiple limiters sequentially, which means if the message limiter succeeds but the byte limiter fails, tokens have already been consumed from the message limiter. This creates a conservative behavior where some quota is consumed without dispatching messages. Consider documenting this behavior in the method's Javadoc, or alternatively, check all limiters' availability before consuming any tokens to avoid unnecessary quota consumption.

Copilot uses AI. Check for mistakes.

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,26 @@ public long getAvailableDispatchRateLimitOnByte() {
*/
@Override
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
tryConsumeDispatchQuota(numberOfMessages, byteSize);
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
* @param numberOfMessages
* @param byteSize
Comment on lines +82 to +83
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing the @return tag to document the return value. The method returns a boolean indicating whether the quota was successfully acquired.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
* @param numberOfMessages
* @param byteSize
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise

Copilot uses AI. Check for mistakes.
*/
@Override
public boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize) {
boolean res = true;
RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
res &= localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
}
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
localDispatchRateLimiterOnByte.tryAcquire(byteSize);
res &= localDispatchRateLimiterOnByte.tryAcquire(byteSize);
}
return res;
Comment on lines +86 to +96
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method consumes tokens from multiple limiters sequentially, which means if the message limiter succeeds but the byte limiter fails, tokens have already been consumed from the message limiter. This creates a conservative behavior where some quota is consumed without dispatching messages. Consider documenting this behavior in the method's Javadoc, or alternatively, check all limiters' availability before consuming any tokens to avoid unnecessary quota consumption.

Copilot uses AI. Check for mistakes.
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,9 +816,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}

int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

Expand Down Expand Up @@ -861,7 +858,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());

totalEntries += filterEntriesForConsumer(metadataArray, start,
filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);
totalEntriesProcessed += entriesForThisConsumer.size();
Expand All @@ -880,12 +877,9 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
}
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -938,9 +932,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

final Map<Consumer, List<EntryAndMetadata>> assignResult =
assignor.assign(originalEntryAndMetadataList, consumerList.size());
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
final AtomicInteger numConsumers = new AtomicInteger(assignResult.size());
boolean notifyAddedToReplay = false;
Expand Down Expand Up @@ -971,7 +962,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
final EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
final EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);

totalEntries += filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
totalEntriesProcessed += entryAndMetadataList.size();
consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks,
Expand All @@ -985,12 +976,9 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
-(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}

lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

// trigger a new readMoreEntries() call
return numConsumers.get() == 0 || notifyAddedToReplay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,9 +705,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}

int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

// If the dispatcher is closed, firstAvailableConsumerPermits will be 0, which skips dispatching the
Expand Down Expand Up @@ -753,7 +750,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
totalEntries += filterEntriesForConsumer(metadataArray, start,
filterEntriesForConsumer(metadataArray, start,
entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor,
readType == ReadType.Replay, c);

Expand All @@ -771,12 +768,8 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
+ "PersistentDispatcherMultipleConsumers",
name, msgSent, batchIndexesAcks.getTotalAckedIndexCount());
}
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}

acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
Expand All @@ -801,9 +794,6 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

final Map<Consumer, List<EntryAndMetadata>> assignResult =
assignor.assign(originalEntryAndMetadataList, consumerList.size());
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
final AtomicInteger numConsumers = new AtomicInteger(assignResult.size());
for (Map.Entry<Consumer, List<EntryAndMetadata>> current : assignResult.entrySet()) {
final Consumer consumer = current.getKey();
Expand Down Expand Up @@ -832,7 +822,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,
final EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
final EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);

totalEntries += filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
filterEntriesForConsumer(entryAndMetadataList, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
consumer.sendMessages(entryAndMetadataList, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(),
Expand All @@ -845,12 +835,8 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType,

TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
-(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}

acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

return numConsumers.get() == 0; // trigger a new readMoreEntries() call
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,6 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());

// Schedule a new read batch operation only after the previous batch has been written to the socket.
executor.execute(() -> readMoreEntries(getActiveConsumer()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ protected synchronized void clearComponentsAfterRemovedAllConsumers() {
@Override
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
lastNumberOfEntriesProcessed = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
long totalEntriesProcessed = 0;
int entriesCount = entries.size();
Expand Down Expand Up @@ -304,16 +302,11 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis

TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this,
-(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
}


lastNumberOfEntriesProcessed = (int) totalEntriesProcessed;

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

// trigger read more messages if necessary
if (triggerLookAhead.booleanValue()) {
// When all messages get filtered and no messages are sent, we should read more entries, "look ahead"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,9 +399,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
}
}

// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);

if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
Expand Down
Loading