Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17616: Remove KafkaServer #18384

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 6 additions & 14 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka

import java.util.Properties
import joptsimple.OptionParser
import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server}
import kafka.server.{KafkaConfig, KafkaRaftServer, Server}
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.{Exit, Java, LoggingSignalHandler, OperatingSystem, Time, Utils}
Expand Down Expand Up @@ -63,18 +63,10 @@ object Kafka extends Logging {

private def buildServer(props: Properties): Server = {
val config = KafkaConfig.fromProps(props, doLog = false)
if (config.requiresZookeeper) {
new KafkaServer(
config,
Time.SYSTEM,
threadNamePrefix = None
)
} else {
new KafkaRaftServer(
config,
Time.SYSTEM,
)
}
new KafkaRaftServer(
config,
Time.SYSTEM,
)
}

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -105,7 +97,7 @@ object Kafka extends Logging {
try server.startup()
catch {
case e: Throwable =>
// KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
// KafkaBroker.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
fatal("Exiting Kafka due to fatal exception during startup.", e)
Exit.exit(1)
}
Expand Down
37 changes: 1 addition & 36 deletions core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,17 @@

package kafka.cluster

import java.util
import kafka.common.BrokerEndPointNotAvailableException
import kafka.server.KafkaConfig
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.feature.Features._
import org.apache.kafka.common.{ClusterResource, Endpoint, Node}
import org.apache.kafka.common.Node
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.metadata.{BrokerRegistration, VersionRange}
import org.apache.kafka.server.authorizer.AuthorizerServerInfo
import org.apache.kafka.server.network.BrokerEndPoint

import scala.collection.Seq
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters.RichOptional

object Broker {
private[kafka] case class ServerInfo(clusterResource: ClusterResource,
brokerId: Int,
endpoints: util.List[Endpoint],
interBrokerEndpoint: Endpoint,
earlyStartListeners: util.Set[String]) extends AuthorizerServerInfo

def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = {
new Broker(id, endPoints, rack, emptySupportedFeatures)
Expand All @@ -47,22 +36,6 @@ object Broker {
def apply(id: Int, endPoint: EndPoint, rack: Option[String]): Broker = {
new Broker(id, Seq(endPoint), rack, emptySupportedFeatures)
}

private def supportedFeatures(features: java.util.Map[String, VersionRange]): java.util
.Map[String, SupportedVersionRange] = {
features.asScala.map { case (name, range) =>
name -> new SupportedVersionRange(range.min(), range.max())
}.asJava
}

def fromBrokerRegistration(registration: BrokerRegistration): Broker = {
new Broker(
registration.id(),
registration.listeners().values().asScala.map(EndPoint.fromJava).toSeq,
registration.rack().toScala,
Features.supportedFeatures(supportedFeatures(registration.supportedFeatures()))
)
}
}

/**
Expand Down Expand Up @@ -111,12 +84,4 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], featu
endPointsMap.getOrElse(listenerName,
throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
}

def toServerInfo(clusterId: String, config: KafkaConfig): AuthorizerServerInfo = {
val clusterResource: ClusterResource = new ClusterResource(clusterId)
val interBrokerEndpoint: Endpoint = endPoint(config.interBrokerListenerName).toJava
val brokerEndpoints: util.List[Endpoint] = endPoints.toList.map(_.toJava).asJava
Broker.ServerInfo(clusterResource, id, brokerEndpoints, interBrokerEndpoint,
config.earlyStartListeners.map(_.value()).asJava)
}
}
27 changes: 0 additions & 27 deletions core/src/main/scala/kafka/common/GenerateBrokerIdException.scala

This file was deleted.

15 changes: 1 addition & 14 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class KafkaController(val config: KafkaConfig,

this.logIdent = s"[Controller id=${config.brokerId}] "

@volatile private var brokerInfo = initialBrokerInfo
private val brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch

private val isAlterPartitionEnabled = config.interBrokerProtocolVersion.isAlterPartitionSupported
Expand Down Expand Up @@ -245,15 +245,6 @@ class KafkaController(val config: KafkaConfig,
eventManager.put(controlledShutdownEvent)
}

private[kafka] def updateBrokerInfo(newBrokerInfo: BrokerInfo): Unit = {
this.brokerInfo = newBrokerInfo
zkClient.updateBrokerInfo(newBrokerInfo)
}

private[kafka] def enableDefaultUncleanLeaderElection(): Unit = {
eventManager.put(UncleanLeaderElectionEnable)
}

private[kafka] def enableTopicUncleanLeaderElection(topic: String): Unit = {
if (isActive) {
eventManager.put(TopicUncleanLeaderElectionEnable(topic))
Expand Down Expand Up @@ -2694,10 +2685,6 @@ class LogDirEventNotificationHandler(eventManager: ControllerEventManager) exten
override def handleChildChange(): Unit = eventManager.put(LogDirEventNotification)
}

object LogDirEventNotificationHandler {
val Version: Long = 1L
}

class PartitionModificationsHandler(eventManager: ControllerEventManager, topic: String) extends ZNodeChangeHandler {
override val path: String = TopicZNode.path(topic)

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,15 @@ class BrokerServer(
val fetchSessionCacheShards = (0 until NumFetchSessionCacheShards)
.map(shardNum => new FetchSessionCacheShard(
config.maxIncrementalFetchSessionCacheSlots / NumFetchSessionCacheShards,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS,
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS,
sessionIdRange,
shardNum
))
val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(fetchSessionCacheShards))

val shareFetchSessionCache : ShareSessionCache = new ShareSessionCache(
config.shareGroupConfig.shareGroupMaxGroups * config.groupCoordinatorConfig.shareGroupMaxSize,
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)
KafkaBroker.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)

sharePartitionManager = new SharePartitionManager(
replicaManager,
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -699,20 +699,11 @@ class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends Brok
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)

logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))

updateLogsConfig(newBrokerDefaults.asScala)

if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !originalUncleanLeaderElectionEnable) {
server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.enableDefaultUncleanLeaderElection()
case _ =>
}
}
}
}

Expand Down Expand Up @@ -1068,8 +1059,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
listenersToMap(newConfig.effectiveAdvertisedBrokerListeners))) {
verifyListenerRegistrationAlterationSupported()
server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
case _ => throw new RuntimeException("Unable to handle non-kafkaServer")
case _ => throw new RuntimeException("Unable to handle reconfigure")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we always throw this exception, we probably don't need to do verifyListenerRegistrationAlterationSupported before that and we may be able to delete that method if it's not used anywhere else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we probably should include more context: it looks like we don't allow dynamic reconfiguration of listener registrations.

@cmccabe @jsancio Is this documented as part of the zk to kraft migration?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG from ReconfigurableConfigs as well.

SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,

That can disallow users to configure advertised listeners dynamically.

Copy link
Contributor

@m1a2st m1a2st Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I open a Jira to check all dynamic config which can't dynamic configure in Kraft, and only find ADVERTISED_LISTENERS_CONFIG can't, If we addressed it on this PR, I will close Jira and #18390

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bunch of other ZooKeeper related logic in this class so I was planning to address them together in a follow up issue. Deleting KafkaServer enables us to start removing other classes that depend on this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bunch of other ZooKeeper related logic in this class so I was planning to address them together in a follow up issue. Deleting KafkaServer enables us to start removing other classes that depend on this one.

I'm ok to merge this and then address comments in the follow-up, because there are many cleanup blocked by KafkaServer

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with that, but let's file a JIRA so we don't lose track.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I care particularly about the error messages and such things since they are not easy to find afterwards.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened https://issues.apache.org/jira/browse/KAFKA-18405 with details about this issue.

}
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/server/KafkaBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ object KafkaBroker {
* you do change it, be sure to make it match that regex or the system tests will fail.
*/
val STARTED_MESSAGE = "Kafka Server started"

val MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS: Long = 120000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd that we're not following the Scala convention here. Since we're rewriting this stuff in Java anyway, maybe it's ok.

}

trait KafkaBroker extends Logging {
Expand Down
91 changes: 0 additions & 91 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,6 @@ object KafkaConfig {
Option(clientConfig.getProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName)))
}

private[kafka] def setZooKeeperClientProperty(clientConfig: ZKClientConfig, kafkaPropName: String, kafkaPropValue: Any): Unit = {
clientConfig.setProperty(ZkConfigs.ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP.get(kafkaPropName),
kafkaPropName match {
case ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG => (kafkaPropValue.toString.toUpperCase == "HTTPS").toString
case ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG | ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG => kafkaPropValue match {
case list: java.util.List[_] => list.asScala.mkString(",")
case _ => kafkaPropValue.toString
}
case _ => kafkaPropValue.toString
})
}

// For ZooKeeper TLS client authentication to be enabled the client must (at a minimum) configure itself as using TLS
// with both a client connection socket and a key store location explicitly set.
private[kafka] def zkTlsClientAuthEnabled(zkClientConfig: ZKClientConfig): Boolean = {
Expand Down Expand Up @@ -192,11 +180,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
this.currentConfig = newConfig
}

// The following captures any system properties impacting ZooKeeper TLS configuration
// and defines the default values this instance will use if no explicit config is given.
// We make it part of each instance rather than the object to facilitate testing.
private val zkClientConfigViaSystemProperties = new ZKClientConfig()

override def originals: util.Map[String, AnyRef] =
if (this eq currentConfig) super.originals else currentConfig.originals
override def values: util.Map[String, _] =
Expand Down Expand Up @@ -250,78 +233,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
private val _quotaConfig = new QuotaConfig(this)
def quotaConfig: QuotaConfig = _quotaConfig


private def zkBooleanConfigOrSystemPropertyWithDefaultValue(propKey: String): Boolean = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false (String) to true/false (Boolean)
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getBoolean(propKey) else {
val sysPropValue = KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
sysPropValue match {
case Some("true") => true
case Some(_) => false
case _ => getBoolean(propKey) // not specified so use the default value
}
}
}

private def zkStringConfigOrSystemPropertyWithDefaultValue(propKey: String): String = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
val actuallyProvided = originals.containsKey(propKey)
if (actuallyProvided) getString(propKey) else {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey) match {
case Some(v) => v
case _ => getString(propKey) // not specified so use the default value
}
}
}

private def zkOptionalStringConfigOrSystemProperty(propKey: String): Option[String] = {
Option(getString(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey)
}
}
private def zkPasswordConfigOrSystemProperty(propKey: String): Option[Password] = {
Option(getPassword(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map(new Password(_))
}
}
private def zkListConfigOrSystemProperty(propKey: String): Option[util.List[String]] = {
Option(getList(propKey)).orElse {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, propKey).map { sysProp =>
sysProp.split("\\s*,\\s*").toBuffer.asJava
}
}
}

val zkSslClientEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG)
val zkClientCnxnSocketClassName = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG)
val zkSslKeyStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG)
val zkSslKeyStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_PASSWORD_CONFIG)
val zkSslKeyStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_KEY_STORE_TYPE_CONFIG)
val zkSslTrustStoreLocation = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_LOCATION_CONFIG)
val zkSslTrustStorePassword = zkPasswordConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_PASSWORD_CONFIG)
val zkSslTrustStoreType = zkOptionalStringConfigOrSystemProperty(ZkConfigs.ZK_SSL_TRUST_STORE_TYPE_CONFIG)
val ZkSslProtocol = zkStringConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_PROTOCOL_CONFIG)
val ZkSslEnabledProtocols = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_ENABLED_PROTOCOLS_CONFIG)
val ZkSslCipherSuites = zkListConfigOrSystemProperty(ZkConfigs.ZK_SSL_CIPHER_SUITES_CONFIG)
val ZkSslEndpointIdentificationAlgorithm = {
// Use the system property if it exists and the Kafka config value was defaulted rather than actually provided
// Need to translate any system property value from true/false to HTTPS/<blank>
val kafkaProp = ZkConfigs.ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
val actuallyProvided = originals.containsKey(kafkaProp)
if (actuallyProvided)
getString(kafkaProp)
else {
KafkaConfig.zooKeeperClientProperty(zkClientConfigViaSystemProperties, kafkaProp) match {
case Some("true") => "HTTPS"
case Some(_) => ""
case None => getString(kafkaProp) // not specified so use the default value
}
}
}
val ZkSslCrlEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_CRL_ENABLE_CONFIG)
val ZkSslOcspEnable = zkBooleanConfigOrSystemPropertyWithDefaultValue(ZkConfigs.ZK_SSL_OCSP_ENABLE_CONFIG)
/** ********* General Configuration ***********/
val brokerIdGenerationEnable: Boolean = getBoolean(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG)
val maxReservedBrokerId: Int = getInt(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG)
Expand Down Expand Up @@ -526,8 +437,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}

/** ********* Controlled shutdown configuration ***********/
val controlledShutdownMaxRetries = getInt(ServerConfigs.CONTROLLED_SHUTDOWN_MAX_RETRIES_CONFIG)
val controlledShutdownRetryBackoffMs = getLong(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is it expected that this is not respected anymore? cc @mumrah @jsancio @cmccabe ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is the KRaft code using ServerConfigs.CONTROLLED_SHUTDOWN_MAX_RETRIES_CONFIG?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems kraft broker relies on the event queue sending BrokerHeartbeatRequest with wantShutDown=true to complete controlled shutdown. The event queue already have retry mechanism and backoff so maybe we don't need to have extra retries and backoff configs.

I have opened https://issues.apache.org/jira/browse/KAFKA-18417 to remove both configs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a note to the zk to kraft migration guide regarding this. Said the same in the JIRA ticket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a note to the zk to kraft migration guide regarding this. Said the same in the JIRA ticket.

Yes, I have added the note to KAFKA-18364

val controlledShutdownEnable = getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)

/** ********* Feature configuration ***********/
Expand Down
Loading
Loading