From fd924991962190c6537480a441ae9bde8dc386c1 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Mon, 14 Oct 2024 15:04:51 +0800 Subject: [PATCH 01/10] [INLONG-11352][Agent] Add dirty data collection sdk --- inlong-sdk/dirty-data-sdk/pom.xml | 96 ++++++++ .../inlong/sdk/dirtydata/Constants.java | 59 +++++ .../inlong/sdk/dirtydata/DirtyData.java | 149 ++++++++++++ .../sdk/dirtydata/DirtyDataCollector.java | 219 ++++++++++++++++++ .../inlong/sdk/dirtydata/DirtyOptions.java | 93 ++++++++ .../inlong/sdk/dirtydata/DirtySink.java | 57 +++++ .../sdk/dirtydata/PatternReplaceUtils.java | 46 ++++ .../SchemaUpdateExceptionPolicy.java | 47 ++++ .../inlong/sdk/dirtydata/sink/Configure.java | 51 ++++ .../dirtydata/sink/InlongSdkDirtySink.java | 153 ++++++++++++ .../sdk/dirtydata/sink/InlongSdkOptions.java | 51 ++++ .../inlong/sdk/dirtydata/sink/LabelUtils.java | 67 ++++++ inlong-sdk/pom.xml | 1 + 13 files changed, 1089 insertions(+) create mode 100644 inlong-sdk/dirty-data-sdk/pom.xml create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java diff --git a/inlong-sdk/dirty-data-sdk/pom.xml b/inlong-sdk/dirty-data-sdk/pom.xml new file mode 100644 index 0000000000..853e873094 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/pom.xml @@ -0,0 +1,96 @@ + + + + 4.0.0 + + org.apache.inlong + inlong-sdk + 2.1.0-SNAPSHOT + + dirty-data-sdk + Apache InLong - Dirty Data SDK + + ${project.parent.parent.basedir} + + + + + org.apache.inlong + inlong-common + ${project.version} + + + org.apache.inlong + sdk-common + ${project.version} + + + org.apache.inlong + dataproxy-sdk + ${project.version} + + + + + + + + maven-clean-plugin + 3.1.0 + + + maven-resources-plugin + 3.2.0 + + + maven-compiler-plugin + ${plugin.compile.version} + + + org.apache.maven.plugins + maven-surefire-plugin + ${plugin.surefire.version} + + + maven-jar-plugin + 3.0.2 + + + maven-install-plugin + 2.5.2 + + + maven-deploy-plugin + ${plugin.deploy.version} + + + maven-site-plugin + 3.7.1 + + + maven-project-info-reports-plugin + 3.0.0 + + + + + + diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java new file mode 100644 index 0000000000..c9d071fe84 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +/** + * connector base option constant + */ +public final class Constants { + + // The action to deal with schema update in multiple sink. + public static final String SINK_MULTIPLE_SCHEMA_UPDATE_POLICY = "sink.multiple.schema-update.policy"; + + public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable"; + + public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = "dirty.side-output.connector"; + + public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = "dirty.side-output.ignore-errors"; + + /** + * The labels of dirty side-output, format is 'key1=value1&key2=value2' + * it supports variable replace like '${variable}' + * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] + * are currently supported, + * and the support of other variables is determined by the connector. + */ + public static final String DIRTY_SIDE_OUTPUT_LABELS = "dirty.side-output.labels"; + + /** + * The log tag of dirty side-output, it supports variable replace like '${variable}'. + * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, + * and the support of other variables is determined by the connector. + */ + public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = "dirty.side-output.log-tag"; + + /** + * It is used for 'inlong.metric.labels' or 'sink.dirty.labels' + */ + public static final String DELIMITER = "&"; + + /** + * The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels' + */ + public static final String KEY_VALUE_DELIMITER = "="; +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java new file mode 100644 index 0000000000..93caf8b57e --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; + +/** + * Dirty data base class, it is a wrapper of dirty data + */ +public class DirtyData { + + private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + + private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; + + private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + /** + * The identifier of dirty data, it will be used for filename generation of file dirty sink, + * topic generation of mq dirty sink, tablename generation of database, etc, + * and it supports variable replace like '${variable}'. + * There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, + * and the support of other variables is determined by the connector. + */ + private final String identifier; + /** + * The labels of the dirty data, it will be written to store system of dirty + */ + private final String labels; + /** + * The log tag of dirty data, it is only used to format log as follows: + * [${logTag}] ${labels} ${data} + */ + private final String logTag; + /** + * Dirty type + */ + private final String dirtyType; + /** + * Dirty describe message, it is the cause of dirty data + */ + private final String dirtyMessage; + /** + * The real dirty data + */ + private final byte[] data; + + public DirtyData(byte[] data, String identifier, String labels, + String logTag, String dirtyType, String dirtyMessage) { + this.data = data; + this.dirtyType = dirtyType; + this.dirtyMessage = dirtyMessage; + Map paramMap = genParamMap(); + this.labels = PatternReplaceUtils.replace(labels, paramMap); + this.logTag = PatternReplaceUtils.replace(logTag, paramMap); + this.identifier = PatternReplaceUtils.replace(identifier, paramMap); + + } + + public static Builder builder() { + return new Builder(); + } + + private Map genParamMap() { + Map paramMap = new HashMap<>(); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + return paramMap; + } + + public String getLabels() { + return labels; + } + + public String getLogTag() { + return logTag; + } + + public byte[] getData() { + return data; + } + + public String getDirtyType() { + return dirtyType; + } + + public String getIdentifier() { + return identifier; + } + + public static class Builder { + + private String identifier; + private String labels; + private String logTag; + private String dirtyType = "UNDEFINED"; + private String dirtyMessage; + private byte[] data; + + public Builder setDirtyType(String dirtyType) { + this.dirtyType = dirtyType; + return this; + } + + public Builder setLabels(String labels) { + this.labels = labels; + return this; + } + + public Builder setData(byte[] data) { + this.data = data; + return this; + } + + public Builder setLogTag(String logTag) { + this.logTag = logTag; + return this; + } + + public Builder setDirtyMessage(String dirtyMessage) { + this.dirtyMessage = dirtyMessage; + return this; + } + + public DirtyData build() { + return new DirtyData(data, identifier, labels, logTag, dirtyType, dirtyMessage); + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java new file mode 100644 index 0000000000..bd8afffe62 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.DirtyData.Builder; +import org.apache.inlong.sdk.dirtydata.sink.Configure; +import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Dirty sink helper, it helps dirty data sink for {@link DirtySink} + */ +public class DirtyDataCollector implements Serializable { + + private static final long serialVersionUID = 1L; + private static final Logger LOGGER = LoggerFactory.getLogger(DirtyDataCollector.class); + static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); + private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; + private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private DirtyOptions dirtyOptions; + private DirtySink dirtySink; + private Configure config; + + public DirtyDataCollector() { + } + + /** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + */ + public void open(Configure configuration) { + config = configuration; + if (dirtyOptions == null) { + dirtyOptions = DirtyOptions.fromConfig(configuration); + } + dirtyOptions.validate(); + if (!dirtyOptions.isEnableDirtyCollect()) { + return; + } + dirtySink = createDirtySink(dirtyOptions.getSinkType()); + try { + dirtySink.open(configuration); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + private DirtySink createDirtySink(String sinkType) { + DirtySink sink; + try { + switch (sinkType) { + case "inlong": { + sink = new InlongSdkDirtySink(); + sink.open(config); + return sink; + } + default: { + LOGGER.error("invalid dirty sink type {}", sinkType); + return null; + } + } + } catch (Exception e) { + LOGGER.error("create dirty sink error", e); + } + return null; + } + + /** + * Dirty data sink + * + * @param data The dirty data + * @param dirtyType The dirty type + * @param e The cause of dirty data + */ + public void invoke(byte[] data, String dirtyType, Throwable e) { + invoke(data, dirtyType, dirtyOptions.getLabels(), dirtyOptions.getLogTag(), e); + } + + /** + * Dirty data sink + * + * @param data The dirty data + * @param dirtyType The dirty type + * @param label The dirty label + * @param logTag The dirty logTag + * @param e The cause of dirty data + */ + public void invoke(byte[] data, String dirtyType, String label, String logTag, Throwable e) { + if (!dirtyOptions.isEnableDirtyCollect()) { + return; + } + if (dirtySink != null) { + Builder builder = DirtyData.builder(); + try { + builder.setData(data) + .setDirtyType(dirtyType) + .setLabels(label) + .setLogTag(logTag) + .setDirtyMessage(e.getMessage()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.isIgnoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + } + + /** + * replace ${SYSTEM_TIME} with real time + * + * @param pattern + * @return + */ + public static String regexReplace(String pattern, String dirtyType, String dirtyMessage) { + if (pattern == null) { + return null; + } + + Map paramMap = new HashMap<>(6); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = paramMap.get(keyText); + if (replacement == null) { + continue; + } + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } + + /** + * replace ${database} ${table} etc. Used in cases where jsonDynamicFormat.parse() is not usable. + */ + public static String regexReplace(String pattern, String dirtyType, String dirtyMessage, String database, + String table, String schema) { + if (pattern == null) { + return null; + } + + Map paramMap = new HashMap<>(6); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + paramMap.put("source.database", database); + paramMap.put("database", database); + paramMap.put("source.table", table); + paramMap.put("table", table); + if (schema != null) { + paramMap.put("source.schema", schema); + paramMap.put("schema", schema); + } + + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = paramMap.get(keyText); + if (replacement == null) { + continue; + } + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } + + public void setDirtyOptions(DirtyOptions dirtyOptions) { + this.dirtyOptions = dirtyOptions; + } + + public DirtyOptions getDirtyOptions() { + return dirtyOptions; + } + + @Nullable + public DirtySink getDirtySink() { + return dirtySink; + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java new file mode 100644 index 0000000000..d70127adf8 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.sink.Configure; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +import java.io.Serializable; + +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS; +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG; + +/** + * Dirty common options + */ +@Data +@Builder +@Getter +public class DirtyOptions implements Serializable { + + private static final long serialVersionUID = 1L; + private static final String DEFAULT_FORMAT = "csv"; + private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; + private final boolean enableDirtyCollect; + private final boolean ignoreSideOutputErrors; + private final String sinkType; + private final String labels; + private final String logTag; + private final String format; + private final String csvFieldDelimiter; + + private DirtyOptions(boolean enableDirtyCollect, boolean ignoreSideOutputErrors, + String sinkType, String labels, String logTag, String format, String csvFieldDelimiter) { + this.enableDirtyCollect = enableDirtyCollect; + this.ignoreSideOutputErrors = ignoreSideOutputErrors; + this.sinkType = sinkType; + this.labels = labels; + this.logTag = logTag; + this.format = format; + this.csvFieldDelimiter = csvFieldDelimiter; + } + + /** + * Get dirty options from {@link Configure} + * + * @param config The config + * @return Dirty options + */ + public static DirtyOptions fromConfig(Configure config) { + boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE, false); + boolean ignoreSinkError = config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true); + String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null); + String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null); + String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); + String format = DEFAULT_FORMAT; + String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; + return new DirtyOptions(enableDirtyCollect, ignoreSinkError, + dirtyConnector, labels, logTag, format, csvFieldDelimiter); + } + + public void validate() { + if (!enableDirtyCollect) { + return; + } + if (sinkType == null || sinkType.trim().length() == 0) { + throw new RuntimeException( + "The option 'dirty.side-output.connector' is not allowed to be empty " + + "when the option 'dirty.ignore' is 'true' " + + "and the option 'dirty.side-output.enable' is 'true'"); + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java new file mode 100644 index 0000000000..68d8cc9110 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dirtydata.sink.Configure; + +import java.io.Serializable; + +/** + * The dirty sink base inteface + * + */ +public interface DirtySink extends Serializable { + + /** + * Open for dirty sink + * + * @param configuration The configuration that is used for dirty sink + * @throws Exception The exception may be thrown when executing + */ + default void open(Configure configuration) throws Exception { + + } + + /** + * Invoke that is used to sink dirty data + * + * @param dirtyData The dirty data that will be written + * @throws Exception The exception may be thrown when executing + */ + void invoke(DirtyData dirtyData) throws Exception; + + /** + * Close for dirty sink + * + * @throws Exception The exception may be thrown when executing + */ + default void close() throws Exception { + + } + +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java new file mode 100644 index 0000000000..5b831d7026 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * The pattern replace utils + */ +public final class PatternReplaceUtils { + + private static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", + Pattern.CASE_INSENSITIVE); + + public static String replace(String pattern, Map params) { + if (pattern == null) { + return pattern; + } + Matcher matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = params.getOrDefault(keyText, keyText); + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java new file mode 100644 index 0000000000..24745baf6f --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +/** + * Multiple sink scenes will meet different table data. + * Maybe one table data have different schema, once it's schema mismatch with catalog schema, how to handle + * this table data. For example schema mismatch: + * + *
+ * data : {a : int, b : string, c : date}
+ * catalog : {a : string, b : timestamp}
+ * 
+ */ +public enum SchemaUpdateExceptionPolicy { + + TRY_IT_BEST("Try it best to handle schema update, if can not handle it, just ignore it."), + LOG_WITH_IGNORE("Ignore schema update and log it."), + ALERT_WITH_IGNORE("Ignore schema update and alert it."), + STOP_PARTIAL("Only stop abnormal sink table, other tables writes normally."), + THROW_WITH_STOP("Throw exception to stop flink job when meet schema update."); + + private String description; + + SchemaUpdateExceptionPolicy(String description) { + this.description = description; + } + + public String getDescription() { + return description; + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java new file mode 100644 index 0000000000..e2031915c6 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class Configure { + + private Map data; + + public Configure(Map data) { + this.data = new ConcurrentHashMap<>(); + this.data.putAll(data); + } + + public String get(String key, String defaultValue) { + String value = data.get(key); + if (value != null) { + return value; + } + return defaultValue; + } + + public String get(String key) { + return data.get(key); + } + + public Boolean getBoolean(String key, boolean defaultValue) { + String value = data.get(key); + if (value != null) { + return Boolean.valueOf(value); + } + return defaultValue; + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java new file mode 100644 index 0000000000..576d03e53e --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; +import org.apache.inlong.sdk.dataproxy.MessageSender; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dirtydata.DirtyData; +import org.apache.inlong.sdk.dirtydata.DirtySink; + +import com.google.common.base.Preconditions; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; +import java.util.StringJoiner; + +import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; + +@Slf4j +public class InlongSdkDirtySink implements DirtySink { + + // The inlong manager addr to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER = "dirty.side-output.inlong-sdk.inlong-manager-addr"; + // The inlong manager auth id to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = "dirty.side-output.inlong-sdk.inlong-auth-id"; + // The inlong manager auth id to init inlong sdk + private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = "dirty.side-output.inlong-sdk.inlong-auth-key"; + // The inlong group id of dirty sink + private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP = "dirty.side-output.inlong-sdk.inlong-group-id"; + // The inlong stream id of dirty sink + private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM = "dirty.side-output.inlong-sdk.inlong-stream-id"; + + private InlongSdkOptions options; + private String inlongGroupId; + private String inlongStreamId; + private final SendMessageCallback callback; + + private transient DateTimeFormatter dateTimeFormatter; + private transient MessageSender sender; + + public InlongSdkDirtySink() { + this.callback = new LogCallBack(); + } + + @Override + public void open(Configure configuration) throws Exception { + options = getOptions(configuration); + this.inlongGroupId = options.getInlongGroupId(); + this.inlongStreamId = options.getInlongStreamId(); + dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + // init sender + ProxyClientConfig proxyClientConfig = + new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(), + options.getInlongManagerAuthId(), options.getInlongManagerAuthKey()); + sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + } + + @Override + public void invoke(DirtyData dirtyData) { + try { + Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); + String groupId = Preconditions.checkNotNull(labelMap.get("groupId")); + String streamId = Preconditions.checkNotNull(labelMap.get("streamId")); + + String message = join(groupId, streamId, + dirtyData.getDirtyType(), dirtyData.getLabels(), + new String(dirtyData.getData(), StandardCharsets.UTF_8)); + sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback); + } catch (Throwable t) { + log.error("failed to send dirty message to inlong sdk", t); + } + } + + private InlongSdkOptions getOptions(Configure config) { + return InlongSdkOptions.builder() + .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER)) + .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) + .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) + .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) + .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) + .ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true)) + .enableDirtyLog(true) + .build(); + } + + @Override + public void close() throws Exception { + if (sender != null) { + sender.close(); + } + } + + private String join( + String inlongGroup, + String inlongStream, + String type, + String label, + String formattedData) { + + String now = LocalDateTime.now().format(dateTimeFormatter); + + StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter()); + return joiner.add(inlongGroup + "." + inlongStream) + .add(now) + .add(type) + .add(label) + .add(formattedData).toString(); + } + + class LogCallBack implements SendMessageCallback { + + @Override + public void onMessageAck(SendResult result) { + if (result == SendResult.OK) { + return; + } + log.error("failed to send inlong dirty message, response={}", result); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); + } + } + + @Override + public void onException(Throwable e) { + log.error("failed to send inlong dirty message", e); + + if (!options.isIgnoreSideOutputErrors()) { + throw new RuntimeException("writing dirty message to inlong sdk failed", e); + } + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java new file mode 100644 index 0000000000..b657a97f20 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import lombok.Builder; +import lombok.Data; +import lombok.Getter; + +import java.io.Serializable; + +@Data +@Builder +@Getter +public class InlongSdkOptions implements Serializable { + + private static final String DEFAULT_FORMAT = "csv"; + + private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; + private static final String DEFAULT_CSV_LINE_DELIMITER = "\n"; + + private static final String DEFAULT_KV_FIELD_DELIMITER = "&"; + private static final String DEFAULT_KV_ENTRY_DELIMITER = "="; + + private String inlongGroupId; + private String inlongStreamId; + private String inlongManagerAddr; + private String inlongManagerAuthKey; + private String inlongManagerAuthId; + private String format = DEFAULT_FORMAT; + private boolean ignoreSideOutputErrors; + private boolean enableDirtyLog; + private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; + private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER; + private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER; + private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER; +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java new file mode 100644 index 0000000000..2f29c42d18 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata.sink; + +import java.util.LinkedHashMap; +import java.util.Map; + +import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER; +import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER; + +/** + * The lable utils class, it is used for parse the lables to a label map + */ +public final class LabelUtils { + + private LabelUtils() { + } + + /** + * Parse the labels to label map + * + * @param labels The labels format by 'key1=value1&key2=value2...' + * @return The label map of labels + */ + public static Map parseLabels(String labels) { + return parseLabels(labels, new LinkedHashMap<>()); + } + + /** + * Parse the labels to label map + * + * @param labels The labels format by 'key1=value1&key2=value2...' + * @return The label map of labels + */ + public static Map parseLabels(String labels, Map labelMap) { + if (labelMap == null) { + labelMap = new LinkedHashMap<>(); + } + if (labels == null || labels.length() == 0) { + return labelMap; + } + String[] labelArray = labels.split(DELIMITER); + for (String label : labelArray) { + int index = label.indexOf(KEY_VALUE_DELIMITER); + if (index < 1 || index == label.length() - 1) { + throw new IllegalArgumentException("The format of labels must be like 'key1=value1&key2=value2...'"); + } + labelMap.put(label.substring(0, index), label.substring(index + 1)); + } + return labelMap; + } +} diff --git a/inlong-sdk/pom.xml b/inlong-sdk/pom.xml index 21b99e24eb..27025d06f5 100644 --- a/inlong-sdk/pom.xml +++ b/inlong-sdk/pom.xml @@ -34,6 +34,7 @@ sort-sdk dataproxy-sdk transform-sdk + dirty-data-sdk From 8eeb68408a2e18861700f51973d010389120eee6 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Mon, 14 Oct 2024 15:37:35 +0800 Subject: [PATCH 02/10] Update inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java Co-authored-by: AloysZhang --- .../main/java/org/apache/inlong/sdk/dirtydata/Constants.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java index c9d071fe84..58728bc53e 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java @@ -18,7 +18,7 @@ package org.apache.inlong.sdk.dirtydata; /** - * connector base option constant + * Connector base option constant */ public final class Constants { From a45daa6205c917ff9d79800643e8ac4e0e757788 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Wed, 16 Oct 2024 09:44:11 +0800 Subject: [PATCH 03/10] Update inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java Co-authored-by: AloysZhang --- .../java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java index 2f29c42d18..2ce58b134d 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java @@ -24,7 +24,7 @@ import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER; /** - * The lable utils class, it is used for parse the lables to a label map + * The label utils class, is used to parse the labels to a label map */ public final class LabelUtils { From 09512fa5fe76763830bd88ba7f52a06c13a21e98 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Wed, 16 Oct 2024 09:44:55 +0800 Subject: [PATCH 04/10] Update inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java Co-authored-by: AloysZhang --- .../apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java index 576d03e53e..0a7ebc525d 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java @@ -40,7 +40,7 @@ public class InlongSdkDirtySink implements DirtySink { // The inlong manager addr to init inlong sdk - private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER = "dirty.side-output.inlong-sdk.inlong-manager-addr"; + private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = "dirty.side-output.inlong-sdk.inlong-manager-addr"; // The inlong manager auth id to init inlong sdk private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = "dirty.side-output.inlong-sdk.inlong-auth-id"; // The inlong manager auth id to init inlong sdk From e9dba7c429b9920265609b92e290a46519a1ab94 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 16 Oct 2024 09:49:10 +0800 Subject: [PATCH 05/10] [INLONG-11352][SDK] Adjust code format --- .../apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java index 0a7ebc525d..9b58af6224 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java @@ -40,7 +40,8 @@ public class InlongSdkDirtySink implements DirtySink { // The inlong manager addr to init inlong sdk - private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = "dirty.side-output.inlong-sdk.inlong-manager-addr"; + private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = + "dirty.side-output.inlong-sdk.inlong-manager-addr"; // The inlong manager auth id to init inlong sdk private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = "dirty.side-output.inlong-sdk.inlong-auth-id"; // The inlong manager auth id to init inlong sdk From cf9cef45cbf14a28aa169e7dcc6ce570a1b724d6 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 16 Oct 2024 10:17:03 +0800 Subject: [PATCH 06/10] [INLONG-11352][SDK] Delete useless enumeration --- .../sdk/dirtydata/PatternReplaceUtils.java | 2 +- .../SchemaUpdateExceptionPolicy.java | 47 ------------------- .../dirtydata/sink/InlongSdkDirtySink.java | 2 +- 3 files changed, 2 insertions(+), 49 deletions(-) delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java index 5b831d7026..20f70c5205 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java @@ -31,7 +31,7 @@ public final class PatternReplaceUtils { public static String replace(String pattern, Map params) { if (pattern == null) { - return pattern; + return null; } Matcher matcher = REGEX_PATTERN.matcher(pattern); StringBuffer sb = new StringBuffer(); diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java deleted file mode 100644 index 24745baf6f..0000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/SchemaUpdateExceptionPolicy.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -/** - * Multiple sink scenes will meet different table data. - * Maybe one table data have different schema, once it's schema mismatch with catalog schema, how to handle - * this table data. For example schema mismatch: - * - *
- * data : {a : int, b : string, c : date}
- * catalog : {a : string, b : timestamp}
- * 
- */ -public enum SchemaUpdateExceptionPolicy { - - TRY_IT_BEST("Try it best to handle schema update, if can not handle it, just ignore it."), - LOG_WITH_IGNORE("Ignore schema update and log it."), - ALERT_WITH_IGNORE("Ignore schema update and alert it."), - STOP_PARTIAL("Only stop abnormal sink table, other tables writes normally."), - THROW_WITH_STOP("Throw exception to stop flink job when meet schema update."); - - private String description; - - SchemaUpdateExceptionPolicy(String description) { - this.description = description; - } - - public String getDescription() { - return description; - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java index 9b58af6224..bef0fc3110 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java @@ -94,7 +94,7 @@ public void invoke(DirtyData dirtyData) { private InlongSdkOptions getOptions(Configure config) { return InlongSdkOptions.builder() - .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER)) + .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR)) .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) From ceedd91f5cbc3dd98e347687cc79874dd5572308 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Wed, 16 Oct 2024 10:23:26 +0800 Subject: [PATCH 07/10] [INLONG-11352][SDK] Delete useless constants --- .../main/java/org/apache/inlong/sdk/dirtydata/Constants.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java index 58728bc53e..933f81a67b 100644 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java @@ -22,9 +22,6 @@ */ public final class Constants { - // The action to deal with schema update in multiple sink. - public static final String SINK_MULTIPLE_SCHEMA_UPDATE_POLICY = "sink.multiple.schema-update.policy"; - public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable"; public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = "dirty.side-output.connector"; From 81e93af201f84079ea45afcff6def6431489d4b5 Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 17 Oct 2024 15:33:29 +0800 Subject: [PATCH 08/10] [INLONG-11352][SDK] Add readme --- inlong-sdk/dirty-data-sdk/README.md | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 inlong-sdk/dirty-data-sdk/README.md diff --git a/inlong-sdk/dirty-data-sdk/README.md b/inlong-sdk/dirty-data-sdk/README.md new file mode 100644 index 0000000000..e5800db856 --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/README.md @@ -0,0 +1,47 @@ +## Overview + +This SDK is used to collect dirty data and store it in a designated storage location. + +## Features + +### Independent SDK + +Independent SDK, not dependent on platform specific libraries (such as Flink), can be used by Agent, +Data Proxy, Sort modules. + +### Scalable multiple data storage options + +Dirty data can be stored in various different storage locations (currently only supports sending to +DataProxy). + +## Usage + +### Create DirtyDataCollector object + +```java + Map configMap = new ConcurrentHashMap<>(); + configMap.put(DIRTY_COLLECT_ENABLE, "true"); + configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true"); + configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong"); + configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2"); + configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); + Configure config = new Configure(configMap); + + DirtyDataCollector collecter = new DirtyDataCollector(); + collector.open(config); +``` + +### Collect dirty data + +```java + // In fact, the dirty data we encounter is often parsed incorrectly, + // so we use byte [] as the format for dirty data. + byte[] dirtyData = "xxxxxxxxxyyyyyyyyyyyyyy".getBytes(StandardCharsets.UTF_8); + // Here, incorrect types can be marked, such as missing fields, type errors, or unknown errors, etc. + String dirtyType = "Undefined"; + // Details of errors can be passed here. + Throwable error = new Throwable(); + collector.invoke(dirtyData, dirtyType, error); +``` + + | \ No newline at end of file From c0b240c5d0435e1aac4a4a45141014c41b2b58fa Mon Sep 17 00:00:00 2001 From: wenweihuang Date: Thu, 17 Oct 2024 15:47:16 +0800 Subject: [PATCH 09/10] [INLONG-11352][SDK] Add code comments in readme --- inlong-sdk/dirty-data-sdk/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/inlong-sdk/dirty-data-sdk/README.md b/inlong-sdk/dirty-data-sdk/README.md index e5800db856..f70a4b553c 100644 --- a/inlong-sdk/dirty-data-sdk/README.md +++ b/inlong-sdk/dirty-data-sdk/README.md @@ -20,10 +20,16 @@ DataProxy). ```java Map configMap = new ConcurrentHashMap<>(); + // Enable dirty data collection configMap.put(DIRTY_COLLECT_ENABLE, "true"); + // If ignore error messages during dirty data collection configMap.put(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, "true"); + // The storage where dirty data will be stored currently only supports 'inlong', + // which means sending the data to DataSroxy configMap.put(DIRTY_SIDE_OUTPUT_CONNECTOR, "inlong"); + // The labels of dirty side-output, format is 'key1=value1&key2=value2' configMap.put(DIRTY_SIDE_OUTPUT_LABELS, "key1=value1&key2=value2"); + // The log tag of dirty side-output, it supports variable replace like '${variable}'. configMap.put(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); Configure config = new Configure(configMap); From 6ad46e177e111d403a06124f5fc1a22873a2fc1b Mon Sep 17 00:00:00 2001 From: Charles Zhang Date: Mon, 21 Oct 2024 14:07:41 +0800 Subject: [PATCH 10/10] Update inlong-sdk/dirty-data-sdk/README.md --- inlong-sdk/dirty-data-sdk/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-sdk/dirty-data-sdk/README.md b/inlong-sdk/dirty-data-sdk/README.md index f70a4b553c..8c3b748f70 100644 --- a/inlong-sdk/dirty-data-sdk/README.md +++ b/inlong-sdk/dirty-data-sdk/README.md @@ -7,7 +7,7 @@ This SDK is used to collect dirty data and store it in a designated storage loca ### Independent SDK Independent SDK, not dependent on platform specific libraries (such as Flink), can be used by Agent, -Data Proxy, Sort modules. +DataProxy, Sort modules. ### Scalable multiple data storage options