Skip to content

Commit

Permalink
Merge pull request #225 from InterestingLab/garyelephant.api.input
Browse files Browse the repository at this point in the history
reimplemented BaseStreamingInput API
  • Loading branch information
RickyHuo authored Jan 26, 2019
2 parents fc2cb89 + e56c2fb commit 779161f
Show file tree
Hide file tree
Showing 22 changed files with 296 additions and 150 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name := "Waterdrop"
version := "1.1.5"
version := "1.2.0"
organization := "io.github.interestinglab.waterdrop"

scalaVersion := "2.11.8"
Expand Down
33 changes: 30 additions & 3 deletions docs/zh-cn/configuration/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,27 +59,54 @@ filter中的多个插件按配置顺序形成了数据处理的pipeline, 上一

一个示例如下:

> 配置中, 以`#`开头的行为注释。
```
spark {
# You can set spark configuration here
# Waterdrop defined streaming batch duration in seconds
spark.streaming.batchDuration = 5
# see available properties defined by spark: https://spark.apache.org/docs/latest/configuration.html#available-properties
spark.app.name = "Waterdrop"
spark.ui.port = 13000
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
socket {}
# This is a example input plugin **only for test and demonstrate the feature input plugin**
fakestream {
content = ["Hello World, InterestingLab"]
rate = 1
}
# If you would like to get more information about how to configure waterdrop and see full list of input plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
filter {
split {
fields = ["msg", "name"]
delimiter = ","
}
# If you would like to get more information about how to configure waterdrop and see full list of filter plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
output {
stdout {}
# If you would like to get more information about how to configure waterdrop and see full list of output plugins,
# please go to https://interestinglab.github.io/waterdrop/#/zh-cn/configuration/base
}
```
```

其他配置可参考:

[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/master/config/streaming.conf.template)

[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/master/config/batch.conf.template)
2 changes: 1 addition & 1 deletion docs/zh-cn/configuration/input-plugins/File.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

##### format [string]

文件的格式,目前支持`csv``json``parquet``xml``text`.
文件的格式,目前支持`csv``json``parquet``xml``orc``text`.


##### options.* [object]
Expand Down
23 changes: 23 additions & 0 deletions docs/zh-cn/configuration/input-plugins/FileStream.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,39 @@

| name | type | required | default value |
| --- | --- | --- | --- |
| [format](#format-string) | no | yes | text |
| [path](#path-string) | string | yes | - |
| [rowTag](#rowtag-string) | no | yes | - |


##### format [string]

文件格式


##### path [string]

文件目录路径


##### rowTag [string]

仅当format为xml时使用,表示XML格式数据的Tag

### Example

```
fileStream {
path = "file:///var/log/"
}
```

或者指定`format`

```
fileStream {
path = "file:///var/log/"
format = "xml"
rowTag = "book"
}
```
2 changes: 1 addition & 1 deletion docs/zh-cn/configuration/input-plugins/Hdfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

##### format [string]

从HDFS中读取文件的格式,目前支持`csv``json``parquet``xml``text`.
从HDFS中读取文件的格式,目前支持`csv``json``parquet``xml``orc``text`.


##### options [object]
Expand Down
29 changes: 26 additions & 3 deletions docs/zh-cn/configuration/input-plugins/HdfsStream.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,47 @@

| name | type | required | default value |
| --- | --- | --- | --- |
| [format](#format-string) | no | yes | text |
| [path](#path-string) | string | yes | - |
| [rowTag](#rowtag-string) | no | yes | - |


##### format [string]

文件格式


##### path [string]

Hadoop集群上文件路径
文件目录路径


##### rowTag [string]

仅当format为xml时使用,表示XML格式数据的Tag

### Example

```
hdfsStream {
path = "hdfs:///access.log"
path = "hdfs:///access/log/"
}
```

或者可以指定 hdfs name service:

```
hdfsStream {
path = "hdfs://m2:8022/access.log"
path = "hdfs://m2:8022/access/log/"
}
```

或者指定`format`

```
hdfsStream {
path = "hdfs://m2:8022/access/log/"
format = "xml"
rowTag = "book"
}
```
2 changes: 1 addition & 1 deletion docs/zh-cn/configuration/output-plugins/File.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

##### serializer [string]

序列化方法,当前支持csv、json、parquet和text
序列化方法,当前支持csv、json、parquet、orc和text


### Example
Expand Down
2 changes: 1 addition & 1 deletion docs/zh-cn/configuration/output-plugins/Hdfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Hadoop集群文件路径,以hdfs://开头

##### serializer [string]

序列化方法,当前支持csv、json、parquet和text
序列化方法,当前支持csv、json、parquet、orc和text


### Example
Expand Down
9 changes: 9 additions & 0 deletions docs/zh-cn/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,12 @@ Waterdrop日志打印出:

Waterdrop简单易用,还有更丰富的数据处理功能等待被发现。本文展示的数据处理案例,
无需任何代码、编译、打包,比官方的[Quick Example](https://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example)更简单。


---

如果想了解更多的Waterdrop配置示例可参见:

[配置示例1 : Streaming 流式计算](https://github.com/InterestingLab/waterdrop/blob/master/config/streaming.conf.template)

[配置示例2 : Batch 离线批处理](https://github.com/InterestingLab/waterdrop/blob/master/config/batch.conf.template)
2 changes: 1 addition & 1 deletion waterdrop-apis/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name := "Waterdrop-apis"
version := "1.1.0"
version := "1.2.0"
organization := "io.github.interestinglab.waterdrop"

scalaVersion := "2.11.8"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package io.github.interestinglab.waterdrop.apis

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.DStream

/**
* Superclass of all streaming input, extends this abstract class to implement a streaming input.
* */
abstract class BaseStreamingInput extends Plugin {

/**
* No matter what kind of Input it is, all you have to do is create a DStream to be used latter
* */
def getDStream(ssc: StreamingContext): DStream[(String, String)]
abstract class BaseStreamingInput[T] extends Plugin {

/**
* Things to do after filter and before output
Expand All @@ -23,4 +20,24 @@ abstract class BaseStreamingInput extends Plugin {
* */
def afterOutput: Unit = {}

/**
* This must be implemented to convert RDD[T] to Dataset[Row] for later processing
* */
def rdd2dataset(spark: SparkSession, rdd: RDD[T]): Dataset[Row]

/**
* start should be invoked in when data is ready.
* */
def start(spark: SparkSession, ssc: StreamingContext, handler: Dataset[Row] => Unit): Unit = {

getDStream(ssc).foreachRDD(rdd => {
val dataset = rdd2dataset(spark, rdd)
handler(dataset)
})
}

/**
* Create spark dstream from data source, you can specify type parameter.
* */
def getDStream(ssc: StreamingContext): DStream[T]
}
2 changes: 1 addition & 1 deletion waterdrop-core/build.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name := "Waterdrop-core"
version := "1.1.3"
version := "1.2.0"
organization := "io.github.interestinglab.waterdrop"

scalaVersion := "2.11.8"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ object Waterdrop extends Logging {
private def process(
configBuilder: ConfigBuilder,
staticInputs: List[BaseStaticInput],
streamingInputs: List[BaseStreamingInput],
streamingInputs: List[BaseStreamingInput[Any]],
filters: List[BaseFilter],
outputs: List[BaseOutput]): Unit = {

Expand Down Expand Up @@ -194,7 +194,7 @@ object Waterdrop extends Logging {
sparkSession: SparkSession,
configBuilder: ConfigBuilder,
staticInputs: List[BaseStaticInput],
streamingInputs: List[BaseStreamingInput],
streamingInputs: List[BaseStreamingInput[Any]],
filters: List[BaseFilter],
outputs: List[BaseOutput]): Unit = {

Expand Down Expand Up @@ -236,54 +236,30 @@ object Waterdrop extends Logging {
// when you see this ASCII logo, waterdrop is really started.
showWaterdropAsciiLogo()

val dstreamList = streamingInputs.map(p => {
p.getDStream(ssc)
})

val unionedDStream = dstreamList.reduce((d1, d2) => {
d1.union(d2)
})
streamingInputs(0).start(
sparkSession,
ssc,
dataset => {

val dStream = unionedDStream.mapPartitions { partitions =>
val strIterator = partitions.map(r => r._2)
val strList = strIterator.toList
strList.iterator
}
var ds = dataset

dStream.foreachRDD { strRDD =>
val rowsRDD = strRDD.mapPartitions { partitions =>
val row = partitions.map(Row(_))
val rows = row.toList
rows.iterator
}
// Ignore empty schema dataset
for (f <- filters) {
if (ds.take(1).length > 0) {
ds = f.process(sparkSession, ds)
}
}

// For implicit conversions like converting RDDs to DataFrames
import sparkSession.implicits._
streamingInputs(0).beforeOutput

val schema = StructType(Array(StructField("raw_message", StringType)))
val encoder = RowEncoder(schema)
var ds = sparkSession.createDataset(rowsRDD)(encoder)
outputs.foreach(p => {
p.process(ds)
})

// Ignore empty schema dataset
streamingInputs(0).afterOutput

for (f <- filters) {
if (ds.take(1).length > 0) {
ds = f.process(sparkSession, ds)
}
}

streamingInputs.foreach(p => {
p.beforeOutput
})

outputs.foreach(p => {
p.process(ds)
})

streamingInputs.foreach(p => {
p.afterOutput
})
}
)

ssc.start()
ssc.awaitTermination()
Expand All @@ -301,7 +277,6 @@ object Waterdrop extends Logging {

basePrepare(sparkSession, staticInputs, filters, outputs)


// let static input register as table for later use if needed
var datasetMap = Map[String, Dataset[Row]]()
for (input <- staticInputs) {
Expand Down Expand Up @@ -353,7 +328,7 @@ object Waterdrop extends Logging {
private def basePrepare(
sparkSession: SparkSession,
staticInputs: List[BaseStaticInput],
streamingInputs: List[BaseStreamingInput],
streamingInputs: List[BaseStreamingInput[Any]],
filters: List[BaseFilter],
outputs: List[BaseOutput]): Unit = {
for (i <- streamingInputs) {
Expand Down
Loading

0 comments on commit 779161f

Please sign in to comment.