From 798addcfb3ccdd8985f56781363f4f52a75df517 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Sat, 25 Oct 2025 12:08:21 +0100 Subject: [PATCH 1/4] KAFKA-19802: Admin client changes for KIP-1226 --- .../kafka/clients/admin/KafkaAdminClient.java | 2 +- .../admin/ListShareGroupOffsetsResult.java | 14 ++-- .../admin/SharePartitionOffsetInfo.java | 83 +++++++++++++++++++ .../ListShareGroupOffsetsHandler.java | 18 ++-- .../clients/admin/AdminClientTestUtils.java | 4 +- .../clients/admin/KafkaAdminClientTest.java | 50 +++++------ .../api/PlaintextAdminIntegrationTest.scala | 10 +-- .../consumer/group/ShareGroupCommand.java | 37 +++++---- .../consumer/group/ShareGroupCommandTest.java | 48 ++++++----- 9 files changed, 181 insertions(+), 85 deletions(-) create mode 100644 clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 78a7f905319c8..9a50f52fb44ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3814,7 +3814,7 @@ public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId, @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, final ListShareGroupOffsetsOptions options) { - SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext); invokeDriver(handler, future, options.timeoutMs); return new ListShareGroupOffsetsResult(future.all()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java index 1a2c8869c6caf..7add7393101b0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -36,9 +36,9 @@ @InterfaceStability.Evolving public class ListShareGroupOffsetsResult { - private final Map>> futures; + private final Map>> futures; - ListShareGroupOffsetsResult(final Map>> futures) { + ListShareGroupOffsetsResult(final Map>> futures) { this.futures = futures.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); } @@ -46,12 +46,12 @@ public class ListShareGroupOffsetsResult { /** * Return the future when the requests for all groups succeed. * - * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. + * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. */ - public KafkaFuture>> all() { + public KafkaFuture>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( nil -> { - Map> offsets = new HashMap<>(futures.size()); + Map> offsets = new HashMap<>(futures.size()); futures.forEach((groupId, future) -> { try { offsets.put(groupId, future.get()); @@ -67,9 +67,9 @@ public KafkaFuture>> all() { /** * Return a future which yields a map of topic partitions to offsets for the specified group. If the group doesn't - * have a committed offset for a specific partition, the corresponding value in the returned map will be null. + * have offset information for a specific partition, the corresponding value in the returned map will be null. */ - public KafkaFuture> partitionsToOffsetAndMetadata(String groupId) { + public KafkaFuture> partitionsToOffsetInfo(String groupId) { if (!futures.containsKey(groupId)) { throw new IllegalArgumentException("Group ID not found: " + groupId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java new file mode 100644 index 0000000000000..2fd54012bc679 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; +import java.util.Optional; + +/** + * This class is used to contain the offset and lag information for a share-partition. + */ +@InterfaceStability.Evolving +public class SharePartitionOffsetInfo { + private final long startOffset; + private final Optional leaderEpoch; + private final Optional lag; + + /** + * Construct a new StartPartitionOffsetInfo. + * + * @param startOffset The share-partition start offset + * @param leaderEpoch The optional leader epoch of the start offset + * @param lag The optional lag for the share-partition + */ + public SharePartitionOffsetInfo(long startOffset, Optional leaderEpoch, Optional lag) + { + this.startOffset = startOffset; + this.leaderEpoch = leaderEpoch; + this.lag = lag; + } + + public long startOffset() { + return startOffset; + } + + public Optional leaderEpoch() { + return leaderEpoch; + } + + public Optional lag() { + return lag; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o; + return startOffset == that.startOffset && + Objects.equals(leaderEpoch, that.leaderEpoch) && + Objects.equals(lag, that.lag); + } + + @Override + public int hashCode() { + return Objects.hash(startOffset, leaderEpoch, lag); + } + + @Override + public String toString() { + return "SharePartitionOffsetInfo{" + + "startOffset=" + startOffset + + ", leaderEpoch=" + leaderEpoch.orElse(null) + + ", lag=" + lag.orElse(null) + + '}'; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java index f9b9e987930bd..5b75a482aaeae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; @@ -47,7 +47,7 @@ /** * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call */ -public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { +public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { private final Map groupSpecs; private final Logger log; @@ -60,7 +60,7 @@ public ListShareGroupOffsetsHandler(Map group this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); } - public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { + public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); } @@ -110,13 +110,13 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina } @Override - public ApiResult> handleResponse(Node coordinator, - Set groupIds, - AbstractResponse abstractResponse) { + public ApiResult> handleResponse(Node coordinator, + Set groupIds, + AbstractResponse abstractResponse) { validateKeys(groupIds); final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; - final Map> completed = new HashMap<>(); + final Map> completed = new HashMap<>(); final Map failed = new HashMap<>(); final List unmapped = new ArrayList<>(); @@ -125,7 +125,7 @@ public ApiResult> handleR if (response.hasGroupError(groupId)) { handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped); } else { - Map groupOffsetsListing = new HashMap<>(); + Map groupOffsetsListing = new HashMap<>(); response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) { @@ -137,7 +137,7 @@ public ApiResult> handleR if (partitionResponse.startOffset() < 0) { groupOffsetsListing.put(tp, null); } else { - groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, "")); + groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty())); } } else { log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 02c094433703f..a272bce672e20 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -186,8 +186,8 @@ public static ListConfigResourcesResult listConfigResourcesResult(KafkaException return new ListConfigResourcesResult(future); } - public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { - Map>> coordinatorFutures = groupOffsets.entrySet().stream() + public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { + Map>> coordinatorFutures = groupOffsets.entrySet().stream() .collect(Collectors.toMap( entry -> CoordinatorKey.byGroupId(entry.getKey()), Map.Entry::getValue diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 9084a25836efc..e155d4d51e8ac 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -11182,15 +11182,15 @@ public void testListShareGroupOffsets() throws Exception { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); - assertEquals(6, partitionToOffsetAndMetadata.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2)); - assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); - assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4)); - assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5)); + assertEquals(6, partitionToOffsetInfo.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2)); + assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3)); + assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5)); } } @@ -11253,17 +11253,17 @@ public void testListShareGroupOffsetsMultipleGroups() throws Exception { final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); assertEquals(2, result.all().get().size()); - final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); - assertEquals(4, partitionToOffsetAndMetadataGroup0.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); - assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); + final Map partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get(); + assertEquals(4, partitionToOffsetInfoGroup0.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2)); + assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3)); - final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get(); - assertEquals(2, partitionToOffsetAndMetadataGroup1.size()); - assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); - assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); + final Map partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get(); + assertEquals(2, partitionToOffsetInfoGroup1.size()); + assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5)); } } @@ -11286,9 +11286,9 @@ public void testListShareGroupOffsetsEmpty() throws Exception { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); - assertEquals(0, partitionToOffsetAndMetadata.size()); + assertEquals(0, partitionToOffsetInfo.size()); } } @@ -11338,13 +11338,13 @@ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); // For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result - assertEquals(3, partitionToOffsetAndMetadata.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(3, partitionToOffsetInfo.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3)); } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index a480a3b5ffbad..0281a37a3455f 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2918,7 +2918,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val tp1 = new TopicPartition(testTopicName, 0) val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() assertTrue(parts.containsKey(tp1)) assertNull(parts.get(tp1)) @@ -2984,10 +2984,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.partitionResult(tp2)) val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() assertTrue(parts.containsKey(tp1)) - assertEquals(0, parts.get(tp1).offset()) + assertEquals(0, parts.get(tp1).startOffset()) } finally { Utils.closeQuietly(client, "adminClient") } @@ -3024,14 +3024,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test listShareGroupOffsets TestUtils.waitUntilTrue(() => { val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec())) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() parts.containsKey(tp1) && parts.containsKey(tp2) }, "Expected the result contains all partitions.") // Test listShareGroupOffsets with listShareGroupOffsetsSpec val groupSpecs = util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))) - val parts = client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get() + val parts = client.listShareGroupOffsets(groupSpecs).partitionsToOffsetInfo(testGroupId).get() assertTrue(parts.containsKey(tp1)) assertFalse(parts.containsKey(tp2)) } finally { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 87cf0f1e83742..3a1f57ce68c9b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; @@ -434,8 +435,8 @@ private Collection getPartitionsToReset(String groupId) throws E partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt)); } else { Map groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); - Map offsetsByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); - partitionsToReset = offsetsByTopicPartitions.keySet(); + Map offsetInfoByTopicPartitions = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); + partitionsToReset = offsetInfoByTopicPartitions.keySet(); } return partitionsToReset; @@ -488,8 +489,8 @@ TreeMap groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); try { - Map startOffsets = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); - Set partitionOffsets = mapOffsetsToSharePartitionInformation(groupId, startOffsets); + Map offsetInfoMap = adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId); + Set partitionOffsets = mapOffsetInfoToSharePartitionInformation(groupId, offsetInfoMap); groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets)); } catch (InterruptedException | ExecutionException e) { @@ -500,17 +501,18 @@ TreeMap mapOffsetsToSharePartitionInformation(String groupId, Map startOffsets) { + private static Set mapOffsetInfoToSharePartitionInformation(String groupId, Map offsetInfoMap) { Set partitionOffsets = new HashSet<>(); - startOffsets.forEach((tp, offsetAndMetadata) -> { - if (offsetAndMetadata != null) { + offsetInfoMap.forEach((tp, offsetInfo) -> { + if (offsetInfo != null) { partitionOffsets.add(new SharePartitionOffsetInformation( groupId, tp.topic(), tp.partition(), - Optional.of(offsetAndMetadata.offset()), - offsetAndMetadata.leaderEpoch() + Optional.of(offsetInfo.startOffset()), + offsetInfo.leaderEpoch(), + offsetInfo.lag() )); } else { partitionOffsets.add(new SharePartitionOffsetInformation( @@ -518,6 +520,7 @@ private static Set mapOffsetsToSharePartitionIn tp.topic(), tp.partition(), Optional.empty(), + Optional.empty(), Optional.empty() )); } @@ -536,9 +539,9 @@ private void printOffsets(TreeMap configOverrides) throws IO } record SharePartitionOffsetInformation(String group, String topic, int partition, Optional offset, - Optional leaderEpoch) { + Optional leaderEpoch, Optional lag) { } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index 29fe151ba2b7b..e90852db1bc39 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; @@ -205,7 +206,7 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), ""))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0L, Optional.of(1), Optional.empty()))) ) ); @@ -222,9 +223,9 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "1", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "1", "0", "-"); } else { - expectedValues = List.of(firstGroup, "topic1", "0", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "0", "-"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); @@ -271,9 +272,9 @@ public void testDescribeOffsetsOfExistingGroupWithNulls() throws Exception { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "-", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "-", "-", "-"); } else { - expectedValues = List.of(firstGroup, "topic1", "0", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "-", "-"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); @@ -313,13 +314,13 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.empty()))) ) ); ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( secondGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.empty()))) ) ); @@ -347,11 +348,11 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { List expectedValues1, expectedValues2; if (describeType.contains("--verbose")) { - expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0", "-"); } else { - expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "0", "-"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && @@ -1078,8 +1079,10 @@ public void testAlterShareGroupMultipleTopicsSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new OffsetAndMetadata(10L), - new TopicPartition(topic2, 0), new OffsetAndMetadata(0L))) + KafkaFuture.completedFuture(Map.of( + new TopicPartition(topic1, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic1, 1), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic2, 0), new SharePartitionOffsetInfo(0L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); @@ -1137,7 +1140,9 @@ public void testAlterShareGroupToLatestSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(t1, new OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of( + t1, new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + t2, new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); Map descriptions = Map.of( @@ -1189,8 +1194,11 @@ public void testAlterShareGroupAllTopicsToDatetimeSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new OffsetAndMetadata(10L), - new TopicPartition(topic2, 0), new OffsetAndMetadata(10L), new TopicPartition(topic3, 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of( + new TopicPartition(topic1, 0), new SharePartitionOffsetInfo(5L, Optional.empty(), Optional.empty()), + new TopicPartition(topic1, 1), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic2, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic3, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); @@ -1256,7 +1264,7 @@ public void testResetOffsetsDryRunSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); @@ -1323,7 +1331,7 @@ public void testAlterShareGroupOffsetsFailureWithNonEmptyGroup() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); @@ -1382,7 +1390,7 @@ public void testAlterShareGroupUnsubscribedTopicSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); @@ -1429,7 +1437,7 @@ public void testAlterShareGroupNonExistentGroupSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult); From 16f4b80bbb4c12795c1fcf198296f62cad4d9c91 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 29 Oct 2025 14:26:31 -0500 Subject: [PATCH 2/4] Fix ShareGroupCommandTest --- .../consumer/group/ShareGroupCommandTest.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index c417bd38e6583..fb5d459febb50 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -208,7 +208,8 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0L, Optional.of(1), Optional.empty()))) + KafkaFuture.completedFuture( + Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0L, Optional.of(1), Optional.of(0L)))) ) ); @@ -225,9 +226,9 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "1", "0", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "1", "0", "0"); } else { - expectedValues = List.of(firstGroup, "topic1", "0", "0", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "0", "0"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); @@ -322,7 +323,7 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( secondGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.empty()))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.of(0L)))) ) ); @@ -351,10 +352,10 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { List expectedValues1, expectedValues2; if (describeType.contains("--verbose")) { expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0", "-"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0", "0"); } else { expectedValues1 = List.of(firstGroup, "topic1", "0", "0", "-"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "0", "0"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && @@ -1523,8 +1524,8 @@ private boolean checkArgsHeaderOutput(List args, String output) { private boolean checkOffsetsArgsHeaderOutput(String output, boolean verbose) { List expectedKeys = verbose ? - List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET") : - List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET", "LAG") : + List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG"); return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); } From 20715dc73472ead7a5ecf3fb0e30f9f530ae52d0 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Wed, 29 Oct 2025 14:46:17 -0500 Subject: [PATCH 3/4] Checkstyle --- .../kafka/clients/admin/ListShareGroupOffsetsResult.java | 1 - .../apache/kafka/clients/admin/SharePartitionOffsetInfo.java | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java index 7add7393101b0..e75735ba2604f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.internals.CoordinatorKey; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java index 2fd54012bc679..262fc8913e76d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java @@ -38,8 +38,7 @@ public class SharePartitionOffsetInfo { * @param leaderEpoch The optional leader epoch of the start offset * @param lag The optional lag for the share-partition */ - public SharePartitionOffsetInfo(long startOffset, Optional leaderEpoch, Optional lag) - { + public SharePartitionOffsetInfo(long startOffset, Optional leaderEpoch, Optional lag) { this.startOffset = startOffset; this.leaderEpoch = leaderEpoch; this.lag = lag; From 4a0e5821aebf2a52df665970627c990f5f0e4877 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 3 Nov 2025 19:34:47 +0000 Subject: [PATCH 4/4] Review comments --- .../apache/kafka/clients/admin/SharePartitionOffsetInfo.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java index 262fc8913e76d..5948103dc17db 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java @@ -32,10 +32,10 @@ public class SharePartitionOffsetInfo { private final Optional lag; /** - * Construct a new StartPartitionOffsetInfo. + * Construct a new SharePartitionOffsetInfo. * * @param startOffset The share-partition start offset - * @param leaderEpoch The optional leader epoch of the start offset + * @param leaderEpoch The optional leader epoch of the share-partition * @param lag The optional lag for the share-partition */ public SharePartitionOffsetInfo(long startOffset, Optional leaderEpoch, Optional lag) {