Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Expand All @@ -35,9 +37,13 @@
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorService;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.test.TestUtils;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -46,7 +52,9 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -335,6 +343,50 @@ public void testRackAwareAssignment(ClusterInstance clusterInstance) throws Exec
}
}

@SuppressWarnings("unchecked")
@ClusterTest(
brokers = 2,
types = {Type.KRAFT},
serverProperties = {
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, value = "3000")
}
)
public void testSingleCoordinatorOwnershipAfterPartitionReassignment(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException, TimeoutException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat could you open a PR against trunk to improve the test coverage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you go: #21403

try (var producer = clusterInstance.<byte[], byte[]>producer()) {
producer.send(new ProducerRecord<>("topic", "value".getBytes(StandardCharsets.UTF_8)));
}

try (var admin = clusterInstance.admin()) {
admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
}

try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
var admin = clusterInstance.admin()) {
consumer.subscribe(List.of("topic"));
TestUtils.waitForCondition(() -> consumer.poll(Duration.ofMillis(100)).isEmpty(), "polling to join group");
// append records to coordinator
consumer.commitSync();

// unload the coordinator by changing leader (0 -> 1)
admin.alterPartitionReassignments(Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
Optional.of(new NewPartitionReassignment(List.of(1))))).all().get();
}

Function<GroupCoordinator, List<TopicPartition>> partitionsInGroupMetrics = service -> assertDoesNotThrow(() -> {
var f0 = GroupCoordinatorService.class.getDeclaredField("groupCoordinatorMetrics");
f0.setAccessible(true);
var f1 = GroupCoordinatorMetrics.class.getDeclaredField("shards");
f1.setAccessible(true);
return List.copyOf(((Map<TopicPartition, ?>) f1.get(f0.get(service))).keySet());
});

// the offset partition should NOT be hosted by multiple coordinators
var tps = clusterInstance.brokers().values().stream()
.flatMap(b -> partitionsInGroupMetrics.apply(b.groupCoordinator()).stream()).toList();
assertEquals(1, tps.size());
}

private void sendMsg(ClusterInstance clusterInstance, String topic, int sendMsgNum) {
try (var producer = clusterInstance.producer(Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,8 @@ private void unload() {
timer.cancelAll();
executor.cancelAll();
deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception());
failCurrentBatch(Errors.NOT_COORDINATOR.exception());
// There is no need to free the current batch, as we will be closing all related resources anyway.
failCurrentBatchWithoutRelease(Errors.NOT_COORDINATOR.exception());
if (coordinator != null) {
try {
coordinator.onUnloaded();
Expand Down Expand Up @@ -879,15 +880,23 @@ private void maybeFlushCurrentBatch(long currentTimeMs) {
}
}

private void failCurrentBatch(Throwable t) {
failCurrentBatch(t, true);
}

private void failCurrentBatchWithoutRelease(Throwable t) {
failCurrentBatch(t, false);
}

/**
* Fails the current batch, reverts to the snapshot to the base/start offset of the
* batch, fails all the associated events.
*/
private void failCurrentBatch(Throwable t) {
private void failCurrentBatch(Throwable t, boolean freeCurrentBatch) {
if (currentBatch != null) {
coordinator.revertLastWrittenOffset(currentBatch.baseOffset);
currentBatch.deferredEvents.complete(t);
freeCurrentBatch();
if (freeCurrentBatch) freeCurrentBatch();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,87 @@ public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionE
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}

@Test
public void testScheduleUnloadingWithPendingBatchWhenPartitionWriterConfigThrows() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking about adding integration tests. For example:

    @ClusterTest(
            brokers = 2,
            types = {Type.KRAFT},
            serverProperties = {
                @ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
                @ClusterConfigProperty(key = "group.coordinator.append.linger.ms", value = "3000")
            }
    )
    public void test(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException, TimeoutException {
        try (var producer = clusterInstance.<byte[], byte[]>producer()) {
            producer.send(new ProducerRecord<>("topic", "value".getBytes(StandardCharsets.UTF_8)));
        }

        try (var admin = clusterInstance.admin()) {
            admin.createTopics(List.of(new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME, Map.of(0, List.of(0))))).all().get();
        }

        try (var consumer = clusterInstance.consumer(Map.of(ConsumerConfig.GROUP_ID_CONFIG, "test-group"));
             var admin = clusterInstance.admin()) {
            consumer.subscribe(List.of("topic"));
            while (consumer.poll(Duration.ofMillis(100)).isEmpty()) {
                // empty body
            }
            // append records to coordinator
            consumer.commitSync();

            // unload the coordinator by changing leader (0 -> 1)
            admin.alterPartitionReassignments(Map.of(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0),
                    Optional.of(new NewPartitionReassignment(List.of(1))))).all().get();
        }

        Function<GroupCoordinator, List<TopicPartition>> partitionsInGroupMetrics = service -> assertDoesNotThrow(() -> {
            var f0 = GroupCoordinatorService.class.getDeclaredField("groupCoordinatorMetrics");
            f0.setAccessible(true);
            var f1 = GroupCoordinatorMetrics.class.getDeclaredField("shards");
            f1.setAccessible(true);
            return List.copyOf(((Map<TopicPartition, ?>) f1.get(f0.get(service))).keySet());
        });

        // the offset partition should NOT be hosted by multiple coordinators
        var tps = clusterInstance.brokers().values().stream()
                .flatMap(b -> partitionsInGroupMetrics.apply(b.groupCoordinator()).stream()).toList();
        assertEquals(1, tps.size());
    }

WDYT?

Copy link
Member Author

@brandboat brandboat Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, why not? Thanks for the thorough integration test!

MockTimer timer = new MockTimer();
MockPartitionWriter writer = mock(MockPartitionWriter.class);
MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class);
MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class);
MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(supplier)
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(OptionalInt.of(10))
.withExecutorService(mock(ExecutorService.class))
.build();

when(builder.withSnapshotRegistry(any())).thenReturn(builder);
when(builder.withLogContext(any())).thenReturn(builder);
when(builder.withTime(any())).thenReturn(builder);
when(builder.withTimer(any())).thenReturn(builder);
when(builder.withCoordinatorMetrics(any())).thenReturn(builder);
when(builder.withTopicPartition(any())).thenReturn(builder);
when(builder.withExecutor(any())).thenReturn(builder);
when(builder.build()).thenReturn(coordinator);
when(supplier.get()).thenReturn(builder);

// Configure the partition writer with a normal config initially.
LogConfig initialLogConfig = new LogConfig(
Map.of(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(1024 * 1024) // 1MB
));
when(writer.config(TP)).thenReturn(initialLogConfig);
when(writer.append(eq(TP), any(), any(), anyShort())).thenReturn(1L);

// Load the coordinator.
runtime.scheduleLoadOperation(TP, 10);
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);

// Schedule a write operation to create a pending batch.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);

// Verify that the write is not committed yet and a batch exists.
assertFalse(write1.isDone());
assertNotNull(ctx.currentBatch);

// Simulate the broker losing leadership: partitionWriter.config() now throws NOT_LEADER_OR_FOLLOWER.
// This is the scenario described in KAFKA-20115.
when(writer.config(TP)).thenThrow(Errors.NOT_LEADER_OR_FOLLOWER.exception());

// Schedule the unloading. This should trigger the bug where freeCurrentBatch()
// tries to call partitionWriter.config(tp).maxMessageSize() and throws an exception.
// Without the fix, this would prevent the coordinator from unloading properly.
runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1));

// The unload should complete despite the NOT_LEADER_OR_FOLLOWER exception
// when trying to access partition writer config during buffer cleanup.
assertEquals(CLOSED, ctx.state);

// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();

// Verify that the listener is deregistered.
verify(writer, times(1)).deregisterListener(
eq(TP),
any(PartitionWriter.Listener.class)
);

// Getting the coordinator context fails because it no longer exists.
assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP));
}


@Test
public void testScheduleWriteOp() throws ExecutionException, InterruptedException, TimeoutException {
MockTimer timer = new MockTimer();
Expand Down