From c08ff26294ecf3d3357662eeea04e9b422df1ba7 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 2 Dec 2025 22:02:26 +0800 Subject: [PATCH 1/4] [fix][load] Avoid get old loadData during split bundle --- .../impl/ModularLoadManagerImpl.java | 1 + .../impl/ModularLoadManagerImplTest.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 75c60e2687942..a919c0c319557 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -794,6 +794,7 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { } } + writeBrokerDataOnZooKeeper(true); updateBundleSplitMetrics(splitCount); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index ad07dbfa21758..d4f0ed72ea215 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData() assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange())); } + @Test + public void testRepeatSplitBundle() throws Exception { + final String cluster = "use"; + final String tenant = "my-tenant"; + final String namespace = "repeat-split-bundle"; + final String topicName = tenant + "/" + namespace + "/" + "topic"; + int bundleNumbers = 8; + + admin1.clusters().createCluster(cluster, ClusterData.builder() + .serviceUrl(pulsar1.getWebServiceAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers); + + LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); + LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData"); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build(); + + // create a lot of topic to fully distributed among bundles. + for (int i = 0; i < 10; i++) { + String topicNameI = topicName + i; + admin1.topics().createPartitionedTopic(topicNameI, 20); + // trigger bundle assignment + + pulsarClient.newConsumer().topic(topicNameI) + .subscriptionName("my-subscriber-name2").subscribe(); + } + + String topicToFindBundle = topicName + 0; + NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle)); + String bundleKey = realBundle.toString(); + log.info("bundle={}", bundleKey); + + NamespaceBundleStats stats = new NamespaceBundleStats(); + stats.msgRateIn = 100000.0; + localData.getLastStats().put(bundleKey, stats); + pulsar1.getBrokerService().updateRates(); + + primaryLoadManager.updateAll(); + + primaryLoadManager.updateAll(); + Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey)); + } + } From d582d6b0db6336b84206e908d25af9f52f12dfb9 Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 9 Dec 2025 19:13:46 +0800 Subject: [PATCH 2/4] Only skip split the bundles that do not exist in Metadata --- .../loadbalance/impl/ModularLoadManagerImpl.java | 14 +++++++++++++- .../impl/ModularLoadManagerImplTest.java | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index a919c0c319557..df0d2584c4278 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -368,6 +368,13 @@ public CompletableFuture> getAvailableBrokersAsync() { return future; } + public boolean checkBundleDataExistInMetadataStore(String bundle) { + Optional optBundleData = + pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join(); + + return optBundleData.isPresent(); + } + // Attempt to local the data for the given bundle in metadata store // If it cannot be found, return the default bundle data. @Override @@ -767,6 +774,12 @@ public void checkNamespaceBundleSplit() { continue; } + if (!checkBundleDataExistInMetadataStore(bundleName)) { + log.warn("Bundle {} has been removed on the metadata store, skip split this bundle ", + bundleName); + continue; + } + // Make sure the same bundle is not selected again. loadData.getBundleData().remove(bundleName); localData.getLastStats().remove(bundleName); @@ -794,7 +807,6 @@ && shouldAntiAffinityNamespaceUnload(namespaceName, bundleRange, broker)) { } } - writeBrokerDataOnZooKeeper(true); updateBundleSplitMetrics(splitCount); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index d4f0ed72ea215..4d2bca8b22e61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -1163,7 +1163,7 @@ public void testRepeatSplitBundle() throws Exception { String topicToFindBundle = topicName + 0; NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle)); String bundleKey = realBundle.toString(); - log.info("bundle={}", bundleKey); + log.info("Before bundle={}", bundleKey); NamespaceBundleStats stats = new NamespaceBundleStats(); stats.msgRateIn = 100000.0; From eeaaf730a47b2779050e7845dd08741d664c547b Mon Sep 17 00:00:00 2001 From: coderzc Date: Tue, 9 Dec 2025 21:09:26 +0800 Subject: [PATCH 3/4] Update bundle existence check to use NamespaceBundles instead of metadata store --- .../loadbalance/impl/ModularLoadManagerImpl.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index df0d2584c4278..5857331029054 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -67,6 +67,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -368,11 +369,9 @@ public CompletableFuture> getAvailableBrokersAsync() { return future; } - public boolean checkBundleDataExistInMetadataStore(String bundle) { - Optional optBundleData = - pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join(); - - return optBundleData.isPresent(); + public boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, String bundleRange) { + List bundles = namespaceBundles.getBundles(); + return bundles.stream().anyMatch(b -> b.getBundleRange().equals(bundleRange)); } // Attempt to local the data for the given bundle in metadata store @@ -774,8 +773,9 @@ public void checkNamespaceBundleSplit() { continue; } - if (!checkBundleDataExistInMetadataStore(bundleName)) { - log.warn("Bundle {} has been removed on the metadata store, skip split this bundle ", + NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName)); + if (!checkBundleDataExistInNamespaceBundles(bundles, bundleRange)) { + log.warn("Bundle {} has been removed, skip split this bundle ", bundleName); continue; } From ef616231886b32a8032669c5135fce944f88504f Mon Sep 17 00:00:00 2001 From: coderzc Date: Wed, 17 Dec 2025 11:21:06 +0800 Subject: [PATCH 4/4] Improve bundle validation and logging during split operations --- .../impl/ModularLoadManagerImpl.java | 24 +++++++++++-------- .../common/naming/NamespaceBundles.java | 2 +- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 5857331029054..ff4fc2d372c0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -369,9 +369,14 @@ public CompletableFuture> getAvailableBrokersAsync() { return future; } - public boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, String bundleRange) { - List bundles = namespaceBundles.getBundles(); - return bundles.stream().anyMatch(b -> b.getBundleRange().equals(bundleRange)); + private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, + NamespaceBundle bundleRange) { + try { + namespaceBundles.validateBundle(bundleRange); + return true; + } catch (IllegalArgumentException e) { + return false; + } } // Attempt to local the data for the given bundle in metadata store @@ -768,15 +773,14 @@ public void checkNamespaceBundleSplit() { try { final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName); - if (!namespaceBundleFactory - .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, bundleRange); + if (!namespaceBundleFactory.canSplitBundle(bundle)) { continue; } NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName)); - if (!checkBundleDataExistInNamespaceBundles(bundles, bundleRange)) { - log.warn("Bundle {} has been removed, skip split this bundle ", - bundleName); + if (!checkBundleDataExistInNamespaceBundles(bundles, bundle)) { + log.warn("Bundle {} has been removed, skip split this bundle ", bundleName); continue; } @@ -1126,7 +1130,7 @@ public void writeBrokerDataOnZooKeeper() { @Override public void writeBrokerDataOnZooKeeper(boolean force) { - lock.lock(); +// lock.lock(); try { updateLocalBrokerData(); @@ -1150,7 +1154,7 @@ public void writeBrokerDataOnZooKeeper(boolean force) { throw (ConcurrentModificationException) e; } } finally { - lock.unlock(); +// lock.unlock(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 27c73edc6b597..c298eb8aa365b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -106,7 +106,7 @@ public int size() { return bundles.size(); } - public void validateBundle(NamespaceBundle nsBundle) throws Exception { + public void validateBundle(NamespaceBundle nsBundle) throws IllegalArgumentException { int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint()); checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle); NamespaceBundle foundBundle = bundles.get(idx);