diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 75c60e2687942..ff4fc2d372c0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -67,6 +67,7 @@ import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; +import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.ServiceUnitId; import org.apache.pulsar.common.policies.data.ResourceQuota; @@ -368,6 +369,16 @@ public CompletableFuture> getAvailableBrokersAsync() { return future; } + private boolean checkBundleDataExistInNamespaceBundles(NamespaceBundles namespaceBundles, + NamespaceBundle bundleRange) { + try { + namespaceBundles.validateBundle(bundleRange); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + // Attempt to local the data for the given bundle in metadata store // If it cannot be found, return the default bundle data. @Override @@ -762,8 +773,14 @@ public void checkNamespaceBundleSplit() { try { final String namespaceName = LoadManagerShared.getNamespaceNameFromBundleName(bundleName); final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundleName); - if (!namespaceBundleFactory - .canSplitBundle(namespaceBundleFactory.getBundle(namespaceName, bundleRange))) { + NamespaceBundle bundle = namespaceBundleFactory.getBundle(namespaceName, bundleRange); + if (!namespaceBundleFactory.canSplitBundle(bundle)) { + continue; + } + + NamespaceBundles bundles = namespaceBundleFactory.getBundles(NamespaceName.get(namespaceName)); + if (!checkBundleDataExistInNamespaceBundles(bundles, bundle)) { + log.warn("Bundle {} has been removed, skip split this bundle ", bundleName); continue; } @@ -1113,7 +1130,7 @@ public void writeBrokerDataOnZooKeeper() { @Override public void writeBrokerDataOnZooKeeper(boolean force) { - lock.lock(); +// lock.lock(); try { updateLocalBrokerData(); @@ -1137,7 +1154,7 @@ public void writeBrokerDataOnZooKeeper(boolean force) { throw (ConcurrentModificationException) e; } } finally { - lock.unlock(); +// lock.unlock(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java index 27c73edc6b597..c298eb8aa365b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundles.java @@ -106,7 +106,7 @@ public int size() { return bundles.size(); } - public void validateBundle(NamespaceBundle nsBundle) throws Exception { + public void validateBundle(NamespaceBundle nsBundle) throws IllegalArgumentException { int idx = Arrays.binarySearch(partitions, nsBundle.getLowerEndpoint()); checkArgument(idx >= 0, "Cannot find bundle %s in the bundles list", nsBundle); NamespaceBundle foundBundle = bundles.get(idx); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java index ad07dbfa21758..4d2bca8b22e61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImplTest.java @@ -1130,4 +1130,50 @@ public void testRemoveNonExistBundleData() assertFalse(bundlesAfterSplit.contains(bundleWillBeSplit.getBundleRange())); } + @Test + public void testRepeatSplitBundle() throws Exception { + final String cluster = "use"; + final String tenant = "my-tenant"; + final String namespace = "repeat-split-bundle"; + final String topicName = tenant + "/" + namespace + "/" + "topic"; + int bundleNumbers = 8; + + admin1.clusters().createCluster(cluster, ClusterData.builder() + .serviceUrl(pulsar1.getWebServiceAddress()).build()); + admin1.tenants().createTenant(tenant, + new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet(cluster))); + admin1.namespaces().createNamespace(tenant + "/" + namespace, bundleNumbers); + + LoadData loadData = (LoadData) getField(primaryLoadManager, "loadData"); + LocalBrokerData localData = (LocalBrokerData) getField(primaryLoadManager, "localData"); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getBrokerServiceUrl()).build(); + + // create a lot of topic to fully distributed among bundles. + for (int i = 0; i < 10; i++) { + String topicNameI = topicName + i; + admin1.topics().createPartitionedTopic(topicNameI, 20); + // trigger bundle assignment + + pulsarClient.newConsumer().topic(topicNameI) + .subscriptionName("my-subscriber-name2").subscribe(); + } + + String topicToFindBundle = topicName + 0; + NamespaceBundle realBundle = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle)); + String bundleKey = realBundle.toString(); + log.info("Before bundle={}", bundleKey); + + NamespaceBundleStats stats = new NamespaceBundleStats(); + stats.msgRateIn = 100000.0; + localData.getLastStats().put(bundleKey, stats); + pulsar1.getBrokerService().updateRates(); + + primaryLoadManager.updateAll(); + + primaryLoadManager.updateAll(); + Assert.assertFalse(loadData.getBundleData().containsKey(bundleKey)); + } + }