Skip to content

Commit

Permalink
doris writer compatible with copy into
Browse files Browse the repository at this point in the history
  • Loading branch information
caoliang-web committed Mar 19, 2024
1 parent af4b422 commit cf016d4
Show file tree
Hide file tree
Showing 12 changed files with 598 additions and 98 deletions.
15 changes: 15 additions & 0 deletions extension/DataX/doriswriter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,21 @@ under the License.
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.alibaba.datax.plugin.writer.doriswriter;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class BaseResponse<T> {
private int code;
private String msg;
private T data;
private int count;

public int getCode() {
return code;
}

public String getMsg() {
return msg;
}

public T getData(){
return data;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.alibaba.datax.plugin.writer.doriswriter;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.util.Map;

@JsonIgnoreProperties(ignoreUnknown = true)
public class CopyIntoResp extends BaseResponse{
private String code;
private String exception;

private Map<String,String> result;

public String getDataCode() {
return code;
}

public String getException() {
return exception;
}

public Map<String, String> getResult() {
return result;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.alibaba.datax.plugin.writer.doriswriter;


import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;

public class CopySQLBuilder {
private final static String COPY_SYNC = "copy.async";
private final static String FIELD_DELIMITER_KEY = "column_separator";
private final static String COPY_FIELD_DELIMITER_KEY = "file.column_separator";
private final static String FIELD_DELIMITER_DEFAULT = "\t";
private final static String LINE_DELIMITER_KEY = "line_delimiter";
private final static String COPY_LINE_DELIMITER_KEY = "file.line_delimiter";
private final static String LINE_DELIMITER_DEFAULT = "\n";
private final static String COLUMNS = "columns";
private static final String STRIP_OUT_ARRAY = "strip_outer_array";
private final String fileName;
private final Keys options;
private Map<String, Object> properties;



public CopySQLBuilder(Keys options, String fileName) {
this.options=options;
this.fileName=fileName;
this.properties=options.getLoadProps();
}

public String buildCopySQL(){
StringBuilder sb = new StringBuilder();
sb.append("COPY INTO ")
.append(options.getDatabase() + "." + options.getTable());

if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) {
sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS))
.append(" FROM @~('").append(fileName).append("') ) ")
.append("PROPERTIES (");
} else {
sb.append(" FROM @~('").append(fileName).append("') ")
.append("PROPERTIES (");
}

//copy into must be sync
properties.put(COPY_SYNC,false);
StringJoiner props = new StringJoiner(",");
for(Map.Entry<String,Object> entry : properties.entrySet()){
String key = concatPropPrefix(String.valueOf(entry.getKey()));
String value = "";
switch (key){
case COPY_FIELD_DELIMITER_KEY:
value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT);
break;
case COPY_LINE_DELIMITER_KEY:
value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT);
break;
default:
value = String.valueOf(entry.getValue());
break;
}
if(!key.equals(COLUMNS)){
String prop = String.format("'%s'='%s'", key, value);
props.add(prop);
}
}
sb.append(props).append(" )");
return sb.toString();
}

static final List<String> PREFIX_LIST =
Arrays.asList(FIELD_DELIMITER_KEY, LINE_DELIMITER_KEY, STRIP_OUT_ARRAY);

private String concatPropPrefix(String key) {
if (PREFIX_LIST.contains(key)) {
return "file." + key;
}
if ("format".equalsIgnoreCase(key)) {
return "file.type";
}
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.alibaba.datax.plugin.writer.doriswriter;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public class DorisCopyIntoObserver {
private static final Logger LOG = LoggerFactory.getLogger(DorisCopyIntoObserver.class);

private Keys options;
private long pos;
public static final int SUCCESS = 0;
public static final String FAIL = "1";
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final HttpClientBuilder httpClientBuilder = HttpClients
.custom()
.disableRedirectHandling();
private CloseableHttpClient httpClient;
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
private static final String COMMIT_PATTERN = "%s/copy/query";
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");

public DorisCopyIntoObserver(Keys options) {
this.options = options;
this.httpClient = httpClientBuilder.build();

}

public void streamLoad(WriterTuple data) throws Exception {
String host = DorisUtil.getLoadHost(options);
String loadUrl = String.format(UPLOAD_URL_PATTERN, host);
String uploadAddress = getUploadAddress(loadUrl, data.getLabel());
put(uploadAddress, data.getLabel(), DorisUtil.addRows(options, data.getRows(), data.getBytes().intValue()));
executeCopy(host, data.getLabel());

}

private String getUploadAddress(String loadUrl, String fileName) throws IOException {
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
.addFileName(fileName)
.addCommonHeader()
.setEmptyEntity()
.baseAuth(options.getUsername(), options.getPassword());
CloseableHttpResponse execute = httpClientBuilder.build().execute(putBuilder.build());
int statusCode = execute.getStatusLine().getStatusCode();
String reason = execute.getStatusLine().getReasonPhrase();
if (statusCode == 307) {
Header location = execute.getFirstHeader("location");
String uploadAddress = location.getValue();
LOG.info("redirect to s3:{}", uploadAddress);
return uploadAddress;
} else {
HttpEntity entity = execute.getEntity();
String result = entity == null ? null : EntityUtils.toString(entity);
LOG.error("Failed get the redirected address, status {}, reason {}, response {}", statusCode, reason, result);
throw new RuntimeException("Could not get the redirected address.");
}

}


public void put(String loadUrl, String fileName, byte[] data) throws IOException {
LOG.info(String.format("Executing upload file to: '%s', size: '%s'", loadUrl, data.length));
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder.setUrl(loadUrl)
.addCommonHeader()
.setEntity(new ByteArrayEntity(data));
CloseableHttpResponse response = httpClient.execute(putBuilder.build());
final int statusCode = response.getStatusLine().getStatusCode();
if (statusCode != 200) {
String result = response.getEntity() == null ? null : EntityUtils.toString(response.getEntity());
LOG.error("upload file {} error, response {}", fileName, result);
throw new DorisWriterExcetion("upload file error: " + fileName, true);
}
}


/**
* execute copy into
*/
public void executeCopy(String hostPort, String fileName) throws IOException {
long start = System.currentTimeMillis();
CopySQLBuilder copySQLBuilder = new CopySQLBuilder(options, fileName);
String copySQL = copySQLBuilder.buildCopySQL();
LOG.info("build copy SQL is {}", copySQL);
Map<String, String> params = new HashMap<>();
params.put("sql", copySQL);
if (StringUtils.isNotBlank(options.getClusterName())) {
params.put("cluster", options.getClusterName());
}
HttpPostBuilder postBuilder = new HttpPostBuilder();
postBuilder.setUrl(String.format(COMMIT_PATTERN, hostPort))
.baseAuth(options.getUsername(), options.getPassword())
.setEntity(new StringEntity(OBJECT_MAPPER.writeValueAsString(params)));

CloseableHttpResponse response = httpClient.execute(postBuilder.build());
final int statusCode = response.getStatusLine().getStatusCode();
final String reasonPhrase = response.getStatusLine().getReasonPhrase();
String loadResult = "";
if (statusCode != 200) {
LOG.warn("commit failed with status {} {}, reason {}", statusCode, hostPort, reasonPhrase);
throw new DorisWriterExcetion("commit error with file: " + fileName, true);
} else if (response.getEntity() != null) {
loadResult = EntityUtils.toString(response.getEntity());
boolean success = handleCommitResponse(loadResult);
if (success) {
LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
} else {
LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult);
String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s",
statusCode, reasonPhrase, loadResult, copySQL);
throw new DorisWriterExcetion(copyErrMsg, true);
}
}
}

public boolean handleCommitResponse(String loadResult) throws IOException {
BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse>() {
});
if (baseResponse.getCode() == SUCCESS) {
CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class);
if (FAIL.equals(dataResp.getDataCode())) {
LOG.error("copy into execute failed, reason:{}", loadResult);
return false;
} else {
Map<String, String> result = dataResp.getResult();
if (DorisUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))) {
LOG.error("copy into load failed, reason:{}", loadResult);
return false;
} else {
return true;
}
}
} else {
LOG.error("commit failed, reason:{}", loadResult);
return false;
}
}

public static boolean isCommitted(String msg) {
return COMMITTED_PATTERN.matcher(msg).matches();
}


public void close() throws IOException {
if (null != httpClient) {
try {
httpClient.close();
} catch (IOException e) {
LOG.error("Closing httpClient failed.", e);
throw new RuntimeException("Closing httpClient failed.", e);
}
}
}
}
Loading

0 comments on commit cf016d4

Please sign in to comment.