Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListConfigResourcesOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
Expand Down Expand Up @@ -367,9 +367,7 @@ object ConfigCommand extends Logging {
return
}
case GroupType =>
if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) &&
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get
.stream.noneMatch(_.name == name)) {
if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId == name) && listGroupConfigResources(adminClient).exists(resources => resources.stream.noneMatch(_.name == name))) {
System.out.println(s"The ${entityType.dropRight(1)} '$name' doesn't exist and doesn't have dynamic config.")
return
}
Expand All @@ -388,8 +386,7 @@ object ConfigCommand extends Logging {
case ClientMetricsType =>
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSeq
case GroupType =>
adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all().get().asScala.map(_.name).toSet
adminClient.listGroups().all.get.asScala.map(_.groupId).toSet ++ listGroupConfigResources(adminClient).map(resources => resources.asScala.map(_.name).toSet).getOrElse(Set.empty)
case entityType => throw new IllegalArgumentException(s"Invalid entity type: $entityType")
})

Expand Down Expand Up @@ -537,6 +534,17 @@ object ConfigCommand extends Logging {
adminClient.describeClientQuotas(ClientQuotaFilter.containsOnly(components.asJava)).entities.get(30, TimeUnit.SECONDS).asScala
}

private def listGroupConfigResources(adminClient: Admin): Option[java.util.Collection[ConfigResource]] = {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    try {
      Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get)
    } catch {
      // (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersion as None
      case e: ExecutionException if e.getCause.isInstanceOf[UnsupportedVersionException] => None
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewJSchofield are you happy with this comment ? 😄

the latest commit d917d47 seems to not handle this comment

Copy link
Member Author

@AndrewJSchofield AndrewJSchofield Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used slightly different words for the comment text. But, I see. I didn't quite get the second part of this. I'll take a look a bit later today.

Some(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get)
} catch {
// (KIP-1142) 4.1+ admin client vs older broker: treat UnsupportedVersionException and ClusterAuthorizationException as None
case e: ExecutionException if e.getCause.isInstanceOf[UnsupportedVersionException] => None
case e: ExecutionException if e.getCause.isInstanceOf[ClusterAuthorizationException] => None
case e: ExecutionException => throw e.getCause
}
}


class ConfigCommandOptions(args: Array[String]) extends CommandDefaultOptions(args) {
val bootstrapServerOpt: OptionSpec[String] = parser.accepts("bootstrap-server", "The Kafka servers to connect to.")
Expand Down
90 changes: 90 additions & 0 deletions tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeUserScramCredentialsOptions;
import org.apache.kafka.clients.admin.DescribeUserScramCredentialsResult;
import org.apache.kafka.clients.admin.ListConfigResourcesOptions;
import org.apache.kafka.clients.admin.ListConfigResourcesResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
Expand Down Expand Up @@ -1413,6 +1417,87 @@ public void shouldNotAlterGroupConfigWithoutEntityName() {
assertEquals("An entity name must be specified with --alter of groups", exception.getMessage());
}

@Test
public void testDescribeGroupConfigOldBroker() {
ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
"--entity-type", "groups",
"--describe"));

KafkaFutureImpl<Collection<ConfigResource>> future = new KafkaFutureImpl<>();
ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class);
when(listConfigResourcesResult.all()).thenReturn(future);

AtomicBoolean listedConfigResources = new AtomicBoolean(false);
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
ConfigResource.Type type = configResourceTypes.iterator().next();
assertEquals(ConfigResource.Type.GROUP, type);
future.completeExceptionally(new UnsupportedVersionException("The v0 ListConfigResources only supports CLIENT_METRICS"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test case for other exceptions to ensure they are propagated correctly and not swallowed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

listedConfigResources.set(true);
return listConfigResourcesResult;
}
};

ConfigCommand.describeConfig(mockAdminClient, describeOpts);
assertTrue(listedConfigResources.get());
}

@Test
public void testDescribeGroupConfigOldBrokerNotAuthorized() {
ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
"--entity-type", "groups",
"--describe"));

KafkaFutureImpl<Collection<ConfigResource>> future = new KafkaFutureImpl<>();
ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class);
when(listConfigResourcesResult.all()).thenReturn(future);

AtomicBoolean listedConfigResources = new AtomicBoolean(false);
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
ConfigResource.Type type = configResourceTypes.iterator().next();
assertEquals(ConfigResource.Type.GROUP, type);
future.completeExceptionally(new ClusterAuthorizationException("Not authorized to the cluster"));
listedConfigResources.set(true);
return listConfigResourcesResult;
}
};

ConfigCommand.describeConfig(mockAdminClient, describeOpts);
assertTrue(listedConfigResources.get());
}

@Test
public void testDescribeGroupConfigOldBrokerUnexpectedException() {
ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
"--entity-type", "groups",
"--describe"));

KafkaFutureImpl<Collection<ConfigResource>> future = new KafkaFutureImpl<>();
ListConfigResourcesResult listConfigResourcesResult = mock(ListConfigResourcesResult.class);
when(listConfigResourcesResult.all()).thenReturn(future);

AtomicBoolean listedConfigResources = new AtomicBoolean(false);
Node node = new Node(1, "localhost", 9092);
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
ConfigResource.Type type = configResourceTypes.iterator().next();
assertEquals(ConfigResource.Type.GROUP, type);
future.completeExceptionally(new InvalidConfigurationException("That was unexpected"));
listedConfigResources.set(true);
return listConfigResourcesResult;
}
};

assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.describeConfig(mockAdminClient, describeOpts));
assertTrue(listedConfigResources.get());
}

public static String[] toArray(String... first) {
return first;
}
Expand Down Expand Up @@ -1462,6 +1547,11 @@ public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter,
public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) {
return mock(AlterClientQuotasResult.class);
}

@Override
public ListConfigResourcesResult listConfigResources(Set<ConfigResource.Type> configResourceTypes, ListConfigResourcesOptions options) {
return mock(ListConfigResourcesResult.class);
}
}

private <T> Seq<T> seq(Collection<T> seq) {
Expand Down