Skip to content

Commit a95c175

Browse files
committed
[improve][broker] Remove enableReplicatedSubscriptions config
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
1 parent 5a59ab7 commit a95c175

File tree

14 files changed

+5
-143
lines changed

14 files changed

+5
-143
lines changed

conf/broker.conf

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -661,9 +661,6 @@ delayedDeliveryMaxDelayInMillis=0
661661
# Whether to enable acknowledge of batch local index.
662662
acknowledgmentAtBatchIndexLevelEnabled=false
663663

664-
# Enable tracking of replicated subscriptions state across clusters.
665-
enableReplicatedSubscriptions=true
666-
667664
# Frequency of snapshots for replicated subscriptions tracking.
668665
replicatedSubscriptionsSnapshotFrequencyMillis=1000
669666

deployment/terraform-ansible/templates/broker.conf

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,9 +390,6 @@ delayedDeliveryTickTimeMillis=1000
390390
# Whether to enable acknowledge of batch local index.
391391
acknowledgmentAtBatchIndexLevelEnabled=false
392392

393-
# Enable tracking of replicated subscriptions state across clusters.
394-
enableReplicatedSubscriptions=true
395-
396393
# Frequency of snapshots for replicated subscriptions tracking.
397394
replicatedSubscriptionsSnapshotFrequencyMillis=1000
398395

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1426,11 +1426,6 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
14261426
maxValue = Integer.MAX_VALUE - Commands.MESSAGE_SIZE_FRAME_PADDING)
14271427
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
14281428

1429-
@FieldContext(
1430-
category = CATEGORY_SERVER,
1431-
doc = "Enable tracking of replicated subscriptions state across clusters.")
1432-
private boolean enableReplicatedSubscriptions = true;
1433-
14341429
@FieldContext(
14351430
category = CATEGORY_SERVER,
14361431
doc = "Frequency of snapshots for replicated subscriptions tracking.")

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,6 @@ protected CompletableFuture<Void> prepareCreateProducer() {
8282
@Override
8383
protected boolean replicateEntries(List<Entry> entries) {
8484
boolean atLeastOneMessageSentForReplication = false;
85-
boolean isEnableReplicatedSubscriptions =
86-
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
87-
8885
try {
8986
// This flag is set to true when we skip at least one local message,
9087
// in order to skip remaining local messages.
@@ -128,9 +125,7 @@ protected boolean replicateEntries(List<Entry> entries) {
128125
}
129126
}
130127

131-
if (isEnableReplicatedSubscriptions) {
132-
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
133-
}
128+
checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload);
134129

135130
if (msg.isReplicated()) {
136131
// Discard messages that were already replicated into this region

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public boolean isReplicated() {
206206
public boolean setReplicated(boolean replicated) {
207207
replicatedControlled = replicated;
208208

209-
if (!replicated || !config.isEnableReplicatedSubscriptions()) {
209+
if (!replicated) {
210210
this.replicatedSubscriptionSnapshotCache = null;
211211
} else if (this.replicatedSubscriptionSnapshotCache == null) {
212212
this.replicatedSubscriptionSnapshotCache = new ReplicatedSubscriptionSnapshotCache(subName,
@@ -215,12 +215,7 @@ public boolean setReplicated(boolean replicated) {
215215

216216
if (this.cursor != null) {
217217
if (replicated) {
218-
if (!config.isEnableReplicatedSubscriptions()) {
219-
log.warn("[{}][{}] Failed set replicated subscription status to {}, please enable the "
220-
+ "configuration enableReplicatedSubscriptions", topicName, subName, replicated);
221-
} else {
222-
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
223-
}
218+
return this.cursor.putProperty(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
224219
} else {
225220
return this.cursor.removeProperty(REPLICATED_SUBSCRIPTION_PROPERTY);
226221
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -925,12 +925,6 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
925925

926926
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
927927
Boolean replicatedSubscriptionState = replicatedSubscriptionStateArg;
928-
if (replicatedSubscriptionState != null && replicatedSubscriptionState
929-
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
930-
log.warn("[{}] Replicated Subscription is disabled by broker.", getName());
931-
replicatedSubscriptionState = false;
932-
}
933-
934928
if (subType == SubType.Key_Shared
935929
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
936930
return FutureUtil.failedFuture(
@@ -4066,16 +4060,13 @@ public synchronized void checkReplicatedSubscriptionControllerState() {
40664060

40674061
private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
40684062
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
4069-
boolean isEnableReplicatedSubscriptions =
4070-
brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
40714063
boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1;
40724064

4073-
if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) {
4065+
if (shouldBeEnabled && !isCurrentlyEnabled && replicationEnabled) {
40744066
log.info("[{}] Enabling replicated subscriptions controller", topic);
40754067
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
40764068
brokerService.pulsar().getConfiguration().getClusterName()));
4077-
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !isEnableReplicatedSubscriptions
4078-
|| !replicationEnabled)) {
4069+
} else if (isCurrentlyEnabled && (!shouldBeEnabled || !replicationEnabled)) {
40794070
log.info("[{}] Disabled replicated subscriptions controller", topic);
40804071
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
40814072
replicatedSubscriptionsController = Optional.empty();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CanReconnectZKClientPulsarServiceBaseTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,6 @@ private void setConfigDefaults(ServiceConfiguration config, String clusterName,
193193
config.setBacklogQuotaCheckIntervalInSeconds(5);
194194
config.setDefaultNumberOfNamespaceBundles(1);
195195
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
196-
config.setEnableReplicatedSubscriptions(true);
197196
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
198197
config.setTlsTrustCertsFilePath(caCertPath);
199198
config.setTlsCertificateFilePath(brokerCertPath);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/GeoReplicationWithConfigurationSyncTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
179179
config.setBacklogQuotaCheckIntervalInSeconds(5);
180180
config.setDefaultNumberOfNamespaceBundles(1);
181181
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
182-
config.setEnableReplicatedSubscriptions(true);
183182
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
184183
config.setLoadBalancerSheddingEnabled(false);
185184
config.setTlsTrustCertsFilePath(caCertPath);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NetworkErrorTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
177177
config.setBacklogQuotaCheckIntervalInSeconds(5);
178178
config.setDefaultNumberOfNamespaceBundles(1);
179179
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
180-
config.setEnableReplicatedSubscriptions(true);
181180
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
182181
config.setLoadBalancerSheddingEnabled(false);
183182
config.setForceDeleteNamespaceAllowed(true);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ protected void setConfigDefaults(ServiceConfiguration config, String clusterName
266266
config.setBacklogQuotaCheckIntervalInSeconds(5);
267267
config.setDefaultNumberOfNamespaceBundles(1);
268268
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
269-
config.setEnableReplicatedSubscriptions(true);
270269
config.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
271270
config.setLoadBalancerSheddingEnabled(false);
272271
config.setForceDeleteNamespaceAllowed(true);

0 commit comments

Comments
 (0)