forked from spirom/LearningSpark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFullMonitoring.scala
116 lines (83 loc) · 2.93 KB
/
FullMonitoring.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package streaming
import org.apache.spark.streaming.scheduler._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.language.postfixOps
// This very more complex listener monitors the full range of receiver behaviors.
// For a simple example, see Monitoring.scala.
private class FullListener
extends StreamingListener
{
private def showBatchInfo(action: String, info: BatchInfo) : Unit = {
println("=== " + action + " batch with " + info.numRecords + " records")
}
override def onBatchCompleted
(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
showBatchInfo("completed", batchCompleted.batchInfo)
}
override def onBatchStarted
(batchCompleted: StreamingListenerBatchStarted) = synchronized {
showBatchInfo("started", batchCompleted.batchInfo)
}
override def onBatchSubmitted
(batchCompleted: StreamingListenerBatchSubmitted) = synchronized {
showBatchInfo("submitted", batchCompleted.batchInfo)
}
override def onReceiverStarted
(receiverStarted: StreamingListenerReceiverStarted) = synchronized {
println("=== LISTENER: Stopped receiver " +
receiverStarted.receiverInfo.name+ "' on stream '" +
receiverStarted.receiverInfo.streamId + "'")
}
override def onReceiverStopped
(receiverStopped: StreamingListenerReceiverStopped) = synchronized {
println("=== LISTENER: Stopped receiver '" +
receiverStopped.receiverInfo.name + "' on stream '" +
receiverStopped.receiverInfo.streamId + "'"
)
}
}
object FullMonitoring {
def main (args: Array[String]) {
val conf =
new SparkConf().setAppName("MonitoringStreaming").setMaster("local[4]")
val sc = new SparkContext(conf)
// streams will produce data every second
val ssc = new StreamingContext(sc, Seconds(1))
// create the stream
val stream = ssc.receiverStream(new CustomReceiver)
val stream2 = ssc.receiverStream(new CustomReceiver)
// register a listener to monitor all the receivers
val listener = new FullListener
ssc.addStreamingListener(listener)
// register for data
stream.foreachRDD(r => {
println(r.count())
})
stream2.foreachRDD(r => {
println(r.count())
})
println("*** starting streaming")
ssc.start()
println("*** starting termination monitor")
new Thread("Streaming Termination Monitor") {
override def run() {
try {
ssc.awaitTermination()
} catch {
case e: Exception => {
println("*** streaming exception caught in monitor thread")
e.printStackTrace()
}
}
println("*** streaming terminated")
}
}.start()
println("*** started termination monitor")
Thread.sleep(10000)
println("*** stopping streaming")
ssc.stop()
// wait a bit longer for the call to awaitTermination() to return
Thread.sleep(5000)
}
}