From 7bcc1305187558900c9b62453eeead66e1e6b379 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 2 Feb 2026 14:17:47 +0000 Subject: [PATCH 1/7] KAFKA-20111: Handle pre-4.1 brokers in kafka-configs.sh for groups --- .../main/scala/kafka/admin/ConfigCommand.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index cea7d77452501..813625b218a43 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -367,9 +367,7 @@ object ConfigCommand extends Logging { return } case GroupType => - if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && - adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get - .stream.noneMatch(_.name == name)) { + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).stream.noneMatch(_.name == name)) { System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") return } @@ -388,8 +386,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ - adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet + adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).asScala.map(_.name).toSet case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) @@ -537,6 +534,15 @@ object ConfigCommand extends Logging { adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala } + private def listGroupConfigResources(adminClient: Admin): java.util.Collection[ConfigResource] = { + try { + adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get() + } catch { + // 4.1 and later admin client connecting to an older broker will not be able to list group config resources (KIP-1142) + case _: UnsupportedVersionException => java.util.Set.of() + } + } + class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "The Kafka servers to connect to.") From 1685a80d88cf377d15f1d4fd82a894815dc6c7a3 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 2 Feb 2026 14:45:35 +0000 Subject: [PATCH 2/7] Add test and fix exception handling --- .../scala/kafka/admin/ConfigCommand.scala | 8 ++++- .../apache/kafka/tools/ConfigCommandTest.java | 32 +++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 813625b218a43..4b0999f8ae68c 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -539,7 +539,13 @@ object ConfigCommand extends Logging { adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get() } catch { // 4.1 and later admin client connecting to an older broker will not be able to list group config resources (KIP-1142) - case _: UnsupportedVersionException => java.util.Set.of() + case e: ExecutionException => + e.getCause match { + case _: UnsupportedVersionException => + java.util.Set.of() + case _ => throw e + } + case e: Throwable => throw e } } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index 1109098500d1b..7bab992553cb8 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -33,10 +33,13 @@ import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions; import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult; +import org.apache.kafka.clients.admin.ListConfigResourcesOptions; +import org.apache.kafka.clients.admin.ListConfigResourcesResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -1413,6 +1416,30 @@ public void shouldNotAlterGroupConfigWithoutEntityName() { assertEquals("An entity name must be specified with --alter of groups", exception.getMessage()); } + @Test + public void testDescribeGroupConfigOldBroker() { + ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092", + "--entity-type", "groups", + "--describe")); + + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class); + when(listConfigResourcesResult.all()).thenReturn(future); + + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + ConfigResource.Type type = configResourceTypes.iterator().next(); + assertEquals(ConfigResource.Type.GROUP, type); + future.completeExceptionally(new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS")); + return listConfigResourcesResult; + } + }; + + ConfigCommand.describeConfig(mockAdminClient, describeOpts); + } + public static String[] toArray(String... first) { return first; } @@ -1462,6 +1489,11 @@ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, public AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options) { return mock(AlterClientQuotasResult.class); } + + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + return mock(ListConfigResourcesResult.class); + } } private Seq seq(Collection seq) { From c7b5c649c480b103cb3f7c22a3a73b3a5aa61d18 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 2 Feb 2026 21:51:33 +0000 Subject: [PATCH 3/7] Optional result from listing resources --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 4b0999f8ae68c..423a7b73af88a 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -367,7 +367,7 @@ object ConfigCommand extends Logging { return } case GroupType => - if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).stream.noneMatch(_.name == name)) { + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).forall(resources => resources.stream.noneMatch(_.name == name))) { System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") return } @@ -386,7 +386,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).asScala.map(_.name).toSet + adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set() ++ entityName) case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) @@ -534,15 +534,15 @@ object ConfigCommand extends Logging { adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala } - private def listGroupConfigResources(adminClient: Admin): java.util.Collection[ConfigResource] = { + private def listGroupConfigResources(adminClient: Admin): Option[java.util.Collection[ConfigResource]] = { try { - adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get() + Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get) } catch { - // 4.1 and later admin client connecting to an older broker will not be able to list group config resources (KIP-1142) + // (KIP-1142) 4.1 and later admin client connecting to an older broker will not be able to list group config resources case e: ExecutionException => e.getCause match { case _: UnsupportedVersionException => - java.util.Set.of() + Option.empty case _ => throw e } case e: Throwable => throw e From cb54832224199c54b2a708aa0503b6d50e93c8af Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Mon, 2 Feb 2026 22:14:21 +0000 Subject: [PATCH 4/7] Improve checking --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 423a7b73af88a..c69143ee42b6e 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -367,7 +367,7 @@ object ConfigCommand extends Logging { return } case GroupType => - if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).forall(resources => resources.stream.noneMatch(_.name == name))) { + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) { System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") return } From c9029eef937011635b84ab1f3f92d825b865df7c Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Tue, 3 Feb 2026 20:33:50 +0000 Subject: [PATCH 5/7] Assertion in test --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 +- .../test/java/org/apache/kafka/tools/ConfigCommandTest.java | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index c69143ee42b6e..07cc95bef905e 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -367,7 +367,7 @@ object ConfigCommand extends Logging { return } case GroupType => - if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) { + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) { System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") return } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index 7bab992553cb8..d588034f9d0d2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -1426,6 +1426,7 @@ public void testDescribeGroupConfigOldBroker() { ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class); when(listConfigResourcesResult.all()).thenReturn(future); + AtomicBoolean listedConfigResources = new AtomicBoolean(false); Node node = new Node(1, "localhost", 9092); MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { @Override @@ -1433,11 +1434,13 @@ public ListConfigResourcesResult listConfigResources(Set co ConfigResource.Type type = configResourceTypes.iterator().next(); assertEquals(ConfigResource.Type.GROUP, type); future.completeExceptionally(new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS")); + listedConfigResources.set(true); return listConfigResourcesResult; } }; ConfigCommand.describeConfig(mockAdminClient, describeOpts); + assertTrue(listedConfigResources.get()); } public static String[] toArray(String... first) { From d917d47b6c0fa5ebed207625f8a846d7c32c3f62 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 5 Feb 2026 14:38:09 +0000 Subject: [PATCH 6/7] Review comments --- .../scala/kafka/admin/ConfigCommand.scala | 6 ++-- .../apache/kafka/tools/ConfigCommandTest.java | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 07cc95bef905e..bf2c005f721e3 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -386,7 +386,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set() ++ entityName) + adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set.empty) case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) @@ -538,12 +538,12 @@ object ConfigCommand extends Logging { try { Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get) } catch { - // (KIP-1142) 4.1 and later admin client connecting to an older broker will not be able to list group config resources + // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException as None case e: ExecutionException => e.getCause match { case _: UnsupportedVersionException => Option.empty - case _ => throw e + case cause => throw cause } case e: Throwable => throw e } diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index d588034f9d0d2..4d7602c3ef5a7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; @@ -1443,6 +1444,33 @@ public ListConfigResourcesResult listConfigResources(Set co assertTrue(listedConfigResources.get()); } + @Test + public void testDescribeGroupConfigOldBrokerNotAuthorized() { + ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092", + "--entity-type", "groups", + "--describe")); + + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class); + when(listConfigResourcesResult.all()).thenReturn(future); + + AtomicBoolean listedConfigResources = new AtomicBoolean(false); + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + ConfigResource.Type type = configResourceTypes.iterator().next(); + assertEquals(ConfigResource.Type.GROUP, type); + future.completeExceptionally(new ClusterAuthorizationException("Not authorized to the cluster")); + listedConfigResources.set(true); + return listConfigResourcesResult; + } + }; + + assertThrows(ClusterAuthorizationException.class, () -> ConfigCommand.describeConfig(mockAdminClient, describeOpts)); + assertTrue(listedConfigResources.get()); + } + public static String[] toArray(String... first) { return first; } From 4dae57938442a6270d7804e658f1d93de9e41c82 Mon Sep 17 00:00:00 2001 From: Andrew Schofield Date: Thu, 5 Feb 2026 21:50:09 +0000 Subject: [PATCH 7/7] Review comments --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index bf2c005f721e3..d1d19a054b651 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -539,13 +539,8 @@ object ConfigCommand extends Logging { Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get) } catch { // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException as None - case e: ExecutionException => - e.getCause match { - case _: UnsupportedVersionException => - Option.empty - case cause => throw cause - } - case e: Throwable => throw e + case e: ExecutionException if e.getCause.isInstanceOf[UnsupportedVersionException] => None + case e: ExecutionException => throw e.getCause } }