diff --git a/extension/DataX/doriswriter/pom.xml b/extension/DataX/doriswriter/pom.xml index aa1e6ff080a9fc..52374d30c203ef 100644 --- a/extension/DataX/doriswriter/pom.xml +++ b/extension/DataX/doriswriter/pom.xml @@ -63,6 +63,21 @@ under the License. httpclient 4.5.13 + + com.fasterxml.jackson.core + jackson-annotations + 2.13.3 + + + com.fasterxml.jackson.core + jackson-core + 2.13.3 + + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java new file mode 100644 index 00000000000000..15fe26bf262236 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/BaseResponse.java @@ -0,0 +1,23 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class BaseResponse { + private int code; + private String msg; + private T data; + private int count; + + public int getCode() { + return code; + } + + public String getMsg() { + return msg; + } + + public T getData(){ + return data; + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java new file mode 100644 index 00000000000000..83ca128dfe3850 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopyIntoResp.java @@ -0,0 +1,26 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +import java.util.Map; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class CopyIntoResp extends BaseResponse{ + private String code; + private String exception; + + private Map result; + + public String getDataCode() { + return code; + } + + public String getException() { + return exception; + } + + public Map getResult() { + return result; + } + +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java new file mode 100644 index 00000000000000..abbb7d196a16f7 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/CopySQLBuilder.java @@ -0,0 +1,83 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.StringJoiner; + +public class CopySQLBuilder { + private final static String COPY_SYNC = "copy.async"; + private final static String FIELD_DELIMITER_KEY = "column_separator"; + private final static String COPY_FIELD_DELIMITER_KEY = "file.column_separator"; + private final static String FIELD_DELIMITER_DEFAULT = "\t"; + private final static String LINE_DELIMITER_KEY = "line_delimiter"; + private final static String COPY_LINE_DELIMITER_KEY = "file.line_delimiter"; + private final static String LINE_DELIMITER_DEFAULT = "\n"; + private final static String COLUMNS = "columns"; + private static final String STRIP_OUT_ARRAY = "strip_outer_array"; + private final String fileName; + private final Keys options; + private Map properties; + + + + public CopySQLBuilder(Keys options, String fileName) { + this.options=options; + this.fileName=fileName; + this.properties=options.getLoadProps(); + } + + public String buildCopySQL(){ + StringBuilder sb = new StringBuilder(); + sb.append("COPY INTO ") + .append(options.getDatabase() + "." + options.getTable()); + + if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) { + sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS)) + .append(" FROM @~('").append(fileName).append("') ) ") + .append("PROPERTIES ("); + } else { + sb.append(" FROM @~('").append(fileName).append("') ") + .append("PROPERTIES ("); + } + + //copy into must be sync + properties.put(COPY_SYNC,false); + StringJoiner props = new StringJoiner(","); + for(Map.Entry entry : properties.entrySet()){ + String key = concatPropPrefix(String.valueOf(entry.getKey())); + String value = ""; + switch (key){ + case COPY_FIELD_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT); + break; + case COPY_LINE_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT); + break; + default: + value = String.valueOf(entry.getValue()); + break; + } + if(!key.equals(COLUMNS)){ + String prop = String.format("'%s'='%s'", key, value); + props.add(prop); + } + } + sb.append(props).append(" )"); + return sb.toString(); + } + + static final List PREFIX_LIST = + Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY); + + private String concatPropPrefix(String key) { + if (PREFIX_LIST.contains(key)) { + return "file." + key; + } + if ("format".equalsIgnoreCase(key)) { + return "file.type"; + } + return key; + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java new file mode 100644 index 00000000000000..95685260cac117 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCopyIntoObserver.java @@ -0,0 +1,172 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.Header; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +public class DorisCopyIntoObserver { + private static final Logger LOG = LoggerFactory.getLogger(DorisCopyIntoObserver.class); + + private Keys options; + private long pos; + public static final int SUCCESS = 0; + public static final String FAIL = "1"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .disableRedirectHandling(); + private CloseableHttpClient httpClient; + private static final String UPLOAD_URL_PATTERN = "%s/copy/upload"; + private static final String COMMIT_PATTERN = "%s/copy/query"; + private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*"); + + public DorisCopyIntoObserver(Keys options) { + this.options = options; + this.httpClient = httpClientBuilder.build(); + + } + + public void streamLoad(WriterTuple data) throws Exception { + String host = DorisUtil.getLoadHost(options); + String loadUrl = String.format(UPLOAD_URL_PATTERN, host); + String uploadAddress = getUploadAddress(loadUrl, data.getLabel()); + put(uploadAddress, data.getLabel(), DorisUtil.addRows(options, data.getRows(), data.getBytes().intValue())); + executeCopy(host, data.getLabel()); + + } + + private String getUploadAddress(String loadUrl, String fileName) throws IOException { + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder.setUrl(loadUrl) + .addFileName(fileName) + .addCommonHeader() + .setEmptyEntity() + .baseAuth(options.getUsername(), options.getPassword()); + CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build()); + int statusCode = execute.getStatusLine().getStatusCode(); + String reason = execute.getStatusLine().getReasonPhrase(); + if (statusCode == 307) { + Header location = execute.getFirstHeader("location"); + String uploadAddress = location.getValue(); + LOG.info("redirect to s3:{}", uploadAddress); + return uploadAddress; + } else { + HttpEntity entity = execute.getEntity(); + String result = entity == null ? null : EntityUtils.toString(entity); + LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result); + throw new RuntimeException("Could not get the redirected address."); + } + + } + + + public void put(String loadUrl, String fileName, byte[] data) throws IOException { + LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length)); + HttpPutBuilder putBuilder = new HttpPutBuilder(); + putBuilder.setUrl(loadUrl) + .addCommonHeader() + .setEntity(new ByteArrayEntity(data)); + CloseableHttpResponse response = httpClient.execute(putBuilder.build()); + final int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode != 200) { + String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity()); + LOG.error("upload file {} error, response {}", fileName, result); + throw new DorisWriterExcetion("upload file error: " + fileName, true); + } + } + + + /** + * execute copy into + */ + public void executeCopy(String hostPort, String fileName) throws IOException { + long start = System.currentTimeMillis(); + CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName); + String copySQL = copySQLBuilder.buildCopySQL(); + LOG.info("build copy SQL is {}", copySQL); + Map params = new HashMap<>(); + params.put("sql", copySQL); + if (StringUtils.isNotBlank(options.getClusterName())) { + params.put("cluster", options.getClusterName()); + } + HttpPostBuilder postBuilder = new HttpPostBuilder(); + postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort)) + .baseAuth(options.getUsername(), options.getPassword()) + .setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params))); + + CloseableHttpResponse response = httpClient.execute(postBuilder.build()); + final int statusCode = response.getStatusLine().getStatusCode(); + final String reasonPhrase = response.getStatusLine().getReasonPhrase(); + String loadResult = ""; + if (statusCode != 200) { + LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase); + throw new DorisWriterExcetion("commit error with file: " + fileName, true); + } else if (response.getEntity() != null) { + loadResult = EntityUtils.toString(response.getEntity()); + boolean success = handleCommitResponse(loadResult); + if (success) { + LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult); + } else { + LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult); + String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s", + statusCode, reasonPhrase, loadResult, copySQL); + throw new DorisWriterExcetion(copyErrMsg, true); + } + } + } + + public boolean handleCommitResponse(String loadResult) throws IOException { + BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference() { + }); + if (baseResponse.getCode() == SUCCESS) { + CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class); + if (FAIL.equals(dataResp.getDataCode())) { + LOG.error("copy into execute failed, reason:{}", loadResult); + return false; + } else { + Map result = dataResp.getResult(); + if (DorisUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))) { + LOG.error("copy into load failed, reason:{}", loadResult); + return false; + } else { + return true; + } + } + } else { + LOG.error("commit failed, reason:{}", loadResult); + return false; + } + } + + public static boolean isCommitted(String msg) { + return COMMITTED_PATTERN.matcher(msg).matches(); + } + + + public void close() throws IOException { + if (null != httpClient) { + try { + httpClient.close(); + } catch (IOException e) { + LOG.error("Closing httpClient failed.", e); + throw new RuntimeException("Closing httpClient failed.", e); + } + } + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java index 2a23a07fac4c77..27887dc3b2e01a 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadObserver.java @@ -17,7 +17,7 @@ package com.alibaba.datax.plugin.writer.doriswriter; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson2.JSON; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; @@ -37,6 +37,7 @@ import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -77,10 +78,7 @@ public String urlDecode(String outBuffer) { } public void streamLoad(WriterTuple data) throws Exception { - String host = getLoadHost(); - if(host == null){ - throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration."); - } + String host = DorisUtil.getLoadHost(options); String loadUrl = new StringBuilder(host) .append("/api/") .append(options.getDatabase()) @@ -90,8 +88,8 @@ public void streamLoad(WriterTuple data) throws Exception { .toString(); LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel()); loadUrl = urlDecode(loadUrl); - Map loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue())); - LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult)); + Map loadResult = put(loadUrl, data.getLabel(), DorisUtil.addRows(options,data.getRows(), data.getBytes().intValue())); + LOG.info("StreamLoad response :{}", JSON.toJSONString(loadResult)); final String keyStatus = "Status"; if (null == loadResult || !loadResult.containsKey(keyStatus)) { throw new IOException("Unable to flush data to Doris: unknown result status."); @@ -152,35 +150,6 @@ private void checkStreamLoadState(String host, String label) throws IOException } } - private byte[] addRows(List rows, int totalBytes) { - if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { - Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); - byte[] lineDelimiter = DelimiterParser.parse((String)props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); - ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); - for (byte[] row : rows) { - bos.put(row); - bos.put(lineDelimiter); - } - return bos.array(); - } - - if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { - ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); - bos.put("[".getBytes(StandardCharsets.UTF_8)); - byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); - boolean isFirstElement = true; - for (byte[] row : rows) { - if (!isFirstElement) { - bos.put(jsonDelimiter); - } - bos.put(row); - isFirstElement = false; - } - bos.put("]".getBytes(StandardCharsets.UTF_8)); - return bos.array(); - } - throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); - } private Map put(String loadUrl, String label, byte[] data) throws IOException { LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); final HttpClientBuilder httpClientBuilder = HttpClients.custom() @@ -237,28 +206,4 @@ private HttpEntity getHttpEntity(CloseableHttpResponse resp) { } return respEntity; } - - private String getLoadHost() { - List hostList = options.getLoadUrlList(); - Collections.shuffle(hostList); - String host = new StringBuilder("http://").append(hostList.get((0))).toString(); - if (checkConnection(host)){ - return host; - } - return null; - } - - private boolean checkConnection(String host) { - try { - URL url = new URL(host); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(5000); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e1) { - e1.printStackTrace(); - return false; - } - } } diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java index 53411945e44562..14c82448db025e 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java @@ -22,16 +22,26 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.rdbms.writer.Constant; import com.alibaba.druid.sql.parser.ParserException; +import com.alibaba.fastjson2.JSON; import com.google.common.base.Strings; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; /** * jdbc util @@ -39,11 +49,12 @@ public class DorisUtil { private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class); - private DorisUtil() {} + private DorisUtil() { + } - public static List getDorisTableColumns( Connection conn, String databaseName, String tableName) { + public static List getDorisTableColumns(Connection conn, String databaseName, String tableName) { String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName); - List columns = new ArrayList<> (); + List columns = new ArrayList<>(); ResultSet rs = null; try { rs = DBUtil.query(conn, currentSql); @@ -65,7 +76,7 @@ public static List renderPreOrPostSqls(List preOrPostSqls, Strin } List renderedSqls = new ArrayList<>(); for (String sql : preOrPostSqls) { - if (! Strings.isNullOrEmpty(sql)) { + if (!Strings.isNullOrEmpty(sql)) { renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); } } @@ -88,7 +99,7 @@ public static void executeSqls(Connection conn, List sqls) { } } - public static void preCheckPrePareSQL( Keys options) { + public static void preCheckPrePareSQL(Keys options) { String table = options.getTable(); List preSqls = options.getPreSqlList(); List renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table); @@ -97,26 +108,118 @@ public static void preCheckPrePareSQL( Keys options) { for (String sql : renderedPreSqls) { try { DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch ( ParserException e) { - throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); + } catch (ParserException e) { + throw RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql); } } } } - public static void preCheckPostSQL( Keys options) { + public static void preCheckPostSQL(Keys options) { String table = options.getTable(); List postSqls = options.getPostSqlList(); List renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table); if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); - for(String sql : renderedPostSqls) { + for (String sql : renderedPostSqls) { try { DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch (ParserException e){ - throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); + } catch (ParserException e) { + throw RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql); + } + } + } + } + + public static T checkNotNull(T reference) { + if (reference == null) { + throw new NullPointerException(); + } else { + return reference; + } + } + + public static String getLoadHost(Keys options) throws IOException { + List hostList = options.getLoadUrlList(); + for (int i = 0; i < hostList.size(); i++) { + String host = new StringBuilder("http://").append(hostList.get((i))).toString(); + if (checkConnection(host)) { + return host; + } + continue; + } + throw new IOException("load_url cannot be empty, or the host cannot connect.Please check your configuration."); + } + + + private static boolean checkConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(5000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + LOG.error("The connection failed, host is {}", host); + return false; + } + } + + + public static boolean checkIsStreamLoad(Keys options) { + final HttpClientBuilder httpClientBuilder = HttpClients + .custom() + .disableRedirectHandling(); + try (CloseableHttpClient httpclient = httpClientBuilder.build()) { + String url = getLoadHost(options) + "/copy/query"; + HttpPost httpPost = new HttpPost(url); + try (CloseableHttpResponse resp = httpclient.execute(httpPost)) { + if (resp.getStatusLine().getStatusCode() == 200) { + Map result = (Map) JSON.parse(EntityUtils.toString(resp.getEntity())); + if (result != null && (int) result.get("code") == 401) { + return false; + } } } + } catch (IOException e) { + e.printStackTrace(); } + return true; + } + + + public static byte[] addRows(Keys options, List rows, int totalBytes) { + if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + Map props = (options.getLoadProps() == null ? new HashMap<>() : options.getLoadProps()); + byte[] lineDelimiter = DelimiterParser.parse((String) props.get("line_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); + } + + public static boolean isNullOrEmpty(Map map) { + return map == null || map.isEmpty(); } } diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java index baacc9d2dd27b9..be098065ecc42f 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterExcetion.java @@ -22,15 +22,20 @@ public class DorisWriterExcetion extends IOException { - private final Map response; + private Map response; private boolean reCreateLabel; - public DorisWriterExcetion ( String message, Map response) { + public DorisWriterExcetion(String message, Map response) { super(message); this.response = response; } - public DorisWriterExcetion ( String message, Map response, boolean reCreateLabel) { + public DorisWriterExcetion(String message, boolean reCreateLabel) { + super(message); + this.reCreateLabel = reCreateLabel; + } + + public DorisWriterExcetion(String message, Map response, boolean reCreateLabel) { super(message); this.response = response; this.reCreateLabel = reCreateLabel; diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java index 4f1569e1762aec..a55ea2faa6c812 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java @@ -37,20 +37,22 @@ public class DorisWriterManager { private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class); - private final DorisStreamLoadObserver visitor; + private final DorisStreamLoadObserver streamLoadObserver; + private final DorisCopyIntoObserver copyIntoObserver; private final Keys options; - private final List buffer = new ArrayList<> (); + private final List buffer = new ArrayList<>(); private int batchCount = 0; private long batchSize = 0; private volatile boolean closed = false; private volatile Exception flushException; - private final LinkedBlockingDeque< WriterTuple > flushQueue; + private final LinkedBlockingDeque flushQueue; private ScheduledExecutorService scheduler; private ScheduledFuture scheduledFuture; - public DorisWriterManager( Keys options) { + public DorisWriterManager(Keys options) { this.options = options; - this.visitor = new DorisStreamLoadObserver (options); + this.streamLoadObserver = new DorisStreamLoadObserver(options); + this.copyIntoObserver = new DorisCopyIntoObserver(options); flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); this.startScheduler(); this.startAsyncFlushing(); @@ -109,7 +111,7 @@ public synchronized void flush(String label, boolean waitUtilDone) throws Except } return; } - flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer))); + flushQueue.put(new WriterTuple(label, batchSize, new ArrayList<>(buffer))); if (waitUtilDone) { // wait the last flush waitAsyncFlushingDone(); @@ -135,18 +137,18 @@ public synchronized void close() { public String createBatchLabel() { StringBuilder sb = new StringBuilder(); - if (! Strings.isNullOrEmpty(options.getLabelPrefix())) { + if (!Strings.isNullOrEmpty(options.getLabelPrefix())) { sb.append(options.getLabelPrefix()); } return sb.append(UUID.randomUUID().toString()) - .toString(); + .toString(); } private void startAsyncFlushing() { // start flush thread - Thread flushThread = new Thread(new Runnable(){ + Thread flushThread = new Thread(new Runnable() { public void run() { - while(true) { + while (true) { try { asyncFlush(); } catch (Exception e) { @@ -162,7 +164,7 @@ public void run() { private void waitAsyncFlushingDone() throws InterruptedException { // wait previous flushings for (int i = 0; i <= options.getFlushQueueLength(); i++) { - flushQueue.put(new WriterTuple ("", 0l, null)); + flushQueue.put(new WriterTuple("", 0l, null)); } checkFlushException(); } @@ -177,7 +179,11 @@ private void asyncFlush() throws Exception { for (int i = 0; i <= options.getMaxRetries(); i++) { try { // flush to Doris with stream load - visitor.streamLoad(flushData); + if (DorisUtil.checkIsStreamLoad(options)) { + streamLoadObserver.streamLoad(flushData); + } else { + copyIntoObserver.streamLoad(flushData); + } LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); startScheduler(); break; @@ -186,7 +192,7 @@ private void asyncFlush() throws Exception { if (i >= options.getMaxRetries()) { throw new IOException(e); } - if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) { + if (e instanceof DorisWriterExcetion && ((DorisWriterExcetion) e).needReCreateLabel()) { String newLabel = createBatchLabel(); LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); flushData.setLabel(newLabel); diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java new file mode 100644 index 00000000000000..10e32f1a3a9379 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPostBuilder.java @@ -0,0 +1,51 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + + +public class HttpPostBuilder { + String url; + Map header; + HttpEntity httpEntity; + public HttpPostBuilder() { + header = new HashMap<>(); + } + + public HttpPostBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPostBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPostBuilder baseAuth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded)); + return this; + } + + public HttpPostBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPost build() { + DorisUtil.checkNotNull(url); + DorisUtil.checkNotNull(httpEntity); + HttpPost put = new HttpPost(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java new file mode 100644 index 00000000000000..546429bccde356 --- /dev/null +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/HttpPutBuilder.java @@ -0,0 +1,65 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.StringEntity; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +public class HttpPutBuilder { + String url; + Map header; + HttpEntity httpEntity; + public HttpPutBuilder() { + header = new HashMap<>(); + } + + public HttpPutBuilder setUrl(String url) { + this.url = url; + return this; + } + + public HttpPutBuilder addFileName(String fileName){ + header.put("fileName", fileName); + return this; + } + + public HttpPutBuilder setEmptyEntity() { + try { + this.httpEntity = new StringEntity(""); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + return this; + } + + public HttpPutBuilder addCommonHeader() { + header.put(HttpHeaders.EXPECT, "100-continue"); + return this; + } + + public HttpPutBuilder baseAuth(String user, String password) { + final String authInfo = user + ":" + password; + byte[] encoded = Base64.encodeBase64(authInfo.getBytes(StandardCharsets.UTF_8)); + header.put(HttpHeaders.AUTHORIZATION, "Basic " + new String(encoded)); + return this; + } + + public HttpPutBuilder setEntity(HttpEntity httpEntity) { + this.httpEntity = httpEntity; + return this; + } + + public HttpPut build() { + DorisUtil.checkNotNull(url); + DorisUtil.checkNotNull(httpEntity); + HttpPut put = new HttpPut(url); + header.forEach(put::setHeader); + put.setEntity(httpEntity); + return put; + } +} diff --git a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java index b520f6b06c9ce8..6bf2ae38a12673 100644 --- a/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java +++ b/extension/DataX/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Keys.java @@ -34,6 +34,7 @@ public class Keys implements Serializable { private static final long DEFAULT_FLUSH_INTERVAL = 30000; private static final String LOAD_PROPS_FORMAT = "format"; + public enum StreamLoadFormat { CSV, JSON; } @@ -53,6 +54,7 @@ public enum StreamLoadFormat { private static final String LOAD_URL = "loadUrl"; private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength"; private static final String LOAD_PROPS = "loadProps"; + private static final String CLUSTER_NAME = "clusterName"; private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; @@ -64,7 +66,7 @@ public enum StreamLoadFormat { private List userSetColumns; private boolean isWildcardColumn; - public Keys ( Configuration options) { + public Keys(Configuration options) { this.options = options; this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) { @@ -106,6 +108,10 @@ public List getLoadUrlList() { return options.getList(LOAD_URL, String.class); } + public String getClusterName() { + return options.getString(CLUSTER_NAME); + } + public List getColumns() { if (isWildcardColumn) { return this.infoSchemaColumns; @@ -163,7 +169,7 @@ public StreamLoadFormat getStreamLoadFormat() { return StreamLoadFormat.CSV; } if (loadProps.containsKey(LOAD_PROPS_FORMAT) - && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) { + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) { return StreamLoadFormat.JSON; } return StreamLoadFormat.CSV; @@ -174,18 +180,18 @@ private void validateStreamLoadUrl() { for (String host : urlList) { if (host.split(":").length < 2) { throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`]."); + "The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`]."); } } } private void validateRequired() { final String[] requiredOptionKeys = new String[]{ - USERNAME, - DATABASE, - TABLE, - COLUMN, - LOAD_URL + USERNAME, + DATABASE, + TABLE, + COLUMN, + LOAD_URL }; for (String optionKey : requiredOptionKeys) { options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);