Skip to content

Commit

Permalink
Merge pull request #296 from InterestingLab/kid-xiong.docs.structured…
Browse files Browse the repository at this point in the history
…_streaming_use_case

Kid xiong.docs.structured streaming use case
  • Loading branch information
RickyHuo authored Apr 15, 2019
2 parents bca2de9 + 9420a07 commit f4b2cae
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 1 deletion.
Binary file added docs/images/wechat-qrcode/garyelephant.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/wechat-qrcode/kid-xiong.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/wechat-qrcode/rickyhuo.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs/zh-cn/case_study/4.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,5 @@ RickyHuo 微信: chodomatte1994



-- Power by [InterestingLab](https://github.com/InterestingLab)
-- Power by [InterestingLab](https://github.com/InterestingLab)

286 changes: 286 additions & 0 deletions docs/zh-cn/case_study/5.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
## Waterdrop最近支持的StructuredStreaming怎么用?

### 前言

StructuredStreaming是Spark 2.0以后新开放的一个模块,相比SparkStreaming,它有一些比较突出的优点:<br/> &emsp;&emsp;一、它能做到更低的延迟;<br/>
&emsp;&emsp;二、可以做实时的聚合,例如实时计算每天每个商品的销售总额;<br/>
&emsp;&emsp;三、可以做流与流之间的关联,例如计算广告的点击率,需要将广告的曝光记录和点击记录关联。<br/>
以上几点如果使用SparkStreaming来实现可能会比较麻烦或者说是很难实现,但是使用StructuredStreaming实现起来会比较轻松。
### 如何使用StructuredStreaming
可能你没有详细研究过StructuredStreaming,但是发现StructuredStreaming能很好的解决你的需求,如何快速利用StructuredStreaming来解决你的需求?目前社区有一款工具**Waterdrop**,项目地址:https://github.com/InterestingLab/waterdrop ,
可以高效低成本的帮助你利用StructuredStreaming来完成你的需求。

### waterdrop

Waterdrop是一个非常易用,高性能,能够应对海量数据的实时数据处理产品,它构建在Spark之上。Waterdrop拥有着非常丰富的插件,支持从Kafka、HDFS、Kudu中读取数据,进行各种各样的数据处理,并将结果写入ClickHouse、Elasticsearch或者Kafka中

### 准备工作

首先我们需要安装Waterdrop,安装十分简单,无需配置系统环境变量

1. 准备Spark环境
2. 安装Waterdrop
3. 配置Waterdrop
以下是简易步骤,具体安装可以参照[Quick Start](https://interestinglab.github.io/waterdrop/#/zh-cn/quick-start)

```
cd /usr/local
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar -xvf https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
wget https://github.com/InterestingLab/waterdrop/releases/download/v1.3.0/waterdrop-1.3.0.zip
unzip waterdrop-1.3.0.zip
cd waterdrop-1.3.0
vim config/waterdrop-env.sh
# 指定Spark安装路径
SPARK_HOME=${SPARK_HOME:-/usr/local/spark-2.2.0-bin-hadoop2.7}
```

### Waterdrop Pipeline

我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。

配置文件包括四个部分,分别是Spark、Input、filter和Output。

#### Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

```
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
```

#### Input

下面是一个从kafka读取数据的例子

```
kafkaStream {
topics = "waterdrop"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"name\":\"string\",\"age\":\"integer\",\"addrs\":{\"country\":\"string\",\"city\":\"string\"}}"
}
```

通过上面的配置就可以读取kafka里的数据了 ,topics是要订阅的kafka的topic,同时订阅多个topic可以以逗号隔开,consumer.bootstrap.servers就是Kafka的服务器列表,schema是可选项,因为StructuredStreaming从kafka读取到的值(官方固定字段value)是binary类型的,详见http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
但是如果你确定你kafka里的数据是json字符串的话,你可以指定schema,input插件将按照你指定的schema解析

#### Filter

下面是一个简单的filter例子

```
filter{
sql{
table_name = "student"
sql = "select name,age from student"
}
}
```
`table_name`是注册成的临时表名,以便于在下面的sql使用

#### Output

处理好的数据往外输出,假设我们的输出也是kafka

```
output{
kafka {
topic = "waterdrop"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}
```

`topic` 是你要输出的topic,` producer.bootstrap.servers`是kafka集群列表,`streaming_output_mode`是StructuredStreaming的一个输出模式参数,有三种类型`append|update|complete`,具体使用参见文档http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes

`checkpointLocation`是StructuredStreaming的checkpoint路径,如果配置了的话,这个目录会存储程序的运行信息,比如程序退出再启动的话会接着上次的offset进行消费。

### 场景分析

以上就是一个简单的例子,接下来我们就来介绍的稍微复杂一些的业务场景

#### 场景一:实时聚合场景

假设现在有一个商城,上面有10种商品,现在需要实时求每天每种商品的销售额,甚至是求每种商品的购买人数(不要求十分精确)。
这么做的巨大的优势就是海量数据可以在实时处理的时候,完成聚合,再也不需要先将数据写入数据仓库,再跑离线的定时任务进行聚合,
操作起来还是很方便的。

kafka的数据如下

```
{"good_id":"abc","price":300,"user_id":123456,"time":1553216320}
```

那我们该怎么利用waterdrop来完成这个需求呢,当然还是只需要配置就好了。

```
#spark里的配置根据业务需求配置
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
#配置input
input {
kafkaStream {
topics = "good_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"good_id\":\"string\",\"price\":\"integer\",\"user_id\":\"Long\",\"time\":\"Long\"}"
}
}
#配置filter
filter {
#在程序做聚合的时候,内部会去存储程序从启动开始的聚合状态,久而久之会导致OOM,如果设置了watermark,程序自动的会去清理watermark之外的状态
#这里表示使用ts字段设置watermark,界限为1天
Watermark {
time_field = "time"
time_type = "UNIX" #UNIX表示时间字段为10为的时间戳,还有其他的类型详细可以查看插件文档
time_pattern = "yyyy-MM-dd" #这里之所以要把ts对其到天是因为求每天的销售额,如果是求每小时的销售额可以对其到小时`yyyy-MM-dd HH`
delay_threshold = "1 day"
watermark_field = "ts" #设置watermark之后会新增一个字段,`ts`就是这个字段的名字
}
#之所以要group by ts是要让watermark生效,approx_count_distinct是一个估值,并不是精确的count_distinct
sql {
table_name = "good_table_2"
sql = "select good_id,sum(price) total, approx_count_distinct(user_id) person from good_table_2 group by ts,good_id"
}
}
#接下来我们选择将结果实时输出到Kafka
output{
kafka {
topic = "waterdrop"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "update"
checkpointLocation = "/your/path"
}
}
```
如上配置完成,启动waterdrop,就可以获取你想要的结果了。

#### 场景二:多个流关联场景

假设你在某个平台投放了广告,现在要实时计算出每个广告的CTR(点击率),数据分别来自两个topic,一个是广告曝光日志,一个是广告点击日志,
此时我们就需要把两个流数据关联到一起做计算,而Waterdrop 最近也支持了此功能,让我们一起看一下该怎么做:


点击topic数据格式

```
{"ad_id":"abc","click_time":1553216320,"user_id":12345}
```

曝光topic数据格式

```
{"ad_id":"abc","show_time":1553216220,"user_id":12345}
```

```
#spark里的配置根据业务需求配置
spark {
spark.app.name = "Waterdrop"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
#配置input
input {
kafkaStream {
topics = "click_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"click_time\":\"Long\"}"
table_name = "click_table"
}
kafkaStream {
topics = "show_topic"
consumer.bootstrap.servers = "localhost:9092"
schema = "{\"ad_id\":\"string\",\"user_id\":\"Long\",\"show_time\":\"Long\"}"
table_name = "show_table"
}
}
filter {
#左关联右表必须设置watermark
#右关左右表必须设置watermark
#http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#inner-joins-with-optional-watermarking
Watermark {
source_table_name = "click_table" #这里可以指定为某个临时表添加watermark,不指定的话就是为input中的第一个
time_field = "time"
time_type = "UNIX"
delay_threshold = "3 hours"
watermark_field = "ts"
result_table_name = "click_table_watermark" #添加完watermark之后可以注册成临时表,方便后续在sql中使用
}
Watermark {
source_table_name = "show_table"
time_field = "time"
time_type = "UNIX"
delay_threshold = "2 hours"
watermark_field = "ts"
result_table_name = "show_table_watermark"
}
sql {
table_name = "show_table_watermark"
sql = "select a.ad_id,count(b.user_id)/count(a.user_id) ctr from show_table_watermark as a left join click_table_watermark as b on a.ad_id = b.ad_id and a.user_id = b.user_id "
}
}
#接下来我们选择将结果实时输出到Kafka
output {
kafka {
topic = "waterdrop"
producer.bootstrap.servers = "localhost:9092"
streaming_output_mode = "append" #流关联只支持append模式
checkpointLocation = "/your/path"
}
}
```

通过配置,到这里流关联的案例也完成了。

### 结语
通过配置能很快的利用StructuredStreaming做实时数据处理,但是还是需要对StructuredStreaming的一些概念了解,比如其中的watermark机制,还有程序的输出模式。

最后,waterdrop当然还支持spark streaming和spark 批处理。
如果你对这两个也感兴趣的话,可以阅读我们以前发布的文章《[如何快速地将Hive中的数据导入ClickHouse](https://interestinglab.github.io/waterdrop/#/zh-cn/case_study/2)》、
[优秀的数据工程师,怎么用Spark在TiDB上做OLAP分析](https://interestinglab.github.io/waterdrop/#/zh-cn/case_study/4)》、
[如何使用Spark快速将数据写入Elasticsearch](https://interestinglab.github.io/waterdrop/#/zh-cn/case_study/3)

希望了解 Waterdrop 和 HBase, ClickHouse、Elasticsearch、Kafka、MySQL 等数据源结合使用的更多功能和案例,可以直接进入项目主页 [https://github.com/InterestingLab/waterdrop](https://github.com/InterestingLab/waterdrop)或者联系项目负责人:

> Garyelephan 微信: garyelephant
> kid-xiong 微信: why20131019
> 扫微信二维码添加项目负责人为好友,拉你入Waterdrop 开发者/用户交流群:
<table><tr>
<td><img src="../../images/wechat-qrcode/garyelephant.jpeg" border=0></td>
<td><img src="../../images/wechat-qrcode/kid-xiong.jpeg" border=0></td>
</tr></table>
1 change: 1 addition & 0 deletions docs/zh-cn/case_study/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [如何快速地将Hive中的数据导入ClickHouse](/zh-cn/case_study/2.md)
- [如何使用Spark快速将数据写入Elasticsearch](/zh-cn/case_study/3.md)
- [优秀的数据工程师,怎么用Spark在TiDB上做OLAP分析](/zh-cn/case_study/4.md)
- [新功能StructuredStreaming怎么用?](/zh-cn/case_study/5.md)

- [配置](/zh-cn/configuration/base)

Expand Down

0 comments on commit f4b2cae

Please sign in to comment.