From cf016d48511afc2637d01c2749efb84354783652 Mon Sep 17 00:00:00 2001
From: caoliang <245623257@qq.com>
Date: Tue, 19 Mar 2024 15:57:14 +0800
Subject: [PATCH] doris writer compatible with copy into
---
extension/DataX/doriswriter/pom.xml | 15 ++
.../writer/doriswriter/BaseResponse.java | 23 +++
.../writer/doriswriter/CopyIntoResp.java | 26 +++
.../writer/doriswriter/CopySQLBuilder.java | 83 +++++++++
.../doriswriter/DorisCopyIntoObserver.java | 172 ++++++++++++++++++
.../doriswriter/DorisStreamLoadObserver.java | 65 +------
.../plugin/writer/doriswriter/DorisUtil.java | 131 +++++++++++--
.../doriswriter/DorisWriterExcetion.java | 11 +-
.../doriswriter/DorisWriterManager.java | 32 ++--
.../writer/doriswriter/HttpPostBuilder.java | 51 ++++++
.../writer/doriswriter/HttpPutBuilder.java | 65 +++++++
.../datax/plugin/writer/doriswriter/Keys.java | 22 ++-
12 files changed, 598 insertions(+), 98 deletions(-)
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java
create mode 100644 extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java
diff --git a/extension/DataX/doriswriter/pom.xml b/extension/DataX/doriswriter/pom.xml
index aa1e6ff080a9fc..52374d30c203ef 100644
--- a/extension/DataX/doriswriter/pom.xml
+++ b/extension/DataX/doriswriter/pom.xml
@@ -63,6 +63,21 @@ under the License.
httpclient
4.5.13
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.13.3
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.13.3
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.3
+
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java
new file mode 100644
index 00000000000000..15fe26bf262236
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java
@@ -0,0 +1,23 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class BaseResponse {
+ private int code;
+ private String msg;
+ private T data;
+ private int count;
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getMsg() {
+ return msg;
+ }
+
+ public T getData(){
+ return data;
+ }
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java
new file mode 100644
index 00000000000000..83ca128dfe3850
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java
@@ -0,0 +1,26 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import java.util.Map;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CopyIntoResp extends BaseResponse{
+ private String code;
+ private String exception;
+
+ private Map result;
+
+ public String getDataCode() {
+ return code;
+ }
+
+ public String getException() {
+ return exception;
+ }
+
+ public Map getResult() {
+ return result;
+ }
+
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java
new file mode 100644
index 00000000000000..abbb7d196a16f7
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java
@@ -0,0 +1,83 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.StringJoiner;
+
+public class CopySQLBuilder {
+ private final static String COPY_SYNC = "copy.async";
+ private final static String FIELD_DELIMITER_KEY = "column_separator";
+ private final static String COPY_FIELD_DELIMITER_KEY = "file.column_separator";
+ private final static String FIELD_DELIMITER_DEFAULT = "\t";
+ private final static String LINE_DELIMITER_KEY = "line_delimiter";
+ private final static String COPY_LINE_DELIMITER_KEY = "file.line_delimiter";
+ private final static String LINE_DELIMITER_DEFAULT = "\n";
+ private final static String COLUMNS = "columns";
+ private static final String STRIP_OUT_ARRAY = "strip_outer_array";
+ private final String fileName;
+ private final Keys options;
+ private Map properties;
+
+
+
+ public CopySQLBuilder(Keys options, String fileName) {
+ this.options=options;
+ this.fileName=fileName;
+ this.properties=options.getLoadProps();
+ }
+
+ public String buildCopySQL(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("COPY INTO ")
+ .append(options.getDatabase() + "." + options.getTable());
+
+ if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) {
+ sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS))
+ .append(" FROM @~('").append(fileName).append("') ) ")
+ .append("PROPERTIES (");
+ } else {
+ sb.append(" FROM @~('").append(fileName).append("') ")
+ .append("PROPERTIES (");
+ }
+
+ //copy into must be sync
+ properties.put(COPY_SYNC,false);
+ StringJoiner props = new StringJoiner(",");
+ for(Map.Entry entry : properties.entrySet()){
+ String key = concatPropPrefix(String.valueOf(entry.getKey()));
+ String value = "";
+ switch (key){
+ case COPY_FIELD_DELIMITER_KEY:
+ value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT);
+ break;
+ case COPY_LINE_DELIMITER_KEY:
+ value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT);
+ break;
+ default:
+ value = String.valueOf(entry.getValue());
+ break;
+ }
+ if(!key.equals(COLUMNS)){
+ String prop = String.format("'%s'='%s'", key, value);
+ props.add(prop);
+ }
+ }
+ sb.append(props).append(" )");
+ return sb.toString();
+ }
+
+ static final List PREFIX_LIST =
+ Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY);
+
+ private String concatPropPrefix(String key) {
+ if (PREFIX_LIST.contains(key)) {
+ return "file." + key;
+ }
+ if ("format".equalsIgnoreCase(key)) {
+ return "file.type";
+ }
+ return key;
+ }
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java
new file mode 100644
index 00000000000000..95685260cac117
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java
@@ -0,0 +1,172 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class DorisCopyIntoObserver {
+ private static final Logger LOG = LoggerFactory.getLogger(DorisCopyIntoObserver.class);
+
+ private Keys options;
+ private long pos;
+ public static final int SUCCESS = 0;
+ public static final String FAIL = "1";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final HttpClientBuilder httpClientBuilder = HttpClients
+ .custom()
+ .disableRedirectHandling();
+ private CloseableHttpClient httpClient;
+ private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
+ private static final String COMMIT_PATTERN = "%s/copy/query";
+ private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");
+
+ public DorisCopyIntoObserver(Keys options) {
+ this.options = options;
+ this.httpClient = httpClientBuilder.build();
+
+ }
+
+ public void streamLoad(WriterTuple data) throws Exception {
+ String host = DorisUtil.getLoadHost(options);
+ String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
+ String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
+ put(uploadAddress, data.getLabel(), DorisUtil.addRows(options, data.getRows(), data.getBytes().intValue()));
+ executeCopy(host, data.getLabel());
+
+ }
+
+ private String getUploadAddress(String loadUrl, String fileName) throws IOException {
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(loadUrl)
+ .addFileName(fileName)
+ .addCommonHeader()
+ .setEmptyEntity()
+ .baseAuth(options.getUsername(), options.getPassword());
+ CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
+ int statusCode = execute.getStatusLine().getStatusCode();
+ String reason = execute.getStatusLine().getReasonPhrase();
+ if (statusCode == 307) {
+ Header location = execute.getFirstHeader("location");
+ String uploadAddress = location.getValue();
+ LOG.info("redirect to s3:{}", uploadAddress);
+ return uploadAddress;
+ } else {
+ HttpEntity entity = execute.getEntity();
+ String result = entity == null ? null : EntityUtils.toString(entity);
+ LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
+ throw new RuntimeException("Could not get the redirected address.");
+ }
+
+ }
+
+
+ public void put(String loadUrl, String fileName, byte[] data) throws IOException {
+ LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
+ HttpPutBuilder putBuilder = new HttpPutBuilder();
+ putBuilder.setUrl(loadUrl)
+ .addCommonHeader()
+ .setEntity(new ByteArrayEntity(data));
+ CloseableHttpResponse response = httpClient.execute(putBuilder.build());
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
+ LOG.error("upload file {} error, response {}", fileName, result);
+ throw new DorisWriterExcetion("upload file error: " + fileName, true);
+ }
+ }
+
+
+ /**
+ * execute copy into
+ */
+ public void executeCopy(String hostPort, String fileName) throws IOException {
+ long start = System.currentTimeMillis();
+ CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
+ String copySQL = copySQLBuilder.buildCopySQL();
+ LOG.info("build copy SQL is {}", copySQL);
+ Map params = new HashMap<>();
+ params.put("sql", copySQL);
+ if (StringUtils.isNotBlank(options.getClusterName())) {
+ params.put("cluster", options.getClusterName());
+ }
+ HttpPostBuilder postBuilder = new HttpPostBuilder();
+ postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
+ .baseAuth(options.getUsername(), options.getPassword())
+ .setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));
+
+ CloseableHttpResponse response = httpClient.execute(postBuilder.build());
+ final int statusCode = response.getStatusLine().getStatusCode();
+ final String reasonPhrase = response.getStatusLine().getReasonPhrase();
+ String loadResult = "";
+ if (statusCode != 200) {
+ LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
+ throw new DorisWriterExcetion("commit error with file: " + fileName, true);
+ } else if (response.getEntity() != null) {
+ loadResult = EntityUtils.toString(response.getEntity());
+ boolean success = handleCommitResponse(loadResult);
+ if (success) {
+ LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
+ } else {
+ LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult);
+ String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s",
+ statusCode, reasonPhrase, loadResult, copySQL);
+ throw new DorisWriterExcetion(copyErrMsg, true);
+ }
+ }
+ }
+
+ public boolean handleCommitResponse(String loadResult) throws IOException {
+ BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference() {
+ });
+ if (baseResponse.getCode() == SUCCESS) {
+ CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class);
+ if (FAIL.equals(dataResp.getDataCode())) {
+ LOG.error("copy into execute failed, reason:{}", loadResult);
+ return false;
+ } else {
+ Map result = dataResp.getResult();
+ if (DorisUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))) {
+ LOG.error("copy into load failed, reason:{}", loadResult);
+ return false;
+ } else {
+ return true;
+ }
+ }
+ } else {
+ LOG.error("commit failed, reason:{}", loadResult);
+ return false;
+ }
+ }
+
+ public static boolean isCommitted(String msg) {
+ return COMMITTED_PATTERN.matcher(msg).matches();
+ }
+
+
+ public void close() throws IOException {
+ if (null != httpClient) {
+ try {
+ httpClient.close();
+ } catch (IOException e) {
+ LOG.error("Closing httpClient failed.", e);
+ throw new RuntimeException("Closing httpClient failed.", e);
+ }
+ }
+ }
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
index 2a23a07fac4c77..27887dc3b2e01a 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java
@@ -17,7 +17,7 @@
package com.alibaba.datax.plugin.writer.doriswriter;
-import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSON;
import org.apache.commons.codec.binary.Base64;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
@@ -77,10 +78,7 @@ public String urlDecode(String outBuffer) {
}
public void streamLoad(WriterTuple data) throws Exception {
- String host = getLoadHost();
- if(host == null){
- throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
- }
+ String host = DorisUtil.getLoadHost(options);
String loadUrl = new StringBuilder(host)
.append("/api/")
.append(options.getDatabase())
@@ -90,8 +88,8 @@ public void streamLoad(WriterTuple data) throws Exception {
.toString();
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
loadUrl = urlDecode(loadUrl);
- Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
- LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
+ Map loadResult = put(loadUrl, data.getLabel(), DorisUtil.addRows(options,data.getRows(), data.getBytes().intValue()));
+ LOG.info("StreamLoad response :{}", JSON.toJSONString(loadResult));
final String keyStatus = "Status";
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
throw new IOException("Unable to flush data to Doris: unknown result status.");
@@ -152,35 +150,6 @@ private void checkStreamLoadState(String host, String label) throws IOException
}
}
- private byte[] addRows(List rows, int totalBytes) {
- if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
- Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
- byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
- for (byte[] row : rows) {
- bos.put(row);
- bos.put(lineDelimiter);
- }
- return bos.array();
- }
-
- if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
- ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
- bos.put("[".getBytes(StandardCharsets.UTF_8));
- byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
- boolean isFirstElement = true;
- for (byte[] row : rows) {
- if (!isFirstElement) {
- bos.put(jsonDelimiter);
- }
- bos.put(row);
- isFirstElement = false;
- }
- bos.put("]".getBytes(StandardCharsets.UTF_8));
- return bos.array();
- }
- throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
- }
private Map put(String loadUrl, String label, byte[] data) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
@@ -237,28 +206,4 @@ private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
}
return respEntity;
}
-
- private String getLoadHost() {
- List hostList = options.getLoadUrlList();
- Collections.shuffle(hostList);
- String host = new StringBuilder("http://").append(hostList.get((0))).toString();
- if (checkConnection(host)){
- return host;
- }
- return null;
- }
-
- private boolean checkConnection(String host) {
- try {
- URL url = new URL(host);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(5000);
- co.connect();
- co.disconnect();
- return true;
- } catch (Exception e1) {
- e1.printStackTrace();
- return false;
- }
- }
}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
index 53411945e44562..14c82448db025e 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java
@@ -22,16 +22,26 @@
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.druid.sql.parser.ParserException;
+import com.alibaba.fastjson2.JSON;
import com.google.common.base.Strings;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
/**
* jdbc util
@@ -39,11 +49,12 @@
public class DorisUtil {
private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class);
- private DorisUtil() {}
+ private DorisUtil() {
+ }
- public static List getDorisTableColumns( Connection conn, String databaseName, String tableName) {
+ public static List getDorisTableColumns(Connection conn, String databaseName, String tableName) {
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
- List columns = new ArrayList<> ();
+ List columns = new ArrayList<>();
ResultSet rs = null;
try {
rs = DBUtil.query(conn, currentSql);
@@ -65,7 +76,7 @@ public static List renderPreOrPostSqls(List preOrPostSqls, Strin
}
List renderedSqls = new ArrayList<>();
for (String sql : preOrPostSqls) {
- if (! Strings.isNullOrEmpty(sql)) {
+ if (!Strings.isNullOrEmpty(sql)) {
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
}
}
@@ -88,7 +99,7 @@ public static void executeSqls(Connection conn, List sqls) {
}
}
- public static void preCheckPrePareSQL( Keys options) {
+ public static void preCheckPrePareSQL(Keys options) {
String table = options.getTable();
List preSqls = options.getPreSqlList();
List renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table);
@@ -97,26 +108,118 @@ public static void preCheckPrePareSQL( Keys options) {
for (String sql : renderedPreSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
- } catch ( ParserException e) {
- throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
+ } catch (ParserException e) {
+ throw RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql);
}
}
}
}
- public static void preCheckPostSQL( Keys options) {
+ public static void preCheckPostSQL(Keys options) {
String table = options.getTable();
List postSqls = options.getPostSqlList();
List renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table);
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
- for(String sql : renderedPostSqls) {
+ for (String sql : renderedPostSqls) {
try {
DBUtil.sqlValid(sql, DataBaseType.MySql);
- } catch (ParserException e){
- throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
+ } catch (ParserException e) {
+ throw RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql);
+ }
+ }
+ }
+ }
+
+ public static T checkNotNull(T reference) {
+ if (reference == null) {
+ throw new NullPointerException();
+ } else {
+ return reference;
+ }
+ }
+
+ public static String getLoadHost(Keys options) throws IOException {
+ List hostList = options.getLoadUrlList();
+ for (int i = 0; i < hostList.size(); i++) {
+ String host = new StringBuilder("http://").append(hostList.get((i))).toString();
+ if (checkConnection(host)) {
+ return host;
+ }
+ continue;
+ }
+ throw new IOException("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
+ }
+
+
+ private static boolean checkConnection(String host) {
+ try {
+ URL url = new URL(host);
+ HttpURLConnection co = (HttpURLConnection) url.openConnection();
+ co.setConnectTimeout(5000);
+ co.connect();
+ co.disconnect();
+ return true;
+ } catch (Exception e1) {
+ LOG.error("The connection failed, host is {}", host);
+ return false;
+ }
+ }
+
+
+ public static boolean checkIsStreamLoad(Keys options) {
+ final HttpClientBuilder httpClientBuilder = HttpClients
+ .custom()
+ .disableRedirectHandling();
+ try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
+ String url = getLoadHost(options) + "/copy/query";
+ HttpPost httpPost = new HttpPost(url);
+ try (CloseableHttpResponse resp = httpclient.execute(httpPost)) {
+ if (resp.getStatusLine().getStatusCode() == 200) {
+ Map result = (Map) JSON.parse(EntityUtils.toString(resp.getEntity()));
+ if (result != null && (int) result.get("code") == 401) {
+ return false;
+ }
}
}
+ } catch (IOException e) {
+ e.printStackTrace();
}
+ return true;
+ }
+
+
+ public static byte[] addRows(Keys options, List rows, int totalBytes) {
+ if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
+ Map props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps());
+ byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
+ for (byte[] row : rows) {
+ bos.put(row);
+ bos.put(lineDelimiter);
+ }
+ return bos.array();
+ }
+
+ if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
+ ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
+ bos.put("[".getBytes(StandardCharsets.UTF_8));
+ byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
+ boolean isFirstElement = true;
+ for (byte[] row : rows) {
+ if (!isFirstElement) {
+ bos.put(jsonDelimiter);
+ }
+ bos.put(row);
+ isFirstElement = false;
+ }
+ bos.put("]".getBytes(StandardCharsets.UTF_8));
+ return bos.array();
+ }
+ throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
+ }
+
+ public static boolean isNullOrEmpty(Map, ?> map) {
+ return map == null || map.isEmpty();
}
}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
index baacc9d2dd27b9..be098065ecc42f 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java
@@ -22,15 +22,20 @@
public class DorisWriterExcetion extends IOException {
- private final Map response;
+ private Map response;
private boolean reCreateLabel;
- public DorisWriterExcetion ( String message, Map response) {
+ public DorisWriterExcetion(String message, Map response) {
super(message);
this.response = response;
}
- public DorisWriterExcetion ( String message, Map response, boolean reCreateLabel) {
+ public DorisWriterExcetion(String message, boolean reCreateLabel) {
+ super(message);
+ this.reCreateLabel = reCreateLabel;
+ }
+
+ public DorisWriterExcetion(String message, Map response, boolean reCreateLabel) {
super(message);
this.response = response;
this.reCreateLabel = reCreateLabel;
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
index 4f1569e1762aec..a55ea2faa6c812 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java
@@ -37,20 +37,22 @@ public class DorisWriterManager {
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
- private final DorisStreamLoadObserver visitor;
+ private final DorisStreamLoadObserver streamLoadObserver;
+ private final DorisCopyIntoObserver copyIntoObserver;
private final Keys options;
- private final List buffer = new ArrayList<> ();
+ private final List buffer = new ArrayList<>();
private int batchCount = 0;
private long batchSize = 0;
private volatile boolean closed = false;
private volatile Exception flushException;
- private final LinkedBlockingDeque< WriterTuple > flushQueue;
+ private final LinkedBlockingDeque flushQueue;
private ScheduledExecutorService scheduler;
private ScheduledFuture> scheduledFuture;
- public DorisWriterManager( Keys options) {
+ public DorisWriterManager(Keys options) {
this.options = options;
- this.visitor = new DorisStreamLoadObserver (options);
+ this.streamLoadObserver = new DorisStreamLoadObserver(options);
+ this.copyIntoObserver = new DorisCopyIntoObserver(options);
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
this.startScheduler();
this.startAsyncFlushing();
@@ -109,7 +111,7 @@ public synchronized void flush(String label, boolean waitUtilDone) throws Except
}
return;
}
- flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer)));
+ flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer)));
if (waitUtilDone) {
// wait the last flush
waitAsyncFlushingDone();
@@ -135,18 +137,18 @@ public synchronized void close() {
public String createBatchLabel() {
StringBuilder sb = new StringBuilder();
- if (! Strings.isNullOrEmpty(options.getLabelPrefix())) {
+ if (!Strings.isNullOrEmpty(options.getLabelPrefix())) {
sb.append(options.getLabelPrefix());
}
return sb.append(UUID.randomUUID().toString())
- .toString();
+ .toString();
}
private void startAsyncFlushing() {
// start flush thread
- Thread flushThread = new Thread(new Runnable(){
+ Thread flushThread = new Thread(new Runnable() {
public void run() {
- while(true) {
+ while (true) {
try {
asyncFlush();
} catch (Exception e) {
@@ -162,7 +164,7 @@ public void run() {
private void waitAsyncFlushingDone() throws InterruptedException {
// wait previous flushings
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
- flushQueue.put(new WriterTuple ("", 0l, null));
+ flushQueue.put(new WriterTuple("", 0l, null));
}
checkFlushException();
}
@@ -177,7 +179,11 @@ private void asyncFlush() throws Exception {
for (int i = 0; i <= options.getMaxRetries(); i++) {
try {
// flush to Doris with stream load
- visitor.streamLoad(flushData);
+ if (DorisUtil.checkIsStreamLoad(options)) {
+ streamLoadObserver.streamLoad(flushData);
+ } else {
+ copyIntoObserver.streamLoad(flushData);
+ }
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
startScheduler();
break;
@@ -186,7 +192,7 @@ private void asyncFlush() throws Exception {
if (i >= options.getMaxRetries()) {
throw new IOException(e);
}
- if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
+ if (e instanceof DorisWriterExcetion && ((DorisWriterExcetion) e).needReCreateLabel()) {
String newLabel = createBatchLabel();
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
flushData.setLabel(newLabel);
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java
new file mode 100644
index 00000000000000..10e32f1a3a9379
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java
@@ -0,0 +1,51 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPost;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class HttpPostBuilder {
+ String url;
+ Map header;
+ HttpEntity httpEntity;
+ public HttpPostBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPostBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPostBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPostBuilder baseAuth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
+ return this;
+ }
+
+ public HttpPostBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPost build() {
+ DorisUtil.checkNotNull(url);
+ DorisUtil.checkNotNull(httpEntity);
+ HttpPost put = new HttpPost(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java
new file mode 100644
index 00000000000000..546429bccde356
--- /dev/null
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java
@@ -0,0 +1,65 @@
+package com.alibaba.datax.plugin.writer.doriswriter;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HttpPutBuilder {
+ String url;
+ Map header;
+ HttpEntity httpEntity;
+ public HttpPutBuilder() {
+ header = new HashMap<>();
+ }
+
+ public HttpPutBuilder setUrl(String url) {
+ this.url = url;
+ return this;
+ }
+
+ public HttpPutBuilder addFileName(String fileName){
+ header.put("fileName", fileName);
+ return this;
+ }
+
+ public HttpPutBuilder setEmptyEntity() {
+ try {
+ this.httpEntity = new StringEntity("");
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ public HttpPutBuilder addCommonHeader() {
+ header.put(HttpHeaders.EXPECT, "100-continue");
+ return this;
+ }
+
+ public HttpPutBuilder baseAuth(String user, String password) {
+ final String authInfo = user + ":" + password;
+ byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8));
+ header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded));
+ return this;
+ }
+
+ public HttpPutBuilder setEntity(HttpEntity httpEntity) {
+ this.httpEntity = httpEntity;
+ return this;
+ }
+
+ public HttpPut build() {
+ DorisUtil.checkNotNull(url);
+ DorisUtil.checkNotNull(httpEntity);
+ HttpPut put = new HttpPut(url);
+ header.forEach(put::setHeader);
+ put.setEntity(httpEntity);
+ return put;
+ }
+}
diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
index b520f6b06c9ce8..6bf2ae38a12673 100644
--- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
+++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java
@@ -34,6 +34,7 @@ public class Keys implements Serializable {
private static final long DEFAULT_FLUSH_INTERVAL = 30000;
private static final String LOAD_PROPS_FORMAT = "format";
+
public enum StreamLoadFormat {
CSV, JSON;
}
@@ -53,6 +54,7 @@ public enum StreamLoadFormat {
private static final String LOAD_URL = "loadUrl";
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
private static final String LOAD_PROPS = "loadProps";
+ private static final String CLUSTER_NAME = "clusterName";
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
@@ -64,7 +66,7 @@ public enum StreamLoadFormat {
private List userSetColumns;
private boolean isWildcardColumn;
- public Keys ( Configuration options) {
+ public Keys(Configuration options) {
this.options = options;
this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) {
@@ -106,6 +108,10 @@ public List getLoadUrlList() {
return options.getList(LOAD_URL, String.class);
}
+ public String getClusterName() {
+ return options.getString(CLUSTER_NAME);
+ }
+
public List getColumns() {
if (isWildcardColumn) {
return this.infoSchemaColumns;
@@ -163,7 +169,7 @@ public StreamLoadFormat getStreamLoadFormat() {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
- && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
+ && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
@@ -174,18 +180,18 @@ private void validateStreamLoadUrl() {
for (String host : urlList) {
if (host.split(":").length < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
- "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
+ "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
}
}
}
private void validateRequired() {
final String[] requiredOptionKeys = new String[]{
- USERNAME,
- DATABASE,
- TABLE,
- COLUMN,
- LOAD_URL
+ USERNAME,
+ DATABASE,
+ TABLE,
+ COLUMN,
+ LOAD_URL
};
for (String optionKey : requiredOptionKeys) {
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);