Skip to content

Commit c484e25

Browse files
Upgrade to Kafka 4.1 (#377)
Co-authored-by: Raphael <22345578+raphala@users.noreply.github.com>
1 parent dcc07a3 commit c484e25

File tree

8 files changed

+188
-151
lines changed

8 files changed

+188
-151
lines changed

gradle/libs.versions.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
junit = "6.0.0"
33
mockito = "5.20.0"
44
testcontainers = "1.21.3"
5-
kafkaUtils = "1.2.1"
5+
kafkaUtils = "1.3.0"
66

77
[libraries]
88
kafka-bom = { group = "com.bakdata.kafka", name = "kafka-bom", version.ref = "kafkaUtils" }
@@ -17,7 +17,7 @@ kafka-streams-avro-serde = { group = "io.confluent", name = "kafka-streams-avro-
1717
kafka-protobuf-provider = { group = "io.confluent", name = "kafka-protobuf-provider" }
1818
largeMessage-bom = { group = "com.bakdata.kafka", name = "large-message-bom", version = "3.1.0" }
1919
largeMessage-core = { group = "com.bakdata.kafka", name = "large-message-core" }
20-
errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.0.0" }
20+
errorHandling-bom = { group = "com.bakdata.kafka", name = "error-handling-bom", version = "2.1.0" }
2121
errorHandling-core = { group = "com.bakdata.kafka", name = "error-handling-core" }
2222
picocli = { group = "info.picocli", name = "picocli", version = "4.7.7" }
2323
slf4j = { group = "org.slf4j", name = "slf4j-api", version = "2.0.17" }
@@ -33,7 +33,7 @@ mockito-core = { group = "org.mockito", name = "mockito-core", version.ref = "mo
3333
mockito-junit = { group = "org.mockito", name = "mockito-junit-jupiter", version.ref = "mockito" }
3434
testcontainers-junit = { group = "org.testcontainers", name = "junit-jupiter", version.ref = "testcontainers" }
3535
testcontainers-kafka = { group = "org.testcontainers", name = "kafka", version.ref = "testcontainers" }
36-
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.4.1" }
36+
fluentKafkaStreamsTests = { group = "com.bakdata.fluent-kafka-streams-tests", name = "fluent-kafka-streams-tests-junit5", version = "3.5.0" }
3737
log4j-slf4j2 = { group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = "2.25.1" }
3838
awaitility = { group = "org.awaitility", name = "awaitility", version = "4.3.0" }
3939

streams-bootstrap-cli-test/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ dependencies {
1616
testImplementation(libs.mockito.junit)
1717
testImplementation(testFixtures(project(":streams-bootstrap-test")))
1818
testImplementation(libs.kafka.streams.avro.serde) {
19-
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
19+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
2020
}
2121
testImplementation(libs.log4j.slf4j2)
2222
}

streams-bootstrap-core/build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ dependencies {
1313
api(libs.kafka.streams)
1414
api(libs.kafka.clients)
1515
implementation(libs.kafka.schema.serializer) {
16-
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
16+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
1717
exclude(group = "org.slf4j", module = "slf4j-api") // Conflict with 2.x when used as dependency
1818
}
1919
api(libs.kafka.schema.registry.client) {
20-
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
20+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
2121
exclude(group = "org.slf4j", module = "slf4j-api") // Conflict with 2.x when used as dependency
2222
}
2323
implementation(libs.slf4j)
@@ -35,7 +35,7 @@ dependencies {
3535

3636
testImplementation(testFixtures(project(":streams-bootstrap-test")))
3737
testImplementation(libs.kafka.streams.avro.serde) {
38-
exclude(group = "org.apache.kafka", module = "kafka-clients") // force usage of OSS kafka-clients
38+
exclude(group = "org.apache.kafka") // force usage of OSS kafka-clients
3939
}
4040
testImplementation(libs.kafka.group.coordinator)
4141
testImplementation(libs.log4j.slf4j2)

streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ <GK, GV, RV> KStreamX<K, RV> leftJoin(GlobalKTable<GK, GV> globalTable,
850850

851851
@Override
852852
<KOut, VOut> KStreamX<KOut, VOut> process(
853-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
853+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
854854
String... stateStoreNames);
855855

856856
/**
@@ -866,7 +866,7 @@ <KOut, VOut> KStreamX<KOut, VOut> process(
866866
* @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier)
867867
*/
868868
<KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
869-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
869+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
870870
String... stateStoreNames);
871871

872872
/**
@@ -883,13 +883,13 @@ <KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
883883
* @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier, java.util.function.Predicate)
884884
*/
885885
<KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
886-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
886+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
887887
java.util.function.Predicate<Exception> errorFilter,
888888
String... stateStoreNames);
889889

890890
@Override
891891
<KOut, VOut> KStreamX<KOut, VOut> process(
892-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
892+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
893893
Named named, String... stateStoreNames);
894894

895895
/**
@@ -906,7 +906,7 @@ <KOut, VOut> KStreamX<KOut, VOut> process(
906906
* @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier)
907907
*/
908908
<KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
909-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
909+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
910910
Named named, String... stateStoreNames);
911911

912912
/**
@@ -924,13 +924,13 @@ <KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
924924
* @see ErrorCapturingProcessor#captureErrors(ProcessorSupplier, java.util.function.Predicate)
925925
*/
926926
<KOut, VOut> KErrorStreamX<K, V, KOut, VOut> processCapturingErrors(
927-
ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier,
927+
ProcessorSupplier<? super K, ? super V, ? extends KOut, ? extends VOut> processorSupplier,
928928
java.util.function.Predicate<Exception> errorFilter,
929929
Named named, String... stateStoreNames);
930930

931931
@Override
932932
<VOut> KStreamX<K, VOut> processValues(
933-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
933+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
934934
String... stateStoreNames);
935935

936936
/**
@@ -947,7 +947,7 @@ <VOut> KStreamX<K, VOut> processValues(
947947
* @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier)
948948
*/
949949
<VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
950-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
950+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
951951
String... stateStoreNames);
952952

953953
/**
@@ -965,13 +965,13 @@ <VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
965965
* @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier, java.util.function.Predicate)
966966
*/
967967
<VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
968-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
968+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
969969
java.util.function.Predicate<Exception> errorFilter,
970970
String... stateStoreNames);
971971

972972
@Override
973973
<VOut> KStreamX<K, VOut> processValues(
974-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
974+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
975975
Named named, String... stateStoreNames);
976976

977977
/**
@@ -989,7 +989,7 @@ <VOut> KStreamX<K, VOut> processValues(
989989
* @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier)
990990
*/
991991
<VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
992-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
992+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
993993
Named named, String... stateStoreNames);
994994

995995
/**
@@ -1008,7 +1008,7 @@ <VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
10081008
* @see ErrorCapturingValueProcessor#captureErrors(FixedKeyProcessorSupplier, java.util.function.Predicate)
10091009
*/
10101010
<VOut> KErrorStreamX<K, V, K, VOut> processValuesCapturingErrors(
1011-
FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier,
1011+
FixedKeyProcessorSupplier<? super K, ? super V, ? extends VOut> processorSupplier,
10121012
java.util.function.Predicate<Exception> errorFilter,
10131013
Named named, String... stateStoreNames);
10141014
}

0 commit comments

Comments
 (0)