diff --git a/docs/zh-cn/configuration/output-plugins/Clickhouse.md b/docs/zh-cn/configuration/output-plugins/Clickhouse.md index 7b05e97db2c..2443a3bc597 100644 --- a/docs/zh-cn/configuration/output-plugins/Clickhouse.md +++ b/docs/zh-cn/configuration/output-plugins/Clickhouse.md @@ -18,6 +18,8 @@ | [fields](#fields-array) | array | yes |-| | [host](#host-string) | string | yes |-| | [password](#password-string) | string | no |-| +| [retry](#retry-number) | number| no |1| +| [retry_codes](#password-array) | array | no |[ ]| | [table](#table-string) | string | yes |-| | [username](#username-string) | string | no |-| @@ -28,27 +30,37 @@ ##### database [string] -Clickhouse database +ClickHouse database ##### fields [array] -需要输出到Clickhouose的数据字段。 +需要输出到ClickHouse的数据字段。 ##### host [string] -Clickhouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。 +ClickHouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。 ##### password [string] -Clickhouse用户密码,仅当Clickhouse中开启权限时需要此字段。 +ClickHouse用户密码,仅当ClickHouse中开启权限时需要此字段。 + +#### retry [number] + +重试次数,默认为1次 + +##### retry_codes [array] + +出现异常时,会重试操作的ClickHouse异常错误码。详细错误码列表参考 [ClickHouseErrorCode](https://github.com/yandex/clickhouse-jdbc/blob/master/src/main/java/ru/yandex/clickhouse/except/ClickHouseErrorCode.java) + +如果多次重试都失败,将会丢弃这个批次的数据,慎用!! ##### table [string] -Clickhouse 表名 +ClickHouse 表名 ##### username [string] -Clickhouse用户用户名,仅当Clickhouse中开启权限时需要此字段 +ClickHouse用户用户名,仅当ClickHouse中开启权限时需要此字段 ##### clickhouse [string] @@ -94,3 +106,18 @@ clickhouse { } ``` +``` +ClickHouse { + host = "localhost:8123" + database = "nginx" + table = "access_msg" + fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"] + username = "username" + password = "password" + bulk_size = 20000 + retry_codes = [209, 210] + retry = 3 +} +``` + +> 当出现网络超时或者网络异常的情况下,重试写入3次 \ No newline at end of file diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index f2e83e8fdb5..55263eacdb8 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -8,12 +8,14 @@ import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseOutput import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils import org.apache.spark.sql.{Dataset, Row, SparkSession} +import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException} import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement} import scala.collection.JavaConversions._ import scala.collection.immutable.HashMap import scala.collection.mutable.WrappedArray import scala.util.matching.Regex +import scala.util.{Failure, Success, Try} class Clickhouse extends BaseOutput { @@ -22,6 +24,7 @@ class Clickhouse extends BaseOutput { var initSQL: String = _ var table: String = _ var fields: java.util.List[String] = _ + var retryCodes: java.util.List[Integer] = _ var config: Config = ConfigFactory.empty() val clickhousePrefix = "clickhouse." val properties: Properties = new Properties() @@ -100,16 +103,21 @@ class Clickhouse extends BaseOutput { val defaultConfig = ConfigFactory.parseMap( Map( - "bulk_size" -> 20000 + "bulk_size" -> 20000, + // "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code), + "retry_codes" -> util.Arrays.asList(), + "retry" -> 1 ) ) config = config.withFallback(defaultConfig) + retryCodes = config.getIntList("retry_codes") super.prepare(spark) } override def process(df: Dataset[Row]): Unit = { val dfFields = df.schema.fieldNames val bulkSize = config.getInt("bulk_size") + val retry = config.getInt("retry") df.foreachPartition { iter => val executorBalanced = new BalancedClickhouseDataSource(this.jdbcLink, this.properties) val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl] @@ -122,12 +130,38 @@ class Clickhouse extends BaseOutput { statement.addBatch() if (length >= bulkSize) { - statement.executeBatch() + execute(statement, retry) length = 0 } } - statement.executeBatch() + execute(statement, retry) + } + } + + private def execute(statement: ClickHousePreparedStatement, retry: Int): Unit = { + val res = Try(statement.executeBatch()) + res match { + case Success(_) => logInfo("Insert into ClickHouse succeed") + case Failure(e: ClickHouseException) => { + val errorCode = e.getErrorCode + if (retryCodes.contains(errorCode)) { + logError("Insert into ClickHouse failed. Reason: ", e) + if (retry > 0) { + execute(statement, retry - 1) + } else { + logError("Insert into ClickHouse failed and retry failed, drop this bulk.") + } + } else { + throw e + } + } + case Failure(e: ClickHouseUnknownException) => { + throw e + } + case Failure(e: Exception) => { + throw e + } } }