Skip to content

Commit

Permalink
KAFKA-18432 Remove unused code from AutoTopicCreationManager (#18438)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
LoganZhuZzz authored Jan 8, 2025
1 parent 4b1b67e commit 5efaae6
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 84 deletions.
85 changes: 3 additions & 82 deletions core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
package kafka.server

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Properties}
import kafka.controller.KafkaController
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.utils.Logging
import org.apache.kafka.clients.ClientResponse
Expand All @@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection}
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, RequestHeader}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager}
Expand All @@ -49,34 +47,13 @@ trait AutoTopicCreationManager {
): Seq[MetadataResponseTopic]
}

object AutoTopicCreationManager {

def apply(
config: KafkaConfig,
channelManager: Option[NodeToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
shareCoordinator: Option[ShareCoordinator],
): AutoTopicCreationManager = {
new DefaultAutoTopicCreationManager(config, channelManager, adminManager,
controller, groupCoordinator, txnCoordinator, shareCoordinator)
}
}

class DefaultAutoTopicCreationManager(
config: KafkaConfig,
channelManager: Option[NodeToControllerChannelManager],
adminManager: Option[ZkAdminManager],
controller: Option[KafkaController],
channelManager: NodeToControllerChannelManager,
groupCoordinator: GroupCoordinator,
txnCoordinator: TransactionCoordinator,
shareCoordinator: Option[ShareCoordinator]
) extends AutoTopicCreationManager with Logging {
if (controller.isEmpty && channelManager.isEmpty) {
throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller")
}

private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())

Expand All @@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager(

val creatableTopicResponses = if (creatableTopics.isEmpty) {
Seq.empty
} else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) {
sendCreateTopicRequest(creatableTopics, metadataRequestContext)
} else {
createTopicsInZk(creatableTopics, controllerMutationQuota)
sendCreateTopicRequest(creatableTopics, metadataRequestContext)
}

uncreatableTopicResponses ++ creatableTopicResponses
}

private def createTopicsInZk(
creatableTopics: Map[String, CreatableTopic],
controllerMutationQuota: ControllerMutationQuota
): Seq[MetadataResponseTopic] = {
val topicErrors = new AtomicReference[Map[String, ApiError]]()
try {
// Note that we use timeout = 0 since we do not need to wait for metadata propagation
// and we want to get the response error immediately.
adminManager.get.createTopics(
timeout = 0,
validateOnly = false,
creatableTopics,
Map.empty,
controllerMutationQuota,
topicErrors.set
)

val creatableTopicResponses = Option(topicErrors.get) match {
case Some(errors) =>
errors.toSeq.map { case (topic, apiError) =>
val error = apiError.error match {
case Errors.TOPIC_ALREADY_EXISTS | Errors.REQUEST_TIMED_OUT =>
// The timeout error is expected because we set timeout=0. This
// nevertheless indicates that the topic metadata was created
// successfully, so we return LEADER_NOT_AVAILABLE.
Errors.LEADER_NOT_AVAILABLE
case error => error
}

new MetadataResponseTopic()
.setErrorCode(error.code)
.setName(topic)
.setIsInternal(Topic.isInternal(topic))
}

case None =>
creatableTopics.keySet.toSeq.map { topic =>
new MetadataResponseTopic()
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
.setName(topic)
.setIsInternal(Topic.isInternal(topic))
}
}

creatableTopicResponses
} finally {
clearInflightRequests(creatableTopics)
}
}

private def sendCreateTopicRequest(
creatableTopics: Map[String, CreatableTopic],
metadataRequestContext: Option[RequestContext]
Expand Down Expand Up @@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager(
}
}

val channelManager = this.channelManager.getOrElse {
throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.")
}

val request = metadataRequestContext.map { context =>
val requestVersion =
channelManager.controllerApiVersions.toScala match {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ class BrokerServer(
producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM)

autoTopicCreationManager = new DefaultAutoTopicCreationManager(
config, Some(clientToControllerChannelManager), None, None,
groupCoordinator, transactionCoordinator, shareCoordinator)
config, clientToControllerChannelManager, groupCoordinator,
transactionCoordinator, shareCoordinator)

dynamicConfigHandlers = Map[String, ConfigHandler](
ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers, None),
Expand Down

0 comments on commit 5efaae6

Please sign in to comment.