diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index cea7d77452501..d1d19a054b651 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -367,9 +367,7 @@ object ConfigCommand extends Logging { return } case GroupType => - if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && - adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get - .stream.noneMatch(_.name == name)) { + if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) { System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.") return } @@ -388,8 +386,7 @@ object ConfigCommand extends Logging { case ClientMetricsType => adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq case GroupType => - adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ - adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet + adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set.empty) case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType") }) @@ -537,6 +534,16 @@ object ConfigCommand extends Logging { adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala } + private def listGroupConfigResources(adminClient: Admin): Option[java.util.Collection[ConfigResource]] = { + try { + Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get) + } catch { + // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException as None + case e: ExecutionException if e.getCause.isInstanceOf[UnsupportedVersionException] => None + case e: ExecutionException => throw e.getCause + } + } + class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) { val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "The Kafka servers to connect to.") diff --git a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java index 1109098500d1b..4d7602c3ef5a7 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java @@ -33,10 +33,14 @@ import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions; import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult; +import org.apache.kafka.clients.admin.ListConfigResourcesOptions; +import org.apache.kafka.clients.admin.ListConfigResourcesResult; import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.common.Node; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.quota.ClientQuotaAlteration; import org.apache.kafka.common.quota.ClientQuotaEntity; @@ -1413,6 +1417,60 @@ public void shouldNotAlterGroupConfigWithoutEntityName() { assertEquals("An entity name must be specified with --alter of groups", exception.getMessage()); } + @Test + public void testDescribeGroupConfigOldBroker() { + ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092", + "--entity-type", "groups", + "--describe")); + + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class); + when(listConfigResourcesResult.all()).thenReturn(future); + + AtomicBoolean listedConfigResources = new AtomicBoolean(false); + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + ConfigResource.Type type = configResourceTypes.iterator().next(); + assertEquals(ConfigResource.Type.GROUP, type); + future.completeExceptionally(new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS")); + listedConfigResources.set(true); + return listConfigResourcesResult; + } + }; + + ConfigCommand.describeConfig(mockAdminClient, describeOpts); + assertTrue(listedConfigResources.get()); + } + + @Test + public void testDescribeGroupConfigOldBrokerNotAuthorized() { + ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092", + "--entity-type", "groups", + "--describe")); + + KafkaFutureImpl> future = new KafkaFutureImpl<>(); + ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class); + when(listConfigResourcesResult.all()).thenReturn(future); + + AtomicBoolean listedConfigResources = new AtomicBoolean(false); + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) { + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + ConfigResource.Type type = configResourceTypes.iterator().next(); + assertEquals(ConfigResource.Type.GROUP, type); + future.completeExceptionally(new ClusterAuthorizationException("Not authorized to the cluster")); + listedConfigResources.set(true); + return listConfigResourcesResult; + } + }; + + assertThrows(ClusterAuthorizationException.class, () -> ConfigCommand.describeConfig(mockAdminClient, describeOpts)); + assertTrue(listedConfigResources.get()); + } + public static String[] toArray(String... first) { return first; } @@ -1462,6 +1520,11 @@ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, public AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options) { return mock(AlterClientQuotasResult.class); } + + @Override + public ListConfigResourcesResult listConfigResources(Set configResourceTypes, ListConfigResourcesOptions options) { + return mock(ListConfigResourcesResult.class); + } } private Seq seq(Collection seq) {