diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index d928bbac01b07..920e70127d4fe 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -413,39 +413,61 @@ class ControllerApis( getCreatableTopics.apply(allowedTopicNames) } val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava - val effectiveRequest = request.duplicate() - val iterator = effectiveRequest.topics().iterator() - while (iterator.hasNext) { - val creatableTopic = iterator.next() - if (duplicateTopicNames.contains(creatableTopic.name()) || - !authorizedTopicNames.contains(creatableTopic.name())) { - iterator.remove() - } - } - controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response => - duplicateTopicNames.forEach { name => - response.topics().add(new CreatableTopicResult(). - setName(name). - setErrorCode(INVALID_REQUEST.code). - setErrorMessage("Duplicate topic name.")) - } - topicNames.forEach { name => - if (name == Topic.CLUSTER_METADATA_TOPIC_NAME) { - response.topics().add(new CreatableTopicResult(). - setName(name). - setErrorCode(INVALID_REQUEST.code). - setErrorMessage(s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited.")) - } else if (!authorizedTopicNames.contains(name)) { - response.topics().add(new CreatableTopicResult(). - setName(name). - setErrorCode(TOPIC_AUTHORIZATION_FAILED.code). - setErrorMessage("Authorization failed.")) - } - } - response - } + // Create a map to collect validation errors upfront +val validationErrors = new util.ArrayList[CreatableTopicResult]() + +// Check for duplicates +duplicateTopicNames.forEach { name => + validationErrors.add(new CreatableTopicResult() + .setName(name) + .setErrorCode(INVALID_REQUEST.code) + .setErrorMessage("Duplicate topic name.")) +} + +// Check for cluster metadata topic +topicNames.forEach { name => + if (name == Topic.CLUSTER_METADATA_TOPIC_NAME) { + validationErrors.add(new CreatableTopicResult() + .setName(name) + .setErrorCode(INVALID_REQUEST.code) + .setErrorMessage( + s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited." + )) } +} +// Check for authorization failures +topicNames.forEach { name => + if (!authorizedTopicNames.contains(name) && name != Topic.CLUSTER_METADATA_TOPIC_NAME) { + validationErrors.add(new CreatableTopicResult() + .setName(name) + .setErrorCode(TOPIC_AUTHORIZATION_FAILED.code) + .setErrorMessage("Authorization failed.")) + } +} + +// Build effective request with only valid topics +val effectiveRequest = request.duplicate() +val iterator = effectiveRequest.topics().iterator() +while (iterator.hasNext) { + val creatableTopic = iterator.next() + if (duplicateTopicNames.contains(creatableTopic.name()) || + creatableTopic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME || + !authorizedTopicNames.contains(creatableTopic.name())) { + iterator.remove() + } +} + +controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response => + + // Add pre-validation errors to response + validationErrors.forEach { error => + response.topics().add(error) + } + + response +} +} def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = { // Note that controller returns its full list of supported ApiKeys and versions regardless of current // authentication state (e.g., before SASL authentication on an SASL listener, do note that no @@ -1102,4 +1124,4 @@ class ControllerApis( authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) handleRaftRequest(request, response => new UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData])) } -} +} \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 63c3bb78765cb..19853d20c18f2 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -141,11 +141,13 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER; import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE; - /** - * The ReplicationControlManager is the part of the controller which deals with topics - * and partitions. It is responsible for managing the in-sync replica set and leader - * of each partition, as well as administrative tasks like creating or deleting topics. + * The ReplicationControlManager is the part of the controller which deals with + * topics + * and partitions. It is responsible for managing the in-sync replica set and + * leader + * of each partition, as well as administrative tasks like creating or deleting + * topics. */ public class ReplicationControlManager { static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000; @@ -214,20 +216,22 @@ ReplicationControlManager build() { } else if (clusterControl == null) { throw new IllegalStateException("Cluster control must be set before building"); } - if (logContext == null) logContext = new LogContext(); - if (snapshotRegistry == null) snapshotRegistry = configurationControl.snapshotRegistry(); + if (logContext == null) + logContext = new LogContext(); + if (snapshotRegistry == null) + snapshotRegistry = configurationControl.snapshotRegistry(); if (featureControl == null) { throw new IllegalStateException("FeatureControlManager must not be null"); } return new ReplicationControlManager(snapshotRegistry, - logContext, - defaultReplicationFactor, - defaultNumPartitions, - maxElectionsPerImbalance, - configurationControl, - clusterControl, - createTopicPolicy, - featureControl); + logContext, + defaultReplicationFactor, + defaultNumPartitions, + maxElectionsPerImbalance, + configurationControl, + clusterControl, + createTopicPolicy, + featureControl); } } @@ -284,19 +288,22 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti private final Logger log; /** - * The KIP-464 default replication factor that is used if a CreateTopics request does + * The KIP-464 default replication factor that is used if a CreateTopics request + * does * not specify one. */ private final short defaultReplicationFactor; /** - * The KIP-464 default number of partitions that is used if a CreateTopics request does + * The KIP-464 default number of partitions that is used if a CreateTopics + * request does * not specify a number of partitions. */ private final int defaultNumPartitions; /** - * Maximum number of leader elections to perform during one partition leader balancing operation. + * Maximum number of leader elections to perform during one partition leader + * balancing operation. */ private final int maxElectionsPerImbalance; @@ -311,7 +318,8 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti private final ClusterControlManager clusterControl; /** - * The policy to use to validate that topic assignments are valid, if one is present. + * The policy to use to validate that topic assignments are valid, if one is + * present. */ private final Optional createTopicPolicy; @@ -331,11 +339,13 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti * The reason for this is that some per-topic metrics do replace periods with * underscores, and would therefore be ambiguous otherwise. * - * This map is from normalized topic name to a set of topic names. So if we had two + * This map is from normalized topic name to a set of topic names. So if we had + * two * topics named foo.bar and foo_bar this map would contain * a mapping from foo_bar to a set containing foo.bar and foo_bar. * - * Since we reject topic creations that would collide, under normal conditions the + * Since we reject topic creations that would collide, under normal conditions + * the * sets in this map should only have a size of 1. However, if the cluster was * upgraded from a version prior to KAFKA-13743, it may be possible to have more * values here, since colliding topic names will be "grandfathered in." @@ -343,7 +353,8 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti private final TimelineHashMap> topicsWithCollisionChars; /** - * Maps topic UUIDs to structures containing topic information, including partitions. + * Maps topic UUIDs to structures containing topic information, including + * partitions. */ private final TimelineHashMap topics; @@ -369,7 +380,8 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti private final TimelineHashSet imbalancedPartitions; /** - * A map from registered directory IDs to the partitions that are stored in that directory. + * A map from registered directory IDs to the partitions that are stored in that + * directory. */ private final TimelineHashMap> directoriesToPartitions; @@ -379,16 +391,15 @@ static Map translateCreationConfigs(CreatableTopicConfigCollecti final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber(); private ReplicationControlManager( - SnapshotRegistry snapshotRegistry, - LogContext logContext, - short defaultReplicationFactor, - int defaultNumPartitions, - int maxElectionsPerImbalance, - ConfigurationControlManager configurationControl, - ClusterControlManager clusterControl, - Optional createTopicPolicy, - FeatureControlManager featureControl - ) { + SnapshotRegistry snapshotRegistry, + LogContext logContext, + short defaultReplicationFactor, + int defaultNumPartitions, + int maxElectionsPerImbalance, + ConfigurationControlManager configurationControl, + ClusterControlManager clusterControl, + Optional createTopicPolicy, + FeatureControlManager featureControl) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(ReplicationControlManager.class); this.defaultReplicationFactor = defaultReplicationFactor; @@ -411,7 +422,8 @@ private ReplicationControlManager( public void replay(TopicRecord record) { Uuid existingUuid = topicsByName.put(record.name(), record.topicId()); if (existingUuid != null) { - // We don't currently support sending a second TopicRecord for the same topic name... + // We don't currently support sending a second TopicRecord for the same topic + // name... // unless, of course, there is a RemoveTopicRecord in between. if (existingUuid.equals(record.topicId())) { throw new RuntimeException("Found duplicate TopicRecord for " + record.name() + @@ -432,7 +444,7 @@ public void replay(TopicRecord record) { topicNames.add(record.name()); } topics.put(record.topicId(), - new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); + new TopicControlInfo(record.name(), snapshotRegistry, record.topicId())); log.info("Replayed TopicRecord for topic {} with topic ID {}.", record.name(), record.topicId()); } @@ -440,12 +452,12 @@ public void replay(PartitionRecord record) { TopicControlInfo topicInfo = topics.get(record.topicId()); if (topicInfo == null) { throw new RuntimeException("Tried to create partition " + record.topicId() + - ":" + record.partitionId() + ", but no topic with that ID was found."); + ":" + record.partitionId() + ", but no topic with that ID was found."); } PartitionRegistration newPartInfo = new PartitionRegistration(record); PartitionRegistration prevPartInfo = topicInfo.parts.get(record.partitionId()); String description = topicInfo.name + "-" + record.partitionId() + - " with topic ID " + record.topicId(); + " with topic ID " + record.topicId(); if (prevPartInfo == null) { log.info("Replayed PartitionRecord for new partition {} and {}.", description, newPartInfo); @@ -453,14 +465,15 @@ public void replay(PartitionRecord record) { updatePartitionInfo(record.topicId(), record.partitionId(), null, newPartInfo); updatePartitionDirectories(record.topicId(), record.partitionId(), null, newPartInfo.directories); updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), - false, isReassignmentInProgress(newPartInfo)); + false, isReassignmentInProgress(newPartInfo)); } else if (!newPartInfo.equals(prevPartInfo)) { log.info("Replayed PartitionRecord for existing partition {} and {}.", description, newPartInfo); newPartInfo.maybeLogPartitionChange(log, description, prevPartInfo); topicInfo.parts.put(record.partitionId(), newPartInfo); updatePartitionInfo(record.topicId(), record.partitionId(), prevPartInfo, newPartInfo); - updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartInfo.directories, newPartInfo.directories); + updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartInfo.directories, + newPartInfo.directories); updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), isReassignmentInProgress(prevPartInfo), isReassignmentInProgress(newPartInfo)); } @@ -473,7 +486,7 @@ public void replay(PartitionRecord record) { } private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId, - boolean wasReassigning, boolean isReassigning) { + boolean wasReassigning, boolean isReassigning) { if (!wasReassigning) { if (isReassigning) { int[] prevReassigningParts = reassigningTopics.getOrDefault(topicId, Replicas.NONE); @@ -494,21 +507,22 @@ public void replay(PartitionChangeRecord record) { TopicControlInfo topicInfo = topics.get(record.topicId()); if (topicInfo == null) { throw new RuntimeException("Tried to create partition " + record.topicId() + - ":" + record.partitionId() + ", but no topic with that ID was found."); + ":" + record.partitionId() + ", but no topic with that ID was found."); } PartitionRegistration prevPartitionInfo = topicInfo.parts.get(record.partitionId()); if (prevPartitionInfo == null) { throw new RuntimeException("Tried to create partition " + record.topicId() + - ":" + record.partitionId() + ", but no partition with that id was found."); + ":" + record.partitionId() + ", but no partition with that id was found."); } PartitionRegistration newPartitionInfo = prevPartitionInfo.merge(record); updateReassigningTopicsIfNeeded(record.topicId(), record.partitionId(), isReassignmentInProgress(prevPartitionInfo), isReassignmentInProgress(newPartitionInfo)); topicInfo.parts.put(record.partitionId(), newPartitionInfo); updatePartitionInfo(record.topicId(), record.partitionId(), prevPartitionInfo, newPartitionInfo); - updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartitionInfo.directories, newPartitionInfo.directories); + updatePartitionDirectories(record.topicId(), record.partitionId(), prevPartitionInfo.directories, + newPartitionInfo.directories); String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " + - record.topicId(); + record.topicId(); newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo); if (newPartitionInfo.hasPreferredLeader()) { @@ -520,13 +534,13 @@ public void replay(PartitionChangeRecord record) { if (record.removingReplicas() != null || record.addingReplicas() != null) { log.info("Replayed partition assignment change {} for topic {}. " + "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {}, leaderEpoch: {})", - record, topicInfo.name, prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.replicas, - newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, newPartitionInfo.leaderEpoch); + record, topicInfo.name, prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.replicas, + newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, newPartitionInfo.leaderEpoch); } else if (log.isDebugEnabled()) { log.debug("Replayed partition change {} for topic {}. " + "(isr: {} -> {}, replicaSet: {} -> {}, partitionEpoch: {}, leaderEpoch: {})", - record, topicInfo.name, prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.replicas, - newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, newPartitionInfo.leaderEpoch); + record, topicInfo.name, prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.replicas, + newPartitionInfo.replicas, newPartitionInfo.partitionEpoch, newPartitionInfo.leaderEpoch); } } @@ -535,7 +549,7 @@ public void replay(RemoveTopicRecord record) { TopicControlInfo topic = topics.remove(record.topicId()); if (topic == null) { throw new UnknownTopicIdException("Can't find topic with ID " + record.topicId() + - " to remove."); + " to remove."); } topicsByName.remove(topic.name); if (Topic.hasCollisionChars(topic.name)) { @@ -613,11 +627,8 @@ int removeTopicElrs(TopicControlInfo topic) { PartitionRegistration partition = topic.parts.get(partitionId); if (partition.elr.length != 0 || partition.lastKnownElr.length != 0) { topic.parts.put(partitionId, partition.merge( - new PartitionChangeRecord(). - setPartitionId(partitionId). - setTopicId(topic.id). - setEligibleLeaderReplicas(List.of()). - setLastKnownElr(List.of()))); + new PartitionChangeRecord().setPartitionId(partitionId).setTopicId(topic.id) + .setEligibleLeaderReplicas(List.of()).setLastKnownElr(List.of()))); numRemoved++; } } @@ -625,10 +636,9 @@ int removeTopicElrs(TopicControlInfo topic) { } ControllerResult createTopics( - ControllerRequestContext context, - CreateTopicsRequestData request, - Set describable - ) { + ControllerRequestContext context, + CreateTopicsRequestData request, + Set describable) { Map topicErrors = new HashMap<>(); List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); @@ -640,24 +650,25 @@ ControllerResult createTopics( // Identify topics that already exist and mark them with the appropriate error request.topics().stream().filter(creatableTopic -> topicsByName.containsKey(creatableTopic.name())) .forEach(t -> topicErrors.put(t.name(), new ApiError(Errors.TOPIC_ALREADY_EXISTS, - "Topic '" + t.name() + "' already exists."))); + "Topic '" + t.name() + "' already exists."))); // Verify that the configurations for the new topics are OK, and figure out what // configurations should be created. - Map>> configChanges = - computeConfigChanges(topicErrors, request.topics()); + Map>> configChanges = computeConfigChanges(topicErrors, + request.topics()); // Try to create whatever topics are needed. Map successes = new HashMap<>(); for (CreatableTopic topic : request.topics()) { - if (topicErrors.containsKey(topic.name())) continue; + if (topicErrors.containsKey(topic.name())) + continue; // Figure out what ConfigRecords should be created, if any. ConfigResource configResource = new ConfigResource(TOPIC, topic.name()); Map> keyToOps = configChanges.get(configResource); List configRecords; if (keyToOps != null) { - ControllerResult configResult = - configurationControl.incrementalAlterConfig(configResource, keyToOps, true); + ControllerResult configResult = configurationControl.incrementalAlterConfig(configResource, + keyToOps, true); if (configResult.response().isFailure()) { topicErrors.put(topic.name(), configResult.response()); continue; @@ -669,7 +680,8 @@ ControllerResult createTopics( } ApiError error; try { - error = createTopic(context, topic, records, successes, configRecords, describable.contains(topic.name())); + error = createTopic(context, topic, records, successes, configRecords, + describable.contains(topic.name())); } catch (ApiException e) { error = ApiError.fromThrowable(e); } @@ -685,19 +697,28 @@ ControllerResult createTopics( for (CreatableTopic topic : request.topics()) { ApiError error = topicErrors.get(topic.name()); if (error != null) { - data.topics().add(new CreatableTopicResult(). - setName(topic.name()). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); - resultsBuilder.append(resultsPrefix).append(topic).append(": "). - append(error.error()).append(" (").append(error.message()).append(")"); + CreatableTopicResult result = new CreatableTopicResult() + .setName(topic.name()) + .setErrorCode(error.error().code()) + .setErrorMessage(error.message()); + + // KAFKA-20097: For existing topics, populate the topic ID + if (error.error() == Errors.TOPIC_ALREADY_EXISTS) { + Uuid existingTopicId = topicsByName.get(topic.name()); + if (existingTopicId != null) { + result.setTopicId(existingTopicId); + } + } + + data.topics().add(result); + resultsBuilder.append(resultsPrefix).append(topic).append(": ").append(error.error()).append(" (") + .append(error.message()).append(")"); resultsPrefix = ", "; continue; } CreatableTopicResult result = successes.get(topic.name()); data.topics().add(result); - resultsBuilder.append(resultsPrefix).append(topic).append(": "). - append("SUCCESS"); + resultsBuilder.append(resultsPrefix).append(topic).append(": ").append("SUCCESS"); resultsPrefix = ", "; } if (request.validateOnly()) { @@ -710,45 +731,45 @@ ControllerResult createTopics( } private ApiError createTopic(ControllerRequestContext context, - CreatableTopic topic, - List records, - Map successes, - List configRecords, - boolean authorizedToReturnConfigs) { + CreatableTopic topic, + List records, + Map successes, + List configRecords, + boolean authorizedToReturnConfigs) { Map creationConfigs = translateCreationConfigs(topic.configs()); Map newParts = new HashMap<>(); if (!topic.assignments().isEmpty()) { if (topic.replicationFactor() != -1) { return new ApiError(INVALID_REQUEST, - "A manual partition assignment was specified, but replication " + - "factor was not set to -1."); + "A manual partition assignment was specified, but replication " + + "factor was not set to -1."); } if (topic.numPartitions() != -1) { return new ApiError(INVALID_REQUEST, - "A manual partition assignment was specified, but numPartitions " + - "was not set to -1."); + "A manual partition assignment was specified, but numPartitions " + + "was not set to -1."); } OptionalInt replicationFactor = OptionalInt.empty(); for (CreatableReplicaAssignment assignment : topic.assignments()) { if (newParts.containsKey(assignment.partitionIndex())) { return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, - "Found multiple manual partition assignments for partition " + - assignment.partitionIndex()); + "Found multiple manual partition assignments for partition " + + assignment.partitionIndex()); } - PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), clusterDescriber); + PartitionAssignment partitionAssignment = new PartitionAssignment(assignment.brokerIds(), + clusterDescriber); validateManualPartitionAssignment(partitionAssignment, replicationFactor); replicationFactor = OptionalInt.of(assignment.brokerIds().size()); - List isr = assignment.brokerIds().stream(). - filter(clusterControl::isActive).toList(); + List isr = assignment.brokerIds().stream().filter(clusterControl::isActive).toList(); if (isr.isEmpty()) { return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT, - "All brokers specified in the manual partition assignment for " + - "partition " + assignment.partitionIndex() + " are fenced or in controlled shutdown."); + "All brokers specified in the manual partition assignment for " + + "partition " + assignment.partitionIndex() + + " are fenced or in controlled shutdown."); } newParts.put( - assignment.partitionIndex(), - buildPartitionRegistration(partitionAssignment, isr) - ); + assignment.partitionIndex(), + buildPartitionRegistration(partitionAssignment, isr)); } for (int i = 0; i < newParts.size(); i++) { if (!newParts.containsKey(i)) { @@ -760,79 +781,74 @@ private ApiError createTopic(ControllerRequestContext context, Map> assignments = new HashMap<>(); newParts.forEach((key, value) -> assignments.put(key, Replicas.toList(value.replicas))); return new CreateTopicPolicy.RequestMetadata( - topic.name(), null, null, assignments, creationConfigs); + topic.name(), null, null, assignments, creationConfigs); }); - if (error.isFailure()) return error; + if (error.isFailure()) + return error; } else if (topic.replicationFactor() < -1 || topic.replicationFactor() == 0) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, - "Replication factor must be larger than 0, or -1 to use the default value."); + "Replication factor must be larger than 0, or -1 to use the default value."); } else if (topic.numPartitions() < -1 || topic.numPartitions() == 0) { return new ApiError(Errors.INVALID_PARTITIONS, - "Number of partitions was set to an invalid non-positive value."); + "Number of partitions was set to an invalid non-positive value."); } else { - int numPartitions = topic.numPartitions() == -1 ? - defaultNumPartitions : topic.numPartitions(); - short replicationFactor = topic.replicationFactor() == -1 ? - defaultReplicationFactor : topic.replicationFactor(); + int numPartitions = topic.numPartitions() == -1 ? defaultNumPartitions : topic.numPartitions(); + short replicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor + : topic.replicationFactor(); try { TopicAssignment topicAssignment = clusterControl.replicaPlacer().place(new PlacementSpec( - 0, - numPartitions, - replicationFactor - ), clusterDescriber); + 0, + numPartitions, + replicationFactor), clusterDescriber); for (int partitionId = 0; partitionId < topicAssignment.assignments().size(); partitionId++) { PartitionAssignment partitionAssignment = topicAssignment.assignments().get(partitionId); - List isr = partitionAssignment.replicas().stream(). - filter(clusterControl::isActive).toList(); + List isr = partitionAssignment.replicas().stream().filter(clusterControl::isActive) + .toList(); // If the ISR is empty, it means that all brokers are fenced or // in controlled shutdown. To be consistent with the replica placer, // we reject the create topic request with INVALID_REPLICATION_FACTOR. if (isr.isEmpty()) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, - "Unable to replicate the partition " + replicationFactor + - " time(s): All brokers are currently fenced or in controlled shutdown."); + "Unable to replicate the partition " + replicationFactor + + " time(s): All brokers are currently fenced or in controlled shutdown."); } newParts.put( - partitionId, - buildPartitionRegistration(partitionAssignment, isr) - ); + partitionId, + buildPartitionRegistration(partitionAssignment, isr)); } } catch (InvalidReplicationFactorException e) { return new ApiError(Errors.INVALID_REPLICATION_FACTOR, - "Unable to replicate the partition " + replicationFactor + - " time(s): " + e.getMessage()); + "Unable to replicate the partition " + replicationFactor + + " time(s): " + e.getMessage()); } ApiError error = maybeCheckCreateTopicPolicy(() -> new CreateTopicPolicy.RequestMetadata( - topic.name(), numPartitions, replicationFactor, null, creationConfigs)); - if (error.isFailure()) return error; + topic.name(), numPartitions, replicationFactor, null, creationConfigs)); + if (error.isFailure()) + return error; } int numPartitions = newParts.size(); try { context.applyPartitionChangeQuota(numPartitions); // check controller mutation quota } catch (ThrottlingQuotaExceededException e) { log.debug("Topic creation of {} partitions not allowed because quota is violated. Delay time: {}", - numPartitions, e.throttleTimeMs()); + numPartitions, e.throttleTimeMs()); return ApiError.fromThrowable(e); } Uuid topicId = Uuid.randomUuid(); - CreatableTopicResult result = new CreatableTopicResult(). - setName(topic.name()). - setTopicId(topicId). - setErrorCode(NONE.code()). - setErrorMessage(null); + CreatableTopicResult result = new CreatableTopicResult().setName(topic.name()).setTopicId(topicId) + .setErrorCode(NONE.code()).setErrorMessage(null); if (authorizedToReturnConfigs) { - Map effectiveConfig = configurationControl. - computeEffectiveTopicConfigs(creationConfigs); + Map effectiveConfig = configurationControl + .computeEffectiveTopicConfigs(creationConfigs); List configNames = new ArrayList<>(effectiveConfig.keySet()); configNames.sort(String::compareTo); for (String configName : configNames) { ConfigEntry entry = effectiveConfig.get(configName); - result.configs().add(new CreateTopicsResponseData.CreatableTopicConfigs(). - setName(entry.name()). - setValue(entry.isSensitive() ? null : entry.value()). - setReadOnly(entry.isReadOnly()). - setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()). - setIsSensitive(entry.isSensitive())); + result.configs() + .add(new CreateTopicsResponseData.CreatableTopicConfigs().setName(entry.name()) + .setValue(entry.isSensitive() ? null : entry.value()).setReadOnly(entry.isReadOnly()) + .setConfigSource(KafkaConfigSchema.translateConfigSource(entry.source()).id()) + .setIsSensitive(entry.isSensitive())); } result.setNumPartitions(numPartitions); result.setReplicationFactor((short) newParts.values().iterator().next().replicas.length); @@ -841,34 +857,26 @@ private ApiError createTopic(ControllerRequestContext context, result.setTopicConfigErrorCode(TOPIC_AUTHORIZATION_FAILED.code()); } successes.put(topic.name(), result); - records.add(new ApiMessageAndVersion(new TopicRecord(). - setName(topic.name()). - setTopicId(topicId), (short) 0)); + records.add(new ApiMessageAndVersion(new TopicRecord().setName(topic.name()).setTopicId(topicId), (short) 0)); // ConfigRecords go after TopicRecord but before PartitionRecord(s). records.addAll(configRecords); for (Entry partEntry : newParts.entrySet()) { int partitionIndex = partEntry.getKey(); PartitionRegistration info = partEntry.getValue(); - records.add(info.toRecord(topicId, partitionIndex, new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()). - setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()). - build())); + records.add(info.toRecord(topicId, partitionIndex, + new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).build())); } return ApiError.NONE; } private static PartitionRegistration buildPartitionRegistration( - PartitionAssignment partitionAssignment, - List isr - ) { - return new PartitionRegistration.Builder(). - setReplicas(Replicas.toArray(partitionAssignment.replicas())). - setDirectories(Uuid.toArray(partitionAssignment.directories())). - setIsr(Replicas.toArray(isr)). - setLeader(isr.get(0)). - setLeaderRecoveryState(LeaderRecoveryState.RECOVERED). - setLeaderEpoch(0). - setPartitionEpoch(0). - build(); + PartitionAssignment partitionAssignment, + List isr) { + return new PartitionRegistration.Builder().setReplicas(Replicas.toArray(partitionAssignment.replicas())) + .setDirectories(Uuid.toArray(partitionAssignment.directories())).setIsr(Replicas.toArray(isr)) + .setLeader(isr.get(0)).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0) + .setPartitionEpoch(0).build(); } private ApiError maybeCheckCreateTopicPolicy(Supplier supplier) { @@ -878,7 +886,8 @@ private ApiError maybeCheckCreateTopicPolicy(Supplier topicErrors, - CreatableTopicCollection topics, - Map> topicsWithCollisionChars) { + CreatableTopicCollection topics, + Map> topicsWithCollisionChars) { for (CreatableTopic topic : topics) { - if (topicErrors.containsKey(topic.name())) continue; + if (topicErrors.containsKey(topic.name())) + continue; try { Topic.validate(topic.name()); } catch (InvalidTopicException e) { topicErrors.put(topic.name(), - new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); + new ApiError(Errors.INVALID_TOPIC_EXCEPTION, e.getMessage())); } if (Topic.hasCollisionChars(topic.name())) { String normalizedName = Topic.unifyCollisionChars(topic.name()); Set colliding = topicsWithCollisionChars.get(normalizedName); if (colliding != null) { topicErrors.put(topic.name(), new ApiError(Errors.INVALID_TOPIC_EXCEPTION, - "Topic '" + topic.name() + "' collides with existing topic: " + - colliding.iterator().next())); + "Topic '" + topic.name() + "' collides with existing topic: " + + colliding.iterator().next())); } } } } - static Map>> - computeConfigChanges(Map topicErrors, - CreatableTopicCollection topics) { + static Map>> computeConfigChanges( + Map topicErrors, + CreatableTopicCollection topics) { Map>> configChanges = new HashMap<>(); for (CreatableTopic topic : topics) { - if (topicErrors.containsKey(topic.name())) continue; + if (topicErrors.containsKey(topic.name())) + continue; Map> topicConfigs = new HashMap<>(); List nullConfigs = new ArrayList<>(); for (CreateTopicsRequestData.CreatableTopicConfig config : topic.configs()) { @@ -929,7 +940,7 @@ static void validateNewTopicNames(Map topicErrors, } if (!nullConfigs.isEmpty()) { topicErrors.put(topic.name(), new ApiError(Errors.INVALID_CONFIG, - "Null value not supported for topic configs: " + String.join(",", nullConfigs))); + "Null value not supported for topic configs: " + String.join(",", nullConfigs))); } else if (!topicConfigs.isEmpty()) { configChanges.put(new ConfigResource(TOPIC, topic.name()), topicConfigs); } @@ -946,7 +957,7 @@ Map> findTopicIds(long offset, Collection na Uuid id = topicsByName.get(name, offset); if (id == null) { results.put(name, new ResultOrError<>( - new ApiError(UNKNOWN_TOPIC_OR_PARTITION))); + new ApiError(UNKNOWN_TOPIC_OR_PARTITION))); } else { results.put(name, new ResultOrError<>(id)); } @@ -968,7 +979,7 @@ Map> findTopicNames(long offset, Collection id for (Uuid id : ids) { if (id == null || id.equals(Uuid.ZERO_UUID)) { results.put(id, new ResultOrError<>(new ApiError(INVALID_REQUEST, - "Attempt to find topic with invalid topicId " + id))); + "Attempt to find topic with invalid topicId " + id))); } else { TopicControlInfo topic = topics.get(id, offset); if (topic == null) { @@ -983,8 +994,7 @@ Map> findTopicNames(long offset, Collection id ControllerResult> deleteTopics(ControllerRequestContext context, Collection ids) { Map results = new HashMap<>(ids.size()); - List records = - BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP, ids.size()); + List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP, ids.size()); StringBuilder resultsBuilder = new StringBuilder(); String resultsPrefix = ""; @@ -1034,11 +1044,10 @@ void deleteTopic(ControllerRequestContext context, Uuid id, List imbalancedPartitions() { } ControllerResult alterPartition( - ControllerRequestContext context, - AlterPartitionRequestData request - ) { + ControllerRequestContext context, + AlterPartitionRequestData request) { short requestVersion = context.requestHeader().requestApiVersion(); clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); AlterPartitionResponseData response = new AlterPartitionResponseData(); List records = new ArrayList<>(); for (AlterPartitionRequestData.TopicData topicData : request.topics()) { - AlterPartitionResponseData.TopicData responseTopicData = - new AlterPartitionResponseData.TopicData(). - setTopicId(topicData.topicId()); + AlterPartitionResponseData.TopicData responseTopicData = new AlterPartitionResponseData.TopicData() + .setTopicId(topicData.topicId()); response.topics().add(responseTopicData); Uuid topicId = topicData.topicId(); if (topicId == null || topicId.equals(Uuid.ZERO_UUID) || !topics.containsKey(topicId)) { for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) { - responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). - setPartitionIndex(partitionData.partitionIndex()). - setErrorCode(UNKNOWN_TOPIC_ID.code())); + responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(partitionData.partitionIndex()).setErrorCode(UNKNOWN_TOPIC_ID.code())); } log.info("Rejecting AlterPartition request for unknown topic ID {}.", topicData.topicId()); continue; @@ -1103,47 +1109,44 @@ ControllerResult alterPartition( for (AlterPartitionRequestData.PartitionData partitionData : topicData.partitions()) { if (requestVersion < 3) { partitionData.setNewIsrWithEpochs( - AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(partitionData.newIsr()) - ); + AlterPartitionRequest.newIsrToSimpleNewIsrWithBrokerEpochs(partitionData.newIsr())); } int partitionId = partitionData.partitionIndex(); PartitionRegistration partition = topic.parts.get(partitionId); Errors validationError = validateAlterPartitionData( - request.brokerId(), - topic, - partitionId, - partition, - partitionData); + request.brokerId(), + topic, + partitionId, + partition, + partitionData); if (validationError != Errors.NONE) { responseTopicData.partitions().add( - new AlterPartitionResponseData.PartitionData() - .setPartitionIndex(partitionId) - .setErrorCode(validationError.code()) - ); + new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(partitionId) + .setErrorCode(validationError.code())); continue; } PartitionChangeBuilder builder = new PartitionChangeBuilder( - partition, - topic.id, - partitionId, - new LeaderAcceptor(clusterControl, partition), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) - ) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); + partition, + topic.id, + partitionId, + new LeaderAcceptor(clusterControl, partition), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topic.name)) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } Optional record = builder - .setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs()) - .setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState())) - .setDefaultDirProvider(clusterDescriber) - .build(); + .setTargetIsrWithBrokerStates(partitionData.newIsrWithEpochs()) + .setTargetLeaderRecoveryState(LeaderRecoveryState.of(partitionData.leaderRecoveryState())) + .setDefaultDirProvider(clusterDescriber) + .build(); if (record.isPresent()) { records.add(record.get()); PartitionChangeRecord change = (PartitionChangeRecord) record.get().message(); @@ -1151,7 +1154,8 @@ ControllerResult alterPartition( partition = originalPartition.merge(change); if (log.isDebugEnabled()) { log.debug("Node {} has altered ISR for {}-{}. {}", - request.brokerId(), topic.name, partitionId, logPartitionChangeInfo(originalPartition, partition)); + request.brokerId(), topic.name, partitionId, + logPartitionChangeInfo(originalPartition, partition)); } if (change.leader() != request.brokerId() && change.leader() != NO_LEADER_CHANGE) { @@ -1168,59 +1172,67 @@ ControllerResult alterPartition( // metadata record. We usually only do one or the other. Errors error = NEW_LEADER_ELECTED; log.info("AlterPartition request from node {} for {}-{} completed " + - "the ongoing partition reassignment and triggered a leadership change {}. Returning {}.", - request.brokerId(), topic.name, partitionId, - logPartitionChangeInfo(originalPartition, partition), error); - responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). - setPartitionIndex(partitionId). - setErrorCode(error.code())); + "the ongoing partition reassignment and triggered a leadership change {}. Returning {}.", + request.brokerId(), topic.name, partitionId, + logPartitionChangeInfo(originalPartition, partition), error); + responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData() + .setPartitionIndex(partitionId).setErrorCode(error.code())); continue; } else if (isReassignmentInProgress(partition)) { log.info("AlterPartition request from node {} for {}-{} completed " + - "the ongoing partition reassignment. {}", - request.brokerId(), topic.name, partitionId, logPartitionChangeInfo(originalPartition, partition)); + "the ongoing partition reassignment. {}", + request.brokerId(), topic.name, partitionId, + logPartitionChangeInfo(originalPartition, partition)); } } - /* Setting the LeaderRecoveryState field is always safe because it will always be the - * same as the value set in the request. For version 0, that is always the default - * RECOVERED which is ignored when serializing to version 0. For any other version, the + /* + * Setting the LeaderRecoveryState field is always safe because it will always + * be the + * same as the value set in the request. For version 0, that is always the + * default + * RECOVERED which is ignored when serializing to version 0. For any other + * version, the * LeaderRecoveryState field is supported. */ - responseTopicData.partitions().add(new AlterPartitionResponseData.PartitionData(). - setPartitionIndex(partitionId). - setErrorCode(Errors.NONE.code()). - setLeaderId(partition.leader). - setIsr(Replicas.toList(partition.isr)). - setLeaderRecoveryState(partition.leaderRecoveryState.value()). - setLeaderEpoch(partition.leaderEpoch). - setPartitionEpoch(partition.partitionEpoch)); + responseTopicData.partitions() + .add(new AlterPartitionResponseData.PartitionData().setPartitionIndex(partitionId) + .setErrorCode(Errors.NONE.code()).setLeaderId(partition.leader) + .setIsr(Replicas.toList(partition.isr)) + .setLeaderRecoveryState(partition.leaderRecoveryState.value()) + .setLeaderEpoch(partition.leaderEpoch).setPartitionEpoch(partition.partitionEpoch)); } } return ControllerResult.of(records, response); } - private static String logPartitionChangeInfo(PartitionRegistration oldRegistration, PartitionRegistration newRegistration) { + private static String logPartitionChangeInfo(PartitionRegistration oldRegistration, + PartitionRegistration newRegistration) { return String.format("isr: %s -> %s, replicaSet: %s -> %s, partitionEpoch: %d -> %d, leaderEpoch: %d -> %d", - Arrays.toString(oldRegistration.isr), Arrays.toString(newRegistration.isr), - Arrays.toString(oldRegistration.replicas), Arrays.toString(newRegistration.replicas), - oldRegistration.partitionEpoch, newRegistration.partitionEpoch, - oldRegistration.leaderEpoch, newRegistration.leaderEpoch); + Arrays.toString(oldRegistration.isr), Arrays.toString(newRegistration.isr), + Arrays.toString(oldRegistration.replicas), Arrays.toString(newRegistration.replicas), + oldRegistration.partitionEpoch, newRegistration.partitionEpoch, + oldRegistration.leaderEpoch, newRegistration.leaderEpoch); } /** - * Validates that a batch of topics will create less than {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch - * has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory. - * Validates an upper bound number of partitions. The actual number may be smaller if some topics are misconfigured. + * Validates that a batch of topics will create less than + * {@value MAX_PARTITIONS_PER_BATCH}. Exceeding this number of topics per batch + * has led to out-of-memory exceptions. We use this validation to fail earlier + * to avoid allocating the memory. + * Validates an upper bound number of partitions. The actual number may be + * smaller if some topics are misconfigured. * - * @param request a batch of topics to create. - * @param defaultNumPartitions default number of partitions to assign if unspecified. - * @throws PolicyViolationException if total number of partitions exceeds {@value MAX_PARTITIONS_PER_BATCH}. + * @param request a batch of topics to create. + * @param defaultNumPartitions default number of partitions to assign if + * unspecified. + * @throws PolicyViolationException if total number of partitions exceeds + * {@value MAX_PARTITIONS_PER_BATCH}. */ static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int defaultNumPartitions) { int totalPartitions = 0; - for (CreatableTopic topic: request.topics()) { + for (CreatableTopic topic : request.topics()) { if (topic.assignments().isEmpty()) { if (topic.numPartitions() == -1) { totalPartitions += defaultNumPartitions; @@ -1240,21 +1252,23 @@ static void validateTotalNumberOfPartitions(CreateTopicsRequestData request, int /** * Validate the partition information included in the alter partition request. * - * @param brokerId id of the broker requesting the alter partition - * @param topic current topic information store by the replication manager - * @param partitionId partition id being altered - * @param partition current partition registration for the partition being altered + * @param brokerId id of the broker requesting the alter partition + * @param topic current topic information store by the replication + * manager + * @param partitionId partition id being altered + * @param partition current partition registration for the partition being + * altered * @param partitionData partition data from the alter partition request * - * @return Errors.NONE for valid alter partition data; otherwise the validation error + * @return Errors.NONE for valid alter partition data; otherwise the validation + * error */ private Errors validateAlterPartitionData( - int brokerId, - TopicControlInfo topic, - int partitionId, - PartitionRegistration partition, - AlterPartitionRequestData.PartitionData partitionData - ) { + int brokerId, + TopicControlInfo topic, + int partitionId, + PartitionRegistration partition, + AlterPartitionRequestData.PartitionData partitionData) { if (partition == null) { log.info("Rejecting AlterPartition request for unknown partition {}-{}.", topic.name, partitionId); @@ -1262,21 +1276,23 @@ private Errors validateAlterPartitionData( return UNKNOWN_TOPIC_OR_PARTITION; } - // If the partition leader has a higher leader/partition epoch, then it is likely - // that this node is no longer the active controller. We return NOT_CONTROLLER in + // If the partition leader has a higher leader/partition epoch, then it is + // likely + // that this node is no longer the active controller. We return NOT_CONTROLLER + // in // this case to give the leader an opportunity to find the new controller. if (partitionData.leaderEpoch() > partition.leaderEpoch) { log.debug("Rejecting AlterPartition request from node {} for {}-{} because " + "the current leader epoch is {}, which is greater than the local value {}. {}", - brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch(), - logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); + brokerId, topic.name, partitionId, partition.leaderEpoch, partitionData.leaderEpoch(), + logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); return NOT_CONTROLLER; } if (partitionData.partitionEpoch() > partition.partitionEpoch) { log.debug("Rejecting AlterPartition request from node {} for {}-{} because " + "the current partition epoch is {}, which is greater than the local value {}. {}", - brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch(), - logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); + brokerId, topic.name, partitionId, partition.partitionEpoch, partitionData.partitionEpoch(), + logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); return NOT_CONTROLLER; } if (partitionData.leaderEpoch() < partition.leaderEpoch) { @@ -1298,13 +1314,14 @@ private Errors validateAlterPartitionData( log.info("Rejecting AlterPartition request from node {} for {}-{} because " + "the current partition epoch is {}, not {}. {}", brokerId, topic.name, partitionId, partition.partitionEpoch, - partitionData.partitionEpoch(), logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); + partitionData.partitionEpoch(), + logPartitionChangeInfo(partition, partitionData.newIsrWithEpochs())); return INVALID_UPDATE_VERSION; } int[] newIsr = partitionData.newIsrWithEpochs().stream() - .mapToInt(BrokerState::brokerId).toArray(); + .mapToInt(BrokerState::brokerId).toArray(); if (!Replicas.validateIsr(partition.replicas, newIsr)) { log.error("Rejecting AlterPartition request from node {} for {}-{} because " + @@ -1355,13 +1372,12 @@ private Errors validateAlterPartitionData( } private static String logPartitionChangeInfo( - PartitionRegistration partition, - List requestedIsr - ) { + PartitionRegistration partition, + List requestedIsr) { return String.format("Proposed ISR was %s and current ISR is %s. " + "Current replica set is %s. Current partitionEpoch is %d. Current leaderEpoch is %d.", - requestedIsr, Arrays.toString(partition.isr), Arrays.toString(partition.replicas), - partition.partitionEpoch, partition.leaderEpoch); + requestedIsr, Arrays.toString(partition.isr), Arrays.toString(partition.replicas), + partition.partitionEpoch, partition.leaderEpoch); } private List ineligibleReplicasForIsr(List brokerStates) { @@ -1376,11 +1392,12 @@ private List ineligibleReplicasForIsr(List broke } else if (registration.fenced()) { ineligibleReplicas.add(new IneligibleReplica(brokerId, "fenced")); } else if (brokerState.brokerEpoch() != -1 && registration.epoch() != brokerState.brokerEpoch()) { - // The given broker epoch should match with the broker epoch in the broker registration, except the + // The given broker epoch should match with the broker epoch in the broker + // registration, except the // given broker epoch is -1 which means skipping the broker epoch verification. ineligibleReplicas.add(new IneligibleReplica(brokerId, - "broker epoch mismatch: requested=" + brokerState.brokerEpoch() - + " VS expected=" + registration.epoch())); + "broker epoch mismatch: requested=" + brokerState.brokerEpoch() + + " VS expected=" + registration.epoch())); } } return ineligibleReplicas; @@ -1392,8 +1409,8 @@ private List ineligibleReplicasForIsr(List broke * First, we remove this broker from any ISR. Then we generate a * BrokerRegistrationChangeRecord. * - * @param brokerId The broker id. - * @param records The record list to append to. + * @param brokerId The broker id. + * @param records The record list to append to. */ void handleBrokerFenced(int brokerId, List records) { BrokerRegistration brokerRegistration = clusterControl.brokerRegistrations().get(brokerId); @@ -1401,11 +1418,11 @@ void handleBrokerFenced(int brokerId, List records) { throw new RuntimeException("Can't find broker registration for broker " + brokerId); } generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, NO_LEADER, records, - brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()). - setFenced(BrokerRegistrationFencingChange.FENCE.value()), - (short) 0)); + brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); + records.add(new ApiMessageAndVersion( + new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()) + .setFenced(BrokerRegistrationFencingChange.FENCE.value()), + (short) 0)); } /** @@ -1414,90 +1431,100 @@ void handleBrokerFenced(int brokerId, List records) { * First, we remove this broker from any ISR or ELR. Then we generate an * UnregisterBrokerRecord. * - * @param brokerId The broker id. - * @param brokerEpoch The broker epoch. - * @param records The record list to append to. + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. */ void handleBrokerUnregistered(int brokerId, long brokerEpoch, - List records) { + List records) { generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, - brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); + brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, NO_LEADER, records, - brokersToElrs.partitionsWithBrokerInElr(brokerId)); - records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), - (short) 0)); + brokersToElrs.partitionsWithBrokerInElr(brokerId)); + records.add( + new ApiMessageAndVersion(new UnregisterBrokerRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch), + (short) 0)); } /** * Generate the appropriate records to handle a broker becoming unfenced. * - * First, we create a BrokerRegistrationChangeRecord. Then, we check if there are any + * First, we create a BrokerRegistrationChangeRecord. Then, we check if there + * are any * partitions that don't currently have a leader that should be led by the newly * unfenced broker. * - * @param brokerId The broker id. - * @param brokerEpoch The broker epoch. - * @param records The record list to append to. + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. */ void handleBrokerUnfenced(int brokerId, long brokerEpoch, List records) { - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). - setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), - (short) 0)); + records.add(new ApiMessageAndVersion( + new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch) + .setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), + (short) 0)); generateLeaderAndIsrUpdates("handleBrokerUnfenced", NO_LEADER, brokerId, NO_LEADER, records, - brokersToIsrs.partitionsWithNoLeader()); + brokersToIsrs.partitionsWithNoLeader()); } /** - * Generate the appropriate records to handle a broker starting a controlled shutdown. + * Generate the appropriate records to handle a broker starting a controlled + * shutdown. * - * First, we create an BrokerRegistrationChangeRecord. Then, we remove this broker + * First, we create an BrokerRegistrationChangeRecord. Then, we remove this + * broker * from any ISR and elect new leaders for partitions led by this * broker. * - * @param brokerId The broker id. - * @param brokerEpoch The broker epoch. - * @param records The record list to append to. + * @param brokerId The broker id. + * @param brokerEpoch The broker epoch. + * @param records The record list to append to. */ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List records) { if (!clusterControl.inControlledShutdown(brokerId)) { - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). - setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), - (short) 1)); + records.add(new ApiMessageAndVersion( + new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch) + .setInControlledShutdown( + BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value()), + (short) 1)); } generateLeaderAndIsrUpdates("enterControlledShutdown[" + brokerId + "]", - brokerId, NO_LEADER, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); + brokerId, NO_LEADER, NO_LEADER, records, brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); } /** - * Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected. + * Create partition change records to remove replicas from any ISR or ELR for + * brokers when the shutdown is detected. * - * @param brokerId The broker id to be shut down. - * @param isCleanShutdown Whether the broker has a clean shutdown. - * @param records The record list to append to. + * @param brokerId The broker id to be shut down. + * @param isCleanShutdown Whether the broker has a clean shutdown. + * @param records The record list to append to. */ void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List records) { if (featureControl.isElrFeatureEnabled() && !isCleanShutdown) { // ELR is enabled, generate unclean shutdown partition change records generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, - brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); + brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records, - brokersToElrs.partitionsWithBrokerInElr(brokerId)); + brokersToElrs.partitionsWithBrokerInElr(brokerId)); } else { - // ELR is not enabled or if it is a clean shutdown, handle the shutdown as if the broker was fenced + // ELR is not enabled or if it is a clean shutdown, handle the shutdown as if + // the broker was fenced generateLeaderAndIsrUpdates("handleBrokerShutdown", brokerId, NO_LEADER, NO_LEADER, records, - brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); + brokersToIsrs.partitionsWithBrokerInIsr(brokerId)); } } /** - * Generates the appropriate records to handle a list of directories being reported offline. + * Generates the appropriate records to handle a list of directories being + * reported offline. * - * If the reported directories include directories that were previously online, this includes - * a BrokerRegistrationChangeRecord and any number of PartitionChangeRecord to update - * leadership and ISR for partitions in those directories that were previously online. + * If the reported directories include directories that were previously online, + * this includes + * a BrokerRegistrationChangeRecord and any number of PartitionChangeRecord to + * update + * leadership and ISR for partitions in those directories that were previously + * online. * * @param brokerId The broker id. * @param brokerEpoch The broker epoch. @@ -1505,26 +1532,24 @@ void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List offlineDirs, - List records - ) { + int brokerId, + long brokerEpoch, + List offlineDirs, + List records) { BrokerRegistration registration = clusterControl.registration(brokerId); List newOfflineDirs = registration.directoryIntersection(offlineDirs); if (!newOfflineDirs.isEmpty()) { for (Uuid newOfflineDir : newOfflineDirs) { TimelineHashSet parts = directoriesToPartitions.get(newOfflineDir); - Iterator iterator = (parts == null) ? - Collections.emptyIterator() : parts.iterator(); + Iterator iterator = (parts == null) ? Collections.emptyIterator() : parts.iterator(); generateLeaderAndIsrUpdates( "handleDirectoriesOffline[" + brokerId + ":" + newOfflineDir + "]", brokerId, NO_LEADER, NO_LEADER, records, iterator); } List newOnlineDirs = registration.directoryDifference(offlineDirs); - records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord(). - setBrokerId(brokerId).setBrokerEpoch(brokerEpoch). - setLogDirs(newOnlineDirs), + records.add(new ApiMessageAndVersion( + new BrokerRegistrationChangeRecord().setBrokerId(brokerId).setBrokerEpoch(brokerEpoch) + .setLogDirs(newOnlineDirs), (short) 2)); log.warn("Directories {} in broker {} marked offline, remaining directories: {}", newOfflineDirs, brokerId, newOnlineDirs); @@ -1536,15 +1561,17 @@ ControllerResult electLeaders(ElectLeadersRequestData List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); ElectLeadersResponseData response = new ElectLeadersResponseData(); if (request.topicPartitions() == null) { - // If topicPartitions is null, we try to elect a new leader for every partition. There - // are some obvious issues with this wire protocol. For example, what if we have too - // many partitions to fit the results in a single RPC? This behavior should probably be - // removed from the protocol. For now, however, we have to implement this for + // If topicPartitions is null, we try to elect a new leader for every partition. + // There + // are some obvious issues with this wire protocol. For example, what if we have + // too + // many partitions to fit the results in a single RPC? This behavior should + // probably be + // removed from the protocol. For now, however, we have to implement this for // compatibility with the old controller. for (Entry topicEntry : topicsByName.entrySet()) { String topicName = topicEntry.getKey(); - ReplicaElectionResult topicResults = - new ReplicaElectionResult().setTopic(topicName); + ReplicaElectionResult topicResults = new ReplicaElectionResult().setTopic(topicName); response.replicaElectionResults().add(topicResults); TopicControlInfo topic = topics.get(topicEntry.getValue()); if (topic != null) { @@ -1554,25 +1581,20 @@ ControllerResult electLeaders(ElectLeadersRequestData // When electing leaders for all partitions, we do not return // partitions which already have the desired leader. if (error.error() != Errors.ELECTION_NOT_NEEDED) { - topicResults.partitionResult().add(new PartitionResult(). - setPartitionId(partitionId). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + topicResults.partitionResult().add(new PartitionResult().setPartitionId(partitionId) + .setErrorCode(error.error().code()).setErrorMessage(error.message())); } } } } } else { for (TopicPartitions topic : request.topicPartitions()) { - ReplicaElectionResult topicResults = - new ReplicaElectionResult().setTopic(topic.topic()); + ReplicaElectionResult topicResults = new ReplicaElectionResult().setTopic(topic.topic()); response.replicaElectionResults().add(topicResults); for (int partitionId : topic.partitions()) { ApiError error = electLeader(topic.topic(), partitionId, electionType, records); - topicResults.partitionResult().add(new PartitionResult(). - setPartitionId(partitionId). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + topicResults.partitionResult().add(new PartitionResult().setPartitionId(partitionId) + .setErrorCode(error.error().code()).setErrorMessage(error.message())); } } } @@ -1588,24 +1610,24 @@ private static ElectionType electionType(byte electionType) { } ApiError electLeader(String topic, int partitionId, ElectionType electionType, - List records) { + List records) { Uuid topicId = topicsByName.get(topic); if (topicId == null) { return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, - "No such topic as " + topic); + "No such topic as " + topic); } TopicControlInfo topicInfo = topics.get(topicId); if (topicInfo == null) { return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, - "No such topic id as " + topicId); + "No such topic id as " + topicId); } PartitionRegistration partition = topicInfo.parts.get(partitionId); if (partition == null) { return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, - "No such partition as " + topic + "-" + partitionId); + "No such partition as " + topic + "-" + partitionId); } if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader()) - || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) { + || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) { return new ApiError(Errors.ELECTION_NOT_NEEDED); } @@ -1614,17 +1636,16 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, election = PartitionChangeBuilder.Election.UNCLEAN; } Optional record = new PartitionChangeBuilder( - partition, - topicId, - partitionId, - new LeaderAcceptor(clusterControl, partition), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic) - ) - .setElection(election) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) - .setDefaultDirProvider(clusterDescriber) - .build(); + partition, + topicId, + partitionId, + new LeaderAcceptor(clusterControl, partition), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topic)) + .setElection(election) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) + .setDefaultDirProvider(clusterDescriber) + .build(); if (record.isEmpty()) { if (electionType == ElectionType.PREFERRED) { return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE); @@ -1637,15 +1658,14 @@ ApiError electLeader(String topic, int partitionId, ElectionType electionType, } ControllerResult processBrokerHeartbeat( - BrokerHeartbeatRequestData request, - long registerBrokerRecordOffset - ) { + BrokerHeartbeatRequestData request, + long registerBrokerRecordOffset) { int brokerId = request.brokerId(); long brokerEpoch = request.brokerEpoch(); clusterControl.checkBrokerEpoch(brokerId, brokerEpoch); BrokerHeartbeatManager heartbeatManager = clusterControl.heartbeatManager(); BrokerControlStates states = heartbeatManager.calculateNextBrokerState(brokerId, - request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId)); + request, registerBrokerRecordOffset, () -> brokersToIsrs.hasLeaderships(brokerId)); List records = new ArrayList<>(); if (states.current() != states.next()) { switch (states.next()) { @@ -1662,8 +1682,8 @@ ControllerResult processBrokerHeartbeat( } } heartbeatManager.touch(brokerId, - states.next().fenced(), - request.currentMetadataOffset()); + states.next().fenced(), + request.currentMetadataOffset()); if (featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) { handleDirectoriesOffline(brokerId, brokerEpoch, request.offlineLogDirs(), records); } @@ -1676,10 +1696,14 @@ ControllerResult processBrokerHeartbeat( } /** - * Process a broker heartbeat which has been sitting on the queue for too long, and has - * expired. With default settings, this would happen after 1 second. We process expired - * heartbeats by updating the lastSeenNs of the broker, so that the broker won't get fenced - * incorrectly. However, we don't perform any state changes that we normally would, such as + * Process a broker heartbeat which has been sitting on the queue for too long, + * and has + * expired. With default settings, this would happen after 1 second. We process + * expired + * heartbeats by updating the lastSeenNs of the broker, so that the broker won't + * get fenced + * incorrectly. However, we don't perform any state changes that we normally + * would, such as * unfencing a fenced broker, etc. */ void processExpiredBrokerHeartbeat(BrokerHeartbeatRequestData request) { @@ -1696,7 +1720,7 @@ public ControllerResult unregisterBroker(int brokerId) { BrokerRegistration registration = clusterControl.brokerRegistrations().get(brokerId); if (registration == null) { throw new BrokerIdNotRegisteredException("Broker ID " + brokerId + - " is not currently registered"); + " is not currently registered"); } List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); handleBrokerUnregistered(brokerId, registration.epoch(), records); @@ -1719,8 +1743,8 @@ ControllerResult maybeFenceOneStaleBroker() { return ControllerResult.of(List.of(), true); } else if (clusterControl.brokerRegistrations().get(id).epoch() != epoch) { log.info("Removing heartbeat tracker entry for broker {} at previous epoch {}. " + - "Current epoch is {}", id, epoch, - clusterControl.brokerRegistrations().get(id).epoch()); + "Current epoch is {}", id, epoch, + clusterControl.brokerRegistrations().get(id).epoch()); return ControllerResult.of(List.of(), true); } // Even though multiple brokers can go stale at a time, we will process @@ -1742,12 +1766,16 @@ boolean areSomePartitionsLeaderless() { } /** - * Attempt to elect a preferred leader for all topic partitions which have a leader that is not the preferred replica. + * Attempt to elect a preferred leader for all topic partitions which have a + * leader that is not the preferred replica. * - * The response() method in the return object is true if this method returned without electing all possible preferred replicas. - * The quorum controller should reschedule this operation immediately if it is true. + * The response() method in the return object is true if this method returned + * without electing all possible preferred replicas. + * The quorum controller should reschedule this operation immediately if it is + * true. * - * @return All of the election records and if there may be more available preferred replicas to elect as leader + * @return All of the election records and if there may be more available + * preferred replicas to elect as leader */ ControllerResult maybeBalancePartitionLeaders() { List records = new ArrayList<>(); @@ -1756,9 +1784,8 @@ ControllerResult maybeBalancePartitionLeaders() { } void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader( - List records, - int maxElections - ) { + List records, + int maxElections) { for (TopicIdPartition topicPartition : imbalancedPartitions) { if (records.size() >= maxElections) { return; @@ -1778,27 +1805,29 @@ void maybeTriggerLeaderChangeForPartitionsWithoutPreferredLeader( // Attempt to perform a preferred leader election new PartitionChangeBuilder( - partition, - topicPartition.topicId(), - topicPartition.partitionId(), - new LeaderAcceptor(clusterControl, partition), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) - ) - .setElection(PartitionChangeBuilder.Election.PREFERRED) - .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) - .setDefaultDirProvider(clusterDescriber) - .build().ifPresent(records::add); + partition, + topicPartition.topicId(), + topicPartition.partitionId(), + new LeaderAcceptor(clusterControl, partition), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topic.name)) + .setElection(PartitionChangeBuilder.Election.PREFERRED) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()) + .setDefaultDirProvider(clusterDescriber) + .build().ifPresent(records::add); } } /** * Check if we can do an unclean election for partitions with no leader. * - * The response() method in the return object is true if this method returned without electing all possible preferred replicas. - * The quorum controller should reschedule this operation immediately if it is true. + * The response() method in the return object is true if this method returned + * without electing all possible preferred replicas. + * The quorum controller should reschedule this operation immediately if it is + * true. * - * @return All of the election records and true if there may be more elections to be done. + * @return All of the election records and true if there may be more elections + * to be done. */ ControllerResult maybeElectUncleanLeaders() { List records = new ArrayList<>(); @@ -1807,15 +1836,15 @@ ControllerResult maybeElectUncleanLeaders() { } /** - * Trigger unclean leader election for partitions without leader (visible for testing) + * Trigger unclean leader election for partitions without leader (visible for + * testing) * - * @param records The record list to append to. - * @param maxElections The maximum number of elections to perform. + * @param records The record list to append to. + * @param maxElections The maximum number of elections to perform. */ void maybeTriggerUncleanLeaderElectionForLeaderlessPartitions( List records, - int maxElections - ) { + int maxElections) { Iterator iterator = brokersToIsrs.partitionsWithNoLeader(); while (iterator.hasNext() && records.size() < maxElections) { TopicIdPartition topicIdPartition = iterator.next(); @@ -1834,7 +1863,7 @@ void maybeTriggerUncleanLeaderElectionForLeaderlessPartitions( } } else if (log.isDebugEnabled()) { log.debug("Cannot trigger unclean leader election for offline partition {}-{} " + - "because unclean leader election is disabled for this topic. {}", + "because unclean leader election is disabled for this topic. {}", topic.name, partitionId, logPartitionInfo(topic.parts.get(partitionId))); } } @@ -1842,14 +1871,13 @@ void maybeTriggerUncleanLeaderElectionForLeaderlessPartitions( private static String logPartitionInfo(PartitionRegistration partition) { return String.format("(isr: %s, replicaSet: %s, partitionEpoch: %d, leaderEpoch: %d)", - Arrays.toString(partition.isr), Arrays.toString(partition.replicas), partition.partitionEpoch, - partition.leaderEpoch); + Arrays.toString(partition.isr), Arrays.toString(partition.replicas), partition.partitionEpoch, + partition.leaderEpoch); } ControllerResult> createPartitions( - ControllerRequestContext context, - List topics - ) { + ControllerRequestContext context, + List topics) { List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); List results = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); for (CreatePartitionsTopic topic : topics) { @@ -1862,17 +1890,15 @@ ControllerResult> createPartitions( log.error("Unexpected createPartitions error for {}", topic, e); apiError = ApiError.fromThrowable(e); } - results.add(new CreatePartitionsTopicResult(). - setName(topic.name()). - setErrorCode(apiError.error().code()). - setErrorMessage(apiError.message())); + results.add(new CreatePartitionsTopicResult().setName(topic.name()).setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message())); } return ControllerResult.atomicOf(records, results); } void createPartitions(ControllerRequestContext context, - CreatePartitionsTopic topic, - List records) { + CreatePartitionsTopic topic, + List records) { Uuid topicId = topicsByName.get(topic.name()); if (topicId == null) { throw new UnknownTopicOrPartitionException(); @@ -1883,18 +1909,18 @@ void createPartitions(ControllerRequestContext context, } if (topic.count() == topicInfo.parts.size()) { throw new InvalidPartitionsException("Topic already has " + - topicInfo.parts.size() + " partition(s)."); + topicInfo.parts.size() + " partition(s)."); } else if (topic.count() < topicInfo.parts.size()) { throw new InvalidPartitionsException("The topic " + topic.name() + " currently " + - "has " + topicInfo.parts.size() + " partition(s); " + topic.count() + - " would not be an increase."); + "has " + topicInfo.parts.size() + " partition(s); " + topic.count() + + " would not be an increase."); } int additional = topic.count() - topicInfo.parts.size(); if (topic.assignments() != null) { if (topic.assignments().size() != additional) { throw new InvalidReplicaAssignmentException("Attempted to add " + additional + - " additional partition(s), but only " + topic.assignments().size() + - " assignment(s) were specified."); + " additional partition(s), but only " + topic.assignments().size() + + " assignment(s) were specified."); } } try { @@ -1902,19 +1928,19 @@ void createPartitions(ControllerRequestContext context, } catch (ThrottlingQuotaExceededException e) { // log a message and rethrow the exception log.debug("Partition creation of {} partitions not allowed because quota is violated. Delay time: {}", - additional, e.throttleTimeMs()); + additional, e.throttleTimeMs()); throw e; } Iterator iterator = topicInfo.parts.values().iterator(); if (!iterator.hasNext()) { throw new UnknownServerException("Invalid state: topic " + topic.name() + - " appears to have no partitions."); + " appears to have no partitions."); } PartitionRegistration partitionInfo = iterator.next(); if (partitionInfo.replicas.length > Short.MAX_VALUE) { throw new UnknownServerException("Invalid replication factor " + - partitionInfo.replicas.length + ": expected a number equal to less than " + - Short.MAX_VALUE); + partitionInfo.replicas.length + ": expected a number equal to less than " + + Short.MAX_VALUE); } short replicationFactor = (short) partitionInfo.replicas.length; int startPartitionId = topicInfo.parts.size(); @@ -1929,50 +1955,46 @@ void createPartitions(ControllerRequestContext context, PartitionAssignment partitionAssignment = new PartitionAssignment(replicas, clusterDescriber); validateManualPartitionAssignment(partitionAssignment, OptionalInt.of(replicationFactor)); partitionAssignments.add(partitionAssignment); - List isr = partitionAssignment.replicas().stream(). - filter(clusterControl::isActive).toList(); + List isr = partitionAssignment.replicas().stream().filter(clusterControl::isActive).toList(); if (isr.isEmpty()) { throw new InvalidReplicaAssignmentException( - "All brokers specified in the manual partition assignment for " + - "partition " + (startPartitionId + i) + " are fenced or in controlled shutdown."); + "All brokers specified in the manual partition assignment for " + + "partition " + (startPartitionId + i) + " are fenced or in controlled shutdown."); } isrs.add(isr); } } else { partitionAssignments = clusterControl.replicaPlacer().place( - new PlacementSpec(startPartitionId, additional, replicationFactor), - clusterDescriber - ).assignments(); + new PlacementSpec(startPartitionId, additional, replicationFactor), + clusterDescriber).assignments(); isrs = partitionAssignments.stream().map(PartitionAssignment::replicas).toList(); } int partitionId = startPartitionId; for (int i = 0; i < partitionAssignments.size(); i++) { PartitionAssignment partitionAssignment = partitionAssignments.get(i); - List isr = isrs.get(i).stream(). - filter(clusterControl::isActive).toList(); + List isr = isrs.get(i).stream().filter(clusterControl::isActive).toList(); // If the ISR is empty, it means that all brokers are fenced or // in controlled shutdown. To be consistent with the replica placer, // we reject the create topic request with INVALID_REPLICATION_FACTOR. if (isr.isEmpty()) { throw new InvalidReplicationFactorException( - "Unable to replicate the partition " + replicationFactor + - " time(s): All brokers are currently fenced or in controlled shutdown."); + "Unable to replicate the partition " + replicationFactor + + " time(s): All brokers are currently fenced or in controlled shutdown."); } records.add(buildPartitionRegistration(partitionAssignment, isr) - .toRecord(topicId, partitionId, new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()). - setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()). - build())); + .toRecord(topicId, partitionId, + new ImageWriterOptions.Builder(featureControl.metadataVersionOrThrow()) + .setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()).build())); partitionId++; } } void validateManualPartitionAssignment( - PartitionAssignment assignment, - OptionalInt replicationFactor - ) { + PartitionAssignment assignment, + OptionalInt replicationFactor) { if (assignment.replicas().isEmpty()) { throw new InvalidReplicaAssignmentException("The manual partition " + - "assignment includes an empty replica list."); + "assignment includes an empty replica list."); } List sortedBrokerIds = new ArrayList<>(assignment.replicas()); sortedBrokerIds.sort(Integer::compare); @@ -1980,67 +2002,80 @@ void validateManualPartitionAssignment( for (Integer brokerId : sortedBrokerIds) { if (!clusterControl.brokerRegistrations().containsKey(brokerId)) { throw new InvalidReplicaAssignmentException("The manual partition " + - "assignment includes broker " + brokerId + ", but no such broker is " + - "registered."); + "assignment includes broker " + brokerId + ", but no such broker is " + + "registered."); } if (brokerId.equals(prevBrokerId)) { throw new InvalidReplicaAssignmentException("The manual partition " + - "assignment includes the broker " + prevBrokerId + " more than " + - "once."); + "assignment includes the broker " + prevBrokerId + " more than " + + "once."); } prevBrokerId = brokerId; } if (replicationFactor.isPresent() && sortedBrokerIds.size() != replicationFactor.getAsInt()) { throw new InvalidReplicaAssignmentException("The manual partition " + - "assignment includes a partition with " + sortedBrokerIds.size() + - " replica(s), but this is not consistent with previous " + - "partitions, which have " + replicationFactor.getAsInt() + " replica(s)."); + "assignment includes a partition with " + sortedBrokerIds.size() + + " replica(s), but this is not consistent with previous " + + "partitions, which have " + replicationFactor.getAsInt() + " replica(s)."); } } /** - * Iterate over a sequence of partitions and generate ISR/ELR changes and/or leader + * Iterate over a sequence of partitions and generate ISR/ELR changes and/or + * leader * changes if necessary. * - * @param context A human-readable context string used in log4j logging. - * @param brokerToRemove NO_LEADER if no broker is being removed; the ID of the - * broker to remove from the ISR and leadership, otherwise. - * @param brokerToAdd NO_LEADER if no broker is being added; the ID of the - * broker which is now eligible to be a leader, otherwise. + * @param context A human-readable context string used in + * log4j logging. + * @param brokerToRemove NO_LEADER if no broker is being removed; the + * ID of the + * broker to remove from the ISR and + * leadership, otherwise. + * @param brokerToAdd NO_LEADER if no broker is being added; the + * ID of the + * broker which is now eligible to be a leader, + * otherwise. * @param brokerWithUncleanShutdown - * NO_LEADER if no broker has unclean shutdown; the ID of the - * broker which is now removed from the ISR, ELR and - * leadership, otherwise. - * @param records A list of records which we will append to. - * @param iterator The iterator containing the partitions to examine. + * NO_LEADER if no broker has unclean shutdown; + * the ID of the + * broker which is now removed from the ISR, + * ELR and + * leadership, otherwise. + * @param records A list of records which we will append to. + * @param iterator The iterator containing the partitions to + * examine. */ void generateLeaderAndIsrUpdates(String context, - int brokerToRemove, - int brokerToAdd, - int brokerWithUncleanShutdown, - List records, - Iterator iterator) { + int brokerToRemove, + int brokerToAdd, + int brokerWithUncleanShutdown, + List records, + Iterator iterator) { int oldSize = records.size(); // If the caller passed a valid broker ID for brokerToAdd, rather than passing // NO_LEADER, that node will be considered an acceptable leader even if it is // currently fenced. This is useful when handling unfencing. The reason is that - // while we're generating the records to handle unfencing, the ClusterControlManager + // while we're generating the records to handle unfencing, the + // ClusterControlManager // still shows the node as fenced. // // Similarly, if the caller passed a valid broker ID for brokerToRemove, rather - // than passing NO_LEADER, that node will never be considered an acceptable leader. - // This is useful when handling a newly fenced node. We also exclude brokerToRemove + // than passing NO_LEADER, that node will never be considered an acceptable + // leader. + // This is useful when handling a newly fenced node. We also exclude + // brokerToRemove // from the target ISR, but we need to exclude it here too, to handle the case // where there is an unclean leader election which chooses a leader from outside // the ISR. // - // If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather than - // passing NO_LEADER, this node should not be an acceptable leader. We also exclude + // If the caller passed a valid broker ID for brokerWithUncleanShutdown, rather + // than + // passing NO_LEADER, this node should not be an acceptable leader. We also + // exclude // brokerWithUncleanShutdown from ELR and ISR. - IntPredicate isAcceptableLeader = - r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) + IntPredicate isAcceptableLeader = r -> (r != brokerToRemove && r != brokerWithUncleanShutdown) && (r == brokerToAdd || clusterControl.isActive(r)); while (iterator.hasNext()) { @@ -2048,21 +2083,20 @@ void generateLeaderAndIsrUpdates(String context, TopicControlInfo topic = topics.get(topicIdPart.topicId()); if (topic == null) { throw new RuntimeException("Topic ID " + topicIdPart.topicId() + - " existed in isrMembers, but not in the topics map."); + " existed in isrMembers, but not in the topics map."); } PartitionRegistration partition = topic.parts.get(topicIdPart.partitionId()); if (partition == null) { throw new RuntimeException("Partition " + topicIdPart + - " existed in isrMembers, but not in the partitions map."); + " existed in isrMembers, but not in the partitions map."); } PartitionChangeBuilder builder = new PartitionChangeBuilder( - partition, - topicIdPart.topicId(), - topicIdPart.partitionId(), - new LeaderAcceptor(clusterControl, partition, isAcceptableLeader), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topic.name) - ); + partition, + topicIdPart.topicId(), + topicIdPart.partitionId(), + new LeaderAcceptor(clusterControl, partition, isAcceptableLeader), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topic.name)); builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); @@ -2071,10 +2105,11 @@ void generateLeaderAndIsrUpdates(String context, builder.setUncleanShutdownReplicas(List.of(brokerWithUncleanShutdown)); } - // Note: if brokerToRemove and brokerWithUncleanShutdown were passed as NO_LEADER, this is a no-op (the new + // Note: if brokerToRemove and brokerWithUncleanShutdown were passed as + // NO_LEADER, this is a no-op (the new // target ISR will be the same as the old one). builder.setTargetIsr(Replicas.toList( - Replicas.copyWithout(partition.isr, new int[] {brokerToRemove, brokerWithUncleanShutdown}))); + Replicas.copyWithout(partition.isr, new int[]{brokerToRemove, brokerWithUncleanShutdown }))); builder.setDefaultDirProvider(clusterDescriber) .build().ifPresent(records::add); @@ -2083,12 +2118,11 @@ void generateLeaderAndIsrUpdates(String context, if (log.isDebugEnabled()) { StringBuilder bld = new StringBuilder(); String prefix = ""; - for (ListIterator iter = records.listIterator(oldSize); - iter.hasNext(); ) { + for (ListIterator iter = records.listIterator(oldSize); iter.hasNext();) { ApiMessageAndVersion apiMessageAndVersion = iter.next(); PartitionChangeRecord record = (PartitionChangeRecord) apiMessageAndVersion.message(); - bld.append(prefix).append(topics.get(record.topicId()).name).append("-"). - append(record.partitionId()); + bld.append(prefix).append(topics.get(record.topicId()).name).append("-") + .append(record.partitionId()); prefix = ", "; } log.debug("{}: changing partition(s): {}", context, bld); @@ -2098,17 +2132,16 @@ void generateLeaderAndIsrUpdates(String context, } } - ControllerResult - alterPartitionReassignments(AlterPartitionReassignmentsRequestData request) { + ControllerResult alterPartitionReassignments( + AlterPartitionReassignmentsRequestData request) { List records = BoundedList.newArrayBacked(MAX_RECORDS_PER_USER_OP); boolean allowRFChange = request.allowReplicationFactorChange(); - AlterPartitionReassignmentsResponseData result = - new AlterPartitionReassignmentsResponseData().setErrorMessage(null) - .setAllowReplicationFactorChange(allowRFChange); + AlterPartitionReassignmentsResponseData result = new AlterPartitionReassignmentsResponseData() + .setErrorMessage(null) + .setAllowReplicationFactorChange(allowRFChange); int successfulAlterations = 0, totalAlterations = 0; for (ReassignableTopic topic : request.topics()) { - ReassignableTopicResponse topicResponse = new ReassignableTopicResponse(). - setName(topic.name()); + ReassignableTopicResponse topicResponse = new ReassignableTopicResponse().setName(topic.name()); for (ReassignablePartition partition : topic.partitions()) { ApiError error = ApiError.NONE; try { @@ -2120,37 +2153,36 @@ void generateLeaderAndIsrUpdates(String context, error = ApiError.fromThrowable(e); } totalAlterations++; - topicResponse.partitions().add(new ReassignablePartitionResponse(). - setPartitionIndex(partition.partitionIndex()). - setErrorCode(error.error().code()). - setErrorMessage(error.message())); + topicResponse.partitions() + .add(new ReassignablePartitionResponse().setPartitionIndex(partition.partitionIndex()) + .setErrorCode(error.error().code()).setErrorMessage(error.message())); } result.responses().add(topicResponse); } log.info("Successfully altered {} out of {} partition reassignment(s).", - successfulAlterations, totalAlterations); + successfulAlterations, totalAlterations); return ControllerResult.atomicOf(records, result); } void alterPartitionReassignment(String topicName, - ReassignablePartition target, - List records, - boolean allowRFChange) { + ReassignablePartition target, + List records, + boolean allowRFChange) { Uuid topicId = topicsByName.get(topicName); if (topicId == null) { throw new UnknownTopicOrPartitionException("Unable to find a topic " + - "named " + topicName + "."); + "named " + topicName + "."); } TopicControlInfo topicInfo = topics.get(topicId); if (topicInfo == null) { throw new UnknownTopicOrPartitionException("Unable to find a topic " + - "with ID " + topicId + "."); + "with ID " + topicId + "."); } TopicIdPartition tp = new TopicIdPartition(topicId, target.partitionIndex()); PartitionRegistration part = topicInfo.parts.get(target.partitionIndex()); if (part == null) { throw new UnknownTopicOrPartitionException("Unable to find partition " + - topicName + ":" + target.partitionIndex() + "."); + topicName + ":" + target.partitionIndex() + "."); } Optional record; if (target.replicas() == null) { @@ -2162,8 +2194,8 @@ record = changePartitionReassignment(tp, part, target, allowRFChange); } Optional cancelPartitionReassignment(String topicName, - TopicIdPartition tp, - PartitionRegistration part) { + TopicIdPartition tp, + PartitionRegistration part) { if (!isReassignmentInProgress(part)) { throw new NoReassignmentInProgressException(NO_REASSIGNMENT_IN_PROGRESS.message()); } @@ -2171,66 +2203,72 @@ Optional cancelPartitionReassignment(String topicName, if (revert.unclean()) { if (!configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) { throw new InvalidReplicaAssignmentException("Unable to revert partition " + - "assignment for " + topicName + ":" + tp.partitionId() + " because " + - "it would require an unclean leader election."); + "assignment for " + topicName + ":" + tp.partitionId() + " because " + + "it would require an unclean leader election."); } } PartitionChangeBuilder builder = new PartitionChangeBuilder( - part, - tp.topicId(), - tp.partitionId(), - new LeaderAcceptor(clusterControl, part), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topicName) - ); + part, + tp.topicId(), + tp.partitionId(), + new LeaderAcceptor(clusterControl, part), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topicName)); builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) { builder.setElection(PartitionChangeBuilder.Election.UNCLEAN); } return builder - .setTargetIsr(revert.isr()). - setTargetReplicas(revert.replicas()). - setTargetRemoving(List.of()). - setTargetAdding(List.of()). - setDefaultDirProvider(clusterDescriber). - build(); + .setTargetIsr(revert.isr()).setTargetReplicas(revert.replicas()).setTargetRemoving(List.of()) + .setTargetAdding(List.of()).setDefaultDirProvider(clusterDescriber).build(); } /** - * Apply a given partition reassignment. In general a partition reassignment goes + * Apply a given partition reassignment. In general a partition reassignment + * goes * through several stages: * - * 1. Issue a PartitionChangeRecord adding all the new replicas to the partition's + * 1. Issue a PartitionChangeRecord adding all the new replicas to the + * partition's * main replica list, and setting removingReplicas and addingReplicas. * - * 2. Wait for the partition to have an ISR that contains all the new replicas. Or - * if there are no new replicas, wait until we have an ISR that contains at least one + * 2. Wait for the partition to have an ISR that contains all the new replicas. + * Or + * if there are no new replicas, wait until we have an ISR that contains at + * least one * replica that we are not removing. * - * 3. Issue a second PartitionChangeRecord removing all removingReplicas from the - * partitions' main replica list, and clearing removingReplicas and addingReplicas. + * 3. Issue a second PartitionChangeRecord removing all removingReplicas from + * the + * partitions' main replica list, and clearing removingReplicas and + * addingReplicas. * * After stage 3, the reassignment is done. * - * Under some conditions, steps #1 and #2 can be skipped entirely since the ISR is - * already suitable to progress to stage #3. For example, a partition reassignment - * that merely rearranges existing replicas in the list can bypass step #1 and #2 and + * Under some conditions, steps #1 and #2 can be skipped entirely since the ISR + * is + * already suitable to progress to stage #3. For example, a partition + * reassignment + * that merely rearranges existing replicas in the list can bypass step #1 and + * #2 and * complete immediately. * - * @param tp The topic id and partition id. - * @param part The existing partition info. - * @param target The target partition info. - * @param allowRFChange Validate if partition replication factor can change. KIP-860 + * @param tp The topic id and partition id. + * @param part The existing partition info. + * @param target The target partition info. + * @param allowRFChange Validate if partition replication factor can change. + * KIP-860 * - * @return The ChangePartitionRecord for the new partition assignment, - * or empty if no change is needed. + * @return The ChangePartitionRecord for the new partition assignment, + * or empty if no change is needed. */ Optional changePartitionReassignment(TopicIdPartition tp, - PartitionRegistration part, - ReassignablePartition target, - boolean allowRFChange) { + PartitionRegistration part, + ReassignablePartition target, + boolean allowRFChange) { // Check that the requested partition assignment is valid. - PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), part::directory); + PartitionAssignment currentAssignment = new PartitionAssignment(Replicas.toList(part.replicas), + part::directory); PartitionAssignment targetAssignment = new PartitionAssignment(target.replicas(), clusterDescriber); validateManualPartitionAssignment(targetAssignment, OptionalInt.empty()); @@ -2239,16 +2277,15 @@ Optional changePartitionReassignment(TopicIdPartition tp, } List currentReplicas = Replicas.toList(part.replicas); - PartitionReassignmentReplicas reassignment = - new PartitionReassignmentReplicas(currentAssignment, targetAssignment); + PartitionReassignmentReplicas reassignment = new PartitionReassignmentReplicas(currentAssignment, + targetAssignment); PartitionChangeBuilder builder = new PartitionChangeBuilder( - part, - tp.topicId(), - tp.partitionId(), - new LeaderAcceptor(clusterControl, part), - featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topics.get(tp.topicId()).name) - ); + part, + tp.topicId(), + tp.partitionId(), + new LeaderAcceptor(clusterControl, part), + featureControl.metadataVersionOrThrow(), + getTopicEffectiveMinIsr(topics.get(tp.topicId()).name)); builder.setEligibleLeaderReplicasEnabled(featureControl.isElrFeatureEnabled()); if (!reassignment.replicas().equals(currentReplicas)) { builder.setTargetReplicas(reassignment.replicas()); @@ -2263,11 +2300,10 @@ Optional changePartitionReassignment(TopicIdPartition tp, } ListPartitionReassignmentsResponseData listPartitionReassignments( - List topicList, - long epoch - ) { - ListPartitionReassignmentsResponseData response = - new ListPartitionReassignmentsResponseData().setErrorMessage(null); + List topicList, + long epoch) { + ListPartitionReassignmentsResponseData response = new ListPartitionReassignmentsResponseData() + .setErrorMessage(null); if (topicList == null) { // List all reassigning topics. for (Entry entry : reassigningTopics.entrySet(epoch)) { @@ -2285,7 +2321,8 @@ ListPartitionReassignmentsResponseData listPartitionReassignments( return response; } - ControllerResult handleAssignReplicasToDirs(AssignReplicasToDirsRequestData request) { + ControllerResult handleAssignReplicasToDirs( + AssignReplicasToDirsRequestData request) { if (!featureControl.metadataVersionOrThrow().isDirectoryAssignmentSupported()) { throw new UnsupportedVersionException("Directory assignment is not supported yet."); } @@ -2301,16 +2338,19 @@ ControllerResult handleAssignReplicasToDirs(As for (AssignReplicasToDirsRequestData.DirectoryData reqDir : request.directories()) { Uuid dirId = reqDir.id(); boolean directoryIsOffline = !brokerRegistration.hasOnlineDir(dirId); - AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData().setId(dirId); + AssignReplicasToDirsResponseData.DirectoryData resDir = new AssignReplicasToDirsResponseData.DirectoryData() + .setId(dirId); for (AssignReplicasToDirsRequestData.TopicData reqTopic : reqDir.topics()) { Uuid topicId = reqTopic.topicId(); Errors topicError = Errors.NONE; TopicControlInfo topicInfo = this.topics.get(topicId); if (topicInfo == null) { - log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, topicId); + log.warn("AssignReplicasToDirsRequest from broker {} references unknown topic ID {}", brokerId, + topicId); topicError = Errors.UNKNOWN_TOPIC_ID; } - AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData().setTopicId(topicId); + AssignReplicasToDirsResponseData.TopicData resTopic = new AssignReplicasToDirsResponseData.TopicData() + .setTopicId(topicId); for (AssignReplicasToDirsRequestData.PartitionData reqPartition : reqTopic.partitions()) { int partitionIndex = reqPartition.partitionIndex(); Errors partitionError = topicError; @@ -2318,10 +2358,13 @@ ControllerResult handleAssignReplicasToDirs(As String topicName = topicInfo.name; PartitionRegistration partitionRegistration = topicInfo.parts.get(partitionIndex); if (partitionRegistration == null) { - log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", brokerId, topicName, partitionIndex); + log.warn("AssignReplicasToDirsRequest from broker {} references unknown partition {}-{}", + brokerId, topicName, partitionIndex); partitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION; } else if (!Replicas.contains(partitionRegistration.replicas, brokerId)) { - log.warn("AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", brokerId, topicName, partitionIndex); + log.warn( + "AssignReplicasToDirsRequest from broker {} references non assigned partition {}-{}", + brokerId, topicName, partitionIndex); partitionError = Errors.NOT_LEADER_OR_FOLLOWER; } else { Optional partitionChangeRecord = new PartitionChangeBuilder( @@ -2330,8 +2373,7 @@ ControllerResult handleAssignReplicasToDirs(As partitionIndex, new LeaderAcceptor(clusterControl, partitionRegistration), featureControl.metadataVersionOrThrow(), - getTopicEffectiveMinIsr(topicName) - ) + getTopicEffectiveMinIsr(topicName)) .setDirectory(brokerId, dirId) .setDefaultDirProvider(clusterDescriber) .build(); @@ -2341,53 +2383,52 @@ ControllerResult handleAssignReplicasToDirs(As } if (log.isDebugEnabled()) { log.debug("Broker {} assigned partition {}:{} to {} dir {}", - brokerId, topics.get(topicId).name(), partitionIndex, - directoryIsOffline ? "OFFLINE" : "ONLINE", dirId); + brokerId, topics.get(topicId).name(), partitionIndex, + directoryIsOffline ? "OFFLINE" : "ONLINE", dirId); } } } - resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData(). - setPartitionIndex(partitionIndex). - setErrorCode(partitionError.code())); + resTopic.partitions().add(new AssignReplicasToDirsResponseData.PartitionData() + .setPartitionIndex(partitionIndex).setErrorCode(partitionError.code())); } resDir.topics().add(resTopic); } response.directories().add(resDir); } if (!leaderAndIsrUpdates.isEmpty()) { - generateLeaderAndIsrUpdates("offline-dir-assignment", brokerId, NO_LEADER, NO_LEADER, records, leaderAndIsrUpdates.iterator()); + generateLeaderAndIsrUpdates("offline-dir-assignment", brokerId, NO_LEADER, NO_LEADER, records, + leaderAndIsrUpdates.iterator()); } return ControllerResult.of(records, response); } private void listReassigningTopic(ListPartitionReassignmentsResponseData response, - Uuid topicId, - List partitionIds) { + Uuid topicId, + List partitionIds) { TopicControlInfo topicInfo = topics.get(topicId); - if (topicInfo == null) return; - OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment(). - setName(topicInfo.name); + if (topicInfo == null) + return; + OngoingTopicReassignment ongoingTopic = new OngoingTopicReassignment().setName(topicInfo.name); for (int partitionId : partitionIds) { - Optional ongoing = - getOngoingPartitionReassignment(topicInfo, partitionId); - ongoing.ifPresent(ongoingPartitionReassignment -> ongoingTopic.partitions().add(ongoingPartitionReassignment)); + Optional ongoing = getOngoingPartitionReassignment(topicInfo, partitionId); + ongoing.ifPresent( + ongoingPartitionReassignment -> ongoingTopic.partitions().add(ongoingPartitionReassignment)); } if (!ongoingTopic.partitions().isEmpty()) { response.topics().add(ongoingTopic); } } - private Optional - getOngoingPartitionReassignment(TopicControlInfo topicInfo, int partitionId) { + private Optional getOngoingPartitionReassignment(TopicControlInfo topicInfo, + int partitionId) { PartitionRegistration partition = topicInfo.parts.get(partitionId); if (partition == null || !isReassignmentInProgress(partition)) { return Optional.empty(); } - return Optional.of(new OngoingPartitionReassignment(). - setAddingReplicas(Replicas.toList(partition.addingReplicas)). - setRemovingReplicas(Replicas.toList(partition.removingReplicas)). - setPartitionIndex(partitionId). - setReplicas(Replicas.toList(partition.replicas))); + return Optional + .of(new OngoingPartitionReassignment().setAddingReplicas(Replicas.toList(partition.addingReplicas)) + .setRemovingReplicas(Replicas.toList(partition.removingReplicas)).setPartitionIndex(partitionId) + .setReplicas(Replicas.toList(partition.replicas))); } // Visible to test. @@ -2406,11 +2447,10 @@ int getTopicEffectiveMinIsr(String topicName) { * partitions assigned to them. */ private void updatePartitionDirectories( - Uuid topicId, - int partitionId, - Uuid[] previousDirectoryIds, - Uuid[] newDirectoryIds - ) { + Uuid topicId, + int partitionId, + Uuid[] previousDirectoryIds, + Uuid[] newDirectoryIds) { Objects.requireNonNull(topicId, "topicId cannot be null"); TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partitionId); if (previousDirectoryIds != null) { @@ -2430,7 +2470,7 @@ private void updatePartitionDirectories( for (Uuid dir : newDirectoryIds) { if (!DirectoryId.reserved(dir)) { Set partitions = directoriesToPartitions.computeIfAbsent(dir, - __ -> new TimelineHashSet<>(snapshotRegistry, 0)); + __ -> new TimelineHashSet<>(snapshotRegistry, 0)); partitions.add(topicIdPartition); } } @@ -2438,26 +2478,25 @@ private void updatePartitionDirectories( } private void updatePartitionInfo( - Uuid topicId, - Integer partitionId, - PartitionRegistration prevPartInfo, - PartitionRegistration newPartInfo - ) { + Uuid topicId, + Integer partitionId, + PartitionRegistration prevPartInfo, + PartitionRegistration newPartInfo) { HashSet validationSet = new HashSet<>(); Arrays.stream(newPartInfo.isr).forEach(validationSet::add); Arrays.stream(newPartInfo.elr).forEach(validationSet::add); if (validationSet.size() != newPartInfo.isr.length + newPartInfo.elr.length) { log.error("{}-{} has overlapping ISR={} and ELR={}", topics.get(topicId).name, partitionId, - Arrays.toString(newPartInfo.isr), Arrays.toString(newPartInfo.elr)); + Arrays.toString(newPartInfo.isr), Arrays.toString(newPartInfo.elr)); } brokersToIsrs.update(topicId, partitionId, prevPartInfo == null ? null : prevPartInfo.isr, - newPartInfo.isr, prevPartInfo == null ? NO_LEADER : prevPartInfo.leader, newPartInfo.leader); + newPartInfo.isr, prevPartInfo == null ? NO_LEADER : prevPartInfo.leader, newPartInfo.leader); brokersToElrs.update(topicId, partitionId, prevPartInfo == null ? null : prevPartInfo.elr, - newPartInfo.elr); + newPartInfo.elr); } private void validatePartitionReplicationFactorUnchanged(PartitionRegistration part, - ReassignablePartition target) { + ReassignablePartition target) { int currentReassignmentSetSize; if (isReassignmentInProgress(part)) { Set set = new HashSet<>(); @@ -2497,7 +2536,8 @@ private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistrati this(clusterControl, partition, clusterControl::isActive); } - private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition, IntPredicate isAcceptableLeader) { + private LeaderAcceptor(ClusterControlManager clusterControl, PartitionRegistration partition, + IntPredicate isAcceptableLeader) { this.clusterControl = clusterControl; this.partition = partition; this.isAcceptableLeader = isAcceptableLeader; diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java index 7024d0de3a23a..895ec0f3e3657 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java @@ -673,11 +673,14 @@ public void testCreateTopics() { setIsr(new int[] {1, 2, 0}).setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(0).setPartitionEpoch(0).build(), replicationControl.getPartition( ((TopicRecord) result3.records().get(0).message()).topicId(), 0)); + Uuid fooId = ((TopicRecord) result3.records().get(0).message()).topicId(); ControllerResult result4 = replicationControl.createTopics(requestContext, request, Set.of("foo")); CreateTopicsResponseData expectedResponse4 = new CreateTopicsResponseData(); - expectedResponse4.topics().add(new CreatableTopicResult().setName("foo"). - setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). + expectedResponse4.topics().add(new CreatableTopicResult() + .setName("foo") + .setTopicId(fooId) + .setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()). setErrorMessage("Topic 'foo' already exists.")); assertEquals(expectedResponse4, result4.response()); } diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py index c691cbc12b072..cd52b2029fccc 100644 --- a/tests/kafkatest/tests/produce_consume_validate.py +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -17,6 +17,9 @@ from ducktape.utils.util import wait_until from kafkatest.utils import validate_delivery +from kafkatest.services.verifiable_consumer import VerifiableConsumer +from kafkatest.services.verifiable_producer import VerifiableProducer + class ProduceConsumeValidateTest(Test): """This class provides a shared template for tests which follow the common pattern of: @@ -130,3 +133,69 @@ def check_lost_data(missing_records): self.mark_for_collect(s) assert succeeded, error_msg + + def test_simple_consumer_commit(self): + """ + Verify that a simple consumer commits offsets and does not re-consume + messages after a restart. + """ + + # Topic configuration + self.topic = "simple-consumer-commit-topic" + self.num_partitions = 1 + self.replication_factor = 1 + + self.kafka.create_topic( + self.topic, + num_partitions=self.num_partitions, + replication_factor=self.replication_factor + ) + + # Number of messages to produce + self.num_messages = 100 + + # Create producer + self.producer = VerifiableProducer( + self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=self.topic, + throughput=100, + message_validator=None + ) + + # Create consumer with manual commit (important!) + self.consumer = VerifiableConsumer( + self.test_context, + num_nodes=1, + kafka=self.kafka, + topic=self.topic, + group_id="simple-consumer-commit-group", + enable_autocommit=False + ) + # Run produce-consume, restart consumer in between + self.run_produce_consume_validate(self._restart_consumer) + + def _restart_consumer(self): + """ + Stop the consumer after offsets are committed and restart it + to ensure consumption resumes from committed offsets. + """ + # Ensure consumer has consumed some messages + wait_until( + lambda: len(self.consumer.messages_consumed[1]) > 0, + timeout_sec=30, + err_msg="Consumer did not consume messages before restart" + ) + + # Stop consumer (commit should have happened) + self.consumer.stop() + self.consumer.wait() + + # Restart consumer + self.consumer.start() + + + + +