Skip to content
This repository has been archived by the owner on Oct 6, 2018. It is now read-only.

use play 2.3.8 #63

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions app/kafka/consumer/async/ZookeeperConsumerConnector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ import kafka.message.MessageAndMetadata
*
* Directories:
* 1. Consumer id registry:
* /consumers/[group_id]/ids[consumer_id] -> topic1,...topicN
* /consumers/[group_id]/ids/[consumer_id] -> topic1,...topicN
* (consumer_id is of the form hostname:uuid.)
* A consumer has a unique consumer id within a consumer group. A consumer registers its id as an ephemeral znode
* and puts all topics that it subscribes to as the value of the znode. The znode is deleted when the client is gone.
* A consumer subscribes to event changes of the consumer id registry within its group.
Expand Down Expand Up @@ -432,8 +433,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
}

private def rebalance(cluster: Cluster): Boolean = {
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient, true).getConsumerThreadIdsPerTopic
val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, true)
val brokers = getAllBrokersInCluster(zkClient)
if (brokers.size == 0) {
// This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
Expand Down Expand Up @@ -489,9 +490,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
for (i <- startPart until startPart + nParts) {
val partition = curPartitions(i)
info(consumerThreadId + " attempting to claim partition " + partition)
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic)
// record the partition ownership decision
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
partitionOwnershipDecision += ((topic, partition) -> consumerThreadId.toString())
}
}
}
Expand Down Expand Up @@ -581,7 +582,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,

private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo[_,_]]],
topicDirs: ZKGroupTopicDirs, partition: Int,
topic: String, consumerThreadId: String) {
topic: String) {
val partTopicInfoMap = currentTopicRegistry.get(topic)

val znode = topicDirs.consumerOffsetDir + "/" + partition
Expand Down Expand Up @@ -629,7 +630,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
topicPartitionChangeListenner = new ZKTopicPartitionChangeListener(loadBalancerListener, keyDecoder, valueDecoder)

// map of {topic -> Set(thread-1, thread-2, ...)}
val consumerThreadIdsPerTopic: Map[String, Set[String]] = topicCount.getConsumerThreadIdsPerTopic
val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = topicCount.getConsumerThreadIdsPerTopic

val topicThreadIds = consumerThreadIdsPerTopic.map {
case(topic, threadIds) => threadIds.map((topic, _))
Expand Down Expand Up @@ -660,9 +661,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
throw new RuntimeException("Each consumer connector can create " + "message streams by filter at most once.")

// bootstrap with existing topics
private var wildcardTopics = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath).filter(topicFilter.isTopicAllowed)
private var wildcardTopics = getChildrenParentMayNotExist(zkClient, BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_, true))

private val wildcardTopicCount = TopicCount.constructTopicCount(consumerIdString, topicFilter, numStreams, zkClient)
private val wildcardTopicCount = TopicCount.constructTopicCount(consumerIdString, topicFilter, numStreams, zkClient, true)

val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
Expand All @@ -678,7 +679,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
def handleTopicEvent(allTopics: Seq[String]) {
debug("Handling topic event")

val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed(_, true))

val addedTopics = updatedTopics filterNot (wildcardTopics contains)
if (addedTopics.nonEmpty)
Expand Down
22 changes: 17 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,29 @@ name := "kafka-web-console"

version := "2.1.0-SNAPSHOT"

resolvers ++= {
Seq(
"Local Maven Repository" at "file:///Users/cfchou/.m2/repository"
)
}

libraryDependencies ++= Seq(
jdbc,
cache,
"org.squeryl" % "squeryl_2.10" % "0.9.5-6",
"com.twitter" % "util-zk_2.10" % "6.11.0",
"com.twitter" % "finagle-core_2.10" % "6.15.0",
"com.twitter" % "util-zk_2.10" % "6.23.0",
"com.101tec" % "zkclient" % "0.3",
// "com.twitter" % "finagle-core_2.10" % "6.15.0",
"org.quartz-scheduler" % "quartz" % "2.2.1",
"org.apache.kafka" % "kafka_2.10" % "0.8.1.1"
"org.apache.kafka" % "kafka_2.10" % "0.8.2.1"
exclude("javax.jms", "jms")
exclude("com.sun.jdmk", "jmxtools")
exclude("com.sun.jmx", "jmxri")
exclude("com.sun.jmx", "jmxri"),
"com.github.okapies" % "finagle-kafka_2.10" % "0.1.5"
)

play.Project.playScalaSettings
lazy val root = (project in file(".")).enablePlugins(PlayScala)

includeFilter in (Assets, LessKeys.less) := "*.less"

excludeFilter in (Assets, LessKeys.less) := "_*.less"
Binary file removed lib/finagle-kafka_2.10-0.1.2-SNAPSHOT.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.0
sbt.version=0.13.5
4 changes: 3 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ logLevel := Level.Warn
resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"

// Use the Play sbt plugin for Play projects
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.2.1")
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.3.8")

addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.0.0")