Skip to content

Commit

Permalink
Merge pull request #299 from InterestingLab/rickyhuo.fea.es.input
Browse files Browse the repository at this point in the history
Add staic input of elasticsearch
  • Loading branch information
garyelephant authored Apr 24, 2019
2 parents fdbe313 + 629489f commit 1d916df
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/zh-cn/configuration/_sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [通用配置](/zh-cn/configuration/base)
- [Input插件](/zh-cn/configuration/input-plugin)
- [AmazonKinesis [商业版]](/zh-cn/configuration/input-plugins/AmazonKinesisStream)
- [ElasticSearch](/zh-cn/configuration/input-plugins/Elasticsearch)
- [FakeStream](/zh-cn/configuration/input-plugins/FakeStream)
- [File](/zh-cn/configuration/input-plugins/File)
- [FileStream](/zh-cn/configuration/input-plugins/FileStream)
Expand Down
70 changes: 70 additions & 0 deletions docs/zh-cn/configuration/input-plugins/Elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
## Input plugin : Elasticsearch [Static]

* Author: InterestingLab
* Homepage: https://interestinglab.github.io/waterdrop
* Version: 1.3.2

### Description

从 Elasticsearch 中读取数据

### Options

| name | type | required | default value |
| --- | --- | --- | --- |
| [hosts](#hosts-array) | array | yes | - |
| [index](#index-string) | string | yes | |
| [source_type](#source_type-string) | string | no | nested |
| [es](#es-string) | string | no | |
| [table_name](#table_name-string) | string | yes | - |

##### hosts [array]

ElasticSearch 集群地址,格式为host:port,允许指定多个host。如 \["host1:9200", "host2:9200"]


##### index [string]

ElasticSearch index名称,支持 `*` 模糊匹配


##### source_type [string]

针对 ElasticSearch 中 `_source`处理方式,目前支持 **string****nested****flatten**

* string: 作为字符串处理
* nested: 作为嵌套字段处理,可以通过 `_source.field` 获取对应的值
* flatten: 平铺处理,把所有顶级字段提取到表中作为单独一列处理

##### es.* [string]

用户还可以指定多个非必须参数,详细的参数列表见[Elasticsearch支持的参数](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html#cfg-mapping).

如指定 `es.read.metadata` 的方式是: `es.read.metadata = true`。如果不指定这些非必须参数,它们将使用官方文档给出的默认值。

##### table_name [string]

数据在 Spark 中注册的表名

### Examples

```
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-20190424"
table_name = "my_dataset"
}
```


```
elasticsearch {
hosts = ["localhost:9200"]
index = "waterdrop-*"
es.read.metadata = true
source_type = "flatten"
table_name = "my_dataset"
}
```

> 匹配所有以 `waterdrop-` 开头的索引, 并且对 `_source`中的内容JSON解析。
2 changes: 1 addition & 1 deletion waterdrop-core/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ libraryDependencies ++= Seq(
"commons-lang" % "commons-lang" % "2.6",
"io.thekraken" % "grok" % "0.1.5",
"mysql" % "mysql-connector-java" % "5.1.6",
"org.elasticsearch" % "elasticsearch-spark-20_2.11" % "6.6.1",
"org.elasticsearch" % "elasticsearch-spark-20_2.11" % "7.0.0",
"com.github.scopt" %% "scopt" % "3.7.0",
"org.apache.commons" % "commons-compress" % "1.15",
"com.pingcap.tispark" % "tispark-core" % "1.1"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
io.github.interestinglab.waterdrop.input.batch.Elasticsearch
io.github.interestinglab.waterdrop.input.batch.Fake
io.github.interestinglab.waterdrop.input.batch.File
io.github.interestinglab.waterdrop.input.batch.Hdfs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package io.github.interestinglab.waterdrop.input.batch

import io.github.interestinglab.waterdrop.apis.BaseStaticInput
import org.apache.spark.sql.functions._
import org.elasticsearch.spark._
import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils
import io.github.interestinglab.waterdrop.core.RowConstant
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.collection.JavaConversions._

class Elasticsearch extends BaseStaticInput {

var esCfg: Map[String, String] = Map()
val esPrefix = "es."
var config: Config = ConfigFactory.empty()

override def setConfig(config: Config): Unit = {
this.config = config
}

override def getConfig(): Config = {
this.config
}

override def prepare(spark: SparkSession): Unit = {

val defaultConfig = ConfigFactory.parseMap(
Map(
"query_string" -> "*",
"source_type" -> "nested"
)
)
config = config.withFallback(defaultConfig)

if (TypesafeConfigUtils.hasSubConfig(config, esPrefix)) {
val esConfig = TypesafeConfigUtils.extractSubConfig(config, esPrefix, false)
esConfig
.entrySet()
.foreach(entry => {
val key = entry.getKey
val value = String.valueOf(entry.getValue.unwrapped())
esCfg += (esPrefix + key -> value)
})
}

esCfg += ("es.nodes" -> config.getStringList("hosts").mkString(","))

println("[INFO] Input ElasticSearch Params:")
for (entry <- esCfg) {
val (key, value) = entry
println("[INFO] \t" + key + " = " + value)
}
}

override def checkConfig(): (Boolean, String) = {
config.hasPath("hosts") && config.hasPath("index") && config.getStringList("hosts").size() > 0 match {
case true => {
// val hosts = config.getStringList("hosts")
// TODO CHECK hosts
(true, "")
}
case false => (false, "please specify [hosts] as a non-empty string list")
}
}

override def getDataset(spark: SparkSession): Dataset[Row] = {

import spark.implicits._

val index = config.getString("index")
val queryString = config.getString("query_string")
val sourceType = config.getString("source_type")
val rdd = spark.sparkContext.esJsonRDD(index, "?q=" + queryString, esCfg)

val df = rdd
.toDF()
.withColumnRenamed("_1", "_id")
.withColumnRenamed("_2", "_source")

if (sourceType == "string") {
df
} else {
val jsonRDD = df.select("_source").as[String].rdd
val schema = spark.read.json(jsonRDD).schema

sourceType match {
case "nested" =>
df.withColumn(RowConstant.TMP, from_json(col("_source"), schema))
.drop("_source")
.withColumnRenamed(RowConstant.TMP, "_source")
case "flatten" => {
var tmpDf = df.withColumn(RowConstant.TMP, from_json(col("_source"), schema))
schema.map { field =>
tmpDf = tmpDf.withColumn(field.name, col(RowConstant.TMP)(field.name))
}
tmpDf.drop(RowConstant.TMP)
}
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class Elasticsearch extends BaseOutput {
})

esCfg += ("es.nodes" -> config.getStringList("hosts").mkString(","))

println("[INFO] Output ElasticSearch Params:")
for (entry <- esCfg) {
val (key, value) = entry
println("[INFO] \t" + key + " = " + value)
}
}

override def process(df: Dataset[Row]): Unit = {
Expand Down

0 comments on commit 1d916df

Please sign in to comment.