diff --git a/conf/broker.conf b/conf/broker.conf index fac9a59098e4f..d20b65f6c7d2d 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -35,6 +35,10 @@ metadataSyncEventTopic= # Event topic to sync configuration-metadata between separate pulsar clusters on different cloud platforms. configurationMetadataSyncEventTopic= +# Exclusions for metadata sync between separate clusters on different cloud platforms +# multiple configurations are separated by comma eg. /admin/clusters/.*,/admin/test/.* +metadataSyncEventExclusions= + # The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl configurationMetadataStoreUrl= diff --git a/pip/pip-449.md b/pip/pip-449.md new file mode 100644 index 0000000000000..66c176dba961c --- /dev/null +++ b/pip/pip-449.md @@ -0,0 +1,52 @@ +# PIP-449: Improve metadata sync to have exclusions in syncing config across clusters in different environments (on-prem and cloud) + +Implementation: https://github.com/apache/pulsar/pull/25035 + +# Background knowledge + +Apache Pulsar is a cloud-native, distributed messaging framework which natively provides geo-replication. Many organizations deploy pulsar instances on-prem and on multiple different cloud providers and at the same time they would like to enable replication between multiple clusters deployed in different cloud providers. Pulsar already provides various proxy options (Pulsar proxy/ enterprise proxy solutions on SNI) to fulfill security requirements when brokers are deployed on different security zones connected with each other. + +However, sometimes it's not possible to share metadata-store (global zookeeper) between pulsar clusters deployed on separate cloud provider platforms, and synchronizing configuration metadata (policies) can be a critical path to share tenant/namespace/topic policies between clusters and administrate pulsar policies uniformly across all clusters. Pulsar has metadata sync feature that syncs configurations across clusters in different environments. + +The metadata sync feature syncs all the configuration metadata cluster/tenant/namespace/topic policies and has no way to exclude specific config sync. This is needed as a cluster can have different config in on-prem vs cloud and synchronizing such config breaks the geo-replication. This PIP is to enhance metadata sync to exclude cluster config from syncing across clusters in on-prem and cloud. + + +# Motivation +The pulsar metadata sync feature syncs all the configuration metadata cluster/tenant/namespace/topic policies and has no way to exclude specific config (eg. cluster configuration) from synchronization across clusters. This is crucial for pulsar multi-environment cluster set up where a cluster can have different configuration in on-prem and cloud and synchronizing this configuration can break geo replication across clusters in different environments on-prem and cloud. Also in pulsar set up where there is separate cluster for a high value tenant in on-prem vs a common cluster for all other tenants, this exclusion in metadata sync becomes crucial in synchronizing tenant/namespace/topic policies across onprem and cloud clusters. + +![diagram](https://github-production-user-asset-6210df.s3.amazonaws.com/95091480/523150696-462180ad-2dbf-4e80-929f-26134c087691.png?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20251205%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20251205T202822Z&X-Amz-Expires=300&X-Amz-Signature=3b3ec1d227d1ca5f1283a18488aa21d3c227ad782507ba87e6f821a5e8f5d7ff&X-Amz-SignedHeaders=host) +# Goals + +## In Scope + +Pulsar metadata sync should exclude synchronization of configuration metadata cluster/tenant/namespace/topic policies across clusters based on the exclusion regex pattern set in the broker config. This helps the cluster admin to set up desired cluster configuration to achieve geo replication across multi-environment cluster in on-prem and cloud while still achieving synchronization of tenant/namespace/topic policies across clusters. + +## Out of Scope +None as part of the metadata sync feature. + +# High Level Design + +Configure regex pattern in `metadataSyncEventExclusions` as part of the broker config on cluster to exclude metadata sync events from synchronizing on the cluster. The destination cluster will receive the event from the source cluster on the metadata sync topic but would not update/delete config from the metadata store if the metadata sync event matches the regex pattern configured as exclusion. A destination cluster would receive all the metadata sync events but ignore them if they match the regex pattern configured as exclusion in the broker config. Multiple comma separated regex patterns can be provided and sync event would be excluded if it matches any of the pattern. IF no exclusions are provided, configuration metadata will sync as before without any exclusions. + +# Detailed Design + +## Design & Implementation Details + +- Add a config `metadataSyncEventExclusions` in the broker config to capture exclusion patterns for the cluster. +- Add a method `private boolean isMetadataEventExcluded(MetadataEvent metadataEvent)` to PulsarMetadataEventSynchronizer.java to match the regex pattern. Return true if the pattern matches the path in the MetadataEvent, false otherwise. +- Update the message listener for the consumer on metadata sync topic to acknowledge the event without processing it if the event is excluded. + +### Configuration +- Add a broker config `metadataSyncEventExclusions` +``` +# Exclusions for metadata sync between separate clusters on different cloud platforms +# multiple configurations are separated by comma eg. /admin/clusters/.*,/admin/test/.* +metadataSyncEventExclusions= +``` + +# General Notes + +# Links +- Mailing List Discussion Thread - https://lists.apache.org/thread/kbkgy0q6fgt2hfyrqync2gvxsordzv0d +- Mailing List Voting Thread - + diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 99682bfbb3349..db2f623ca1a65 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -588,6 +588,13 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private String configurationMetadataSyncEventTopic = null; + @FieldContext( + category = CATEGORY_SERVER, + doc = "Metadata events to be excluded from synchronization between Pulsar " + + "clusters deployed on different cloud platforms." + ) + private String metadataSyncEventExclusions = null; + @FieldContext( dynamic = true, doc = "Factory class-name to create topic with custom workflow" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java index a9564642c1a1e..5474218bd83f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.java @@ -21,12 +21,15 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import com.google.common.annotations.VisibleForTesting; import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Function; import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; import lombok.Getter; import org.apache.commons.lang3.StringUtils; @@ -56,6 +59,7 @@ public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronize protected volatile PulsarClientImpl client; protected volatile Producer producer; protected volatile Consumer consumer; + private final List exclusionsPatterns; private final CopyOnWriteArrayList>> listeners = new CopyOnWriteArrayList<>(); @@ -86,6 +90,35 @@ public PulsarMetadataEventSynchronizer(PulsarService pulsar, String topicName) { if (!StringUtils.isNotBlank(topicName)) { log.info("Metadata synchronizer is disabled"); } + String exclusions = pulsar.getConfig().getMetadataSyncEventExclusions(); + if (!StringUtils.isBlank(exclusions)) { + this.exclusionsPatterns = List.of(); + } else { + this.exclusionsPatterns = Arrays.stream(exclusions.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .map(exclusion -> compileExclusionPattern(exclusion)) + .filter(pattern -> pattern != null) + .collect(Collectors.toUnmodifiableList()); + if (!this.exclusionsPatterns.isEmpty()) { + log.info("Metadata synchronizer configured with exclusions: {}", this.exclusionsPatterns); + } + } + } + + /* + * Compiles an exclusion string into a regex pattern + * @param exclusion - The exclusion string to compile + * @return Compiled Pattern object + */ + private Pattern compileExclusionPattern(String exclusion) { + try { + return Pattern.compile(exclusion); + } catch (PatternSyntaxException pse) { + log.warn("Invalid exclusion pattern: '{}', error: {}. Pattern will be ignored", + exclusion, pse.getMessage()); + } + return null; } public void start() throws PulsarServerException { @@ -199,7 +232,11 @@ private void startConsumer() { if (listeners.size() == 0) { c.acknowledgeAsync(msg); return; - + } + // Applying exclusion at consumer + if (isMetadataEventExcluded(msg.getValue())) { + c.acknowledgeAsync(msg); + return; } if (listeners.size() == 1) { listeners.get(0).apply(msg.getValue()).thenApply(__ -> c.acknowledgeAsync(msg)) @@ -323,4 +360,29 @@ private void closeResource(final Supplier> asyncCloseabl TimeUnit.MILLISECONDS); }); } + + /** + * Checks if a metadata event should be excluded from synchronization based on the configured exclusion patterns. + * Events are excluded if their path matches any of the configured regex patterns. + * Patterns are standard Java regular expressions evaluated against the full event path + * + * @param metadataEvent The metadata event to check + * @return true if the event should be excluded, false otherwise + */ + private boolean isMetadataEventExcluded(MetadataEvent metadataEvent) { + String path = metadataEvent.getPath(); + if (path == null || exclusionsPatterns.isEmpty()){ + return false; + } + for (Pattern pattern : exclusionsPatterns) { + if (pattern.matcher(path).matches()) { + if (log.isDebugEnabled()) { + log.debug("Excluding metadata event for path: {} (matched pattern: {})", + path, pattern.pattern()); + } + return true; + } + } + return false; + } } diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java index caca16ff538a4..f4d1bed147ab2 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStoreTest.java @@ -23,18 +23,20 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.function.Function; - +import java.util.regex.Pattern; +import lombok.Cleanup; import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataEvent; import org.apache.pulsar.metadata.api.MetadataEventSynchronizer; @@ -47,8 +49,6 @@ import org.awaitility.Awaitility; import org.testng.annotations.Test; -import lombok.Cleanup; - public class LocalMemoryMetadataStoreTest { HashSet EMPTY_SET = new HashSet<>(); @@ -184,13 +184,324 @@ public void testSyncListener() throws Exception { assertEquals(store1.get(path).get().get().getValue(), value2); } + @Test + public void testMetadataSyncWithSimpleRegexExclusions() throws Exception { + // Create synchronizer with exclusions + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + sync.addRegexExclusions("/admin/.*"); + sync.addRegexExclusions("/temp/session.*"); + + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Path should be excluded (/admin/) + String excludedPath1 = "/admin/schemas"; + store1.put(excludedPath1, value, Optional.empty()).join(); + + // Wait a bit and verify that the event was not notified + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath1), + "Event for /admin/schemas should be excluded"); + + // Test 2: Another excluded path (/temp/session123) + String excludedPath2 = "/temp/session123"; + store1.put(excludedPath2, value, Optional.empty()).join(); + + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath2), + "Event for /temp/session123 should be excluded"); + + // Test 3: Path that should not be excluded + String normalPath = "/data/test"; + store1.put(normalPath, value, Optional.empty()).join(); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath) != null); + assertNotNull(sync.notifiedEvents.get(normalPath), + "Event for /data/test path should be notified"); + } + + @Test + public void testMetadataSyncWithNegativeLookaheadRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + // Exclude all path EXCEPT those starting with /important + sync.addRegexExclusions("^(?!/important/).*"); + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Path starting with /important/ - should be notified + String importantPath = "/important/config"; + store1.put(importantPath, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(importantPath) != null); + assertNotNull(sync.notifiedEvents.get(importantPath), + "Event for /important/config path should be notified (negative lookahead doesn't match)"); + + // Test 2: Other path - should be excluded + String excludedPath1 = "/temp/data"; + store1.put(excludedPath1, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath1), + "Event for /temp/data should be excluded (matches negative lookahead)"); + + // Test 3: Another non-important path - should be excluded + String excludedPath2 = "/admin/schemas"; + store1.put(excludedPath2, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath2), + "Event for /admin/schemas should be excluded (matches negative lookahead)"); + } + + @Test + public void testMetadataSyncWithComplexCharacterClassRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + // Exclude /namespace/{lowercase-only}/temp* + sync.addRegexExclusions("/namespace/[a-z]+/temp.*"); + // Exclude admin test resources for schemas or topics + sync.addRegexExclusions("/admin/(schemas|topics)/test.*"); + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Matches character class pattern + String excludedPath1 = "/namespace/public/temp123"; + store1.put(excludedPath1, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath1), + "Event for /namespace/public/temp123 should be excluded"); + + // Test 2: Doesn't match character class (has numbers) + String normalPath1 = "/namespace/public123/temp"; + store1.put(normalPath1, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath1) != null); + assertNotNull(sync.notifiedEvents.get(normalPath1), + "Event for /namespace/public123/temp path should be notified"); + + // Test 3: Matches alternation (schemas) + String excludedPath2 = "/admin/schemas/test-schema"; + store1.put(excludedPath2, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath2), + "Event for /admin/schemas/test-schema should be excluded"); + + // Test 4: Matches alternation (topics) + String excludedPath3 = "/admin/topics/test-topic"; + store1.put(excludedPath3, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath3), + "Event for /admin/topics/test-topic should be excluded"); + + // Test 5: Doesn't match alternation + String normalPath2 = "/admin/functions/test-function"; + store1.put(normalPath2, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath2) != null); + assertNotNull(sync.notifiedEvents.get(normalPath2), + "Event for /admin/functions/test-function path should be notified"); + } + + @Test + public void testMetadataSyncWithMultipleNegativeLookaheadRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + // Exclude all path EXCEPT those starting with /production/ OR /critical/ + // Using negative lookahead with alternation + sync.addRegexExclusions("^(?!/(production|critical)/).*"); + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Path starting with /production/ - should be notified + String productionPath = "/production/config"; + store1.put(productionPath, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(productionPath) != null); + assertNotNull(sync.notifiedEvents.get(productionPath), + "Event for /production/config path should be notified"); + + // Test 2: Path starting with /critical/ - should be notified + String criticalPath = "/critical/data"; + store1.put(criticalPath, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(criticalPath) != null); + assertNotNull(sync.notifiedEvents.get(criticalPath), + "Event for /critical/data path should be notified"); + + // Test 3: Other path - should be excluded + String excludedPath = "/data/temp"; + store1.put(excludedPath, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath), + "Event for /data/temp should be excluded"); + } + + @Test + public void testMetadataSyncWithAnchoredRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + sync.addRegexExclusions("^/admin/.*"); // Anchored at start + sync.addRegexExclusions(".*/tmp$"); //Anchored at end + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Matches start anchor + String excludedPath1 = "/admin/schemas"; + store1.put(excludedPath1, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath1), + "Event for /admin/schemas should be excluded"); + + // Test 2: Matches end anchor + String excludedPath2 = "/data/tmp"; + store1.put(excludedPath2, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath2), + "Event for /data/tmp should be excluded"); + + // Test 3: Doesn't match end anchor + String normalPath = "/data/tmp/file"; + store1.put(normalPath, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath) != null); + assertNotNull(sync.notifiedEvents.get(normalPath), + "Event for /data/tmp/file path should be notified"); + + } + + @Test + public void testMetadataSyncWithCombinedComplexRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + // Complex pattern: Exclude /cache/* OR /temp/* but NOT if the path ends with important + sync.addRegexExclusions("^/(cache|temp)/(?!.*important$).*"); + + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: /cache path without important - should be excluded + String excludedPath1 = "/cache/data"; + store1.put(excludedPath1, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath1), + "Event for /cache/data should be excluded"); + + // Test 2: /temp path without important - should be excluded + String excludedPath2 = "/temp/session"; + store1.put(excludedPath2, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath2), + "Event for /temp/session should be excluded"); + + // Test 3: /cache path ending with important - should be notified + String normalPath1 = "/cache/data/important"; + store1.put(normalPath1, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath1) != null); + assertNotNull(sync.notifiedEvents.get(normalPath1), + "Event for /cache/data/important path should be notified"); + + // Test 4: Different path - should be notified + String normalPath2 = "/data/test"; + store1.put(normalPath2, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath2) != null); + assertNotNull(sync.notifiedEvents.get(normalPath2), + "Event for /data/test path should be notified"); + } + + @Test + public void testMetadataSyncWithDeleteAndComplexRegex() throws Exception { + TestMetadataEventSynchronizer sync = new TestMetadataEventSynchronizer(); + // Exclude temporary or cache paths + sync.addRegexExclusions("^/(cache|temp)/.*"); + + + @Cleanup + MetadataStore store1 = MetadataStoreFactory.create("memory:local", + MetadataStoreConfig.builder().synchronizer(sync).build()); + + byte[] value = "value".getBytes(StandardCharsets.UTF_8); + + // Test 1: Put and delete on excluded path + String excludedPath = "/temp/session"; + store1.put(excludedPath, value, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath), + "Put event for /temp/session should be excluded"); + store1.delete(excludedPath, Optional.empty()).join(); + Thread.sleep(100); + assertNull(sync.notifiedEvents.get(excludedPath), + "Delete event for /temp/session should be excluded"); + + // Test 2: Put and delete on normal path + String normalPath = "/data/test"; + store1.put(normalPath, value, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath) != null); + assertNotNull(sync.notifiedEvents.get(normalPath), + "Put event for /data/test should be notified"); + assertEquals(sync.notifiedEvents.get(normalPath).getType(), NotificationType.Modified); + store1.delete(normalPath, Optional.empty()).join(); + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> + sync.notifiedEvents.get(normalPath) != null); + assertNotNull(sync.notifiedEvents.get(normalPath), + "Delete event for /data/test should be notified"); + assertEquals(sync.notifiedEvents.get(normalPath).getType(), NotificationType.Deleted); + + } + static class TestMetadataEventSynchronizer implements MetadataEventSynchronizer { public Map notifiedEvents = new ConcurrentHashMap<>(); public String clusterName = "test"; public volatile Function> listener; + private final List exclusionPatterns = new CopyOnWriteArrayList<>(); + + public void addRegexExclusions(String regexPattern) { + Pattern pattern = Pattern.compile(regexPattern); + exclusionPatterns.add(pattern); + } + + private boolean isExcluded(String path) { + if (path == null) { + return false; + } + // Check regex pattern exclusions + if (!exclusionPatterns.isEmpty()){ + for (Pattern pattern : exclusionPatterns) { + if(pattern.matcher(path).matches()) { + return true; + } + } + } + return false; + } @Override public CompletableFuture notify(MetadataEvent event) { + // Check if the event path is excluded + if (isExcluded(event.getPath())){ + return CompletableFuture.completedFuture(null); + } notifiedEvents.put(event.getPath(), event); return CompletableFuture.completedFuture(null); }