From fd80e0373be8e38e102db973691e81fd1efc41eb Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 28 Oct 2024 20:55:33 +0800 Subject: [PATCH] [INLONG-11426][SDK] Optimize dirty data sdk (#11427) * [INLONG-11426][SDK] Optimize dirty data sdk * [INLONG-11426][SDK] Optimize dirty data sdk --- .../inlong/sdk/dirtydata/Constants.java | 56 ----- .../inlong/sdk/dirtydata/DirtyData.java | 149 ------------ .../sdk/dirtydata/DirtyDataCollector.java | 219 ------------------ .../sdk/dirtydata/DirtyMessageWrapper.java | 63 +++++ .../inlong/sdk/dirtydata/DirtyOptions.java | 93 -------- .../inlong/sdk/dirtydata/DirtySink.java | 57 ----- .../sdk/dirtydata/InlongSdkDirtySink.java | 87 +++++++ .../sdk/dirtydata/PatternReplaceUtils.java | 46 ---- .../inlong/sdk/dirtydata/sink/Configure.java | 51 ---- .../dirtydata/sink/InlongSdkDirtySink.java | 154 ------------ .../sdk/dirtydata/sink/InlongSdkOptions.java | 51 ---- .../inlong/sdk/dirtydata/sink/LabelUtils.java | 67 ------ 12 files changed, 150 insertions(+), 943 deletions(-) delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java create mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java delete mode 100644 inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java deleted file mode 100644 index 933f81a67ba..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/Constants.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -/** - * Connector base option constant - */ -public final class Constants { - - public static final String DIRTY_COLLECT_ENABLE = "dirty.collect.enable"; - - public static final String DIRTY_SIDE_OUTPUT_CONNECTOR = "dirty.side-output.connector"; - - public static final String DIRTY_SIDE_OUTPUT_IGNORE_ERRORS = "dirty.side-output.ignore-errors"; - - /** - * The labels of dirty side-output, format is 'key1=value1&key2=value2' - * it supports variable replace like '${variable}' - * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] - * are currently supported, - * and the support of other variables is determined by the connector. - */ - public static final String DIRTY_SIDE_OUTPUT_LABELS = "dirty.side-output.labels"; - - /** - * The log tag of dirty side-output, it supports variable replace like '${variable}'. - * There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, - * and the support of other variables is determined by the connector. - */ - public static final String DIRTY_SIDE_OUTPUT_LOG_TAG = "dirty.side-output.log-tag"; - - /** - * It is used for 'inlong.metric.labels' or 'sink.dirty.labels' - */ - public static final String DELIMITER = "&"; - - /** - * The delimiter of key and value, it is used for 'inlong.metric.labels' or 'sink.dirty.labels' - */ - public static final String KEY_VALUE_DELIMITER = "="; -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java deleted file mode 100644 index 93caf8b57e5..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyData.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; - -/** - * Dirty data base class, it is a wrapper of dirty data - */ -public class DirtyData { - - private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; - - private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; - private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; - - private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - - /** - * The identifier of dirty data, it will be used for filename generation of file dirty sink, - * topic generation of mq dirty sink, tablename generation of database, etc, - * and it supports variable replace like '${variable}'. - * There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, - * and the support of other variables is determined by the connector. - */ - private final String identifier; - /** - * The labels of the dirty data, it will be written to store system of dirty - */ - private final String labels; - /** - * The log tag of dirty data, it is only used to format log as follows: - * [${logTag}] ${labels} ${data} - */ - private final String logTag; - /** - * Dirty type - */ - private final String dirtyType; - /** - * Dirty describe message, it is the cause of dirty data - */ - private final String dirtyMessage; - /** - * The real dirty data - */ - private final byte[] data; - - public DirtyData(byte[] data, String identifier, String labels, - String logTag, String dirtyType, String dirtyMessage) { - this.data = data; - this.dirtyType = dirtyType; - this.dirtyMessage = dirtyMessage; - Map paramMap = genParamMap(); - this.labels = PatternReplaceUtils.replace(labels, paramMap); - this.logTag = PatternReplaceUtils.replace(logTag, paramMap); - this.identifier = PatternReplaceUtils.replace(identifier, paramMap); - - } - - public static Builder builder() { - return new Builder(); - } - - private Map genParamMap() { - Map paramMap = new HashMap<>(); - paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); - paramMap.put(DIRTY_TYPE_KEY, dirtyType); - paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); - return paramMap; - } - - public String getLabels() { - return labels; - } - - public String getLogTag() { - return logTag; - } - - public byte[] getData() { - return data; - } - - public String getDirtyType() { - return dirtyType; - } - - public String getIdentifier() { - return identifier; - } - - public static class Builder { - - private String identifier; - private String labels; - private String logTag; - private String dirtyType = "UNDEFINED"; - private String dirtyMessage; - private byte[] data; - - public Builder setDirtyType(String dirtyType) { - this.dirtyType = dirtyType; - return this; - } - - public Builder setLabels(String labels) { - this.labels = labels; - return this; - } - - public Builder setData(byte[] data) { - this.data = data; - return this; - } - - public Builder setLogTag(String logTag) { - this.logTag = logTag; - return this; - } - - public Builder setDirtyMessage(String dirtyMessage) { - this.dirtyMessage = dirtyMessage; - return this; - } - - public DirtyData build() { - return new DirtyData(data, identifier, labels, logTag, dirtyType, dirtyMessage); - } - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java deleted file mode 100644 index bd8afffe625..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyDataCollector.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -import org.apache.inlong.sdk.dirtydata.DirtyData.Builder; -import org.apache.inlong.sdk.dirtydata.sink.Configure; -import org.apache.inlong.sdk.dirtydata.sink.InlongSdkDirtySink; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.Serializable; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Dirty sink helper, it helps dirty data sink for {@link DirtySink} - */ -public class DirtyDataCollector implements Serializable { - - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LoggerFactory.getLogger(DirtyDataCollector.class); - static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); - private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; - private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; - private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; - private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private DirtyOptions dirtyOptions; - private DirtySink dirtySink; - private Configure config; - - public DirtyDataCollector() { - } - - /** - * Open for dirty sink - * - * @param configuration The configuration that is used for dirty sink - */ - public void open(Configure configuration) { - config = configuration; - if (dirtyOptions == null) { - dirtyOptions = DirtyOptions.fromConfig(configuration); - } - dirtyOptions.validate(); - if (!dirtyOptions.isEnableDirtyCollect()) { - return; - } - dirtySink = createDirtySink(dirtyOptions.getSinkType()); - try { - dirtySink.open(configuration); - } catch (Exception e) { - throw new RuntimeException(e); - } - - } - - private DirtySink createDirtySink(String sinkType) { - DirtySink sink; - try { - switch (sinkType) { - case "inlong": { - sink = new InlongSdkDirtySink(); - sink.open(config); - return sink; - } - default: { - LOGGER.error("invalid dirty sink type {}", sinkType); - return null; - } - } - } catch (Exception e) { - LOGGER.error("create dirty sink error", e); - } - return null; - } - - /** - * Dirty data sink - * - * @param data The dirty data - * @param dirtyType The dirty type - * @param e The cause of dirty data - */ - public void invoke(byte[] data, String dirtyType, Throwable e) { - invoke(data, dirtyType, dirtyOptions.getLabels(), dirtyOptions.getLogTag(), e); - } - - /** - * Dirty data sink - * - * @param data The dirty data - * @param dirtyType The dirty type - * @param label The dirty label - * @param logTag The dirty logTag - * @param e The cause of dirty data - */ - public void invoke(byte[] data, String dirtyType, String label, String logTag, Throwable e) { - if (!dirtyOptions.isEnableDirtyCollect()) { - return; - } - if (dirtySink != null) { - Builder builder = DirtyData.builder(); - try { - builder.setData(data) - .setDirtyType(dirtyType) - .setLabels(label) - .setLogTag(logTag) - .setDirtyMessage(e.getMessage()); - dirtySink.invoke(builder.build()); - } catch (Exception ex) { - if (!dirtyOptions.isIgnoreSideOutputErrors()) { - throw new RuntimeException(ex); - } - LOGGER.warn("Dirty sink failed", ex); - } - } - } - - /** - * replace ${SYSTEM_TIME} with real time - * - * @param pattern - * @return - */ - public static String regexReplace(String pattern, String dirtyType, String dirtyMessage) { - if (pattern == null) { - return null; - } - - Map paramMap = new HashMap<>(6); - paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); - paramMap.put(DIRTY_TYPE_KEY, dirtyType); - paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); - - Matcher matcher = REGEX_PATTERN.matcher(pattern); - StringBuffer sb = new StringBuffer(); - while (matcher.find()) { - String keyText = matcher.group(1); - String replacement = paramMap.get(keyText); - if (replacement == null) { - continue; - } - matcher.appendReplacement(sb, replacement); - } - matcher.appendTail(sb); - return sb.toString(); - } - - /** - * replace ${database} ${table} etc. Used in cases where jsonDynamicFormat.parse() is not usable. - */ - public static String regexReplace(String pattern, String dirtyType, String dirtyMessage, String database, - String table, String schema) { - if (pattern == null) { - return null; - } - - Map paramMap = new HashMap<>(6); - paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); - paramMap.put(DIRTY_TYPE_KEY, dirtyType); - paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); - paramMap.put("source.database", database); - paramMap.put("database", database); - paramMap.put("source.table", table); - paramMap.put("table", table); - if (schema != null) { - paramMap.put("source.schema", schema); - paramMap.put("schema", schema); - } - - Matcher matcher = REGEX_PATTERN.matcher(pattern); - StringBuffer sb = new StringBuffer(); - while (matcher.find()) { - String keyText = matcher.group(1); - String replacement = paramMap.get(keyText); - if (replacement == null) { - continue; - } - matcher.appendReplacement(sb, replacement); - } - matcher.appendTail(sb); - return sb.toString(); - } - - public void setDirtyOptions(DirtyOptions dirtyOptions) { - this.dirtyOptions = dirtyOptions; - } - - public DirtyOptions getDirtyOptions() { - return dirtyOptions; - } - - @Nullable - public DirtySink getDirtySink() { - return dirtySink; - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java new file mode 100644 index 00000000000..984c456480d --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import lombok.Builder; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Base64; +import java.util.StringJoiner; + +@Builder +public class DirtyMessageWrapper { + + private static DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + private String delimiter; + + private String inlongGroupId; + private String inlongStreamId; + private String dataTime; + private String dataflowId; + private String serverType; + private String dirtyType; + private String ext; + private String data; + private byte[] dataBytes; + + public String format() { + String now = LocalDateTime.now().format(dateTimeFormatter); + StringJoiner joiner = new StringJoiner(delimiter); + String formatData = null; + if (data != null) { + formatData = data; + } else if (dataBytes != null) { + formatData = Base64.getEncoder().encodeToString(dataBytes); + } + + return joiner.add(inlongGroupId) + .add(inlongStreamId) + .add(now) + .add(dataTime) + .add(dataflowId) + .add(serverType) + .add(dirtyType) + .add(ext) + .add(formatData).toString(); + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java deleted file mode 100644 index d70127adf8f..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyOptions.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -import org.apache.inlong.sdk.dirtydata.sink.Configure; - -import lombok.Builder; -import lombok.Data; -import lombok.Getter; - -import java.io.Serializable; - -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_COLLECT_ENABLE; -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_CONNECTOR; -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LABELS; -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_LOG_TAG; - -/** - * Dirty common options - */ -@Data -@Builder -@Getter -public class DirtyOptions implements Serializable { - - private static final long serialVersionUID = 1L; - private static final String DEFAULT_FORMAT = "csv"; - private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; - private final boolean enableDirtyCollect; - private final boolean ignoreSideOutputErrors; - private final String sinkType; - private final String labels; - private final String logTag; - private final String format; - private final String csvFieldDelimiter; - - private DirtyOptions(boolean enableDirtyCollect, boolean ignoreSideOutputErrors, - String sinkType, String labels, String logTag, String format, String csvFieldDelimiter) { - this.enableDirtyCollect = enableDirtyCollect; - this.ignoreSideOutputErrors = ignoreSideOutputErrors; - this.sinkType = sinkType; - this.labels = labels; - this.logTag = logTag; - this.format = format; - this.csvFieldDelimiter = csvFieldDelimiter; - } - - /** - * Get dirty options from {@link Configure} - * - * @param config The config - * @return Dirty options - */ - public static DirtyOptions fromConfig(Configure config) { - boolean enableDirtyCollect = config.getBoolean(DIRTY_COLLECT_ENABLE, false); - boolean ignoreSinkError = config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true); - String dirtyConnector = config.get(DIRTY_SIDE_OUTPUT_CONNECTOR, null); - String labels = config.get(DIRTY_SIDE_OUTPUT_LABELS, null); - String logTag = config.get(DIRTY_SIDE_OUTPUT_LOG_TAG, "DirtyData"); - String format = DEFAULT_FORMAT; - String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; - return new DirtyOptions(enableDirtyCollect, ignoreSinkError, - dirtyConnector, labels, logTag, format, csvFieldDelimiter); - } - - public void validate() { - if (!enableDirtyCollect) { - return; - } - if (sinkType == null || sinkType.trim().length() == 0) { - throw new RuntimeException( - "The option 'dirty.side-output.connector' is not allowed to be empty " - + "when the option 'dirty.ignore' is 'true' " - + "and the option 'dirty.side-output.enable' is 'true'"); - } - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java deleted file mode 100644 index 68d8cc9110a..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtySink.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -import org.apache.inlong.sdk.dirtydata.sink.Configure; - -import java.io.Serializable; - -/** - * The dirty sink base inteface - * - */ -public interface DirtySink extends Serializable { - - /** - * Open for dirty sink - * - * @param configuration The configuration that is used for dirty sink - * @throws Exception The exception may be thrown when executing - */ - default void open(Configure configuration) throws Exception { - - } - - /** - * Invoke that is used to sink dirty data - * - * @param dirtyData The dirty data that will be written - * @throws Exception The exception may be thrown when executing - */ - void invoke(DirtyData dirtyData) throws Exception; - - /** - * Close for dirty sink - * - * @throws Exception The exception may be thrown when executing - */ - default void close() throws Exception { - - } - -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java new file mode 100644 index 00000000000..2240ebdb6cf --- /dev/null +++ b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/InlongSdkDirtySink.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.dirtydata; + +import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; +import org.apache.inlong.sdk.dataproxy.MessageSender; +import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; +import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; +import org.apache.inlong.sdk.dataproxy.common.SendResult; +import org.apache.inlong.sdk.dataproxy.network.ProxysdkException; + +import com.google.common.base.Preconditions; +import lombok.Builder; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +@Builder +public class InlongSdkDirtySink { + + private String inlongGroupId; + private String inlongStreamId; + private String inlongManagerAddr; + private String authId; + private String authKey; + private boolean ignoreErrors; + + private SendMessageCallback callback; + private MessageSender sender; + + public void init() throws Exception { + Preconditions.checkNotNull(inlongGroupId, "inlongGroupId cannot be null"); + Preconditions.checkNotNull(inlongStreamId, "inlongStreamId cannot be null"); + Preconditions.checkNotNull(inlongManagerAddr, "inlongManagerAddr cannot be null"); + Preconditions.checkNotNull(authId, "authId cannot be null"); + Preconditions.checkNotNull(authKey, "authKey cannot be null"); + + this.callback = new LogCallBack(); + ProxyClientConfig proxyClientConfig = + new ProxyClientConfig(inlongManagerAddr, inlongGroupId, authId, authKey); + this.sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); + log.info("init InlongSdkDirtySink successfully, target group={}, stream={}", inlongGroupId, inlongStreamId); + } + + public void sendDirtyMessage(DirtyMessageWrapper messageWrapper) + throws ProxysdkException { + sender.asyncSendMessage(inlongGroupId, inlongStreamId, messageWrapper.format().getBytes(), callback); + } + + class LogCallBack implements SendMessageCallback { + + @Override + public void onMessageAck(SendResult result) { + if (result == SendResult.OK) { + return; + } + log.error("failed to send inlong dirty message, response={}", result); + + if (!ignoreErrors) { + throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); + } + } + + @Override + public void onException(Throwable e) { + log.error("failed to send inlong dirty message", e); + + if (!ignoreErrors) { + throw new RuntimeException("writing dirty message to inlong sdk failed", e); + } + } + } +} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java deleted file mode 100644 index 20f70c52052..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/PatternReplaceUtils.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata; - -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * The pattern replace utils - */ -public final class PatternReplaceUtils { - - private static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", - Pattern.CASE_INSENSITIVE); - - public static String replace(String pattern, Map params) { - if (pattern == null) { - return null; - } - Matcher matcher = REGEX_PATTERN.matcher(pattern); - StringBuffer sb = new StringBuffer(); - while (matcher.find()) { - String keyText = matcher.group(1); - String replacement = params.getOrDefault(keyText, keyText); - matcher.appendReplacement(sb, replacement); - } - matcher.appendTail(sb); - return sb.toString(); - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java deleted file mode 100644 index e2031915c61..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/Configure.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata.sink; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class Configure { - - private Map data; - - public Configure(Map data) { - this.data = new ConcurrentHashMap<>(); - this.data.putAll(data); - } - - public String get(String key, String defaultValue) { - String value = data.get(key); - if (value != null) { - return value; - } - return defaultValue; - } - - public String get(String key) { - return data.get(key); - } - - public Boolean getBoolean(String key, boolean defaultValue) { - String value = data.get(key); - if (value != null) { - return Boolean.valueOf(value); - } - return defaultValue; - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java deleted file mode 100644 index bef0fc31101..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkDirtySink.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata.sink; - -import org.apache.inlong.sdk.dataproxy.DefaultMessageSender; -import org.apache.inlong.sdk.dataproxy.MessageSender; -import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; -import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; -import org.apache.inlong.sdk.dataproxy.common.SendResult; -import org.apache.inlong.sdk.dirtydata.DirtyData; -import org.apache.inlong.sdk.dirtydata.DirtySink; - -import com.google.common.base.Preconditions; -import lombok.extern.slf4j.Slf4j; - -import java.nio.charset.StandardCharsets; -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Map; -import java.util.StringJoiner; - -import static org.apache.inlong.sdk.dirtydata.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS; - -@Slf4j -public class InlongSdkDirtySink implements DirtySink { - - // The inlong manager addr to init inlong sdk - private static final String DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR = - "dirty.side-output.inlong-sdk.inlong-manager-addr"; - // The inlong manager auth id to init inlong sdk - private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID = "dirty.side-output.inlong-sdk.inlong-auth-id"; - // The inlong manager auth id to init inlong sdk - private static final String DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY = "dirty.side-output.inlong-sdk.inlong-auth-key"; - // The inlong group id of dirty sink - private static final String DIRTY_SIDE_OUTPUT_INLONG_GROUP = "dirty.side-output.inlong-sdk.inlong-group-id"; - // The inlong stream id of dirty sink - private static final String DIRTY_SIDE_OUTPUT_INLONG_STREAM = "dirty.side-output.inlong-sdk.inlong-stream-id"; - - private InlongSdkOptions options; - private String inlongGroupId; - private String inlongStreamId; - private final SendMessageCallback callback; - - private transient DateTimeFormatter dateTimeFormatter; - private transient MessageSender sender; - - public InlongSdkDirtySink() { - this.callback = new LogCallBack(); - } - - @Override - public void open(Configure configuration) throws Exception { - options = getOptions(configuration); - this.inlongGroupId = options.getInlongGroupId(); - this.inlongStreamId = options.getInlongStreamId(); - dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - // init sender - ProxyClientConfig proxyClientConfig = - new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(), - options.getInlongManagerAuthId(), options.getInlongManagerAuthKey()); - sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig); - } - - @Override - public void invoke(DirtyData dirtyData) { - try { - Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); - String groupId = Preconditions.checkNotNull(labelMap.get("groupId")); - String streamId = Preconditions.checkNotNull(labelMap.get("streamId")); - - String message = join(groupId, streamId, - dirtyData.getDirtyType(), dirtyData.getLabels(), - new String(dirtyData.getData(), StandardCharsets.UTF_8)); - sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback); - } catch (Throwable t) { - log.error("failed to send dirty message to inlong sdk", t); - } - } - - private InlongSdkOptions getOptions(Configure config) { - return InlongSdkOptions.builder() - .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER_ADDR)) - .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP)) - .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM)) - .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY)) - .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID)) - .ignoreSideOutputErrors(config.getBoolean(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS, true)) - .enableDirtyLog(true) - .build(); - } - - @Override - public void close() throws Exception { - if (sender != null) { - sender.close(); - } - } - - private String join( - String inlongGroup, - String inlongStream, - String type, - String label, - String formattedData) { - - String now = LocalDateTime.now().format(dateTimeFormatter); - - StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter()); - return joiner.add(inlongGroup + "." + inlongStream) - .add(now) - .add(type) - .add(label) - .add(formattedData).toString(); - } - - class LogCallBack implements SendMessageCallback { - - @Override - public void onMessageAck(SendResult result) { - if (result == SendResult.OK) { - return; - } - log.error("failed to send inlong dirty message, response={}", result); - - if (!options.isIgnoreSideOutputErrors()) { - throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result); - } - } - - @Override - public void onException(Throwable e) { - log.error("failed to send inlong dirty message", e); - - if (!options.isIgnoreSideOutputErrors()) { - throw new RuntimeException("writing dirty message to inlong sdk failed", e); - } - } - } -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java deleted file mode 100644 index b657a97f20d..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/InlongSdkOptions.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata.sink; - -import lombok.Builder; -import lombok.Data; -import lombok.Getter; - -import java.io.Serializable; - -@Data -@Builder -@Getter -public class InlongSdkOptions implements Serializable { - - private static final String DEFAULT_FORMAT = "csv"; - - private static final String DEFAULT_CSV_FIELD_DELIMITER = ","; - private static final String DEFAULT_CSV_LINE_DELIMITER = "\n"; - - private static final String DEFAULT_KV_FIELD_DELIMITER = "&"; - private static final String DEFAULT_KV_ENTRY_DELIMITER = "="; - - private String inlongGroupId; - private String inlongStreamId; - private String inlongManagerAddr; - private String inlongManagerAuthKey; - private String inlongManagerAuthId; - private String format = DEFAULT_FORMAT; - private boolean ignoreSideOutputErrors; - private boolean enableDirtyLog; - private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER; - private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER; - private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER; - private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER; -} diff --git a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java deleted file mode 100644 index 2ce58b134d4..00000000000 --- a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/sink/LabelUtils.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sdk.dirtydata.sink; - -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.apache.inlong.sdk.dirtydata.Constants.DELIMITER; -import static org.apache.inlong.sdk.dirtydata.Constants.KEY_VALUE_DELIMITER; - -/** - * The label utils class, is used to parse the labels to a label map - */ -public final class LabelUtils { - - private LabelUtils() { - } - - /** - * Parse the labels to label map - * - * @param labels The labels format by 'key1=value1&key2=value2...' - * @return The label map of labels - */ - public static Map parseLabels(String labels) { - return parseLabels(labels, new LinkedHashMap<>()); - } - - /** - * Parse the labels to label map - * - * @param labels The labels format by 'key1=value1&key2=value2...' - * @return The label map of labels - */ - public static Map parseLabels(String labels, Map labelMap) { - if (labelMap == null) { - labelMap = new LinkedHashMap<>(); - } - if (labels == null || labels.length() == 0) { - return labelMap; - } - String[] labelArray = labels.split(DELIMITER); - for (String label : labelArray) { - int index = label.indexOf(KEY_VALUE_DELIMITER); - if (index < 1 || index == label.length() - 1) { - throw new IllegalArgumentException("The format of labels must be like 'key1=value1&key2=value2...'"); - } - labelMap.put(label.substring(0, index), label.substring(index + 1)); - } - return labelMap; - } -}