From f6f455d9d1ba8b08d00ee327962b76717eb7dbb1 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Fri, 30 Aug 2024 14:27:43 +0300 Subject: [PATCH 1/5] IGNITE-22530 CDC: Add regex filters for cache names --- .../test/java/org/apache/ignite/util/CdcCommandTest.java | 3 ++- .../java/org/apache/ignite/util/CdcResendCommandTest.java | 3 ++- .../src/main/java/org/apache/ignite/cdc/CdcConsumer.java | 8 +++++--- .../main/java/org/apache/ignite/internal/cdc/CdcMain.java | 2 +- .../apache/ignite/internal/cdc/WalRecordsConsumer.java | 7 ++++--- .../test/java/org/apache/ignite/cdc/AbstractCdcTest.java | 4 ++-- .../org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java | 5 +++-- .../org/apache/ignite/cdc/CdcPushMetricsExporterTest.java | 5 +++-- .../src/test/java/org/apache/ignite/cdc/CdcSelfTest.java | 5 +++-- .../internal/ducktest/tests/cdc/CountingCdcConsumer.java | 3 ++- .../apache/ignite/internal/cdc/CdcIndexRebuildTest.java | 3 ++- .../java/org/apache/ignite/cdc/CdcConfigurationTest.java | 3 ++- 12 files changed, 31 insertions(+), 20 deletions(-) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 241b3adc51eeb..6c9d2eae10c10 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.Serializable; +import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -194,7 +195,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { CdcConfiguration cfg = new CdcConfiguration(); cfg.setConsumer(new UserCdcConsumer() { - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { appStarted.countDown(); } }); diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index c333fd3ed0dac..cb17d9ba7bb94 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.util; +import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -217,7 +218,7 @@ synchronized List events() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op } diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java index 592bc71ef545b..a5a508b687b62 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Iterator; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteBinary; @@ -33,7 +34,7 @@ * This consumer will receive data change events during ignite-cdc process invocation. * The lifecycle of the consumer is the following: * @@ -66,8 +67,9 @@ public interface CdcConsumer { /** * Starts the consumer. * @param mreg Metric registry for consumer specific metrics. + * @param cdcDir Path to Change Data Capture Directory. */ - public void start(MetricRegistry mreg); + public void start(MetricRegistry mreg, Path cdcDir); /** * Handles entry changes events. @@ -131,7 +133,7 @@ public interface CdcConsumer { /** * Stops the consumer. - * This method can be invoked only after {@link #start(MetricRegistry)}. + * This method can be invoked only after {@link #start(MetricRegistry, Path)}. */ public void stop(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index d7f22658fdd7a..7110b6ca8ffaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -339,7 +339,7 @@ public void runX() throws Exception { committedSegmentOffset.value(walState.get1().fileOffset()); } - consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer"))); + consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir); started = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 3b111d50a197e..94ce645f1dbd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.cdc; +import java.nio.file.Path; import java.util.EnumSet; import java.util.Iterator; import java.util.NoSuchElementException; @@ -188,8 +189,8 @@ public void onCacheDestroyEvents(Iterator caches) { * @param cdcConsumerReg CDC consumer metric registry. * @throws IgniteCheckedException If failed. */ - public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) throws IgniteCheckedException { - consumer.start(cdcConsumerReg); + public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException { + consumer.start(cdcConsumerReg, cdcDir); evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer"); lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process"); @@ -200,7 +201,7 @@ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg) /** * Stops the consumer. - * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl)}. + * This methods can be invoked only after {@link #start(MetricRegistryImpl, MetricRegistryImpl, Path)}. */ public void stop() { consumer.stop(); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index 6f1180389558d..c4ed4b61f9621 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer implements CdcConsumer { private volatile boolean stopped; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { stopped = false; } @@ -462,7 +462,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java index cc6f520187d24..9988ccf905603 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.cdc; import java.io.File; +import java.nio.file.Path; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -92,8 +93,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception { CdcConfiguration cdcCfg = new CdcConfiguration(); cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { - @Override public void start(MetricRegistry mreg) { - super.start(mreg); + @Override public void start(MetricRegistry mreg, Path cdcDir) { + super.start(mreg, cdcDir); started.countDown(); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java index 4b4d49790d951..feeecacf976d5 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -201,10 +202,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { Ignite ignite = Ignition.start(destClusterCliCfg); - super.start(mreg); + super.start(mreg, cdcDir); ignite.log().info("TestIgniteToIgniteConsumer started."); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index 1c2132c497bb9..f55a6e51adec2 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.Serializable; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -418,7 +419,7 @@ public void testReadOneByOneForBackup() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } }; @@ -512,7 +513,7 @@ public void testReadFromNextEntry() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } }, cfg)); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java index 8af23d0c36185..d43da27eb6fa0 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.ducktest.tests.cdc; +import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; @@ -42,7 +43,7 @@ public class CountingCdcConsumer implements CdcConsumer { private final AtomicLong objectsConsumed = new AtomicLong(); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { log.info("CountingCdcConsumer started"); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java index e45cdaae45bc9..4613fd0e1b887 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.cdc; +import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -140,7 +141,7 @@ public void reset() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { // No-op. } diff --git a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java index 7ace260bcfc00..4675fb310fc87 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.cdc; +import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; @@ -123,7 +124,7 @@ public static class TestCdcConsumer implements CdcConsumer { public CountDownLatch startLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg) { + @Override public void start(MetricRegistry mreg, Path cdcDir) { springString2 = ctx.getBean("springString2", String.class); startLatch.countDown(); From a18deb60beaacbca363986213d6f1149dbf95d9d Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Sun, 18 May 2025 21:58:51 +0300 Subject: [PATCH 2/5] IGNITE-22530 Add CdcConsumerEx interface --- .../apache/ignite/util/CdcCommandTest.java | 3 +- .../ignite/util/CdcResendCommandTest.java | 3 +- .../org/apache/ignite/cdc/CdcConsumer.java | 8 ++--- .../org/apache/ignite/cdc/CdcConsumerEx.java | 34 +++++++++++++++++++ .../apache/ignite/internal/cdc/CdcMain.java | 2 +- .../internal/cdc/WalRecordsConsumer.java | 7 +++- .../apache/ignite/cdc/AbstractCdcTest.java | 4 +-- .../ignite/cdc/CdcNonDefaultWorkDirTest.java | 5 ++- .../cdc/CdcPushMetricsExporterTest.java | 5 ++- .../org/apache/ignite/cdc/CdcSelfTest.java | 5 ++- .../tests/cdc/CountingCdcConsumer.java | 3 +- .../internal/cdc/CdcIndexRebuildTest.java | 3 +- .../ignite/cdc/CdcConfigurationTest.java | 3 +- 13 files changed, 57 insertions(+), 28 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java index 6c9d2eae10c10..241b3adc51eeb 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcCommandTest.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.Serializable; -import java.nio.file.Path; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -195,7 +194,7 @@ public void testDeleteLostSegmentLinksApplicationNotClosed() throws Exception { CdcConfiguration cfg = new CdcConfiguration(); cfg.setConsumer(new UserCdcConsumer() { - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { appStarted.countDown(); } }); diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java index cb17d9ba7bb94..c333fd3ed0dac 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/CdcResendCommandTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.util; -import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -218,7 +217,7 @@ synchronized List events() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op } diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java index a5a508b687b62..592bc71ef545b 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumer.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Iterator; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteBinary; @@ -34,7 +33,7 @@ * This consumer will receive data change events during ignite-cdc process invocation. * The lifecycle of the consumer is the following: *
    - *
  • Start of the consumer {@link #start(MetricRegistry, Path)}.
  • + *
  • Start of the consumer {@link #start(MetricRegistry)}.
  • *
  • Notification of the consumer by the {@link #onEvents(Iterator)} call.
  • *
  • Stop of the consumer {@link #stop()}.
  • *
@@ -67,9 +66,8 @@ public interface CdcConsumer { /** * Starts the consumer. * @param mreg Metric registry for consumer specific metrics. - * @param cdcDir Path to Change Data Capture Directory. */ - public void start(MetricRegistry mreg, Path cdcDir); + public void start(MetricRegistry mreg); /** * Handles entry changes events. @@ -133,7 +131,7 @@ public interface CdcConsumer { /** * Stops the consumer. - * This method can be invoked only after {@link #start(MetricRegistry, Path)}. + * This method can be invoked only after {@link #start(MetricRegistry)}. */ public void stop(); diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java new file mode 100644 index 0000000000000..035fb566ed30e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +import java.nio.file.Path; +import org.apache.ignite.metric.MetricRegistry; + +/** + * Extended CdcConsumer interface which provides overloaded {@link CdcConsumerEx#start(MetricRegistry, Path)} method + * required for CDC regex filters. + */ +public interface CdcConsumerEx extends CdcConsumer { + /** + * Starts the consumer. + * @param mreg Metric registry for consumer specific metrics. + * @param cdcDir Path to Change Data Capture Directory. + */ + void start(MetricRegistry mreg, Path cdcDir); +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java index 7110b6ca8ffaa..b7cbbc8c4ece9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java @@ -339,7 +339,7 @@ public void runX() throws Exception { committedSegmentOffset.value(walState.get1().fileOffset()); } - consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), cdcDir); + consumer.start(mreg, kctx.metric().registry(metricName("cdc", "consumer")), ft.walCdc().toPath()); started = true; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 94ce645f1dbd1..3295ae5589d44 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -27,6 +27,7 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcCacheEvent; import org.apache.ignite.cdc.CdcConsumer; +import org.apache.ignite.cdc.CdcConsumerEx; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -187,10 +188,14 @@ public void onCacheDestroyEvents(Iterator caches) { * * @param cdcReg CDC metric registry. * @param cdcConsumerReg CDC consumer metric registry. + * @param cdcDir Path to Change Data Capture Directory. * @throws IgniteCheckedException If failed. */ public void start(MetricRegistryImpl cdcReg, MetricRegistryImpl cdcConsumerReg, Path cdcDir) throws IgniteCheckedException { - consumer.start(cdcConsumerReg, cdcDir); + if (consumer instanceof CdcConsumerEx) + ((CdcConsumerEx) consumer).start(cdcConsumerReg, cdcDir); + else + consumer.start(cdcConsumerReg); evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer"); lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process"); diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java index c4ed4b61f9621..6f1180389558d 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/AbstractCdcTest.java @@ -308,7 +308,7 @@ public abstract static class TestCdcConsumer implements CdcConsumer { private volatile boolean stopped; /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { stopped = false; } @@ -462,7 +462,7 @@ public static class TrackCacheEventsConsumer implements CdcConsumer { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java index 9988ccf905603..cc6f520187d24 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcNonDefaultWorkDirTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.cdc; import java.io.File; -import java.nio.file.Path; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.configuration.DataStorageConfiguration; @@ -93,8 +92,8 @@ public void testCdcStartWithNonDefaultWorkDir() throws Exception { CdcConfiguration cdcCfg = new CdcConfiguration(); cdcCfg.setConsumer(new AbstractCdcTest.UserCdcConsumer() { - @Override public void start(MetricRegistry mreg, Path cdcDir) { - super.start(mreg, cdcDir); + @Override public void start(MetricRegistry mreg) { + super.start(mreg); started.countDown(); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java index feeecacf976d5..4b4d49790d951 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcPushMetricsExporterTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -202,10 +201,10 @@ public TestIgniteToIgniteConsumer(IgniteConfiguration destClusterCliCfg) { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { Ignite ignite = Ignition.start(destClusterCliCfg); - super.start(mreg, cdcDir); + super.start(mreg); ignite.log().info("TestIgniteToIgniteConsumer started."); } diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java index f55a6e51adec2..1c2132c497bb9 100644 --- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java @@ -19,7 +19,6 @@ import java.io.File; import java.io.Serializable; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -419,7 +418,7 @@ public void testReadOneByOneForBackup() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } }; @@ -513,7 +512,7 @@ public void testReadFromNextEntry() throws Exception { // No-op. } - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } }, cfg)); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java index d43da27eb6fa0..8af23d0c36185 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.ducktest.tests.cdc; -import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.IgniteLogger; @@ -43,7 +42,7 @@ public class CountingCdcConsumer implements CdcConsumer { private final AtomicLong objectsConsumed = new AtomicLong(); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { log.info("CountingCdcConsumer started"); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java index 4613fd0e1b887..e45cdaae45bc9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/cdc/CdcIndexRebuildTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.internal.cdc; -import java.nio.file.Path; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -141,7 +140,7 @@ public void reset() { } /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { // No-op. } diff --git a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java index 4675fb310fc87..7ace260bcfc00 100644 --- a/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java +++ b/modules/spring/src/test/java/org/apache/ignite/cdc/CdcConfigurationTest.java @@ -17,7 +17,6 @@ package org.apache.ignite.cdc; -import java.nio.file.Path; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import org.apache.ignite.IgniteCheckedException; @@ -124,7 +123,7 @@ public static class TestCdcConsumer implements CdcConsumer { public CountDownLatch startLatch = new CountDownLatch(1); /** {@inheritDoc} */ - @Override public void start(MetricRegistry mreg, Path cdcDir) { + @Override public void start(MetricRegistry mreg) { springString2 = ctx.getBean("springString2", String.class); startLatch.countDown(); From 572bc0e1d65d215f1d83997f906a42ca2e0fb838 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Sun, 8 Jun 2025 21:57:32 +0300 Subject: [PATCH 3/5] IGNITE-22530 Add CdcRegexMatcher interface --- .../apache/ignite/cdc/CdcRegexMatcher.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java new file mode 100644 index 0000000000000..08aade859db1e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cdc; + +/** + * Regexp matcher that processes user's regexp patterns for CDC caches names. + */ +public interface CdcRegexMatcher { + /** + * Finds and processes match between cache name and user's regexp patterns. + * + * @param cacheName Cache name. + * @return True if cache name matches user's regexp patterns. + */ + boolean match(String cacheName); +} From 77525a6d716df574dfab47b63eb02ef303c224db Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Wed, 9 Jul 2025 16:49:46 +0300 Subject: [PATCH 4/5] IGNITE-22530 Move CdcConsumerEx to org.apache.ignite.internal --- .../org/apache/ignite/{ => internal}/cdc/CdcConsumerEx.java | 4 +++- .../org/apache/ignite/internal/cdc/WalRecordsConsumer.java | 1 - 2 files changed, 3 insertions(+), 2 deletions(-) rename modules/core/src/main/java/org/apache/ignite/{ => internal}/cdc/CdcConsumerEx.java (94%) diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java similarity index 94% rename from modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java rename to modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java index 035fb566ed30e..57153b7018c56 100644 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcConsumerEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerEx.java @@ -15,9 +15,11 @@ * limitations under the License. */ -package org.apache.ignite.cdc; +package org.apache.ignite.internal.cdc; import java.nio.file.Path; + +import org.apache.ignite.cdc.CdcConsumer; import org.apache.ignite.metric.MetricRegistry; /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java index 3295ae5589d44..18a21ef84ce96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/WalRecordsConsumer.java @@ -27,7 +27,6 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cdc.CdcCacheEvent; import org.apache.ignite.cdc.CdcConsumer; -import org.apache.ignite.cdc.CdcConsumerEx; import org.apache.ignite.cdc.CdcEvent; import org.apache.ignite.cdc.TypeMapping; import org.apache.ignite.internal.pagemem.wal.WALIterator; From 58d2fe68d192e0544e1c0211c40343a9a85dbd57 Mon Sep 17 00:00:00 2001 From: Andrei Nadyktov Date: Mon, 3 Nov 2025 20:39:21 +0300 Subject: [PATCH 5/5] IGNITE-22530 Remove CdcRegexMatcher interface --- .../apache/ignite/cdc/CdcRegexMatcher.java | 31 ------------------- 1 file changed, 31 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java diff --git a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java b/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java deleted file mode 100644 index 08aade859db1e..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/cdc/CdcRegexMatcher.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cdc; - -/** - * Regexp matcher that processes user's regexp patterns for CDC caches names. - */ -public interface CdcRegexMatcher { - /** - * Finds and processes match between cache name and user's regexp patterns. - * - * @param cacheName Cache name. - * @return True if cache name matches user's regexp patterns. - */ - boolean match(String cacheName); -}