Skip to content

Commit 255b740

Browse files
committed
example RecordNameStrategy for multiple event types per topic
1 parent 0169792 commit 255b740

File tree

7 files changed

+241
-0
lines changed

7 files changed

+241
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Play with record name strategy
2+
3+
```bash
4+
5+
docker-compose -f docker-compose.yml -f docker-compose-akhq.yml up -d schema-registry akhq broker
6+
7+
curl localhost:8081/subjects
8+
9+
# Produce some messages with the RecordNameStrategy: run the producer
10+
11+
# check subjects again
12+
curl localhost:8081/subjects
13+
14+
15+
docker-compose -f docker-compose.yml -f docker-compose-akhq.yml down schema-registry akhq broker
16+
```
17+
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
3+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<artifactId>record-name-strategy</artifactId>
8+
<version>1.0-SNAPSHOT</version>
9+
10+
<name>record-name-strategy</name>
11+
12+
<parent>
13+
<groupId>io.spoud.training</groupId>
14+
<artifactId>training-parent</artifactId>
15+
<version>1.0.0-SNAPSHOT</version>
16+
<relativePath>../../pom.xml</relativePath>
17+
</parent>
18+
19+
<properties>
20+
<main.basedir>${project.parent.basedir}</main.basedir>
21+
<avro.version>1.12.0</avro.version>
22+
<confluent.version>7.8.0</confluent.version>
23+
</properties>
24+
25+
<dependencies>
26+
<dependency>
27+
<groupId>io.confluent</groupId>
28+
<artifactId>kafka-avro-serializer</artifactId>
29+
<version>${confluent.version}</version>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.apache.avro</groupId>
33+
<artifactId>avro</artifactId>
34+
<version>${avro.version}</version>
35+
</dependency>
36+
</dependencies>
37+
<build>
38+
<plugins>
39+
<plugin>
40+
<groupId>org.apache.avro</groupId>
41+
<artifactId>avro-maven-plugin</artifactId>
42+
<version>${avro.version}</version>
43+
<executions>
44+
<execution>
45+
<phase>generate-sources</phase>
46+
<goals>
47+
<goal>schema</goal>
48+
</goals>
49+
<configuration>
50+
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
51+
<stringType>String</stringType>
52+
</configuration>
53+
</execution>
54+
</executions>
55+
</plugin>
56+
<plugin>
57+
<groupId>org.apache.maven.plugins</groupId>
58+
<artifactId>maven-compiler-plugin</artifactId>
59+
<version>3.13.0</version>
60+
<configuration>
61+
<source>17</source>
62+
<target>17</target>
63+
</configuration>
64+
</plugin>
65+
</plugins>
66+
</build>
67+
68+
<repositories>
69+
<repository>
70+
<id>confluent</id>
71+
<url>https://packages.confluent.io/maven/</url>
72+
</repository>
73+
</repositories>
74+
</project>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"type": "record",
3+
"name": "CreateEvent",
4+
"namespace": "io.spoud.training.recordnamestrategy",
5+
"fields": [
6+
{
7+
"name": "eventId",
8+
"type": "string"
9+
},
10+
{
11+
"name": "name",
12+
"type": "string"
13+
},
14+
{
15+
"name": "created",
16+
"type": {
17+
"type": "long",
18+
"logicalType": "timestamp-millis"
19+
}
20+
}
21+
]
22+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"type": "record",
3+
"name": "UpdateEvent",
4+
"namespace": "io.spoud.training.recordnamestrategy",
5+
"fields": [
6+
{
7+
"name": "eventId",
8+
"type": "string"
9+
},
10+
{
11+
"name": "updateCounter",
12+
"type": "long"
13+
},
14+
{
15+
"name": "updated",
16+
"type": {
17+
"type": "long",
18+
"logicalType": "timestamp-millis"
19+
}
20+
}
21+
]
22+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package io.spoud.training;
2+
3+
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
4+
import org.apache.kafka.clients.consumer.ConsumerRecord;
5+
import org.apache.kafka.clients.consumer.ConsumerRecords;
6+
import org.apache.kafka.clients.consumer.KafkaConsumer;
7+
import org.apache.kafka.clients.producer.KafkaProducer;
8+
import org.apache.kafka.clients.producer.ProducerRecord;
9+
10+
import java.time.Duration;
11+
import java.util.Arrays;
12+
import java.util.List;
13+
import java.util.Properties;
14+
15+
public class Consumer
16+
{
17+
public static void main( String[] args )
18+
{
19+
20+
Properties props = new Properties();
21+
props.put("bootstrap.servers", "localhost:9092");
22+
props.put("group.id", "test");
23+
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
24+
props.put("value.deserializer", KafkaAvroDeserializer.class);
25+
props.put("enable.auto.commit", "false");
26+
props.put("auto.offset.reset", "earliest");
27+
props.put("schema.registry.url", "http://localhost:8081");
28+
29+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
30+
31+
consumer.subscribe(List.of("record-name-strategy"));
32+
33+
while (true) {
34+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
35+
for (ConsumerRecord<String, String> record : records) {
36+
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
37+
}
38+
}
39+
}
40+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package io.spoud.training;
2+
3+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
4+
import io.spoud.training.recordnamestrategy.CreateEvent;
5+
import io.spoud.training.recordnamestrategy.UpdateEvent;
6+
import org.apache.kafka.clients.producer.KafkaProducer;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
9+
import java.time.Instant;
10+
import java.util.*;
11+
12+
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
13+
14+
public class Producer
15+
{
16+
public static void main( String[] args )
17+
{
18+
19+
//java-batching-1 Create the producer
20+
Properties props = new Properties();
21+
props.put("bootstrap.servers", "localhost:9092");
22+
props.put("acks", "all");
23+
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
24+
props.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
25+
props.put("value.subject.name.strategy", "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
26+
props.put("schema.registry.url", "http://localhost:8081");
27+
28+
KafkaProducer<String, CreateEvent> createEventProducer = new KafkaProducer<>(props);
29+
KafkaProducer<String, UpdateEvent> updateEventProducer = new KafkaProducer<>(props);
30+
String key = "aboutSomething";
31+
String topic = "record-name-strategy";
32+
33+
// Send 1 create message and 5 update messages
34+
CreateEvent value = CreateEvent.newBuilder()
35+
.setEventId(UUID.randomUUID().toString())
36+
.setName("Create Event ")
37+
.setCreated(Instant.now())
38+
.build();
39+
createEventProducer.send(new ProducerRecord<>(topic, key, value));
40+
createEventProducer.flush();
41+
createEventProducer.close();
42+
for (int i = 0; i < 5; i++) {
43+
UpdateEvent updateValue = UpdateEvent.newBuilder()
44+
.setEventId(UUID.randomUUID().toString())
45+
.setUpdateCounter(i)
46+
.setUpdated(Instant.now())
47+
.build();
48+
updateEventProducer.send(new ProducerRecord<>(topic, key, updateValue));
49+
}
50+
updateEventProducer.flush();
51+
updateEventProducer.close();
52+
}
53+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<configuration>
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%-5level [%thread] %logger{12} - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
8+
<logger name="org.apache" level="debug" />
9+
10+
<root level="info">
11+
<appender-ref ref="STDOUT" />
12+
</root>
13+
</configuration>

0 commit comments

Comments
 (0)