From 31f5988385e08652b058880016a473903467dc5a Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 4 Feb 2026 00:27:07 +0800 Subject: [PATCH 1/4] KAFKA-20115: Group coordinator fails to unload metadata when no longer leader or follower Signed-off-by: Kuan-Po Tseng --- .../common/runtime/CoordinatorRuntime.java | 15 +++- .../runtime/CoordinatorRuntimeTest.java | 80 +++++++++++++++++++ 2 files changed, 92 insertions(+), 3 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 1d3fb49ad3075..4e81c1ad152f8 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -754,7 +754,8 @@ private void unload() { timer.cancelAll(); executor.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); - failCurrentBatch(Errors.NOT_COORDINATOR.exception()); + // There is no need to free the current batch, as we will be closing all related resources anyway. + failCurrentBatch(Errors.NOT_COORDINATOR.exception(), false); if (coordinator != null) { try { coordinator.onUnloaded(); @@ -879,15 +880,23 @@ private void maybeFlushCurrentBatch(long currentTimeMs) { } } + /** + * Convenience method to fail the current batch. + * See {@link #failCurrentBatch(Throwable, boolean)} for details. + */ + private void failCurrentBatch(Throwable t) { + failCurrentBatch(t, true); + } + /** * Fails the current batch, reverts to the snapshot to the base/start offset of the * batch, fails all the associated events. */ - private void failCurrentBatch(Throwable t) { + private void failCurrentBatch(Throwable t, boolean freeCurrentBatch) { if (currentBatch != null) { coordinator.revertLastWrittenOffset(currentBatch.baseOffset); currentBatch.deferredEvents.complete(t); - freeCurrentBatch(); + if (freeCurrentBatch) freeCurrentBatch(); } } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index a359832756d3d..bce6e0db984c7 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -5695,6 +5695,86 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception assertEquals(0, schedulerTimer.size()); } + @Test + public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(10)) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.withExecutor(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Configure the partition writer with a normal config initially. + LogConfig initialLogConfig = new LogConfig( + Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + when(writer.config(TP)).thenReturn(initialLogConfig); + when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L); + + // Load the coordinator. + runtime.scheduleLoadOperation(TP, 10); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // Schedule a write operation to create a pending batch. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(List.of("record1"), "response1") + ); + + // Verify that the write is not committed yet and a batch exists. + assertFalse(write1.isDone()); + assertNotNull(ctx.currentBatch); + + // Simulate the broker losing leadership: partitionWriter.config() now throws NOT_LEADER_OR_FOLLOWER. + // This is the scenario described in KAFKA-20115. + when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + + // Schedule the unloading. This should trigger the bug where freeCurrentBatch() + // tries to call partitionWriter.config(tp).maxMessageSize() and throws an exception. + // Without the fix, this would prevent the coordinator from unloading properly. + runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); + + // The unload should complete despite the NOT_LEADER_OR_FOLLOWER exception + // when trying to access partition writer config during buffer cleanup. + assertEquals(CLOSED, ctx.state); + + // Verify that onUnloaded is called. + verify(coordinator, times(1)).onUnloaded(); + + // Verify that the listener is deregistered. + verify(writer, times(1)).deregisterListener( + eq(TP), + any(PartitionWriter.Listener.class) + ); + + // Getting the coordinator context fails because it no longer exists. + assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + } + private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp From 83e16f7a76d13ae94e504c667f6c52b76c5dbb63 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 4 Feb 2026 08:03:04 +0800 Subject: [PATCH 2/4] Address comments Signed-off-by: Kuan-Po Tseng --- .../runtime/CoordinatorRuntimeTest.java | 161 +++++++++--------- 1 file changed, 81 insertions(+), 80 deletions(-) diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index bce6e0db984c7..32a79765f09b3 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -710,6 +710,87 @@ public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); } + @Test + public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); + MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(supplier) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(OptionalInt.of(10)) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + when(builder.withSnapshotRegistry(any())).thenReturn(builder); + when(builder.withLogContext(any())).thenReturn(builder); + when(builder.withTime(any())).thenReturn(builder); + when(builder.withTimer(any())).thenReturn(builder); + when(builder.withCoordinatorMetrics(any())).thenReturn(builder); + when(builder.withTopicPartition(any())).thenReturn(builder); + when(builder.withExecutor(any())).thenReturn(builder); + when(builder.build()).thenReturn(coordinator); + when(supplier.get()).thenReturn(builder); + + // Configure the partition writer with a normal config initially. + LogConfig initialLogConfig = new LogConfig( + Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB + )); + when(writer.config(TP)).thenReturn(initialLogConfig); + when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L); + + // Load the coordinator. + runtime.scheduleLoadOperation(TP, 10); + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // Schedule a write operation to create a pending batch. + CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, + state -> new CoordinatorResult<>(List.of("record1"), "response1") + ); + + // Verify that the write is not committed yet and a batch exists. + assertFalse(write1.isDone()); + assertNotNull(ctx.currentBatch); + + // Simulate the broker losing leadership: partitionWriter.config() now throws NOT_LEADER_OR_FOLLOWER. + // This is the scenario described in KAFKA-20115. + when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception()); + + // Schedule the unloading. This should trigger the bug where freeCurrentBatch() + // tries to call partitionWriter.config(tp).maxMessageSize() and throws an exception. + // Without the fix, this would prevent the coordinator from unloading properly. + runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); + + // The unload should complete despite the NOT_LEADER_OR_FOLLOWER exception + // when trying to access partition writer config during buffer cleanup. + assertEquals(CLOSED, ctx.state); + + // Verify that onUnloaded is called. + verify(coordinator, times(1)).onUnloaded(); + + // Verify that the listener is deregistered. + verify(writer, times(1)).deregisterListener( + eq(TP), + any(PartitionWriter.Listener.class) + ); + + // Getting the coordinator context fails because it no longer exists. + assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); + } + + @Test public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException { MockTimer timer = new MockTimer(); @@ -5695,86 +5776,6 @@ public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception assertEquals(0, schedulerTimer.size()); } - @Test - public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() { - MockTimer timer = new MockTimer(); - MockPartitionWriter writer = mock(MockPartitionWriter.class); - MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); - MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); - MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); - - CoordinatorRuntime runtime = - new CoordinatorRuntime.Builder() - .withTime(timer.time()) - .withTimer(timer) - .withDefaultWriteTimeOut(Duration.ofMillis(20)) - .withLoader(new MockCoordinatorLoader()) - .withEventProcessor(new DirectEventProcessor()) - .withPartitionWriter(writer) - .withCoordinatorShardBuilderSupplier(supplier) - .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) - .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) - .withSerializer(new StringSerializer()) - .withAppendLingerMs(OptionalInt.of(10)) - .withExecutorService(mock(ExecutorService.class)) - .build(); - - when(builder.withSnapshotRegistry(any())).thenReturn(builder); - when(builder.withLogContext(any())).thenReturn(builder); - when(builder.withTime(any())).thenReturn(builder); - when(builder.withTimer(any())).thenReturn(builder); - when(builder.withCoordinatorMetrics(any())).thenReturn(builder); - when(builder.withTopicPartition(any())).thenReturn(builder); - when(builder.withExecutor(any())).thenReturn(builder); - when(builder.build()).thenReturn(coordinator); - when(supplier.get()).thenReturn(builder); - - // Configure the partition writer with a normal config initially. - LogConfig initialLogConfig = new LogConfig( - Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB - )); - when(writer.config(TP)).thenReturn(initialLogConfig); - when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L); - - // Load the coordinator. - runtime.scheduleLoadOperation(TP, 10); - CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); - - // Schedule a write operation to create a pending batch. - CompletableFuture write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT, - state -> new CoordinatorResult<>(List.of("record1"), "response1") - ); - - // Verify that the write is not committed yet and a batch exists. - assertFalse(write1.isDone()); - assertNotNull(ctx.currentBatch); - - // Simulate the broker losing leadership: partitionWriter.config() now throws NOT_LEADER_OR_FOLLOWER. - // This is the scenario described in KAFKA-20115. - when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception()); - - // Schedule the unloading. This should trigger the bug where freeCurrentBatch() - // tries to call partitionWriter.config(tp).maxMessageSize() and throws an exception. - // Without the fix, this would prevent the coordinator from unloading properly. - runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); - - // The unload should complete despite the NOT_LEADER_OR_FOLLOWER exception - // when trying to access partition writer config during buffer cleanup. - assertEquals(CLOSED, ctx.state); - - // Verify that onUnloaded is called. - verify(coordinator, times(1)).onUnloaded(); - - // Verify that the listener is deregistered. - verify(writer, times(1)).deregisterListener( - eq(TP), - any(PartitionWriter.Listener.class) - ); - - // Getting the coordinator context fails because it no longer exists. - assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); - } - private static , U> ArgumentMatcher> coordinatorMatcher( CoordinatorRuntime runtime, TopicPartition tp From 99d48553d8e11543859032772c4c6f13f62edb5c Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Wed, 4 Feb 2026 09:47:46 +0800 Subject: [PATCH 3/4] Add integration test Signed-off-by: Kuan-Po Tseng --- .../consumer/ConsumerIntegrationTest.java | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java index 220866c240f4a..8580893b7d6ff 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java @@ -26,7 +26,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -35,9 +37,13 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTests; import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupCoordinator; import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.coordinator.group.GroupCoordinatorService; +import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.test.TestUtils; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -46,7 +52,9 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.function.Function; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -335,6 +343,50 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec } } + @SuppressWarnings("unchecked") + @ClusterTest( + brokers = 2, + types = {Type.KRAFT}, + serverProperties = { + @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = "3000") + } + ) + public void testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException, TimeoutException { + try (var producer = clusterInstance.producer()) { + producer.send(new ProducerRecord<>("topic", "value".getBytes(StandardCharsets.UTF_8))); + } + + try (var admin = clusterInstance.admin()) { + admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get(); + } + + try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group")); + var admin = clusterInstance.admin()) { + consumer.subscribe(List.of("topic")); + TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group"); + // append records to coordinator + consumer.commitSync(); + + // unload the coordinator by changing leader (0 -> 1) + admin.alterPartitionReassignments(Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0), + Optional.of(new NewPartitionReassignment(List.of(1))))).all().get(); + } + + Function> partitionsInGroupMetrics = service -> assertDoesNotThrow(() -> { + var f0 = GroupCoordinatorService.class.getDeclaredField("groupCoordinatorMetrics"); + f0.setAccessible(true); + var f1 = GroupCoordinatorMetrics.class.getDeclaredField("shards"); + f1.setAccessible(true); + return List.copyOf(((Map) f1.get(f0.get(service))).keySet()); + }); + + // the offset partition should NOT be hosted by multiple coordinators + var tps = clusterInstance.brokers().values().stream() + .flatMap(b -> partitionsInGroupMetrics.apply(b.groupCoordinator()).stream()).toList(); + assertEquals(1, tps.size()); + } + private void sendMsg(ClusterInstance clusterInstance, String topic, int sendMsgNum) { try (var producer = clusterInstance.producer(Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, From 9d3874a0afdf8b0fd503b95fba9734c632d27176 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Thu, 5 Feb 2026 09:50:40 +0800 Subject: [PATCH 4/4] Address comments Signed-off-by: Kuan-Po Tseng --- .../coordinator/common/runtime/CoordinatorRuntime.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 4e81c1ad152f8..e85c223681856 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -755,7 +755,7 @@ private void unload() { executor.cancelAll(); deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); // There is no need to free the current batch, as we will be closing all related resources anyway. - failCurrentBatch(Errors.NOT_COORDINATOR.exception(), false); + failCurrentBatchWithoutRelease(Errors.NOT_COORDINATOR.exception()); if (coordinator != null) { try { coordinator.onUnloaded(); @@ -880,14 +880,14 @@ private void maybeFlushCurrentBatch(long currentTimeMs) { } } - /** - * Convenience method to fail the current batch. - * See {@link #failCurrentBatch(Throwable, boolean)} for details. - */ private void failCurrentBatch(Throwable t) { failCurrentBatch(t, true); } + private void failCurrentBatchWithoutRelease(Throwable t) { + failCurrentBatch(t, false); + } + /** * Fails the current batch, reverts to the snapshot to the base/start offset of the * batch, fails all the associated events.