-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Motivation
Future
Currently, I am deeply researching the delayed message module of Pulsar and want to discuss some things with everyone.
Flexible control of delayed messages
Before this, I researched both the community and offline sources. I found that the delayed message feature is very important for the business, and the business requirements demand flexible control over delayed messages. We know that for a message system, once a message is sent, it is immutable, but for delayed messages, the delay itself is actually variable.
Currently, Pulsar's delay message module does not have this variability, and to my knowledge, other messaging systems also do not have similar functionality. However, it is actually achievable.
- Initially, I tried the feature for canceling delayed messages.
Currently, we can indirectly implement the cancellation of delayed messages by acknowledging the messages.
See PR:#24370
- Then @thetumbled proposed that the business needs to achieve delayed messages on the consumer side, but there will be issues such as write amplification.
I also tried to implement it, see the PR for details:#24372
These two PRs are actually implemented in the same way. We only need ledgerId, entryId, and delayed timestamp to operate on delayed messages, but they all have a common problem.
If a delayed message in the LastMutableBucket has not been flushed to the Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, the messages will still be read from the MackDelete position onwards, and the Bucket will be rebuilt. This is why the data in the Bucket can be deleted as long as it is read (without requiring client Ack).
If we send a command to add or cancel a delayed message from the consumer side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the LastMutableBucket, and the Broker crashes, the command will be lost. We cannot wait until the Seal Bucket condition is triggered to return the delayed message command cancellation success, because we do not know how long it will take.
So we need to solve the problem that commands are lost.
- We need an operation command type for delayed messages
CANCEL(Delayed message cancellation)
DELAY(Delay message on the consumer side)
LOOP(Loop/Timed delay)
LOOP_EXPONENT(Loop exponent delay)
-
We need to store these commands, the ledgerid, entryid, and timestamp of the messages we are going to operate.
-
Most importantly, we don't need to store all the commands in memory. Referring to delayed messages, we can allow commands to be loaded into memory with delay, such as one to two ticktime windows earlier than the original message to be operated on. Thus, when a message acquisition trigger occurs and it is found that this message needs to be canceled or cyclically delayed, the corresponding processing can be done.
Consider using a dedicated topic to store these commands, these commands will be delayed in sending but will not trigger the current future. This mechanism is similar to a configurable dead-letter queue.
This should be the last enhancement considered.
One topic corresponds to one DelayedDeliveryTracker instance.
We know that each DelayedDeliveryTracker corresponds to a subscription. If the topic has a large number of subscriptions, each tracker will load the same information (including ledger ID, entry ID, and timestamp), which will consume a lot of memory.
If we implement the aforementioned future functionality, the number of topics storing delayed message operation commands will become very large.
So we need to transform DelayedDeliveryTracker into a topic-level one. In the tracker, we track and manage the "delay position" corresponding to each subscription.
This should be the second improvement to prioritize.
Bug
Currently, BucketDelayedDeliveryTracker has thread safety issues that need to be resolved.
And it is important that finer-grained concurrency control may be needed in BucketDelayedDeliveryTracker, similar to using StampedLock or ReentrantReadWriteLock.
For example, I found that the current getScheduledMessages should be a read operation, but it is actually a composite operation (read + write + consume), which leads to inconsistent concurrency control strategies and performance issues. This requires redesigning the architecture.
The currently used coarse-grained synchronized may lead to:
- Read operations being blocked: containsMessage(), nextDeliveryTime() cannot be executed concurrently
- Write operations being blocked: addMessage() is blocked by consumption operations
public synchronized NavigableSet<Position> getScheduledMessages(int maxMessages) {
// It looks like a read operation, but actually doing:
// 1. Modify queue status
sharedBucketPriorityQueue.pop(); // Remove elements from the queue
// 2. Modify index bitmap
removeIndexBit(ledgerId, entryId); // Clear bitmap
// 3. Modify counter
numberDelayedMessages.decrementAndGet(); // Reduce message count
// 4. Modify bucket status
snapshotSegmentLastIndexMap.remove(snapshotKey); // Remove snapshot mapping
// 5. Trigger asynchronous loading and state change
bucket.asyncLoadNextBucketSnapshotEntry()...
}This should be the highest priority.
Solution
No response
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!