From 8c7d31dceb75b2db40e4eee6650b32b355cbfed2 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 21 Oct 2024 14:15:45 +0800 Subject: [PATCH] [INLONG-11352][SDK] Add dirty data collection sdk (#11354) Co-authored-by: AloysZhang Co-authored-by: Charles Zhang --- inlong-sdk/dirty-data-sdk/README.md | 53 +++++ inlong-sdk/dirty-data-sdk/pom.xml | 96 ++++++++ .../inlong/sdk/dirtydata/Constants.java | 56 +++++ .../inlong/sdk/dirtydata/DirtyData.java | 149 ++++++++++++ .../sdk/dirtydata/DirtyDataCollector.java | 219 ++++++++++++++++++ .../inlong/sdk/dirtydata/DirtyOptions.java | 93 ++++++++ .../inlong/sdk/dirtydata/DirtySink.java | 57 +++++ .../sdk/dirtydata/PatternReplaceUtils.java | 46 ++++ .../inlong/sdk/dirtydata/sink/Configure.java | 51 ++++ .../dirtydata/sink/InlongSdkDirtySink.java | 154 ++++++++++++ .../sdk/dirtydata/sink/InlongSdkOptions.java | 51 ++++ .../inlong/sdk/dirtydata/sink/LabelUtils.java | 67 ++++++ inlong-sdk/pom.xml | 1 + 13 files changed, 1093 insertions(+) create mode 100644 inlong-sdk/dirty-data-sdk/README.md create mode 100644 inlong-sdk/dirty-data-sdk/pom.xml create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java diff --git a/inlong-sdk/dirty-data-sdk/README.md b/inlong-sdk/dirty-data-sdk/README.md new file mode 100644 index 0000000000..8c3b748f70 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/README.md @@ -0,0 +1,53 @@ +## Overview + +This SDK is used to collect dirty data and store it in a designated storage location. + +## Features + +### Independent SDK + +Independent SDK, not dependent on platform specific libraries (such as Flink), can be used by Agent, +DataProxy, Sort modules. + +### Scalable multiple data storage options + +Dirty data can be stored in various different storage locations (currently only supports sending to +DataProxy). + +## Usage + +### Create DirtyDataCollector object + +```java + Map configMap = new ConcurrentHashMap<>(); + // Enable dirty data collection + configMap.put(DIRTY_COLLECT_ENABLE, "true"); + // If ignore error messages during dirty data collection + configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true"); + // The storage where dirty data will be stored currently only supports 'inlong', + // which means sending the data to DataSroxy + configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong"); + // The labels of dirty side-output, format is 'key1=value1&key2=value2' + configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2"); + // The log tag of dirty side-output, it supports variable replace like '${variable}'. + configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); + Configure config = new Configure(configMap); + + DirtyDataCollector collecter = new DirtyDataCollector(); + collector.open(config); +``` + +### Collect dirty data + +```java + // In fact, the dirty data we encounter is often parsed incorrectly, + // so we use byte [] as the format for dirty data. + byte[] dirtyData = "xxxxxxxxxyyyyyyyyyyyyyy".getBytes(StandardCharsets.UTF_8); + // Here, incorrect types can be marked, such as missing fields, type errors, or unknown errors, etc. + String dirtyType = "Undefined"; + // Details of errors can be passed here. + Throwable error = new Throwable(); + collector.invoke(dirtyData, dirtyType, error); +``` + + | \ No newline at end of file diff --git a/inlong-sdk/dirty-data-sdk/pom.xml b/inlong-sdk/dirty-data-sdk/pom.xml new file mode 100644 index 0000000000..853e873094 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + org.apache.inlong + inlong-sdk + 2.1.0-SNAPSHOT + + dirty-data-sdk + Apache InLong - Dirty Data SDK + + ${project.parent.parent.basedir} + + + + + org.apache.inlong + inlong-common + ${project.version} + + + org.apache.inlong + sdk-common + ${project.version} + + + org.apache.inlong + dataproxy-sdk + ${project.version} + + + + + + + + maven-clean-plugin + 3.1.0 + + + maven-resources-plugin + 3.2.0 + + + maven-compiler-plugin + ${plugin.compile.version} + + + org.apache.maven.plugins + maven-surefire-plugin + ${plugin.surefire.version} + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + ${plugin.deploy.version} + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + + diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java new file mode 100644 index 0000000000..933f81a67b --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +/** + * Connector base option constant + */ +public final class Constants { + + public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable"; + + public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = "dirty.side-output.connector"; + + public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = "dirty.side-output.ignore-errors"; + + /** + * The labels of dirty side-output, format is 'key1=value1&key2=value2' + * it supports variable replace like '${variable}' + * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] + * are currently supported, + * and the support of other variables is determined by the connector. + */ + public static final String DIRTY_SIDE_OUTPUT_LABELS = "dirty.side-output.labels"; + + /** + * The log tag of dirty side-output, it supports variable replace like '${variable}'. + * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, + * and the support of other variables is determined by the connector. + */ + public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = "dirty.side-output.log-tag"; + + /** + * It is used for 'inlong.metric.labels' or 'sink.dirty.labels' + */ + public static final String DELIMITER = "&"; + + /** + * The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels' + */ + public static final String KEY_VALUE_DELIMITER = "="; +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java new file mode 100644 index 0000000000..93caf8b57e --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + +/** + * Dirty data base class, it is a wrapper of dirty data + */ +public class DirtyData { + + private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + + private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; + + private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** + * The identifier of dirty data, it will be used for filename generation of file dirty sink, + * topic generation of mq dirty sink, tablename generation of database, etc, + * and it supports variable replace like '${variable}'. + * There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, + * and the support of other variables is determined by the connector. + */ + private final String identifier; + /** + * The labels of the dirty data, it will be written to store system of dirty + */ + private final String labels; + /** + * The log tag of dirty data, it is only used to format log as follows: + * [${logTag}] ${labels} ${data} + */ + private final String logTag; + /** + * Dirty type + */ + private final String dirtyType; + /** + * Dirty describe message, it is the cause of dirty data + */ + private final String dirtyMessage; + /** + * The real dirty data + */ + private final byte[] data; + + public DirtyData(byte[] data, String identifier, String labels, + String logTag, String dirtyType, String dirtyMessage) { + this.data = data; + this.dirtyType = dirtyType; + this.dirtyMessage = dirtyMessage; + Map paramMap = genParamMap(); + this.labels = PatternReplaceUtils.replace(labels, paramMap); + this.logTag = PatternReplaceUtils.replace(logTag, paramMap); + this.identifier = PatternReplaceUtils.replace(identifier, paramMap); + + } + + public static Builder builder() { + return new Builder(); + } + + private Map genParamMap() { + Map paramMap = new HashMap<>(); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + return paramMap; + } + + public String getLabels() { + return labels; + } + + public String getLogTag() { + return logTag; + } + + public byte[] getData() { + return data; + } + + public String getDirtyType() { + return dirtyType; + } + + public String getIdentifier() { + return identifier; + } + + public static class Builder { + + private String identifier; + private String labels; + private String logTag; + private String dirtyType = "UNDEFINED"; + private String dirtyMessage; + private byte[] data; + + public Builder setDirtyType(String dirtyType) { + this.dirtyType = dirtyType; + return this; + } + + public Builder setLabels(String labels) { + this.labels = labels; + return this; + } + + public Builder setData(byte[] data) { + this.data = data; + return this; + } + + public Builder setLogTag(String logTag) { + this.logTag = logTag; + return this; + } + + public Builder setDirtyMessage(String dirtyMessage) { + this.dirtyMessage = dirtyMessage; + return this; + } + + public DirtyData build() { + return new DirtyData(data, identifier, labels, logTag, dirtyType, dirtyMessage); + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java new file mode 100644 index 0000000000..bd8afffe62 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.DirtyData.Builder; +import org.apache.inlong.sdk.dirtydata.sink.Configure; +import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Dirty sink helper, it helps dirty data sink for {@link DirtySink} + */ +public class DirtyDataCollector implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(DirtyDataCollector.class); + static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); + private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; + private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private DirtyOptions dirtyOptions; + private DirtySink dirtySink; + private Configure config; + + public DirtyDataCollector() { + } + + /** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + */ + public void open(Configure configuration) { + config = configuration; + if (dirtyOptions == null) { + dirtyOptions = DirtyOptions.fromConfig(configuration); + } + dirtyOptions.validate(); + if (!dirtyOptions.isEnableDirtyCollect()) { + return; + } + dirtySink = createDirtySink(dirtyOptions.getSinkType()); + try { + dirtySink.open(configuration); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + private DirtySink createDirtySink(String sinkType) { + DirtySink sink; + try { + switch (sinkType) { + case "inlong": { + sink = new InlongSdkDirtySink(); + sink.open(config); + return sink; + } + default: { + LOGGER.error("invalid dirty sink type {}", sinkType); + return null; + } + } + } catch (Exception e) { + LOGGER.error("create dirty sink error", e); + } + return null; + } + + /** + * Dirty data sink + * + * @param data The dirty data + * @param dirtyType The dirty type + * @param e The cause of dirty data + */ + public void invoke(byte[] data, String dirtyType, Throwable e) { + invoke(data, dirtyType, dirtyOptions.getLabels(), dirtyOptions.getLogTag(), e); + } + + /** + * Dirty data sink + * + * @param data The dirty data + * @param dirtyType The dirty type + * @param label The dirty label + * @param logTag The dirty logTag + * @param e The cause of dirty data + */ + public void invoke(byte[] data, String dirtyType, String label, String logTag, Throwable e) { + if (!dirtyOptions.isEnableDirtyCollect()) { + return; + } + if (dirtySink != null) { + Builder builder = DirtyData.builder(); + try { + builder.setData(data) + .setDirtyType(dirtyType) + .setLabels(label) + .setLogTag(logTag) + .setDirtyMessage(e.getMessage()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.isIgnoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + } + + /** + * replace ${SYSTEM_TIME} with real time + * + * @param pattern + * @return + */ + public static String regexReplace(String pattern, String dirtyType, String dirtyMessage) { + if (pattern == null) { + return null; + } + + Map paramMap = new HashMap<>(6); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = paramMap.get(keyText); + if (replacement == null) { + continue; + } + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } + + /** + * replace ${database} ${table} etc. Used in cases where jsonDynamicFormat.parse() is not usable. + */ + public static String regexReplace(String pattern, String dirtyType, String dirtyMessage, String database, + String table, String schema) { + if (pattern == null) { + return null; + } + + Map paramMap = new HashMap<>(6); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + paramMap.put("source.database", database); + paramMap.put("database", database); + paramMap.put("source.table", table); + paramMap.put("table", table); + if (schema != null) { + paramMap.put("source.schema", schema); + paramMap.put("schema", schema); + } + + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = paramMap.get(keyText); + if (replacement == null) { + continue; + } + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } + + public void setDirtyOptions(DirtyOptions dirtyOptions) { + this.dirtyOptions = dirtyOptions; + } + + public DirtyOptions getDirtyOptions() { + return dirtyOptions; + } + + @Nullable + public DirtySink getDirtySink() { + return dirtySink; + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java new file mode 100644 index 0000000000..d70127adf8 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.sink.Configure; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +import java.io.Serializable; + +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG; + +/** + * Dirty common options + */ +@Data +@Builder +@Getter +public class DirtyOptions implements Serializable { + + private static final long serialVersionUID = 1L; + private static final String DEFAULT_FORMAT = "csv"; + private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; + private final boolean enableDirtyCollect; + private final boolean ignoreSideOutputErrors; + private final String sinkType; + private final String labels; + private final String logTag; + private final String format; + private final String csvFieldDelimiter; + + private DirtyOptions(boolean enableDirtyCollect, boolean ignoreSideOutputErrors, + String sinkType, String labels, String logTag, String format, String csvFieldDelimiter) { + this.enableDirtyCollect = enableDirtyCollect; + this.ignoreSideOutputErrors = ignoreSideOutputErrors; + this.sinkType = sinkType; + this.labels = labels; + this.logTag = logTag; + this.format = format; + this.csvFieldDelimiter = csvFieldDelimiter; + } + + /** + * Get dirty options from {@link Configure} + * + * @param config The config + * @return Dirty options + */ + public static DirtyOptions fromConfig(Configure config) { + boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE, false); + boolean ignoreSinkError = config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true); + String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null); + String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null); + String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); + String format = DEFAULT_FORMAT; + String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; + return new DirtyOptions(enableDirtyCollect, ignoreSinkError, + dirtyConnector, labels, logTag, format, csvFieldDelimiter); + } + + public void validate() { + if (!enableDirtyCollect) { + return; + } + if (sinkType == null || sinkType.trim().length() == 0) { + throw new RuntimeException( + "The option 'dirty.side-output.connector' is not allowed to be empty " + + "when the option 'dirty.ignore' is 'true' " + + "and the option 'dirty.side-output.enable' is 'true'"); + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java new file mode 100644 index 0000000000..68d8cc9110 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.sink.Configure; + +import java.io.Serializable; + +/** + * The dirty sink base inteface + * + */ +public interface DirtySink extends Serializable { + + /** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + * @throws Exception The exception may be thrown when executing + */ + default void open(Configure configuration) throws Exception { + + } + + /** + * Invoke that is used to sink dirty data + * + * @param dirtyData The dirty data that will be written + * @throws Exception The exception may be thrown when executing + */ + void invoke(DirtyData dirtyData) throws Exception; + + /** + * Close for dirty sink + * + * @throws Exception The exception may be thrown when executing + */ + default void close() throws Exception { + + } + +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java new file mode 100644 index 0000000000..20f70c5205 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * The pattern replace utils + */ +public final class PatternReplaceUtils { + + private static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", + Pattern.CASE_INSENSITIVE); + + public static String replace(String pattern, Map params) { + if (pattern == null) { + return null; + } + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = params.getOrDefault(keyText, keyText); + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java new file mode 100644 index 0000000000..e2031915c6 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class Configure { + + private Map data; + + public Configure(Map data) { + this.data = new ConcurrentHashMap<>(); + this.data.putAll(data); + } + + public String get(String key, String defaultValue) { + String value = data.get(key); + if (value != null) { + return value; + } + return defaultValue; + } + + public String get(String key) { + return data.get(key); + } + + public Boolean getBoolean(String key, boolean defaultValue) { + String value = data.get(key); + if (value != null) { + return Boolean.valueOf(value); + } + return defaultValue; + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java new file mode 100644 index 0000000000..bef0fc3110 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; +import org.apache.inlong.sdk.dataproxy.MessageSender; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dirtydata.DirtyData; +import org.apache.inlong.sdk.dirtydata.DirtySink; + +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.StringJoiner; + +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; + +@Slf4j +public class InlongSdkDirtySink implements DirtySink { + + // The inlong manager addr to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = + "dirty.side-output.inlong-sdk.inlong-manager-addr"; + // The inlong manager auth id to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = "dirty.side-output.inlong-sdk.inlong-auth-id"; + // The inlong manager auth id to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = "dirty.side-output.inlong-sdk.inlong-auth-key"; + // The inlong group id of dirty sink + private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP = "dirty.side-output.inlong-sdk.inlong-group-id"; + // The inlong stream id of dirty sink + private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM = "dirty.side-output.inlong-sdk.inlong-stream-id"; + + private InlongSdkOptions options; + private String inlongGroupId; + private String inlongStreamId; + private final SendMessageCallback callback; + + private transient DateTimeFormatter dateTimeFormatter; + private transient MessageSender sender; + + public InlongSdkDirtySink() { + this.callback = new LogCallBack(); + } + + @Override + public void open(Configure configuration) throws Exception { + options = getOptions(configuration); + this.inlongGroupId = options.getInlongGroupId(); + this.inlongStreamId = options.getInlongStreamId(); + dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + // init sender + ProxyClientConfig proxyClientConfig = + new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(), + options.getInlongManagerAuthId(), options.getInlongManagerAuthKey()); + sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + } + + @Override + public void invoke(DirtyData dirtyData) { + try { + Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); + String groupId = Preconditions.checkNotNull(labelMap.get("groupId")); + String streamId = Preconditions.checkNotNull(labelMap.get("streamId")); + + String message = join(groupId, streamId, + dirtyData.getDirtyType(), dirtyData.getLabels(), + new String(dirtyData.getData(), StandardCharsets.UTF_8)); + sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback); + } catch (Throwable t) { + log.error("failed to send dirty message to inlong sdk", t); + } + } + + private InlongSdkOptions getOptions(Configure config) { + return InlongSdkOptions.builder() + .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR)) + .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) + .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) + .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) + .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) + .ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true)) + .enableDirtyLog(true) + .build(); + } + + @Override + public void close() throws Exception { + if (sender != null) { + sender.close(); + } + } + + private String join( + String inlongGroup, + String inlongStream, + String type, + String label, + String formattedData) { + + String now = LocalDateTime.now().format(dateTimeFormatter); + + StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter()); + return joiner.add(inlongGroup + "." + inlongStream) + .add(now) + .add(type) + .add(label) + .add(formattedData).toString(); + } + + class LogCallBack implements SendMessageCallback { + + @Override + public void onMessageAck(SendResult result) { + if (result == SendResult.OK) { + return; + } + log.error("failed to send inlong dirty message, response={}", result); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); + } + } + + @Override + public void onException(Throwable e) { + log.error("failed to send inlong dirty message", e); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed", e); + } + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java new file mode 100644 index 0000000000..b657a97f20 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +import java.io.Serializable; + +@Data +@Builder +@Getter +public class InlongSdkOptions implements Serializable { + + private static final String DEFAULT_FORMAT = "csv"; + + private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; + private static final String DEFAULT_CSV_LINE_DELIMITER = "\n"; + + private static final String DEFAULT_KV_FIELD_DELIMITER = "&"; + private static final String DEFAULT_KV_ENTRY_DELIMITER = "="; + + private String inlongGroupId; + private String inlongStreamId; + private String inlongManagerAddr; + private String inlongManagerAuthKey; + private String inlongManagerAuthId; + private String format = DEFAULT_FORMAT; + private boolean ignoreSideOutputErrors; + private boolean enableDirtyLog; + private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; + private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER; + private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER; + private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER; +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java new file mode 100644 index 0000000000..2ce58b134d --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER; +import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER; + +/** + * The label utils class, is used to parse the labels to a label map + */ +public final class LabelUtils { + + private LabelUtils() { + } + + /** + * Parse the labels to label map + * + * @param labels The labels format by 'key1=value1&key2=value2...' + * @return The label map of labels + */ + public static Map parseLabels(String labels) { + return parseLabels(labels, new LinkedHashMap<>()); + } + + /** + * Parse the labels to label map + * + * @param labels The labels format by 'key1=value1&key2=value2...' + * @return The label map of labels + */ + public static Map parseLabels(String labels, Map labelMap) { + if (labelMap == null) { + labelMap = new LinkedHashMap<>(); + } + if (labels == null || labels.length() == 0) { + return labelMap; + } + String[] labelArray = labels.split(DELIMITER); + for (String label : labelArray) { + int index = label.indexOf(KEY_VALUE_DELIMITER); + if (index < 1 || index == label.length() - 1) { + throw new IllegalArgumentException("The format of labels must be like 'key1=value1&key2=value2...'"); + } + labelMap.put(label.substring(0, index), label.substring(index + 1)); + } + return labelMap; + } +} diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml index 21b99e24eb..27025d06f5 100644 --- a/inlong-sdk/pom.xml +++ b/inlong-sdk/pom.xml @@ -34,6 +34,7 @@ sort-sdk dataproxy-sdk transform-sdk + dirty-data-sdk