Skip to content

Conversation

@nodece
Copy link
Member

@nodece nodece commented Dec 10, 2025

Motivation

When multiple topics share a global consumer dispatch limiter, the old implementation could overshoot the configured dispatch rate.

Previously, the dispatcher followed this sequence:

  1. Get the available tokens from the limiter
  2. Read entries from BookKeeper based on the available tokens
  3. Send messages to the client
  4. Deduct tokens only after dispatch completed

This design fails whenever a rate limiter is shared by multiple dispatchers. In Pulsar, a single topic or subscription may have multiple consumers, and dispatchers may use different levels of limiters (broker-level, topic-level, subscription-level). Because token deduction happened after dispatch, all dispatchers could read the same available tokens concurrently and assume the quota was exclusively theirs. Each dispatcher would then send messages independently and subtract tokens afterward, causing all of them to consume the same quota and overshoot the configured limit.

The most dangerous part is the first round of dispatch. During this initial cycle, multiple dispatchers may all read the same available token value and simultaneously push a large burst of traffic to clients. Although subsequent rounds may stabilize because the limiter starts enforcing the average rate correctly, the initial spike can exceed the intended limit by a large margin, creating unpredictable load and potential service impact.

To prevent overshoot, this patch changes the workflow so that tokens are consumed before dispatch. This ensures that all dispatchers competing for a shared limiter acquire tokens atomically, preserving accurate rate enforcement under concurrency.

Modifications

  • Refactored consumer dispatch to consume rate-limiting tokens before sending messages, ensuring that the total messages and bytes do not exceed available tokens.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 10, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR refactors the consumer dispatch rate limiting mechanism to prevent rate limit overshooting when multiple topics share a global limiter. Instead of consuming rate limit tokens in bulk after dispatching messages, the new implementation consumes tokens before dispatching each entry, stopping dispatch when tokens are exhausted. This ensures strict enforcement of dispatch rate limits and prevents burst traffic spikes.

Key Changes:

  • Rate limiting tokens are now consumed before message dispatch rather than after, using a new tryConsumeDispatchQuota method
  • Unprocessed entries due to rate limit throttling are now tracked and redelivered
  • Accumulation of total messages/bytes sent for bulk token consumption has been removed across all dispatcher implementations

Reviewed changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
AbstractBaseDispatcher.java Core logic change: consumes rate limit tokens per-entry before dispatch; tracks and redelivers throttled entries
DispatchRateLimiter.java Adds abstract tryConsumeDispatchQuota method to interface
DispatchRateLimiterClassicImpl.java Implements tryConsumeDispatchQuota using RateLimiter.tryAcquire() with return value
DispatchRateLimiterAsyncTokenBucketImpl.java Implements tryConsumeDispatchQuota using AsyncTokenBucket.consumeTokensAndCheckIfContainsTokens()
PersistentDispatcherSingleActiveConsumer.java Removes post-dispatch acquirePermitsForDeliveredMessages call
PersistentStickyKeyDispatcherMultipleConsumers.java Removes post-dispatch token consumption and related variable tracking
PersistentStickyKeyDispatcherMultipleConsumersClassic.java Removes post-dispatch token consumption
PersistentDispatcherMultipleConsumers.java Removes post-dispatch token consumption, variable tracking, and unused filterEntriesForConsumer return values
PersistentDispatcherMultipleConsumersClassic.java Removes post-dispatch token consumption, variable tracking, and unused filterEntriesForConsumer return values

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +82 to +83
* @param numberOfMessages
* @param byteSize
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing the @return tag to document the return value. The method returns a boolean indicating whether the quota was successfully acquired.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
* @param numberOfMessages
* @param byteSize
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise

Copilot uses AI. Check for mistakes.
public void consumeDispatchQuota(long numberOfMessages, long byteSize) {
tryConsumeDispatchQuota(numberOfMessages, byteSize);
}

Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing a description and the @return tag. The method should document its return value.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
/**
* Tries to acquire message and byte permits from the rate-limiter and returns whether the acquisition succeeded.
*
* @param numberOfMessages the number of message permits to acquire
* @param byteSize the number of byte permits to acquire
* @return true if permits were successfully acquired, false otherwise
*/

Copilot uses AI. Check for mistakes.
Comment on lines +90 to +91
* @param numberOfMessages
* @param byteSize
Copy link

Copilot AI Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is missing the @return tag to document the return value.

Suggested addition:

/**
 * It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
 *
 * @param numberOfMessages the number of messages to acquire permits for
 * @param byteSize the number of bytes to acquire permits for
 * @return true if permits were successfully acquired, false otherwise
 */
Suggested change
* @param numberOfMessages
* @param byteSize
* @param numberOfMessages the number of messages to acquire permits for
* @param byteSize the number of bytes to acquire permits for
* @return true if permits were successfully acquired, false otherwise

Copilot uses AI. Check for mistakes.
@lhotari
Copy link
Member

lhotari commented Dec 10, 2025

When multiple topics share a global consumer dispatch limiter, the old implementation could overshoot the rate limit.

This is only temporarily. The average will remain consistent with the new AsyncTokenBucket based rate limiters.

For older versions, such as 3.0.x, what you explain applies.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to strictly enforce the dispatch rate to avoid bursts and maintain predictable throughput.

I'm curious to know which Pulsar version has this problem.

This PR shouldn't be merged without first going through the PIP process.

@lhotari
Copy link
Member

lhotari commented Dec 10, 2025

Refactored consumer dispatch to consume rate-limiting tokens before sending messages, ensuring that the total messages and bytes do not exceed available tokens.

In the AsyncTokenBucket implementation, it's not a problem if the tokens are exceeded in a point in time since the algorithm will smooth out the average.

One possible improvement to the existing solution could be to acquire tokens based on some estimate before reading the entries and adjust the tokens later, after sending (like it's currently performed).

@nodece
Copy link
Member Author

nodece commented Dec 11, 2025

@lhotari I believe this issue exists in all versions. The AsyncTokenBucket implementation itself is correct, but the dispatcher was not using the limiter correctly.

One possible improvement to the existing solution could be to acquire tokens based on some estimate before reading the entries and adjust the tokens later, after sending (like it's currently performed).

This approach is workable but not ideal. It still relies on estimating tokens before dispatching, which can lead to inaccurate enforcement when multiple dispatchers share the same limiter. Instead, we can iterate through the entries after they are read and check the rate limit per entry, consuming tokens before dispatching each message. This avoids relying on estimates and prevents overshoot under concurrency.

The PR description has been updated accordingly.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 9 out of 9 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +352 to +353
private boolean tryAcquirePermitsForDeliveredMessages(
Topic topic, ManagedCursor cursor, long totalMessagesSent, long totalBytesSent) {
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous parameter, totalBytesSent is misleading since this method is now called per-entry rather than once after processing all entries. Consider renaming to byteCount or numBytes to better reflect that it represents the count for a single entry, not a cumulative total.

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +93
/**
* It tries to acquire msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
* @param numberOfMessages
* @param byteSize
*/
public abstract boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize);
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Javadoc is incomplete and missing parameter descriptions. The @param tags should have descriptions for what numberOfMessages and byteSize represent.

Copilot uses AI. Check for mistakes.
Comment on lines +86 to +96
public boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize) {
boolean res = true;
RateLimiter localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
res &= localDispatchRateLimiterOnMessage.tryAcquire(numberOfMessages);
}
RateLimiter localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
localDispatchRateLimiterOnByte.tryAcquire(byteSize);
res &= localDispatchRateLimiterOnByte.tryAcquire(byteSize);
}
return res;
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method consumes tokens from multiple limiters sequentially, which means if the message limiter succeeds but the byte limiter fails, tokens have already been consumed from the message limiter. This creates a conservative behavior where some quota is consumed without dispatching messages. Consider documenting this behavior in the method's Javadoc, or alternatively, check all limiters' availability before consuming any tokens to avoid unnecessary quota consumption.

Copilot uses AI. Check for mistakes.
Comment on lines +80 to 91
public boolean tryConsumeDispatchQuota(long numberOfMessages, long byteSize) {
boolean res = true;
AsyncTokenBucket localDispatchRateLimiterOnMessage = dispatchRateLimiterOnMessage;
if (numberOfMessages > 0 && localDispatchRateLimiterOnMessage != null) {
localDispatchRateLimiterOnMessage.consumeTokens(numberOfMessages);
res &= localDispatchRateLimiterOnMessage.consumeTokensAndCheckIfContainsTokens(numberOfMessages);
}
AsyncTokenBucket localDispatchRateLimiterOnByte = dispatchRateLimiterOnByte;
if (byteSize > 0 && localDispatchRateLimiterOnByte != null) {
localDispatchRateLimiterOnByte.consumeTokens(byteSize);
res &= localDispatchRateLimiterOnByte.consumeTokensAndCheckIfContainsTokens(byteSize);
}
return res;
}
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method consumes tokens from multiple limiters sequentially, which means if the message limiter succeeds but the byte limiter fails, tokens have already been consumed from the message limiter. This creates a conservative behavior where some quota is consumed without dispatching messages. Consider documenting this behavior in the method's Javadoc, or alternatively, check all limiters' availability before consuming any tokens to avoid unnecessary quota consumption.

Copilot uses AI. Check for mistakes.
Comment on lines +352 to +353
private boolean tryAcquirePermitsForDeliveredMessages(
Topic topic, ManagedCursor cursor, long totalMessagesSent, long totalBytesSent) {
Copy link

Copilot AI Dec 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter name totalMessagesSent is misleading since this method is now called per-entry rather than once after processing all entries. Consider renaming to messageCount or numMessages to better reflect that it represents the count for a single entry, not a cumulative total.

Copilot uses AI. Check for mistakes.
@lhotari
Copy link
Member

lhotari commented Dec 11, 2025

@lhotari I believe this issue exists in all versions. The AsyncTokenBucket implementation itself is correct, but the dispatcher was not using the limiter correctly.

Which version did you observe the behavior in?

After PIP-322 there were a few issues and the last one was fixed in #24012.
Please validate that the problem of imbalance really exists in practice. The rate limiter will smoothen the traffic and possible imbalance is not noticeable in most cases. Is your experience different?

In Pulsar, since there isn't really a scheduler that all dispatchers would get a "fair share" of a shared rate limiter, it's possible that it causes imbalance. However that problem wouldn't be solved with the solution in this PR in any case.

One possible improvement to the existing solution could be to acquire tokens based on some estimate before reading the entries and adjust the tokens later, after sending (like it's currently performed).

This approach is workable but not ideal. It still relies on estimating tokens before dispatching, which can lead to inaccurate enforcement when multiple dispatchers share the same limiter. Instead, we can iterate through the entries after they are read and check the rate limit per entry, consuming tokens before dispatching each message. This avoids relying on estimates and prevents overshoot under concurrency.

It's part of the design of rate limiters that it can go over the limit temporarily. The AsyncTokenBucket implementation guarantees that the average rate is limited to the desired rate.

Dropping entries just before sending them to a consumer is not a great idea for shared subscriptions. It's not a great idea for other subscription types either, since the entries have been unnecessarily read from BookKeeper. The reason to use rate limiters is usually because of capacity management reasons. If the ratelimiter adds performance overhead to the system, it would have negative effects.

Therefore, I think that consuming tokens based on an estimate and making an adjustment later would be the correct path forward for improving the accuracy in cases where there's a "thundering herd" type of load increase where a solution for the described problem could possibly matter. If there's a need for improving the accuracy, the "fair sharing" problem should also be addressed in the solution, since currently there's nothing that would ensure that each dispatcher gets about the same "share" of the total limit in shared limiter. This problem has been solved for publish rate limiters with a queue.

@nodece
Copy link
Member Author

nodece commented Dec 11, 2025

I'm using a forked 3.0.x version, but this behavior is not tied to any specific branch. The issue is not related to the limiter algorithm itself. The problem occurs in the dispatcher, which reads entries from BookKeeper and sends them to consumers.

When multiple topics share a global dispatch limiter, such as brokerDispatchRateLimiter, each dispatcher can observe the same available token value concurrently and proceed in parallel. This causes deterministic overshoot because multiple dispatchers effectively consume the same tokens. This differs from the normal temporary smoothing overshoot allowed by AsyncTokenBucket. The root cause is simply how dispatchers interact with the shared limiter.

I agree that competing for a shared limiter is inherently first-come, first-served, and this PR does not try to change that. I also agree that AsyncTokenBucket allows temporary overshoot while guaranteeing that the average rate is limited.

Regarding dropping entries before sending to consumers, if entries are not dropped, they will still be delivered to the client, which would effectively bypass the limiter. In that case, the limiter becomes meaningless.

I understand that rate limiters are primarily for capacity management and that enforcing them may introduce some performance overhead. However, since a limiter is explicitly configured, the broker should enforce the dispatch rate, even if this temporarily blocks or delays some dispatches. My approach is similar in spirit to the suggested pre-estimate and adjust method, but instead of estimating, it consumes tokens before dispatch. This ensures that concurrent dispatchers cannot overshoot the configured limit while preserving the intended behavior of the rate limiter.

@lhotari
Copy link
Member

lhotari commented Dec 11, 2025

I'm using a forked 3.0.x version, but this behavior is not tied to any specific branch. The issue is not related to the limiter algorithm itself. The problem occurs in the dispatcher, which reads entries from BookKeeper and sends them to consumers.

When multiple topics share a global dispatch limiter, such as brokerDispatchRateLimiter, each dispatcher can observe the same available token value concurrently and proceed in parallel. This causes deterministic overshoot because multiple dispatchers effectively consume the same tokens. This differs from the normal temporary smoothing overshoot allowed by AsyncTokenBucket. The root cause is simply how dispatchers interact with the shared limiter.

Yes, this is a weakness in the current solution. However, it would be useful to share what the current impact of temporarily going over the limit. It seems that also in 3.0.x, the Dispatch rate limiter implementation will smoothen out the acquired tokens over time, so the behavior is similar as with PIP-322 changes.

Regarding dropping entries before sending to consumers, if entries are not dropped, they will still be delivered to the client, which would effectively bypass the limiter. In that case, the limiter becomes meaningless.

I understand that rate limiters are primarily for capacity management and that enforcing them may introduce some performance overhead. However, since a limiter is explicitly configured, the broker should enforce the dispatch rate, even if this temporarily blocks or delays some dispatches. My approach is similar in spirit to the suggested pre-estimate and adjust method, but instead of estimating, it consumes tokens before dispatch. This ensures that concurrent dispatchers cannot overshoot the configured limit while preserving the intended behavior of the rate limiter.

The optimal solution would be to acquire tokens based on what consumers would be available to read and do an adjustment after sending the entries. In the adjustment phase, it would be possible to consume or return tokens. This would address the concerns that I have about performance impacts when entries are read and then dropped before sending out to consumers. Just wondering if you are interested in adjusting the solution to this direction?

@nodece
Copy link
Member Author

nodece commented Dec 12, 2025

The optimal solution would be to acquire tokens based on what consumers would be available to read and do an adjustment after sending the entries. In the adjustment phase, it would be possible to consume or return tokens.

The previous implementation already followed your approach:
https://github.com/apache/pulsar/blob/v4.1.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L228-L231

This PR changes that behavior by consuming tokens before sending entries, so that multiple dispatchers cannot use the same quota concurrently.

This would address the concerns that I have about performance impacts when entries are read and then dropped before sending out to consumers.

Some performance impact is expected once a dispatch rate limit is configured. Similar to the producer rate limiter, the broker may need to pause processing message. From the broker's perspective, various settings such as read batch sizes can mask the overshoot, so Prometheus may still show values that appear aligned with the configured rate. However, this alignment is influenced by several layers of constraints, not strictly by the limiter.
My goal is to ensure the code path itself behaves rigorously so that the limiter remains accurate even under concurrency.

@nodece nodece requested a review from lhotari December 12, 2025 04:14
@lhotari
Copy link
Member

lhotari commented Dec 12, 2025

The previous implementation already followed your approach: https://github.com/apache/pulsar/blob/v4.1.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L228-L231

This is not what I mean. The permits should be consumed from rate limiters before retrieving the entries. After sending, the difference should be adjusted to the actual.

This PR changes that behavior by consuming tokens before sending entries, so that multiple dispatchers cannot use the same quota concurrently.

This is not optimal due to extra work added to the system.

This would address the concerns that I have about performance impacts when entries are read and then dropped before sending out to consumers.

Some performance impact is expected once a dispatch rate limit is configured. Similar to the producer rate limiter, the broker may need to pause processing message. From the broker's perspective, various settings such as read batch sizes can mask the overshoot, so Prometheus may still show values that appear aligned with the configured rate. However, this alignment is influenced by several layers of constraints, not strictly by the limiter. My goal is to ensure the code path itself behaves rigorously so that the limiter remains accurate even under concurrency.

With the performance impact, I mean that this solution would require the system to do more work since it performs unnecessary work in the case of exceeding the rate limit. That's something that is not optimal at all. It's possible to eliminate the extra work by acquiring the tokens before retrieving of the entries is made and adjusting the difference later.

Adding extra work to an already highly loaded system often leads to cascading failures. That's why it should be avoided at all cost.

@lhotari
Copy link
Member

lhotari commented Dec 12, 2025

The most dangerous part is the first round of dispatch. During this initial cycle, multiple dispatchers may all read the same available token value and simultaneously push a large burst of traffic to clients. Although subsequent rounds may stabilize because the limiter starts enforcing the average rate correctly, the initial spike can exceed the intended limit by a large margin, creating unpredictable load and potential service impact.

Dispatchers have a slow start. They start by dispatching one entry (dispatcherMinReadBatchSize=1) at a time. Does the problem really exist that you are describing? What kind of impact is really observed?

@nodece
Copy link
Member Author

nodece commented Dec 12, 2025

I understand that pre-estimating tokens and adjusting afterwards is an ideal model to reduce extra work. However, the issue addressed here is a concurrency problem: multiple dispatchers sharing a global limiter can observe the same available tokens and cause deterministic overshoot. This PR consumes tokens before sending entries, preventing multiple dispatchers from using the same quota concurrently. While this may introduce minor delays, enforcing the configured rate is necessary, and this approach ensures correctness under concurrency. Pre-estimating tokens could be explored as a future optimization.

Dispatchers have a slow start. They start by dispatching one entry (dispatcherMinReadBatchSize=1) at a time.

Regarding the dispatch batch size: it should be dispatcherMaxReadBatchSize = 100, not dispatcherMinReadBatchSize = 1.

Does the problem really exist that you are describing? What kind of impact is really observed?

I am using the ResourceGroup limiter, which is a global limiter. When multiple topics share this limiter, deterministic overshoot is observable. The traffic may return to normal in the next second, but I don't want to cause too much fluctuation.

I will discuss with my team further, as there is indeed a trade-off between strict rate enforcement and performance.

@lhotari
Copy link
Member

lhotari commented Dec 12, 2025

I understand that pre-estimating tokens and adjusting afterwards is an ideal model to reduce extra work. However, the issue addressed here is a concurrency problem: multiple dispatchers sharing a global limiter can observe the same available tokens and cause deterministic overshoot.

The concurrency problem would be addressed if the tokens are acquired before retrieving the entries and adjusted later.

In the current implementation, it would also be possible to change the current token acquisition so that tokens are consumed immediately after starting to dispatch the entries to a consumer and not after the operation completes.
https://github.com/apache/pulsar/blob/v4.1.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java#L228-L231
It's currently possible that entries get queued up in the broker side buffer for a consumer. The issue is #24926.

This PR consumes tokens before sending entries, preventing multiple dispatchers from using the same quota concurrently. While this may introduce minor delays, enforcing the configured rate is necessary, and this approach ensures correctness under concurrency. Pre-estimating tokens could be explored as a future optimization.

I disagree on this approach since it would add more overhead to the system. It would be a breaking change to modify the default dispatch rate limiting in the way that you are suggesting since it adds overheads. A system with a certain capacity would have less capacity with this change.

I am using the ResourceGroup limiter, which is a global limiter. When multiple topics share this limiter, deterministic overshoot is observable. The traffic may return to normal in the next second, but I don't want to cause too much fluctuation.

I'd assume that ResourceGroup limiter (PIP-82) isn't able to react to sudden changes in any case when limiters are across multiple brokers.

@nodece nodece marked this pull request as draft December 15, 2025 02:53
@nodece nodece closed this Dec 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants