From 5610bc298b55d453154f1634e18aac9f2b7988b9 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Fri, 5 Dec 2025 17:49:15 +0800 Subject: [PATCH] fix process in updatePartition --- .../admin/impl/PersistentTopicsBase.java | 61 ++++++++++--------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 85387ccc26731..b3de089383be3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -418,37 +418,31 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean } else { checkFuture = CompletableFuture.completedFuture(null); } - return checkFuture.thenCompose(topics -> { - final CompletableFuture updateMetadataFuture = (expectPartitions == currentMetadataPartitions) - // current metadata partitions is equals to expect partitions - ? CompletableFuture.completedFuture(null) - // update current cluster topic metadata - : namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, m -> - new PartitionedTopicMetadata(expectPartitions, m.properties)); - return updateMetadataFuture + + return checkFuture // create missing partitions - .thenCompose(__ -> tryCreatePartitionsAsync(expectPartitions)) + .thenCompose(topics -> tryCreatePartitionsAsync(expectPartitions)) // because we should consider the compatibility. // Copy subscriptions from partition 0 instead of being created by the customer .thenCompose(__ -> admin.topics().getStatsAsync(topicName.getPartition(0).toString()) - .thenCompose(stats -> { - List> futures = stats.getSubscriptions().entrySet() - // We must not re-create non-durable subscriptions on the new partitions + .thenCompose(stats -> { + List> futures = + stats.getSubscriptions().entrySet() + // We must not re-create non-durable subscriptions on the new partitions .stream().filter(entry -> entry.getValue().isDurable()) .map(entry -> { final List> innerFutures = new ArrayList<>(expectPartitions); for (int i = 0; i < expectPartitions; i++) { innerFutures.add(admin.topics().createSubscriptionAsync( - topicName.getPartition(i).toString(), + topicName.getPartition(i).toString(), entry.getKey(), MessageId.earliest, entry.getValue().isReplicated(), entry.getValue().getSubscriptionProperties()) .exceptionally(ex -> { - Throwable rc = - FutureUtil.unwrapCompletionException(ex); + Throwable rc = FutureUtil + .unwrapCompletionException(ex); if (!(rc instanceof PulsarAdminException .ConflictException)) { log.warn("[{}] got an error while copying" @@ -463,15 +457,21 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean } return FutureUtil.waitForAll(innerFutures); }).collect(Collectors.toList()); - return FutureUtil.waitForAll(futures); - }) - ); - }).thenCompose(__ -> { - if (updateLocal || !topicName.isGlobal()) { - return CompletableFuture.completedFuture(null); - } - // update remote cluster - return getReplicationClusters() + return FutureUtil.waitForAll(futures); + }) + ).thenCompose(__ -> (expectPartitions == currentMetadataPartitions) + // current metadata partitions is equals to expect partitions + ? CompletableFuture.completedFuture(null) + // update current cluster topic metadata + : namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, m -> + new PartitionedTopicMetadata(expectPartitions, m.properties)) + ).thenCompose(__ -> { + if (updateLocal || !topicName.isGlobal()) { + return CompletableFuture.completedFuture(null); + } + // update remote cluster + return getReplicationClusters() .thenCompose(replicationClusters -> { if (replicationClusters == null || replicationClusters.isEmpty()) { return CompletableFuture.completedFuture(null); @@ -492,19 +492,20 @@ protected CompletableFuture internalCreateNonPartitionedTopicAsync(boolean List> futures = replicationClusters.stream() .map(replicationCluster -> admin.clusters().getClusterAsync(replicationCluster) .thenCompose(clusterData -> pulsarService.getBrokerService() - .getClusterPulsarAdmin(replicationCluster, Optional.of(clusterData)) + .getClusterPulsarAdmin(replicationCluster, + Optional.of(clusterData)) .topics().updatePartitionedTopicAsync(topicName.toString(), - expectPartitions, true, force) + expectPartitions, true, force) .exceptionally(ex -> { log.warn("[{}][{}] Update remote cluster partition fail.", topicName, replicationCluster, ex); - throw FutureUtil.wrapToCompletionException(ex); + throw FutureUtil.wrapToCompletionException(ex); }) ) ).collect(Collectors.toList()); - return FutureUtil.waitForAll(futures); + return FutureUtil.waitForAll(futures); }); - }); + }); }); }