Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -413,39 +413,61 @@ class ControllerApis(
getCreatableTopics.apply(allowedTopicNames)
}
val describableTopicNames = getDescribableTopics.apply(allowedTopicNames).asJava
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
while (iterator.hasNext) {
val creatableTopic = iterator.next()
if (duplicateTopicNames.contains(creatableTopic.name()) ||
!authorizedTopicNames.contains(creatableTopic.name())) {
iterator.remove()
}
}
controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response =>
duplicateTopicNames.forEach { name =>
response.topics().add(new CreatableTopicResult().
setName(name).
setErrorCode(INVALID_REQUEST.code).
setErrorMessage("Duplicate topic name."))
}
topicNames.forEach { name =>
if (name == Topic.CLUSTER_METADATA_TOPIC_NAME) {
response.topics().add(new CreatableTopicResult().
setName(name).
setErrorCode(INVALID_REQUEST.code).
setErrorMessage(s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited."))
} else if (!authorizedTopicNames.contains(name)) {
response.topics().add(new CreatableTopicResult().
setName(name).
setErrorCode(TOPIC_AUTHORIZATION_FAILED.code).
setErrorMessage("Authorization failed."))
}
}
response
}
// Create a map to collect validation errors upfront
val validationErrors = new util.ArrayList[CreatableTopicResult]()

// Check for duplicates
duplicateTopicNames.forEach { name =>
validationErrors.add(new CreatableTopicResult()
.setName(name)
.setErrorCode(INVALID_REQUEST.code)
.setErrorMessage("Duplicate topic name."))
}

// Check for cluster metadata topic
topicNames.forEach { name =>
if (name == Topic.CLUSTER_METADATA_TOPIC_NAME) {
validationErrors.add(new CreatableTopicResult()
.setName(name)
.setErrorCode(INVALID_REQUEST.code)
.setErrorMessage(
s"Creation of internal topic ${Topic.CLUSTER_METADATA_TOPIC_NAME} is prohibited."
))
}
}

// Check for authorization failures
topicNames.forEach { name =>
if (!authorizedTopicNames.contains(name) && name != Topic.CLUSTER_METADATA_TOPIC_NAME) {
validationErrors.add(new CreatableTopicResult()
.setName(name)
.setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
.setErrorMessage("Authorization failed."))
}
}

// Build effective request with only valid topics
val effectiveRequest = request.duplicate()
val iterator = effectiveRequest.topics().iterator()
while (iterator.hasNext) {
val creatableTopic = iterator.next()
if (duplicateTopicNames.contains(creatableTopic.name()) ||
creatableTopic.name() == Topic.CLUSTER_METADATA_TOPIC_NAME ||
!authorizedTopicNames.contains(creatableTopic.name())) {
iterator.remove()
}
}

controller.createTopics(context, effectiveRequest, describableTopicNames).thenApply { response =>

// Add pre-validation errors to response
validationErrors.forEach { error =>
response.topics().add(error)
}

response
}
}
def handleApiVersionsRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
// Note that controller returns its full list of supported ApiKeys and versions regardless of current
// authentication state (e.g., before SASL authentication on an SASL listener, do note that no
Expand Down Expand Up @@ -1102,4 +1124,4 @@ class ControllerApis(
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new UpdateRaftVoterResponse(response.asInstanceOf[UpdateRaftVoterResponseData]))
}
}
}
Loading