diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 78a7f905319c8..9a50f52fb44ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -3814,7 +3814,7 @@ public AlterShareGroupOffsetsResult alterShareGroupOffsets(final String groupId, @Override public ListShareGroupOffsetsResult listShareGroupOffsets(final Map groupSpecs, final ListShareGroupOffsetsOptions options) { - SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); + SimpleAdminApiFuture> future = ListShareGroupOffsetsHandler.newFuture(groupSpecs.keySet()); ListShareGroupOffsetsHandler handler = new ListShareGroupOffsetsHandler(groupSpecs, logContext); invokeDriver(handler, future, options.timeoutMs); return new ListShareGroupOffsetsResult(future.all()); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java index 1a2c8869c6caf..e75735ba2604f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.admin.internals.CoordinatorKey; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability; @@ -36,9 +35,9 @@ @InterfaceStability.Evolving public class ListShareGroupOffsetsResult { - private final Map>> futures; + private final Map>> futures; - ListShareGroupOffsetsResult(final Map>> futures) { + ListShareGroupOffsetsResult(final Map>> futures) { this.futures = futures.entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); } @@ -46,12 +45,12 @@ public class ListShareGroupOffsetsResult { /** * Return the future when the requests for all groups succeed. * - * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. + * @return Future which yields all {@code Map>} objects, if requests for all the groups succeed. */ - public KafkaFuture>> all() { + public KafkaFuture>> all() { return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])).thenApply( nil -> { - Map> offsets = new HashMap<>(futures.size()); + Map> offsets = new HashMap<>(futures.size()); futures.forEach((groupId, future) -> { try { offsets.put(groupId, future.get()); @@ -67,9 +66,9 @@ public KafkaFuture>> all() { /** * Return a future which yields a map of topic partitions to offsets for the specified group. If the group doesn't - * have a committed offset for a specific partition, the corresponding value in the returned map will be null. + * have offset information for a specific partition, the corresponding value in the returned map will be null. */ - public KafkaFuture> partitionsToOffsetAndMetadata(String groupId) { + public KafkaFuture> partitionsToOffsetInfo(String groupId) { if (!futures.containsKey(groupId)) { throw new IllegalArgumentException("Group ID not found: " + groupId); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java new file mode 100644 index 0000000000000..5948103dc17db --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/SharePartitionOffsetInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Objects; +import java.util.Optional; + +/** + * This class is used to contain the offset and lag information for a share-partition. + */ +@InterfaceStability.Evolving +public class SharePartitionOffsetInfo { + private final long startOffset; + private final Optional leaderEpoch; + private final Optional lag; + + /** + * Construct a new SharePartitionOffsetInfo. + * + * @param startOffset The share-partition start offset + * @param leaderEpoch The optional leader epoch of the share-partition + * @param lag The optional lag for the share-partition + */ + public SharePartitionOffsetInfo(long startOffset, Optional leaderEpoch, Optional lag) { + this.startOffset = startOffset; + this.leaderEpoch = leaderEpoch; + this.lag = lag; + } + + public long startOffset() { + return startOffset; + } + + public Optional leaderEpoch() { + return leaderEpoch; + } + + public Optional lag() { + return lag; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SharePartitionOffsetInfo that = (SharePartitionOffsetInfo) o; + return startOffset == that.startOffset && + Objects.equals(leaderEpoch, that.leaderEpoch) && + Objects.equals(lag, that.lag); + } + + @Override + public int hashCode() { + return Objects.hash(startOffset, leaderEpoch, lag); + } + + @Override + public String toString() { + return "SharePartitionOffsetInfo{" + + "startOffset=" + startOffset + + ", leaderEpoch=" + leaderEpoch.orElse(null) + + ", lag=" + lag.orElse(null) + + '}'; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java index f9b9e987930bd..5b75a482aaeae 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/ListShareGroupOffsetsHandler.java @@ -19,7 +19,7 @@ import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.message.DescribeShareGroupOffsetsRequestData; @@ -47,7 +47,7 @@ /** * This class is the handler for {@link KafkaAdminClient#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call */ -public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { +public class ListShareGroupOffsetsHandler extends AdminApiHandler.Batched> { private final Map groupSpecs; private final Logger log; @@ -60,7 +60,7 @@ public ListShareGroupOffsetsHandler(Map group this.lookupStrategy = new CoordinatorStrategy(CoordinatorType.GROUP, logContext); } - public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { + public static AdminApiFuture.SimpleAdminApiFuture> newFuture(Collection groupIds) { return AdminApiFuture.forKeys(coordinatorKeys(groupIds)); } @@ -110,13 +110,13 @@ public DescribeShareGroupOffsetsRequest.Builder buildBatchedRequest(int coordina } @Override - public ApiResult> handleResponse(Node coordinator, - Set groupIds, - AbstractResponse abstractResponse) { + public ApiResult> handleResponse(Node coordinator, + Set groupIds, + AbstractResponse abstractResponse) { validateKeys(groupIds); final DescribeShareGroupOffsetsResponse response = (DescribeShareGroupOffsetsResponse) abstractResponse; - final Map> completed = new HashMap<>(); + final Map> completed = new HashMap<>(); final Map failed = new HashMap<>(); final List unmapped = new ArrayList<>(); @@ -125,7 +125,7 @@ public ApiResult> handleR if (response.hasGroupError(groupId)) { handleGroupError(coordinatorKey, response.groupError(groupId), failed, unmapped); } else { - Map groupOffsetsListing = new HashMap<>(); + Map groupOffsetsListing = new HashMap<>(); response.data().groups().stream().filter(g -> g.groupId().equals(groupId)).forEach(groupResponse -> { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponseTopic topicResponse : groupResponse.topics()) { for (DescribeShareGroupOffsetsResponseData.DescribeShareGroupOffsetsResponsePartition partitionResponse : topicResponse.partitions()) { @@ -137,7 +137,7 @@ public ApiResult> handleR if (partitionResponse.startOffset() < 0) { groupOffsetsListing.put(tp, null); } else { - groupOffsetsListing.put(tp, new OffsetAndMetadata(startOffset, leaderEpoch, "")); + groupOffsetsListing.put(tp, new SharePartitionOffsetInfo(startOffset, leaderEpoch, Optional.empty())); } } else { log.warn("Skipping return offset for {} due to error {}: {}.", tp, partitionResponse.errorCode(), partitionResponse.errorMessage()); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java index 02c094433703f..a272bce672e20 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java @@ -186,8 +186,8 @@ public static ListConfigResourcesResult listConfigResourcesResult(KafkaException return new ListConfigResourcesResult(future); } - public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { - Map>> coordinatorFutures = groupOffsets.entrySet().stream() + public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map>> groupOffsets) { + Map>> coordinatorFutures = groupOffsets.entrySet().stream() .collect(Collectors.toMap( entry -> CoordinatorKey.byGroupId(entry.getKey()), Map.Entry::getValue diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d01cde83bb312..55b4119f0ce48 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -11186,15 +11186,15 @@ public void testListShareGroupOffsets() throws Exception { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); - assertEquals(6, partitionToOffsetAndMetadata.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition2)); - assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); - assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition4)); - assertEquals(new OffsetAndMetadata(500, Optional.of(3), ""), partitionToOffsetAndMetadata.get(myTopicPartition5)); + assertEquals(6, partitionToOffsetInfo.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition2)); + assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3)); + assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition4)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(3), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition5)); } } @@ -11257,17 +11257,17 @@ public void testListShareGroupOffsetsMultipleGroups() throws Exception { final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); assertEquals(2, result.all().get().size()); - final Map partitionToOffsetAndMetadataGroup0 = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); - assertEquals(4, partitionToOffsetAndMetadataGroup0.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(40, Optional.of(0), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition2)); - assertEquals(new OffsetAndMetadata(50, Optional.of(1), ""), partitionToOffsetAndMetadataGroup0.get(myTopicPartition3)); + final Map partitionToOffsetInfoGroup0 = result.partitionsToOffsetInfo(GROUP_ID).get(); + assertEquals(4, partitionToOffsetInfoGroup0.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(40, Optional.of(0), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition2)); + assertEquals(new SharePartitionOffsetInfo(50, Optional.of(1), Optional.empty()), partitionToOffsetInfoGroup0.get(myTopicPartition3)); - final Map partitionToOffsetAndMetadataGroup1 = result.partitionsToOffsetAndMetadata("group-1").get(); - assertEquals(2, partitionToOffsetAndMetadataGroup1.size()); - assertEquals(new OffsetAndMetadata(100, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition4)); - assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadataGroup1.get(myTopicPartition5)); + final Map partitionToOffsetInfoGroup1 = result.partitionsToOffsetInfo("group-1").get(); + assertEquals(2, partitionToOffsetInfoGroup1.size()); + assertEquals(new SharePartitionOffsetInfo(100, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition4)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfoGroup1.get(myTopicPartition5)); } } @@ -11290,9 +11290,9 @@ public void testListShareGroupOffsetsEmpty() throws Exception { env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); - assertEquals(0, partitionToOffsetAndMetadata.size()); + assertEquals(0, partitionToOffsetInfo.size()); } } @@ -11342,13 +11342,13 @@ public void testListShareGroupOffsetsWithErrorInOnePartition() throws Exception env.kafkaClient().prepareResponse(new DescribeShareGroupOffsetsResponse(data)); final ListShareGroupOffsetsResult result = env.adminClient().listShareGroupOffsets(groupSpecs); - final Map partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata(GROUP_ID).get(); + final Map partitionToOffsetInfo = result.partitionsToOffsetInfo(GROUP_ID).get(); // For myTopicPartition2 we have set an error as the response. Thus, it should be skipped from the final result - assertEquals(3, partitionToOffsetAndMetadata.size()); - assertEquals(new OffsetAndMetadata(10, Optional.of(0), ""), partitionToOffsetAndMetadata.get(myTopicPartition0)); - assertEquals(new OffsetAndMetadata(11, Optional.of(1), ""), partitionToOffsetAndMetadata.get(myTopicPartition1)); - assertEquals(new OffsetAndMetadata(500, Optional.of(2), ""), partitionToOffsetAndMetadata.get(myTopicPartition3)); + assertEquals(3, partitionToOffsetInfo.size()); + assertEquals(new SharePartitionOffsetInfo(10, Optional.of(0), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition0)); + assertEquals(new SharePartitionOffsetInfo(11, Optional.of(1), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition1)); + assertEquals(new SharePartitionOffsetInfo(500, Optional.of(2), Optional.empty()), partitionToOffsetInfo.get(myTopicPartition3)); } } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index c25ed9e9bfba9..d057ce99d7111 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -2925,7 +2925,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val tp1 = new TopicPartition(testTopicName, 0) val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() assertTrue(parts.containsKey(tp1)) assertNull(parts.get(tp1)) @@ -2991,10 +2991,10 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetAlterResult.partitionResult(tp2)) val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1)))) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() assertTrue(parts.containsKey(tp1)) - assertEquals(0, parts.get(tp1).offset()) + assertEquals(0, parts.get(tp1).startOffset()) } finally { Utils.closeQuietly(client, "adminClient") } @@ -3031,14 +3031,14 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test listShareGroupOffsets TestUtils.waitUntilTrue(() => { val parts = client.listShareGroupOffsets(util.Map.of(testGroupId, new ListShareGroupOffsetsSpec())) - .partitionsToOffsetAndMetadata(testGroupId) + .partitionsToOffsetInfo(testGroupId) .get() parts.containsKey(tp1) && parts.containsKey(tp2) }, "Expected the result contains all partitions.") // Test listShareGroupOffsets with listShareGroupOffsetsSpec val groupSpecs = util.Map.of(testGroupId, new ListShareGroupOffsetsSpec().topicPartitions(util.List.of(tp1))) - val parts = client.listShareGroupOffsets(groupSpecs).partitionsToOffsetAndMetadata(testGroupId).get() + val parts = client.listShareGroupOffsets(groupSpecs).partitionsToOffsetInfo(testGroupId).get() assertTrue(parts.containsKey(tp1)) assertFalse(parts.containsKey(tp2)) } finally { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 2c3e21fedb1d7..6d36e49dcfd34 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; import org.apache.kafka.common.GroupType; @@ -437,7 +438,7 @@ private Collection getPartitionsToReset(String groupId) throws E partitionsToReset = offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt)); } else { Map groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); - Map offsetsByTopicPartitions = adminClient.listShareGroupOffsets( + Map offsetsByTopicPartitions = adminClient.listShareGroupOffsets( groupSpecs, withTimeoutMs(new ListShareGroupOffsetsOptions()) ).all().get().get(groupId); @@ -494,11 +495,11 @@ TreeMap groupSpecs = Map.of(groupId, new ListShareGroupOffsetsSpec()); try { - Map startOffsets = adminClient.listShareGroupOffsets( + Map offsetInfoMap = adminClient.listShareGroupOffsets( groupSpecs, withTimeoutMs(new ListShareGroupOffsetsOptions()) ).all().get().get(groupId); - Set partitionOffsets = mapOffsetsToSharePartitionInformation(groupId, startOffsets); + Set partitionOffsets = mapOffsetInfoToSharePartitionInformation(groupId, offsetInfoMap); groupOffsets.put(groupId, new SimpleImmutableEntry<>(shareGroup, partitionOffsets)); } catch (InterruptedException | ExecutionException e) { @@ -509,17 +510,18 @@ TreeMap mapOffsetsToSharePartitionInformation(String groupId, Map startOffsets) { + private static Set mapOffsetInfoToSharePartitionInformation(String groupId, Map offsetInfoMap) { Set partitionOffsets = new HashSet<>(); - startOffsets.forEach((tp, offsetAndMetadata) -> { - if (offsetAndMetadata != null) { + offsetInfoMap.forEach((tp, offsetInfo) -> { + if (offsetInfo != null) { partitionOffsets.add(new SharePartitionOffsetInformation( groupId, tp.topic(), tp.partition(), - Optional.of(offsetAndMetadata.offset()), - offsetAndMetadata.leaderEpoch() + Optional.of(offsetInfo.startOffset()), + offsetInfo.leaderEpoch(), + offsetInfo.lag() )); } else { partitionOffsets.add(new SharePartitionOffsetInformation( @@ -527,6 +529,7 @@ private static Set mapOffsetsToSharePartitionIn tp.topic(), tp.partition(), Optional.empty(), + Optional.empty(), Optional.empty() )); } @@ -545,9 +548,9 @@ private void printOffsets(TreeMap configOverrides) throws IO } record SharePartitionOffsetInformation(String group, String topic, int partition, Optional offset, - Optional leaderEpoch) { + Optional leaderEpoch, Optional lag) { } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index 28e51e8c53036..fb5d459febb50 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -40,6 +40,7 @@ import org.apache.kafka.clients.admin.ShareGroupDescription; import org.apache.kafka.clients.admin.ShareMemberAssignment; import org.apache.kafka.clients.admin.ShareMemberDescription; +import org.apache.kafka.clients.admin.SharePartitionOffsetInfo; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.GroupState; @@ -207,7 +208,8 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0L, Optional.of(1), ""))) + KafkaFuture.completedFuture( + Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0L, Optional.of(1), Optional.of(0L)))) ) ); @@ -224,9 +226,9 @@ public void testDescribeOffsetsOfExistingGroup() throws Exception { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "1", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "1", "0", "0"); } else { - expectedValues = List.of(firstGroup, "topic1", "0", "0"); + expectedValues = List.of(firstGroup, "topic1", "0", "0", "0"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); @@ -273,9 +275,9 @@ public void testDescribeOffsetsOfExistingGroupWithNulls() throws Exception { List expectedValues; if (describeType.contains("--verbose")) { - expectedValues = List.of(firstGroup, "topic1", "0", "-", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "-", "-", "-"); } else { - expectedValues = List.of(firstGroup, "topic1", "0", "-"); + expectedValues = List.of(firstGroup, "topic1", "0", "-", "-"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); @@ -315,13 +317,13 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { ListShareGroupOffsetsResult listShareGroupOffsetsResult1 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( firstGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.empty()))) ) ); ListShareGroupOffsetsResult listShareGroupOffsetsResult2 = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( secondGroup, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new OffsetAndMetadata(0, Optional.of(1), ""))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic1", 0), new SharePartitionOffsetInfo(0, Optional.of(1), Optional.of(0L)))) ) ); @@ -349,11 +351,11 @@ public void testDescribeOffsetsOfAllExistingGroups() throws Exception { List expectedValues1, expectedValues2; if (describeType.contains("--verbose")) { - expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "1", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "1", "0", "0"); } else { - expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); - expectedValues2 = List.of(secondGroup, "topic1", "0", "0"); + expectedValues1 = List.of(firstGroup, "topic1", "0", "0", "-"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "0", "0"); } return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && @@ -1080,8 +1082,10 @@ public void testAlterShareGroupMultipleTopicsSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 0), new OffsetAndMetadata(10L), new TopicPartition(topic1, 1), new OffsetAndMetadata(10L), - new TopicPartition(topic2, 0), new OffsetAndMetadata(0L))) + KafkaFuture.completedFuture(Map.of( + new TopicPartition(topic1, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic1, 1), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic2, 0), new SharePartitionOffsetInfo(0L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1136,7 +1140,9 @@ public void testAlterShareGroupToLatestSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(t1, new OffsetAndMetadata(10L), t2, new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of( + t1, new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + t2, new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); Map descriptions = Map.of( @@ -1188,8 +1194,11 @@ public void testAlterShareGroupAllTopicsToDatetimeSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic1, 0), new OffsetAndMetadata(5L), new TopicPartition(topic1, 1), new OffsetAndMetadata(10L), - new TopicPartition(topic2, 0), new OffsetAndMetadata(10L), new TopicPartition(topic3, 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of( + new TopicPartition(topic1, 0), new SharePartitionOffsetInfo(5L, Optional.empty(), Optional.empty()), + new TopicPartition(topic1, 1), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic2, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()), + new TopicPartition(topic3, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1255,7 +1264,7 @@ public void testResetOffsetsDryRunSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1322,7 +1331,7 @@ public void testAlterShareGroupOffsetsFailureWithNonEmptyGroup() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1381,7 +1390,7 @@ public void testAlterShareGroupUnsubscribedTopicSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1428,7 +1437,7 @@ public void testAlterShareGroupNonExistentGroupSuccess() { ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( Map.of( group, - KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new OffsetAndMetadata(10L))) + KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 0), new SharePartitionOffsetInfo(10L, Optional.empty(), Optional.empty()))) ) ); when(adminClient.listShareGroupOffsets(any(), any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult); @@ -1515,8 +1524,8 @@ private boolean checkArgsHeaderOutput(List args, String output) { private boolean checkOffsetsArgsHeaderOutput(String output, boolean verbose) { List expectedKeys = verbose ? - List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET") : - List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET", "LAG") : + List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET", "LAG"); return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); }