Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Dec 12, 2025

PIP-442

Motivation

See PIP-442.

This is part 2 for the PIP-442 implementation. This covers the topic list watcher operations.
The first part of PIP-442 implementation was #24833.

Modifications

  • use memory limits in TopicListService / PulsarCommandSenderImpl so that topic list watcher related operations are covered

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.2.0 milestone Dec 12, 2025
@lhotari lhotari self-assigned this Dec 12, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Dec 12, 2025
@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 71.96970% with 37 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.40%. Comparing base (c8dbc3c) to head (56ee92c).

Files with missing lines Patch % Lines
...apache/pulsar/broker/service/TopicListService.java 74.60% 17 Missing and 15 partials ⚠️
...pulsar/broker/service/PulsarCommandSenderImpl.java 33.33% 0 Missing and 2 partials ⚠️
...r/common/semaphore/AsyncDualMemoryLimiterImpl.java 0.00% 2 Missing ⚠️
...he/pulsar/common/semaphore/AsyncSemaphoreImpl.java 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #25070      +/-   ##
============================================
- Coverage     74.40%   74.40%   -0.01%     
+ Complexity    34204    34196       -8     
============================================
  Files          1921     1921              
  Lines        150526   150614      +88     
  Branches      17495    17504       +9     
============================================
+ Hits         112002   112064      +62     
+ Misses        29645    29615      -30     
- Partials       8879     8935      +56     
Flag Coverage Δ
inttests 26.31% <1.51%> (-0.24%) ⬇️
systests 22.82% <1.51%> (+<0.01%) ⬆️
unittests 73.94% <71.96%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...he/pulsar/common/semaphore/AsyncSemaphoreImpl.java 91.21% <0.00%> (-0.63%) ⬇️
...pulsar/broker/service/PulsarCommandSenderImpl.java 85.71% <33.33%> (-1.34%) ⬇️
...r/common/semaphore/AsyncDualMemoryLimiterImpl.java 91.11% <0.00%> (-4.24%) ⬇️
...apache/pulsar/broker/service/TopicListService.java 73.89% <74.60%> (+8.93%) ⬆️

... and 94 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@Denovo1998 Denovo1998 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments.

AsyncDualMemoryLimiterImpl limiter =
pulsar.getBrokerService().getMaxTopicListInFlightLimiter();
assertThat(limiter.getLimiter(AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY).getAvailablePermits())
.isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 1024 * 1024);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If getMaxTopicListInFlightHeapMemSizeMB() or getMaxTopicListInFlightDirectMemSizeMB() returns a value of 2048 or more, the expression * 1024 * 1024 will overflow a 32-bit integer. To prevent this, use 1024L to ensure the computation is performed with long integers.

-        .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 1024 * 1024);
+        .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightHeapMemSizeMB() * 1024L * 1024);
-        .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 1024 * 1024);
+        .isEqualTo(pulsar.getConfiguration().getMaxTopicListInFlightDirectMemSizeMB() * 1024L * 1024);

Comment on lines +461 to +462
log.info("[{}] Cannot acquire direct memory tokens for sending {}. Retry {} in {} ms. {}", connection,
operationName, retryCount.get(), retryDelay, t.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The retryCount variable is created but never incremented.

private void sendTopicListSuccessWithPermitAcquiringRetries(long watcherId, long requestId, List<String> topicList,
String hash,
Runnable successfulCompletionCallback) {
performOperationWithPermitAcquiringRetries(watcherId, "topic list success", permitAcquireErrorHandler ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an earlier update A is removed from the semaphore queue due to timeout/queuefull and retried, and a later update B gets the permits first and sends successfully, it is possible for B to arrive first and A to arrive later.

For the pattern consumer, out-of-order updates may lead to inconsistent final subscription sets (typical example: A=Create topic2, B=Delete topic2; if B arrives first, the client may mistakenly retain topic2).

Perhaps we need to maintain a "send chain" or queue for each watcher, ensuring that only one update/success is trying to acquire + write at any moment; if it fails, retry this "head of the queue," and subsequent updates cannot bypass it?

private final Executor executor;
private volatile boolean closed = false;
private boolean sendTopicListSuccessCompleted = false;
private BlockingDeque<Runnable> sendTopicListUpdateTasksBeforeInit = new LinkedBlockingDeque<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No capacity limit. Will there be any issues here? When a watcher has registered a listener (to "not miss events"), but sendWatchTopicListSuccess cannot be obtained for a long time due to pressure on direct permits (or repeatedly times out and retries), all updates will enter this queue.

Additionally, since both sendTopicListUpdate(...) and sendTopicListSuccessCompleted() are synchronized here, maybe the concurrency capability of LinkedBlockingDeque is redundant?

@lhotari lhotari requested a review from Copilot December 15, 2025 17:40
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements memory limits for topic list watcher operations as part of PIP-442. It introduces backpressure mechanisms to prevent broker memory exhaustion when handling concurrent pattern consumer topic list requests by applying memory limiting to topic list watcher creation and update operations.

Key Changes

  • Added memory limit acquisition with retry logic for topic list watcher initialization and updates
  • Modified topic list watcher to queue updates during initialization to avoid race conditions
  • Enhanced test coverage for memory limiting scenarios and concurrent consumer patterns

Reviewed changes

Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncSemaphoreImpl.java Added toString method to SemaphorePermit for debugging
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImpl.java Added toString method to DualMemoryLimiterPermit for debugging
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PatternConsumerTopicWatcherBackPressureMultipleConsumersTest.java New test validating backpressure with 100 concurrent pattern consumers and 300 topics
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java Updated test setup to use direct executor and verify watcher initialization completion
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java Added comprehensive tests for permit acquisition retries and memory limiting scenarios
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java Implemented memory limiting with retry backoff for watcher operations and queued updates during initialization
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java Modified methods to acquire direct memory permits before sending topic list messages
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java Updated interface to return CompletableFuture and accept error handler for permit acquisition

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.


/***
* @param topics topic names which are matching, the topic name contains the partition suffix.
* @return
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @return tag in the Javadoc comment is empty and does not describe what the method returns. It should describe that the method returns a CompletableFuture that completes when the operation finishes.

Suggested change
* @return
* @return a CompletableFuture&lt;Void&gt; that completes when the operation finishes

Copilot uses AI. Check for mistakes.

/***
* {@inheritDoc}
* @return
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The @return tag in the Javadoc comment is empty and does not describe what the method returns. It should describe that the method returns a CompletableFuture that completes when the operation finishes.

Suggested change
* @return
* @return a CompletableFuture that completes when the watch topic list update operation finishes

Copilot uses AI. Check for mistakes.
String hash = TopicList.calculateHash(topics);
topicListFuture.complete(topics);
// wait for acquisition to timeout a few times
Thread.sleep(2000);
Copy link

Copilot AI Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using Thread.sleep in tests is a code smell that makes tests brittle and slower. Consider using Awaitility or similar mechanisms to wait for specific conditions instead of arbitrary sleep durations.

Suggested change
Thread.sleep(2000);
Awaitility.await().atMost(Duration.ofSeconds(2))
.untilAsserted(() -> Assert.assertEquals(0, lookupSemaphore.availablePermits()));

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs ready-to-test

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants