Skip to content

Commit

Permalink
Merge pull request #302 from InterestingLab/rickyhuo.enhance.clickhou…
Browse files Browse the repository at this point in the history
…se.retry

Retry when insert into clickhouse failed
  • Loading branch information
garyelephant authored Apr 27, 2019
2 parents 1d916df + c3e0f2a commit 3f23dc6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 9 deletions.
39 changes: 33 additions & 6 deletions docs/zh-cn/configuration/output-plugins/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
| [fields](#fields-array) | array | yes |-|
| [host](#host-string) | string | yes |-|
| [password](#password-string) | string | no |-|
| [retry](#retry-number) | number| no |1|
| [retry_codes](#password-array) | array | no |[ ]|
| [table](#table-string) | string | yes |-|
| [username](#username-string) | string | no |-|

Expand All @@ -28,27 +30,37 @@

##### database [string]

Clickhouse database
ClickHouse database

##### fields [array]

需要输出到Clickhouose的数据字段
需要输出到ClickHouse的数据字段

##### host [string]

Clickhouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。
ClickHouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。

##### password [string]

Clickhouse用户密码,仅当Clickhouse中开启权限时需要此字段。
ClickHouse用户密码,仅当ClickHouse中开启权限时需要此字段。

#### retry [number]

重试次数,默认为1次

##### retry_codes [array]

出现异常时,会重试操作的ClickHouse异常错误码。详细错误码列表参考 [ClickHouseErrorCode](https://github.com/yandex/clickhouse-jdbc/blob/master/src/main/java/ru/yandex/clickhouse/except/ClickHouseErrorCode.java)

如果多次重试都失败,将会丢弃这个批次的数据,慎用!!

##### table [string]

Clickhouse 表名
ClickHouse 表名

##### username [string]

Clickhouse用户用户名,仅当Clickhouse中开启权限时需要此字段
ClickHouse用户用户名,仅当ClickHouse中开启权限时需要此字段

##### clickhouse [string]

Expand Down Expand Up @@ -94,3 +106,18 @@ clickhouse {
}
```

```
ClickHouse {
host = "localhost:8123"
database = "nginx"
table = "access_msg"
fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"]
username = "username"
password = "password"
bulk_size = 20000
retry_codes = [209, 210]
retry = 3
}
```

> 当出现网络超时或者网络异常的情况下,重试写入3次
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import com.typesafe.config.{Config, ConfigFactory}
import io.github.interestinglab.waterdrop.apis.BaseOutput
import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException}
import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement}

import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
import scala.collection.mutable.WrappedArray
import scala.util.matching.Regex
import scala.util.{Failure, Success, Try}

class Clickhouse extends BaseOutput {

Expand All @@ -22,6 +24,7 @@ class Clickhouse extends BaseOutput {
var initSQL: String = _
var table: String = _
var fields: java.util.List[String] = _
var retryCodes: java.util.List[Integer] = _
var config: Config = ConfigFactory.empty()
val clickhousePrefix = "clickhouse."
val properties: Properties = new Properties()
Expand Down Expand Up @@ -100,16 +103,21 @@ class Clickhouse extends BaseOutput {

val defaultConfig = ConfigFactory.parseMap(
Map(
"bulk_size" -> 20000
"bulk_size" -> 20000,
// "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),
"retry_codes" -> util.Arrays.asList(),
"retry" -> 1
)
)
config = config.withFallback(defaultConfig)
retryCodes = config.getIntList("retry_codes")
super.prepare(spark)
}

override def process(df: Dataset[Row]): Unit = {
val dfFields = df.schema.fieldNames
val bulkSize = config.getInt("bulk_size")
val retry = config.getInt("retry")
df.foreachPartition { iter =>
val executorBalanced = new BalancedClickhouseDataSource(this.jdbcLink, this.properties)
val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
Expand All @@ -122,12 +130,38 @@ class Clickhouse extends BaseOutput {
statement.addBatch()

if (length >= bulkSize) {
statement.executeBatch()
execute(statement, retry)
length = 0
}
}

statement.executeBatch()
execute(statement, retry)
}
}

private def execute(statement: ClickHousePreparedStatement, retry: Int): Unit = {
val res = Try(statement.executeBatch())
res match {
case Success(_) => logInfo("Insert into ClickHouse succeed")
case Failure(e: ClickHouseException) => {
val errorCode = e.getErrorCode
if (retryCodes.contains(errorCode)) {
logError("Insert into ClickHouse failed. Reason: ", e)
if (retry > 0) {
execute(statement, retry - 1)
} else {
logError("Insert into ClickHouse failed and retry failed, drop this bulk.")
}
} else {
throw e
}
}
case Failure(e: ClickHouseUnknownException) => {
throw e
}
case Failure(e: Exception) => {
throw e
}
}
}

Expand Down

0 comments on commit 3f23dc6

Please sign in to comment.