diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java index f56a9954e..8b32b24cd 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java @@ -17,9 +17,11 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Iterator; import java.util.Set; import java.util.stream.Collectors; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -27,6 +29,7 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryMetadata; import org.apache.ignite.internal.binary.BinaryTypeImpl; +import org.apache.ignite.internal.cdc.CdcConsumerEx; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.util.typedef.F; @@ -42,7 +45,7 @@ * * @see AbstractCdcEventsApplier */ -public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { +public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx { /** */ public static final String EVTS_SENT_CNT = "EventsCount"; @@ -67,12 +70,24 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { /** */ public static final String LAST_EVT_SENT_TIME_DESC = "Timestamp of last applied event to destination cluster"; + /** */ + public static final String DFLT_REGEXP = ""; + /** Handle only primary entry flag. */ private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY; /** Cache names. */ private Set caches; + /** Regexp manager. */ + private CdcRegexManager regexManager; + + /** Include regex template for cache names. */ + private String includeTemplate = DFLT_REGEXP; + + /** Exclude regex template for cache names. */ + private String excludeTemplate = DFLT_REGEXP; + /** Cache IDs. */ protected Set cachesIds; @@ -100,13 +115,26 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { /** {@inheritDoc} */ @Override public void start(MetricRegistry reg) { + //No-op + } + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry reg, Path cdcDir) { A.notEmpty(caches, "caches"); + regexManager = new CdcRegexManager(cdcDir, log); + cachesIds = caches.stream() .mapToInt(CU::cacheId) .boxed() .collect(Collectors.toSet()); + regexManager.compileRegexp(includeTemplate, excludeTemplate); + + regexManager.getSavedCaches().stream() + .map(CU::cacheId) + .forEach(cachesIds::add); + MetricRegistryImpl mreg = (MetricRegistryImpl)reg; this.evtsCnt = mreg.longMetric(EVTS_SENT_CNT, EVTS_SENT_CNT_DESC); @@ -144,15 +172,26 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumer { /** {@inheritDoc} */ @Override public void onCacheChange(Iterator cacheEvents) { cacheEvents.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. + matchWithRegex(e.configuration().getName()); }); } + /** + * Finds match between cache name and user's regex templates. + * If match is found, adds this cache's id to id's list. + * + * @param cacheName Cache name. + */ + private void matchWithRegex(String cacheName) { + int cacheId = CU.cacheId(cacheName); + + if (!cachesIds.contains(cacheId) && regexManager.match(cacheName)) + cachesIds.add(cacheId); + } + /** {@inheritDoc} */ @Override public void onCacheDestroy(Iterator caches) { - caches.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. - }); + caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent); } /** {@inheritDoc} */ @@ -238,6 +277,30 @@ public AbstractIgniteCdcStreamer setCaches(Set caches) { return this; } + /** + * Sets include regex pattern that participates in CDC. + * + * @param includeTemplate Include regex template + * @return {@code this} for chaining. + */ + public AbstractIgniteCdcStreamer setIncludeTemplate(String includeTemplate) { + this.includeTemplate = includeTemplate; + + return this; + } + + /** + * Sets exclude regex pattern that participates in CDC. + * + * @param excludeTemplate Exclude regex template + * @return {@code this} for chaining. + */ + public AbstractIgniteCdcStreamer setExcludeTemplate(String excludeTemplate) { + this.excludeTemplate = excludeTemplate; + + return this; + } + /** * Sets maximum batch size that will be applied to destination cluster. * diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java new file mode 100644 index 000000000..a099d08ab --- /dev/null +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import java.util.stream.Collectors; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.util.typedef.internal.CU; + +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; + +/** + * Contains logic to process user's regexp patterns for CDC. + */ +public class CdcRegexManager { + + /** File with saved names of caches added by cache masks. */ + private static final String SAVED_CACHES_FILE = "caches"; + + /** Temporary file with saved names of caches added by cache masks. */ + private static final String SAVED_CACHES_TMP_FILE = "caches_tmp"; + + /** CDC directory path. */ + private final Path cdcDir; + + /** Include regex pattern for cache names. */ + private Pattern includeFilter; + + /** Exclude regex pattern for cache names. */ + private Pattern excludeFilter; + + /** Logger. */ + private IgniteLogger log; + + /** + * + * @param cdcDir Path to Change Data Capture Directory. + * @param log Logger. + */ + public CdcRegexManager(Path cdcDir, IgniteLogger log) { + this.cdcDir = cdcDir; + this.log = log; + } + + /** + * Finds and processes match between cache name and user's regexp patterns. + * + * @param cacheName Cache name. + * @return True if cache name matches user's regexp patterns. + */ + public boolean match(String cacheName) { + return matchAndSave(cacheName); + } + + /** + * Get actual list of names of caches added by regex templates from cache list file. + * Caches that added to replication through regex templates during the work of CDC application, + * are saved to file so they can be restored after application restart. + * + * @return Caches names list. + */ + public List getSavedCaches() { + try { + return loadCaches().stream() + .filter(this::matchesFilters) + .collect(Collectors.toList()); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + /** + * Finds match between cache name and user's regex templates. + * If match is found, saves cache name to file. + * + * @param cacheName Cache name. + * @return True if cache name matches user's regexp patterns. + */ + private boolean matchAndSave(String cacheName) { + if (matchesFilters(cacheName)) { + try { + List caches = loadCaches(); + + caches.add(cacheName); + + save(caches); + } + catch (IOException e) { + throw new IgniteException(e); + } + + if (log.isInfoEnabled()) + log.info("Cache has been added to replication [cacheName=" + cacheName + "]"); + + return true; + } + return false; + } + + /** + * Matches cache name with compiled regex patterns. + * + * @param cacheName Cache name. + * @return True if cache name matches include pattern and doesn't match exclude pattern. + */ + private boolean matchesFilters(String cacheName) { + return includeFilter.matcher(cacheName).matches() && !excludeFilter.matcher(cacheName).matches(); + } + + /** + * Compiles regex patterns from user templates. + * + * @param includeTemplate Include regex template. + * @param excludeTemplate Exclude regex template. + * @throws IgniteException If the template's syntax is invalid + */ + public void compileRegexp(String includeTemplate, String excludeTemplate) { + try { + includeFilter = Pattern.compile(includeTemplate); + + excludeFilter = Pattern.compile(excludeTemplate); + } + catch (PatternSyntaxException e) { + throw new IgniteException("Invalid cache regexp template.", e); + } + } + + /** + * Loads saved CDC caches from file. If file not found, creates a new one containing empty list. + * + * @return List of saved caches names. + */ + private List loadCaches() throws IOException { + if (cdcDir == null) { + throw new IgniteException("Can't load '" + SAVED_CACHES_FILE + "' file. Cdc directory is null"); + } + Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE); + + if (Files.notExists(savedCachesPath)) { + Files.createFile(savedCachesPath); + + if (log.isInfoEnabled()) + log.info("Cache list created: " + savedCachesPath); + } + + return Files.readAllLines(savedCachesPath); + } + + /** + * Writes caches list to file. + * + * @param caches Caches list. + */ + private void save(List caches) throws IOException { + if (cdcDir == null) { + throw new IgniteException("Can't write to '" + SAVED_CACHES_FILE + "' file. Cdc directory is null"); + } + Path savedCachesPath = cdcDir.resolve(SAVED_CACHES_FILE); + Path tmpSavedCachesPath = cdcDir.resolve(SAVED_CACHES_TMP_FILE); + + StringBuilder cacheList = new StringBuilder(); + + for (String cache : caches) { + cacheList.append(cache); + + cacheList.append('\n'); + } + + Files.write(tmpSavedCachesPath, cacheList.toString().getBytes()); + + Files.move(tmpSavedCachesPath, savedCachesPath, ATOMIC_MOVE, REPLACE_EXISTING); + } + + /** + * Removes cache added by regexp from cache list if such cache is present in file to prevent disk space overflow. + * + * @param cacheId Cache id. + */ + public void deleteRegexpCacheIfPresent(Integer cacheId) { + try { + List caches = loadCaches(); + + Optional cacheName = caches.stream() + .filter(name -> CU.cacheId(name) == cacheId) + .findAny(); + + if (cacheName.isPresent()) { + String name = cacheName.get(); + + caches.remove(name); + + save(caches); + + if (log.isInfoEnabled()) + log.info("Cache has been removed from replication [cacheName=" + name + ']'); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } +} diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java index d38f45883..597993719 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java @@ -17,6 +17,8 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; + import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; @@ -59,8 +61,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme private volatile boolean alive = true; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { - super.start(mreg); + @Override public void start(MetricRegistry mreg, Path cdcDir) { + super.start(mreg, cdcDir); if (log.isInfoEnabled()) log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']'); diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java index 0083f136a..b87157705 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java @@ -20,6 +20,10 @@ import java.io.Serializable; import java.util.Set; import java.util.UUID; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteEx; @@ -41,6 +45,9 @@ * @see CacheVersionConflictResolver */ public class CacheVersionConflictResolverPluginProvider implements PluginProvider { + /** */ + public static final String DFLT_REGEXP = ""; + /** Cluster id. */ private byte clusterId; @@ -65,6 +72,12 @@ public class CacheVersionConflictResolverPluginProvider caches = null; + Set caches = new HashSet<>(); if (!F.isEmpty(streamerCfg.getCaches())) { checkCaches(streamerCfg.getCaches()); - caches = streamerCfg.getCaches().stream() - .map(CU::cacheId).collect(Collectors.toSet()); + streamerCfg.getCaches().stream() + .map(CU::cacheId).forEach(caches::add); } KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater( @@ -181,7 +181,8 @@ protected void runAppliers() { caches, metaUpdr, stopped, - metrics + metrics, + this ); addAndStart("applier-thread-" + cntr++, applier); @@ -252,6 +253,13 @@ private void addAndStart(String threadName, /** Checks that configured caches exist in a destination cluster. */ protected abstract void checkCaches(Collection caches); + /** + * Get cache names from client. + * + * @return Cache names. + * */ + protected abstract Collection getCaches(); + /** */ private void ackAsciiLogo(IgniteLogger log) { String ver = "ver. " + ACK_VER_STR; diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java index 08fa25a07..b60ec1431 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc.kafka; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -30,14 +31,16 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; + import org.apache.ignite.IgniteLogger; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcCacheEvent; -import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.cdc.CdcEvent; +import org.apache.ignite.cdc.CdcRegexManager; import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; import org.apache.ignite.internal.binary.BinaryTypeImpl; +import org.apache.ignite.internal.cdc.CdcConsumerEx; import org.apache.ignite.internal.cdc.CdcMain; import org.apache.ignite.internal.processors.metric.MetricRegistryImpl; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; @@ -76,7 +79,7 @@ * @see KafkaToIgniteClientCdcStreamer * @see CacheVersionConflictResolverImpl */ -public class IgniteToKafkaCdcStreamer implements CdcConsumer { +public class IgniteToKafkaCdcStreamer implements CdcConsumerEx { /** */ public static final String EVTS_SENT_CNT = "EventsCount"; @@ -113,6 +116,9 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** Count of metadata markers sent description. */ public static final String MARKERS_SENT_CNT_DESC = "Count of metadata markers sent to Kafka"; + /** */ + public static final String DFLT_REGEXP = ""; + /** Default value for the flag that indicates whether entries only from primary nodes should be handled. */ public static final boolean DFLT_IS_ONLY_PRIMARY = false; @@ -147,6 +153,15 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** Cache names. */ private Collection caches; + /** Regexp manager. */ + private CdcRegexManager regexManager; + + /** Include regex template for cache names. */ + private String includeTemplate = DFLT_REGEXP; + + /** Exclude regex template for cache names. */ + private String excludeTemplate = DFLT_REGEXP; + /** Max batch size. */ private int maxBatchSz = DFLT_MAX_BATCH_SIZE; @@ -246,15 +261,13 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer { /** {@inheritDoc} */ @Override public void onCacheChange(Iterator cacheEvents) { cacheEvents.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. + matchWithRegex(e.configuration().getName()); }); } /** {@inheritDoc} */ @Override public void onCacheDestroy(Iterator caches) { - caches.forEachRemaining(e -> { - // Just skip. Handle of cache events not supported. - }); + caches.forEachRemaining(regexManager::deleteRegexpCacheIfPresent); } /** Send marker(meta need to be updated) record to each partition of events topic. */ @@ -319,6 +332,11 @@ private void sendOneBatch( /** {@inheritDoc} */ @Override public void start(MetricRegistry reg) { + //No-op + } + + /** {@inheritDoc} */ + @Override public void start(MetricRegistry reg, Path cdcDir) { A.notNull(kafkaProps, "Kafka properties"); A.notNull(evtTopic, "Kafka topic"); A.notNull(metadataTopic, "Kafka metadata topic"); @@ -329,10 +347,18 @@ private void sendOneBatch( kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + regexManager = new CdcRegexManager(cdcDir, log); + cachesIds = caches.stream() .map(CU::cacheId) .collect(Collectors.toSet()); + regexManager.compileRegexp(includeTemplate, excludeTemplate); + + regexManager.getSavedCaches().stream() + .map(CU::cacheId) + .forEach(cachesIds::add); + try { producer = new KafkaProducer<>(kafkaProps); @@ -378,6 +404,19 @@ public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) { return this; } + /** + * Finds a match between the cache name and user regex templates. + * If a match is found, adds the cache to replication. + * + * @param cacheName Cache name. + */ + private void matchWithRegex(String cacheName) { + int cacheId = CU.cacheId(cacheName); + + if (!cachesIds.contains(cacheId) && regexManager.match(cacheName)) + cachesIds.add(cacheId); + } + /** * Sets topic that is used to send data to Kafka. * @@ -426,6 +465,30 @@ public IgniteToKafkaCdcStreamer setCaches(Collection caches) { return this; } + /** + * Sets include regex pattern that participates in CDC. + * + * @param includeTemplate Include regex template. + * @return {@code this} for chaining. + */ + public IgniteToKafkaCdcStreamer setIncludeTemplate(String includeTemplate) { + this.includeTemplate = includeTemplate; + + return this; + } + + /** + * Sets exclude regex pattern that participates in CDC. + * + * @param excludeTemplate Exclude regex template. + * @return {@code this} for chaining. + */ + public IgniteToKafkaCdcStreamer setExcludeTemplate(String excludeTemplate) { + this.excludeTemplate = excludeTemplate; + + return this; + } + /** * Sets maximum batch size. * diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java index 0e2ada723..6e9336747 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java @@ -125,4 +125,9 @@ public KafkaToIgniteCdcStreamer( @Override protected void checkCaches(Collection caches) { caches.forEach(name -> Objects.requireNonNull(ign.cache(name), name + " not exists!")); } + + /** {@inheritDoc} */ + @Override protected Collection getCaches() { + return ign.cacheNames(); + } } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java index c28e18ba9..200ae59c2 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java @@ -27,11 +27,15 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -42,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -104,6 +109,12 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable { /** Caches ids to read. */ private final Set caches; + /** Include regex template for cache names. */ + private final String includeTemplate; + + /** Exclude regex template for cache names. */ + private final String excludeTemplate; + /** The maximum time to complete Kafka related requests, in milliseconds. */ private final long kafkaReqTimeout; @@ -128,6 +139,9 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable { /** CDC kafka to ignite metrics */ private final KafkaToIgniteMetrics metrics; + /** Instance of KafkaToIgniteCdcStreamer */ + private final AbstractKafkaToIgniteCdcStreamer streamer; + /** * @param applierSupplier Cdc events applier supplier. * @param log Logger. @@ -139,6 +153,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable { * @param metaUpdr Metadata updater. * @param stopped Stopped flag. * @param metrics CDC Kafka to Ignite metrics. + * @param streamer Instance of KafkaToIgniteCdcStreamer */ public KafkaToIgniteCdcStreamerApplier( Supplier applierSupplier, @@ -150,7 +165,8 @@ public KafkaToIgniteCdcStreamerApplier( Set caches, KafkaToIgniteMetadataUpdater metaUpdr, AtomicBoolean stopped, - KafkaToIgniteMetrics metrics + KafkaToIgniteMetrics metrics, + AbstractKafkaToIgniteCdcStreamer streamer ) { this.applierSupplier = applierSupplier; this.kafkaProps = kafkaProps; @@ -164,6 +180,9 @@ public KafkaToIgniteCdcStreamerApplier( this.stopped = stopped; this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class); this.metrics = metrics; + this.streamer = streamer; + this.includeTemplate = streamerCfg.getIncludeTemplate(); + this.excludeTemplate = streamerCfg.getExcludeTemplate(); } /** {@inheritDoc} */ @@ -260,7 +279,46 @@ private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord metrics.incrementReceivedEvents(); - return F.isEmpty(caches) || caches.contains(rec.key()); + return F.isEmpty(caches) || caches.contains(rec.key()) || matchesRegexTemplates(rec.key()); + } + + /** + * Gets caches names from CDC client and finds match + * between cache id and user's regex templates. + * + * @param key Cache id. + * @return True if match is found. + */ + private boolean matchesRegexTemplates(Integer key) { + Optional cache = streamer.getCaches().stream() + .filter(name -> CU.cacheId(name) == key) + .findAny(); + + Optional matchedCache = cache.filter(this::matchesFilters); + + matchedCache.ifPresent(c -> caches.add(CU.cacheId(c))); + + return matchedCache.isPresent(); + } + + /** + * Matches cache name with compiled regex patterns. + * + * @param cacheName Cache name. + * @return True if cache name matches include pattern and doesn't match exclude pattern. + * @throws IgniteException If the template's syntax is invalid + */ + private boolean matchesFilters(String cacheName) { + try { + boolean matchesInclude = Pattern.compile(includeTemplate).matcher(cacheName).matches(); + + boolean matchesExclude = Pattern.compile(excludeTemplate).matcher(cacheName).matches(); + + return matchesInclude && !matchesExclude; + } + catch (PatternSyntaxException e) { + throw new IgniteException("Invalid cache regexp template.", e); + } } /** diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java index 571afac8d..683d6d461 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java @@ -19,6 +19,7 @@ import java.util.Collection; import java.util.Map; + import org.apache.ignite.cdc.CdcConfiguration; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.spi.metric.MetricExporterSpi; @@ -83,6 +84,12 @@ public class KafkaToIgniteCdcStreamerConfiguration { */ private Collection caches; + /** Include regex template for cache names. */ + private String includeTemplate; + + /** Exclude regex template for cache names. */ + private String excludeTemplate; + /** Metric exporter SPI. */ private MetricExporterSpi[] metricExporterSpi; @@ -173,6 +180,34 @@ public void setCaches(Collection caches) { this.caches = caches; } + /** + * @return Include regex template. + */ + public String getIncludeTemplate() { + return includeTemplate; + } + + /** + * @param includeTemplate Include regex template. + */ + public void setIncludeTemplate(String includeTemplate) { + this.includeTemplate = includeTemplate; + } + + /** + * @return Exclude regex template + */ + public String getExcludeTemplate() { + return excludeTemplate; + } + + /** + * @param excludeTemplate Exclude regex template. + */ + public void setExcludeTemplate(String excludeTemplate) { + this.excludeTemplate = excludeTemplate; + } + /** @return The maximum time to complete Kafka related requests, in milliseconds. */ public long getKafkaRequestTimeout() { return kafkaReqTimeout; diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java index 706d53ad0..14ef7d229 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java @@ -125,4 +125,9 @@ public KafkaToIgniteClientCdcStreamer( caches.forEach(name -> A.ensure(clusterCaches.contains(name), name + " not exists!")); } + + /** {@inheritDoc} */ + @Override protected Collection getCaches() { + return client.cacheNames(); + } } diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java index bc7af745b..607ca19c6 100644 --- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java +++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java @@ -17,6 +17,8 @@ package org.apache.ignite.cdc.thin; +import java.nio.file.Path; + import org.apache.ignite.Ignition; import org.apache.ignite.cdc.AbstractIgniteCdcStreamer; import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl; @@ -66,8 +68,8 @@ public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer { private long aliveCheckTimeout = DFLT_ALIVE_CHECK_TIMEOUT; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { - super.start(mreg); + @Override public void start(MetricRegistry mreg, Path cdcDir) { + super.start(mreg, cdcDir); if (log.isInfoEnabled()) log.info("Ignite To Ignite Client Streamer [cacheIds=" + cachesIds + ']'); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java index 9a5e18c7f..529e1ebcb 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java @@ -146,6 +146,18 @@ public static Collection parameters() { /** */ public static final String IGNORED_CACHE = "ignored-cache"; + /** */ + public static final String REGEX_INCLUDE_TEMPLATE_CACHE = "cdc_on_cache"; + + /** */ + public static final String REGEX_EXCLUDE_TEMPLATE_CACHE = "cdc_on_excluded_cache"; + + /** */ + public static final String REGEX_INCLUDE_PATTERN = "cdc_on.*"; + + /** */ + public static final String REGEX_EXCLUDE_PATTERN = "cdc_on_excluded.*"; + /** */ public static final byte SRC_CLUSTER_ID = 1; @@ -200,6 +212,8 @@ private enum WaitDataMode { cfgPlugin1.setClusterId(clusterId); cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE))); + cfgPlugin1.setIncludeTemplate(REGEX_INCLUDE_PATTERN); + cfgPlugin1.setExcludeTemplate(REGEX_EXCLUDE_PATTERN); cfgPlugin1.setConflictResolveField("reqId"); cfg.setPluginProviders(cfgPlugin1); @@ -562,6 +576,92 @@ public void testWithExpiryPolicy() throws Exception { } } + /** Check that caches matching regex filters in config, are added to CDC after its creation. + * Active/Active mode means changes made in both clusters. */ + @Test + public void testActiveActiveReplicationWithRegexFilters() throws Exception { + createCache(srcCluster[0], ACTIVE_ACTIVE_CACHE); + createCache(destCluster[0], ACTIVE_ACTIVE_CACHE); + + IgniteCache srcCache = createCache(srcCluster[0], REGEX_INCLUDE_TEMPLATE_CACHE); + IgniteCache destCache = createCache(destCluster[0], REGEX_INCLUDE_TEMPLATE_CACHE); + + // Even keys goes to src cluster. + runAsync(generateData(REGEX_INCLUDE_TEMPLATE_CACHE, srcCluster[srcCluster.length - 1], + IntStream.range(0, KEYS_CNT).filter(i -> i % 2 == 0))); + + // Odd keys goes to dest cluster. + runAsync(generateData(REGEX_INCLUDE_TEMPLATE_CACHE, destCluster[destCluster.length - 1], + IntStream.range(0, KEYS_CNT).filter(i -> i % 2 != 0))); + + //Start CDC with only 'active-active-cache' in 'caches' property of CDC config + List> futs = startActiveActiveCdcWithFilters(REGEX_INCLUDE_PATTERN, REGEX_EXCLUDE_PATTERN); + + try { + waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs); + + runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 == 0).forEach(srcCache::remove)); + runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 != 0).forEach(destCache::remove)); + + waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs); + + //Shouldn't add to the replication, otherwise CDC will throw an error + runAsync(generateData(REGEX_EXCLUDE_TEMPLATE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT))); + + assertFalse(destCluster[0].cacheNames().contains(REGEX_EXCLUDE_TEMPLATE_CACHE)); + } + finally { + for (IgniteInternalFuture fut : futs) + fut.cancel(); + } + } + + /** Check that caches matching regex filters in config, are added to CDC after its creation. + * Active/Passive mode means changes made only in one cluster. */ + @Test + public void testActivePassiveReplicationWithRegexFilters() throws Exception { + //Start CDC with only 'active-active-cache' in 'caches' property of CDC config + List> futs = startActivePassiveCdcWithFilters(ACTIVE_PASSIVE_CACHE, REGEX_INCLUDE_PATTERN, + REGEX_EXCLUDE_PATTERN); + + try { + createCache(destCluster[0], ACTIVE_PASSIVE_CACHE); + + IgniteCache destCache = createCache(destCluster[0], REGEX_INCLUDE_TEMPLATE_CACHE); + + // Updates for "ignored-cache" should be ignored because of CDC consume configuration. + runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT))); + runAsync(generateData(REGEX_INCLUDE_TEMPLATE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT))); + + IgniteCache srcCache = + createCache(srcCluster[srcCluster.length - 1], REGEX_INCLUDE_TEMPLATE_CACHE); + + waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs); + + checkMetricsCount(KEYS_CNT); + checkMetrics(); + + IntStream.range(0, KEYS_CNT).forEach(srcCache::remove); + + waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs); + + checkMetrics(); + + assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE)); + + checkMetricsCount(2 * KEYS_CNT); + + //Shouldn't add to the replication, otherwise CDC will throw an error + runAsync(generateData(REGEX_EXCLUDE_TEMPLATE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT))); + + assertFalse(destCluster[0].cacheNames().contains(REGEX_EXCLUDE_TEMPLATE_CACHE)); + } + finally { + for (IgniteInternalFuture fut : futs) + fut.cancel(); + } + } + /** */ public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) { return () -> { @@ -688,9 +788,18 @@ protected String[] hostAddresses(IgniteEx[] dest) { /** */ protected abstract List> startActivePassiveCdc(String cache); + /** */ + protected abstract List> startActivePassiveCdcWithFilters(String cache, + String includeTemplate, + String excludeTemplate); + /** */ protected abstract List> startActiveActiveCdc(); + /** */ + protected abstract List> startActiveActiveCdcWithFilters(String includeTemplate, + String excludeTemplate); + /** */ protected abstract void checkConsumerMetrics(Function longMetric); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java index b6d42e240..bab982936 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java @@ -45,26 +45,40 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { /** {@inheritDoc} */ @Override protected List> startActivePassiveCdc(String cache) { + return startActivePassiveCdcWithFilters(cache, "", ""); + } + + /** {@inheritDoc} */ + @Override protected List> startActivePassiveCdcWithFilters(String cache, + String includeTemplate, + String excludeTemplate) { List> futs = new ArrayList<>(); for (int i = 0; i < srcCluster.length; i++) - futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache, "ignite-to-ignite-src-" + i)); + futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache, + includeTemplate, excludeTemplate, "ignite-to-ignite-src-" + i)); return futs; } /** {@inheritDoc} */ @Override protected List> startActiveActiveCdc() { + return startActiveActiveCdcWithFilters("", ""); + } + + /** {@inheritDoc} */ + @Override protected List> startActiveActiveCdcWithFilters(String includeTemplate, + String excludeTemplate) { List> futs = new ArrayList<>(); for (int i = 0; i < srcCluster.length; i++) { - futs.add(igniteToIgnite( - srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-src-" + i)); + futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, + ACTIVE_ACTIVE_CACHE, includeTemplate, excludeTemplate, "ignite-to-ignite-src-" + i)); } for (int i = 0; i < destCluster.length; i++) { - futs.add(igniteToIgnite( - destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE, "ignite-to-ignite-dest-" + i)); + futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, + ACTIVE_ACTIVE_CACHE, includeTemplate, excludeTemplate, "ignite-to-ignite-dest-" + i)); } return futs; @@ -86,6 +100,8 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest { * @param destCfg Ignite destination cluster configuration. * @param dest Ignite destination cluster. * @param cache Cache name to stream to kafka. + * @param includeTemplate Include regex template for cache names. + * @param excludeTemplate Exclude regex template for cache names. * @param threadName Thread to run CDC instance. * @return Future for Change Data Capture application. */ @@ -94,6 +110,8 @@ protected IgniteInternalFuture igniteToIgnite( IgniteConfiguration destCfg, IgniteEx[] dest, String cache, + String includeTemplate, + String excludeTemplate, @Nullable String threadName ) { return runAsync(() -> { @@ -115,6 +133,8 @@ protected IgniteInternalFuture igniteToIgnite( streamer.setMaxBatchSize(KEYS_CNT); streamer.setCaches(Collections.singleton(cache)); + streamer.setIncludeTemplate(includeTemplate); + streamer.setExcludeTemplate(excludeTemplate); cdcCfg.setConsumer(streamer); cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi()); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/RegexFiltersSelfTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/RegexFiltersSelfTest.java new file mode 100644 index 000000000..1d31dfc9a --- /dev/null +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/RegexFiltersSelfTest.java @@ -0,0 +1,222 @@ +package org.apache.ignite.cdc; + +import java.util.Collections; +import java.util.stream.IntStream; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.ClientConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cdc.CdcMain; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** */ +public class RegexFiltersSelfTest extends GridCommonAbstractTest { + + /** */ + private IgniteEx src; + + /** */ + private IgniteEx dest; + + /** */ + private int discoPort = TcpDiscoverySpi.DFLT_PORT; + + /** */ + private enum WaitDataMode { + /** */ + EXISTS, + + /** */ + REMOVED + } + + /** */ + private static final String TEST_CACHE = "test-cache"; + + /** */ + private static final String REGEX_MATCHING_CACHE = "regex-cache"; + + /** */ + private static final String REGEX_PATTERN = "regex.*"; + + /** */ + private static final int KEYS_CNT = 1000; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder() + .setAddresses(Collections.singleton("127.0.0.1:" + discoPort + ".." + (discoPort + DFLT_PORT_RANGE))); + + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName) + .setDiscoverySpi(new TcpDiscoverySpi() + .setLocalPort(discoPort) + .setIpFinder(finder)); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setCdcEnabled(true))); + + cfg.getDataStorageConfiguration() + .setWalForceArchiveTimeout(5_000); + + cfg.setConsistentId(igniteInstanceName); + + return cfg; + } + + /** + * + * @param srcCfg Ignite source node configuration. + * @param cache Cache name to stream to Ignite2Ignite. + * @param includeTemplate Include cache template. + * @return Future for Change Data Capture application. + */ + private IgniteInternalFuture startCdc(IgniteConfiguration srcCfg, + String cache, + String includeTemplate) { + return runAsync(() -> { + CdcConfiguration cdcCfg = new CdcConfiguration(); + + AbstractIgniteCdcStreamer streamer = new IgniteToIgniteClientCdcStreamer() + .setDestinationClientConfiguration(new ClientConfiguration() + .setAddresses(F.first(dest.localNode().addresses()) + ":" + + dest.localNode().attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT))); + + streamer.setMaxBatchSize(KEYS_CNT); + streamer.setCaches(Collections.singleton(cache)); + streamer.setIncludeTemplate(includeTemplate); + + cdcCfg.setConsumer(streamer); + cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi()); + + CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg); + + cdc.run(); + }); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + + src = startGrid(getConfiguration("source-cluster")); + + discoPort += DFLT_PORT_RANGE + 1; + + dest = startGrid(getConfiguration("dest-cluster")); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** */ + public void waitForSameData( + IgniteCache src, + IgniteCache dest, + int keysCnt, + WaitDataMode mode, + IgniteInternalFuture fut + ) throws IgniteInterruptedCheckedException { + assertTrue(waitForCondition(() -> { + for (int i = 0; i < keysCnt; i++) { + if (mode == WaitDataMode.EXISTS) { + if (!src.containsKey(i) || !dest.containsKey(i)) + return checkFut(false, fut); + } + else if (mode == WaitDataMode.REMOVED) { + if (src.containsKey(i) || dest.containsKey(i)) + return checkFut(false, fut); + + continue; + } + else + throw new IllegalArgumentException(mode + " not supported."); + + Integer data = dest.get(i); + + if (!data.equals(src.get(i))) + return checkFut(false, fut); + } + + return checkFut(true, fut); + }, getTestTimeout())); + } + + /** */ + private boolean checkFut(boolean res, IgniteInternalFuture fut) { + assertFalse("Fut error: " + X.getFullStackTrace(fut.error()), fut.isDone()); + + return res; + } + + /** */ + public Runnable generateData(IgniteCache cache, IntStream keys) { + return () -> { + keys.forEach(i -> cache.put(i, i * 2)); + }; + } + + /** + * Test checks whether caches added by regex filters are saved to and read from file after CDC restart. + */ + @Test + public void testRegexFiltersOnCdcRestart() throws Exception { + + src.cluster().state(ClusterState.ACTIVE); + + dest.cluster().state(ClusterState.ACTIVE); + + //Start CDC only with 'test-cache' in config and cache masks (regex filters) + IgniteInternalFuture cdc = startCdc(src.configuration(), TEST_CACHE, REGEX_PATTERN); + + IgniteCache srcCache = src.getOrCreateCache(new CacheConfiguration() + .setName(REGEX_MATCHING_CACHE) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + IgniteCache destCache = dest.getOrCreateCache(new CacheConfiguration() + .setName(REGEX_MATCHING_CACHE) + .setCacheMode(CacheMode.PARTITIONED) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + cdc.cancel(); + + //Restart CDC + IgniteInternalFuture cdc2 = startCdc(src.configuration(), TEST_CACHE, REGEX_PATTERN); + + try { + runAsync(generateData(srcCache, IntStream.range(0, KEYS_CNT))); + + waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, cdc2); + } + finally { + cdc2.cancel(); + } + } +} diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java index ef4e14004..a1af003bb 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java @@ -113,6 +113,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { String topic, String metadataTopic, String cache, + String includeTemplate, + String excludeTemplate, String threadName ) { Map params = new HashMap<>(); @@ -141,6 +143,8 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest { IgniteEx[] dest, int partFrom, int partTo, + String includeTemplate, + String excludeTemplate, String threadName ) { Map params = new HashMap<>(); diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java index a56b2941b..ea4e8cf0c 100644 --- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java +++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java @@ -93,6 +93,13 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { /** {@inheritDoc} */ @Override protected List> startActivePassiveCdc(String cache) { + return startActivePassiveCdcWithFilters(cache, "", ""); + } + + /** {@inheritDoc} */ + @Override protected List> startActivePassiveCdcWithFilters(String cache, + String includeTemplate, + String excludeTemplate) { try { KAFKA.createTopic(cache, DFLT_PARTS, 1); @@ -107,7 +114,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { for (IgniteEx ex : srcCluster) { int idx = getTestIgniteInstanceIndex(ex.name()); - futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, "ignite-src-to-kafka-" + idx)); + futs.add(igniteToKafka(ex.configuration(), cache, SRC_DEST_META_TOPIC, cache, includeTemplate, + excludeTemplate, "ignite-src-to-kafka-" + idx)); } for (int i = 0; i < destCluster.length; i++) { @@ -119,7 +127,9 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { destCluster, i * (DFLT_PARTS / 2), (i + 1) * (DFLT_PARTS / 2), - "kafka-to-ignite-dest-" + i + includeTemplate, + excludeTemplate, + "kafka-to-ignite-dest-" + i )); } @@ -128,20 +138,26 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { /** {@inheritDoc} */ @Override protected List> startActiveActiveCdc() { + return startActiveActiveCdcWithFilters("", ""); + } + + /** {@inheritDoc} */ + @Override protected List> startActiveActiveCdcWithFilters(String includeTemplate, + String excludeTemplate) { List> futs = new ArrayList<>(); for (IgniteEx ex : srcCluster) { int idx = getTestIgniteInstanceIndex(ex.name()); - - futs.add(igniteToKafka( - ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-src-to-kafka-" + idx)); + + futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, SRC_DEST_META_TOPIC, ACTIVE_ACTIVE_CACHE, + includeTemplate, excludeTemplate, "ignite-src-to-kafka-" + idx)); } for (IgniteEx ex : destCluster) { int idx = getTestIgniteInstanceIndex(ex.name()); - - futs.add(igniteToKafka( - ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, "ignite-dest-to-kafka-" + idx)); + + futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, DEST_SRC_META_TOPIC, ACTIVE_ACTIVE_CACHE, + includeTemplate, excludeTemplate, "ignite-dest-to-kafka-" + idx)); } futs.add(kafkaToIgnite( @@ -152,6 +168,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { destCluster, 0, DFLT_PARTS, + includeTemplate, + excludeTemplate, "kafka-to-ignite-src" )); @@ -163,6 +181,8 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest { srcCluster, 0, DFLT_PARTS, + includeTemplate, + excludeTemplate, "kafka-to-ignite-dest" )); @@ -255,6 +275,8 @@ private void checkK2IMetrics(Function longMetric) { * @param topic Kafka topic name. * @param metadataTopic Metadata topic name. * @param cache Cache name to stream to kafka. + * @param includeTemplate Include regex template for cache names. + * @param excludeTemplate Exclude regex template for cache names. * @return Future for Change Data Capture application. */ protected IgniteInternalFuture igniteToKafka( @@ -262,6 +284,8 @@ protected IgniteInternalFuture igniteToKafka( String topic, String metadataTopic, String cache, + String includeTemplate, + String excludeTemplate, String threadName ) { return runAsync(() -> { @@ -270,6 +294,8 @@ protected IgniteInternalFuture igniteToKafka( .setMetadataTopic(metadataTopic) .setKafkaPartitions(DFLT_PARTS) .setCaches(Collections.singleton(cache)) + .setIncludeTemplate(includeTemplate) + .setExcludeTemplate(excludeTemplate) .setMaxBatchSize(KEYS_CNT) .setOnlyPrimary(false) .setKafkaProperties(kafkaProperties()) @@ -292,6 +318,8 @@ protected IgniteInternalFuture igniteToKafka( * @param cacheName Cache name. * @param igniteCfg Ignite configuration. * @param dest Destination Ignite cluster. + * @param includeTemplate Include regex template for cache names. + * @param excludeTemplate Exclude regex template for cache names. * @return Future for runed {@link KafkaToIgniteCdcStreamer}. */ protected IgniteInternalFuture kafkaToIgnite( @@ -302,6 +330,8 @@ protected IgniteInternalFuture kafkaToIgnite( IgniteEx[] dest, int fromPart, int toPart, + String includeTemplate, + String excludeTemplate, String threadName ) { KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration(); @@ -311,6 +341,8 @@ protected IgniteInternalFuture kafkaToIgnite( cfg.setThreadCount((toPart - fromPart) / 2); cfg.setCaches(Collections.singletonList(cacheName)); + cfg.setIncludeTemplate(includeTemplate); + cfg.setExcludeTemplate(excludeTemplate); cfg.setTopic(topic); cfg.setMetadataTopic(metadataTopic); cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);