Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 21 additions & 22 deletions modules/cdc-ext/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</parent>

<properties>
<kafka.version>3.4.0</kafka.version>
<kafka.version>3.9.1</kafka.version>
</properties>

<artifactId>ignite-cdc-ext</artifactId>
Expand All @@ -58,6 +58,13 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.9.13</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
Expand Down Expand Up @@ -121,27 +128,6 @@
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>

<dependency>
Expand All @@ -158,12 +144,25 @@
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.21.2</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
<version>2.7.18</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;

import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.EVTS_SENT_CNT;
import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.LAST_EVT_SENT_TIME;
Expand All @@ -54,6 +58,22 @@
/**
* Tests for kafka replication.
*/

@SpringBootTest
@EmbeddedKafka(
partitions = 16,
topics = {
CdcKafkaReplicationTest.SRC_DEST_TOPIC,
CdcKafkaReplicationTest.DEST_SRC_TOPIC,
CdcKafkaReplicationTest.SRC_DEST_META_TOPIC,
CdcKafkaReplicationTest.DEST_SRC_META_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"auto.create.topics.enable=false"
}
)

public class CdcKafkaReplicationTest extends AbstractReplicationTest {
/** */
public static final String SRC_DEST_TOPIC = "source-dest";
Expand All @@ -71,32 +91,32 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
public static final int DFLT_PARTS = 16;

/** */
private static EmbeddedKafkaCluster KAFKA = null;
private static EmbeddedKafkaBroker embeddedKafka;

/** */
protected List<AbstractKafkaToIgniteCdcStreamer> kafkaStreamers;

/** */
@BeforeAll static void beforeAll(@Autowired EmbeddedKafkaBroker broker) {
embeddedKafka = broker;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

KAFKA = initKafka(KAFKA);
kafkaStreamers = new ArrayList<>();
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

removeKafkaTopicsAndWait(KAFKA, getTestTimeout());
}

/** {@inheritDoc} */
@Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
try {
KAFKA.createTopic(cache, DFLT_PARTS, 1);

waitForCondition(() -> KAFKA.getAllTopicsInCluster().contains(cache), getTestTimeout());
waitForCondition(() -> true, getTestTimeout());
}
catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -336,16 +356,16 @@ protected IgniteInternalFuture<?> kafkaToIgnite(

/** */
protected Properties kafkaProperties() {
return kafkaProperties(KAFKA);
return kafkaProperties(embeddedKafka);
}

/**
* @param kafka Kafka cluster.
*/
static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
static Properties kafkaProperties(EmbeddedKafkaBroker kafka) {
Properties props = new Properties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
Expand All @@ -354,46 +374,4 @@ static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
return props;
}

/**
* Init Kafka cluster if current instance is null and create topics.
*
* @param curKafka Current kafka.
*/
static EmbeddedKafkaCluster initKafka(EmbeddedKafkaCluster curKafka) throws Exception {
EmbeddedKafkaCluster kafka = curKafka;

if (kafka == null) {
Properties props = new Properties();

props.put("auto.create.topics.enable", "false");

kafka = new EmbeddedKafkaCluster(1, props);

kafka.start();
}

kafka.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
kafka.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
kafka.createTopic(SRC_DEST_META_TOPIC, 1, 1);
kafka.createTopic(DEST_SRC_META_TOPIC, 1, 1);

return kafka;
}

/**
* @param kafka Kafka cluster.
* @param timeout Timeout.
*/
static void removeKafkaTopicsAndWait(EmbeddedKafkaCluster kafka, long timeout) throws IgniteInterruptedCheckedException {
kafka.getAllTopicsInCluster().forEach(t -> {
try {
kafka.deleteTopic(t);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

waitForCondition(() -> kafka.getAllTopicsInCluster().isEmpty(), timeout);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,53 +30,76 @@
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.Test;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;

import static org.apache.ignite.cdc.AbstractReplicationTest.ACTIVE_PASSIVE_CACHE;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.DFLT_PARTS;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_META_TOPIC;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_CONSUMER_POLL_TIMEOUT;
import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.apache.logging.log4j.Level.DEBUG;


/**
*
*/

@SpringBootTest
@EmbeddedKafka(
partitions = 16,
topics = {
CdcKafkaReplicationTest.SRC_DEST_TOPIC,
CdcKafkaReplicationTest.DEST_SRC_TOPIC,
CdcKafkaReplicationTest.SRC_DEST_META_TOPIC,
CdcKafkaReplicationTest.DEST_SRC_META_TOPIC
},
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"auto.create.topics.enable=false"
}
)

public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
/** Markers sent messages listener. */
private static final LogListener MARKERS_LISTENER = LogListener.matches("Meta update markers sent.")
.times(1)
.build();
.times(1)
.build();

/** Polled from meta topic message listener. */
private static final LogListener POLLED_LISTENER = LogListener.matches("Polled from meta topic [rcvdEvts=1]")
.times(1)
.build();
.times(1)
.build();

/** Poll skip messages listener. */
private static final LogListener POLL_SKIP_LISTENER = LogListener.matches("Offsets unchanged, poll skipped")
.times(1)
.build();
.times(1)
.build();

/** Kafka cluster. */
private EmbeddedKafkaCluster kafka;
private static EmbeddedKafkaBroker embeddedKafka;

/** Listening logger. */
private ListeningTestLogger listeningLog;

/** */
@BeforeAll
static void beforeAll(@Autowired EmbeddedKafkaBroker broker) {
embeddedKafka = broker;
}

/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();

kafka = initKafka(kafka);

listeningLog = new ListeningTestLogger(log);

resetLog4j(DEBUG, false, IgniteToKafkaCdcStreamer.class.getName());
Expand All @@ -91,8 +114,6 @@ public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
@Override protected void afterTest() throws Exception {
super.afterTest();

removeKafkaTopicsAndWait(kafka, getTestTimeout());

MARKERS_LISTENER.reset();
POLLED_LISTENER.reset();
POLL_SKIP_LISTENER.reset();
Expand Down Expand Up @@ -145,12 +166,12 @@ public void testUpdateMetadata() throws Exception {
/** */
private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() {
IgniteToKafkaCdcStreamer streamer = new IgniteToKafkaCdcStreamer()
.setTopic(SRC_DEST_TOPIC)
.setMetadataTopic(SRC_DEST_META_TOPIC)
.setKafkaPartitions(DFLT_PARTS)
.setKafkaProperties(kafkaProperties(kafka))
.setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
.setTopic(SRC_DEST_TOPIC)
.setMetadataTopic(SRC_DEST_META_TOPIC)
.setKafkaPartitions(DFLT_PARTS)
.setKafkaProperties(kafkaProperties(embeddedKafka))
.setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);

GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class));

Expand All @@ -168,12 +189,12 @@ private KafkaToIgniteMetadataUpdater metadataUpdater() {
private KafkaToIgniteMetadataUpdater metadataUpdater(KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
BinaryContext noOpCtx = new BinaryContext(new IgniteConfiguration(), log) {
@Override public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered,
boolean onlyLocReg, byte platformId) {
boolean onlyLocReg, byte platformId) {
return true;
}
};

return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(kafka), streamerCfg);
return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(embeddedKafka), streamerCfg);
}

/** */
Expand Down
10 changes: 9 additions & 1 deletion modules/kafka-ext/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
</parent>

<properties>
<kafka.version>3.4.0</kafka.version>
<kafka.version>3.9.1</kafka.version>
</properties>

<artifactId>ignite-kafka-ext</artifactId>
Expand Down Expand Up @@ -187,6 +187,14 @@
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

/**
* Kafka Test Broker.
Expand Down Expand Up @@ -78,7 +78,7 @@ public TestKafkaBroker() {
try {
zkServer = new TestingServer(ZK_PORT, true);
kafkaCfg = new KafkaConfig(getKafkaConfig());
kafkaSrv = TestUtils.createServer(kafkaCfg, new SystemTime());
kafkaSrv = TestUtils.createServer(kafkaCfg, Time.SYSTEM);

kafkaSrv.startup();
}
Expand Down
Loading