From b4ec1fd94e02a5502c9a165ebd4f33c5a92878b3 Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Tue, 23 Apr 2019 22:27:00 +0800 Subject: [PATCH 1/9] Add param of retry --- .../waterdrop/output/batch/Clickhouse.scala | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index f2e83e8fdb5..533435564e0 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -6,8 +6,9 @@ import java.util.Properties import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseOutput -import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils +import io.github.interestinglab.waterdrop.config.{ConfigRuntimeException, TypesafeConfigUtils} import org.apache.spark.sql.{Dataset, Row, SparkSession} +import ru.yandex.clickhouse.except.ClickHouseException import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement} import scala.collection.JavaConversions._ @@ -100,7 +101,8 @@ class Clickhouse extends BaseOutput { val defaultConfig = ConfigFactory.parseMap( Map( - "bulk_size" -> 20000 + "bulk_size" -> 20000, + "retry" -> 1 ) ) config = config.withFallback(defaultConfig) @@ -110,6 +112,7 @@ class Clickhouse extends BaseOutput { override def process(df: Dataset[Row]): Unit = { val dfFields = df.schema.fieldNames val bulkSize = config.getInt("bulk_size") + val retry = config.getInt("retry") df.foreachPartition { iter => val executorBalanced = new BalancedClickhouseDataSource(this.jdbcLink, this.properties) val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl] @@ -122,12 +125,31 @@ class Clickhouse extends BaseOutput { statement.addBatch() if (length >= bulkSize) { - statement.executeBatch() + execute(statement, retry) length = 0 } } + execute(statement, retry) + } + } + + private def execute(statement: ClickHousePreparedStatement, retry: Int): Unit = { + try { statement.executeBatch() + logInfo("Insert into clickhouse succeed") + } catch { + case e: ClickHouseException => { + logError(e.getMessage) + } + case e: Throwable => { + logError("Insert into clickhouse failed. Reason: ", e) + if (retry > 0) { + execute(statement, retry - 1) + } else { + logError("Insert into clickhouse and retry failed") + } + } } } From 27674802da371f18f6c62152573ba1b041e3daf0 Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Wed, 24 Apr 2019 23:11:21 +0800 Subject: [PATCH 2/9] Update clickhouse retry func --- .../waterdrop/output/batch/Clickhouse.scala | 36 ++++++++++++++----- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index 533435564e0..b7716014282 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -4,11 +4,13 @@ import java.text.SimpleDateFormat import java.util import java.util.Properties +import com.google.common.base.Throwables import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseOutput -import io.github.interestinglab.waterdrop.config.{ConfigRuntimeException, TypesafeConfigUtils} +import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils import org.apache.spark.sql.{Dataset, Row, SparkSession} import ru.yandex.clickhouse.except.ClickHouseException +import ru.yandex.clickhouse.except.ClickHouseErrorCode import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement} import scala.collection.JavaConversions._ @@ -140,14 +142,19 @@ class Clickhouse extends BaseOutput { logInfo("Insert into clickhouse succeed") } catch { case e: ClickHouseException => { - logError(e.getMessage) - } - case e: Throwable => { - logError("Insert into clickhouse failed. Reason: ", e) - if (retry > 0) { - execute(statement, retry - 1) - } else { - logError("Insert into clickhouse and retry failed") + val exception = Clickhouse.getClickhouseException(e) + val errorCode = exception.getErrorCode + + errorCode match { + case ClickHouseErrorCode.NETWORK_ERROR.code.toInt | ClickHouseErrorCode.TIMEOUT_EXCEEDED.code.toInt | + ClickHouseErrorCode.SOCKET_TIMEOUT.code.toInt => { + logError("Insert into clickhouse failed. Reason: ", e) + if (retry > 0) { + execute(statement, retry - 1) + } else { + logError("Insert into clickhouse and retry failed") + } + } } } } @@ -315,4 +322,15 @@ object Clickhouse { "" } } + + private[waterdrop] def getClickhouseException(e: Exception): ClickHouseException = { + val causalChain = Throwables.getCausalChain(e) + import scala.collection.JavaConversions._ + for (throwable <- causalChain) { + throwable match { + case ClickHouseException => throwable.asInstanceOf[ClickHouseException] + } + } + throw new IllegalArgumentException("no ClickHouseException found") + } } From cbca682adfee06d449aa76f270a4d31172b66cc3 Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Thu, 25 Apr 2019 18:40:02 +0800 Subject: [PATCH 3/9] Fix coding style --- .../waterdrop/output/batch/Clickhouse.scala | 23 ++++--------------- .../waterdrop/filter/caseTest.scala | 14 +++++++++++ 2 files changed, 19 insertions(+), 18 deletions(-) create mode 100644 waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index b7716014282..cc6c93d76cc 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -4,13 +4,11 @@ import java.text.SimpleDateFormat import java.util import java.util.Properties -import com.google.common.base.Throwables import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseOutput import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils import org.apache.spark.sql.{Dataset, Row, SparkSession} -import ru.yandex.clickhouse.except.ClickHouseException -import ru.yandex.clickhouse.except.ClickHouseErrorCode +import ru.yandex.clickhouse.except.{ClickHouseErrorCode, ClickHouseException} import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement} import scala.collection.JavaConversions._ @@ -142,12 +140,12 @@ class Clickhouse extends BaseOutput { logInfo("Insert into clickhouse succeed") } catch { case e: ClickHouseException => { - val exception = Clickhouse.getClickhouseException(e) - val errorCode = exception.getErrorCode + val errorCode = new ClickHouseErrorCode(e.getErrorCode) errorCode match { - case ClickHouseErrorCode.NETWORK_ERROR.code.toInt | ClickHouseErrorCode.TIMEOUT_EXCEEDED.code.toInt | - ClickHouseErrorCode.SOCKET_TIMEOUT.code.toInt => { + case ClickHouseErrorCode.NETWORK_ERROR | ClickHouseErrorCode.TIMEOUT_EXCEEDED | + ClickHouseErrorCode.SOCKET_TIMEOUT => { + logError("Insert into clickhouse failed. Reason: ", e) if (retry > 0) { execute(statement, retry - 1) @@ -322,15 +320,4 @@ object Clickhouse { "" } } - - private[waterdrop] def getClickhouseException(e: Exception): ClickHouseException = { - val causalChain = Throwables.getCausalChain(e) - import scala.collection.JavaConversions._ - for (throwable <- causalChain) { - throwable match { - case ClickHouseException => throwable.asInstanceOf[ClickHouseException] - } - } - throw new IllegalArgumentException("no ClickHouseException found") - } } diff --git a/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala b/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala new file mode 100644 index 00000000000..ea371b33fa9 --- /dev/null +++ b/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala @@ -0,0 +1,14 @@ +package io.github.interestinglab.waterdrop.filter + +import org.scalatest.FunSuite + +class caseTest extends FunSuite { + test("An empty Set should have size 0") { + val a = 3 + + a match { + case 1 + } + assert(Set.empty.size == 0) + } +} From fac0e4075d28db8a760cc4d3fffa28fa7d5c4f3b Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Thu, 25 Apr 2019 19:56:27 +0800 Subject: [PATCH 4/9] Fix coding style --- .../interestinglab/waterdrop/output/batch/Clickhouse.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index cc6c93d76cc..a3abe64a9ec 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -144,7 +144,7 @@ class Clickhouse extends BaseOutput { errorCode match { case ClickHouseErrorCode.NETWORK_ERROR | ClickHouseErrorCode.TIMEOUT_EXCEEDED | - ClickHouseErrorCode.SOCKET_TIMEOUT => { + ClickHouseErrorCode.SOCKET_TIMEOUT => { logError("Insert into clickhouse failed. Reason: ", e) if (retry > 0) { From ab9668c26346ce8caab224dff716be31267d286d Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Thu, 25 Apr 2019 22:10:48 +0800 Subject: [PATCH 5/9] Support retry with specificed ClickHouseErrorCode --- .../waterdrop/output/batch/Clickhouse.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index a3abe64a9ec..031409eb2b2 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -23,6 +23,7 @@ class Clickhouse extends BaseOutput { var initSQL: String = _ var table: String = _ var fields: java.util.List[String] = _ + var retryCodes: java.util.List[Integer] = _ var config: Config = ConfigFactory.empty() val clickhousePrefix = "clickhouse." val properties: Properties = new Properties() @@ -102,10 +103,13 @@ class Clickhouse extends BaseOutput { val defaultConfig = ConfigFactory.parseMap( Map( "bulk_size" -> 20000, + // "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code), + "retry_codes" -> util.Arrays.asList(), "retry" -> 1 ) ) config = config.withFallback(defaultConfig) + retryCodes = config.getIntList("retry_codes") super.prepare(spark) } @@ -140,19 +144,16 @@ class Clickhouse extends BaseOutput { logInfo("Insert into clickhouse succeed") } catch { case e: ClickHouseException => { - val errorCode = new ClickHouseErrorCode(e.getErrorCode) - - errorCode match { - case ClickHouseErrorCode.NETWORK_ERROR | ClickHouseErrorCode.TIMEOUT_EXCEEDED | - ClickHouseErrorCode.SOCKET_TIMEOUT => { - - logError("Insert into clickhouse failed. Reason: ", e) - if (retry > 0) { - execute(statement, retry - 1) - } else { - logError("Insert into clickhouse and retry failed") - } + val errorCode = e.getErrorCode + if (retryCodes.contains(errorCode)) { + logError("Insert into clickhouse failed. Reason: ", e) + if (retry > 0) { + execute(statement, retry - 1) + } else { + logError("Insert into clickhouse and retry failed") } + } else { + throw e } } } From 81956c1c95a7080ca27e3f1f15a64fef1c6efd30 Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Thu, 25 Apr 2019 22:26:09 +0800 Subject: [PATCH 6/9] Remove castTest.scala --- .../interestinglab/waterdrop/filter/caseTest.scala | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala diff --git a/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala b/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala deleted file mode 100644 index ea371b33fa9..00000000000 --- a/waterdrop-core/src/test/scala/io/github/interestinglab/waterdrop/filter/caseTest.scala +++ /dev/null @@ -1,14 +0,0 @@ -package io.github.interestinglab.waterdrop.filter - -import org.scalatest.FunSuite - -class caseTest extends FunSuite { - test("An empty Set should have size 0") { - val a = 3 - - a match { - case 1 - } - assert(Set.empty.size == 0) - } -} From 243d4b4399350d97fe7ea6e221ff767cfe26c3ff Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Thu, 25 Apr 2019 22:35:35 +0800 Subject: [PATCH 7/9] Modify clickhouse.md --- .../output-plugins/Clickhouse.md | 39 ++++++++++++++++--- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/docs/zh-cn/configuration/output-plugins/Clickhouse.md b/docs/zh-cn/configuration/output-plugins/Clickhouse.md index 7b05e97db2c..2443a3bc597 100644 --- a/docs/zh-cn/configuration/output-plugins/Clickhouse.md +++ b/docs/zh-cn/configuration/output-plugins/Clickhouse.md @@ -18,6 +18,8 @@ | [fields](#fields-array) | array | yes |-| | [host](#host-string) | string | yes |-| | [password](#password-string) | string | no |-| +| [retry](#retry-number) | number| no |1| +| [retry_codes](#password-array) | array | no |[ ]| | [table](#table-string) | string | yes |-| | [username](#username-string) | string | no |-| @@ -28,27 +30,37 @@ ##### database [string] -Clickhouse database +ClickHouse database ##### fields [array] -需要输出到Clickhouose的数据字段。 +需要输出到ClickHouse的数据字段。 ##### host [string] -Clickhouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。 +ClickHouse集群地址,格式为host:port,允许指定多个host。如"host1:8123,host2:8123"。 ##### password [string] -Clickhouse用户密码,仅当Clickhouse中开启权限时需要此字段。 +ClickHouse用户密码,仅当ClickHouse中开启权限时需要此字段。 + +#### retry [number] + +重试次数,默认为1次 + +##### retry_codes [array] + +出现异常时,会重试操作的ClickHouse异常错误码。详细错误码列表参考 [ClickHouseErrorCode](https://github.com/yandex/clickhouse-jdbc/blob/master/src/main/java/ru/yandex/clickhouse/except/ClickHouseErrorCode.java) + +如果多次重试都失败,将会丢弃这个批次的数据,慎用!! ##### table [string] -Clickhouse 表名 +ClickHouse 表名 ##### username [string] -Clickhouse用户用户名,仅当Clickhouse中开启权限时需要此字段 +ClickHouse用户用户名,仅当ClickHouse中开启权限时需要此字段 ##### clickhouse [string] @@ -94,3 +106,18 @@ clickhouse { } ``` +``` +ClickHouse { + host = "localhost:8123" + database = "nginx" + table = "access_msg" + fields = ["date", "datetime", "hostname", "http_code", "data_size", "ua", "request_time"] + username = "username" + password = "password" + bulk_size = 20000 + retry_codes = [209, 210] + retry = 3 +} +``` + +> 当出现网络超时或者网络异常的情况下,重试写入3次 \ No newline at end of file From be14d7b032d3dcc77b8fd1ce1c394b0e1000462b Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Fri, 26 Apr 2019 09:30:38 +0800 Subject: [PATCH 8/9] Fix coding style --- .../waterdrop/output/batch/Clickhouse.scala | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index 031409eb2b2..d7f158d087f 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -8,13 +8,14 @@ import com.typesafe.config.{Config, ConfigFactory} import io.github.interestinglab.waterdrop.apis.BaseOutput import io.github.interestinglab.waterdrop.config.TypesafeConfigUtils import org.apache.spark.sql.{Dataset, Row, SparkSession} -import ru.yandex.clickhouse.except.{ClickHouseErrorCode, ClickHouseException} +import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException} import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatement} import scala.collection.JavaConversions._ import scala.collection.immutable.HashMap import scala.collection.mutable.WrappedArray import scala.util.matching.Regex +import scala.util.{Failure, Success, Try} class Clickhouse extends BaseOutput { @@ -139,23 +140,25 @@ class Clickhouse extends BaseOutput { } private def execute(statement: ClickHousePreparedStatement, retry: Int): Unit = { - try { - statement.executeBatch() - logInfo("Insert into clickhouse succeed") - } catch { - case e: ClickHouseException => { + val res = Try(statement.executeBatch()) + res match { + case Success(_) => logInfo("Insert into ClickHouse succeed") + case Failure(e: ClickHouseException) => { val errorCode = e.getErrorCode if (retryCodes.contains(errorCode)) { - logError("Insert into clickhouse failed. Reason: ", e) + logError("Insert into ClickHouse failed. Reason: ", e) if (retry > 0) { execute(statement, retry - 1) } else { - logError("Insert into clickhouse and retry failed") + logError("Insert into ClickHouse failed and retry failed, drop this bulk.") } } else { throw e } } + case Failure(e: ClickHouseUnknownException) => { + throw e + } } } From c3e0f2aba96c674e6352d3fc2c0211bab2354967 Mon Sep 17 00:00:00 2001 From: rickyhuo Date: Fri, 26 Apr 2019 13:39:31 +0800 Subject: [PATCH 9/9] Enhancement Try catch --- .../interestinglab/waterdrop/output/batch/Clickhouse.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala index d7f158d087f..55263eacdb8 100644 --- a/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala +++ b/waterdrop-core/src/main/scala/io/github/interestinglab/waterdrop/output/batch/Clickhouse.scala @@ -159,6 +159,9 @@ class Clickhouse extends BaseOutput { case Failure(e: ClickHouseUnknownException) => { throw e } + case Failure(e: Exception) => { + throw e + } } }