diff --git a/inlong-sort/sort-flink/base/pom.xml b/inlong-sort/sort-flink/base/pom.xml
index a98220f178e..12b10fe19d7 100644
--- a/inlong-sort/sort-flink/base/pom.xml
+++ b/inlong-sort/sort-flink/base/pom.xml
@@ -56,6 +56,13 @@
${project.version}
provided
+
+
+ org.apache.inlong
+ dataproxy-sdk
+ ${project.version}
+ compile
+
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
new file mode 100644
index 00000000000..27f6fea73ff
--- /dev/null
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
+import org.apache.inlong.sdk.dataproxy.MessageSender;
+import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
+import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
+import org.apache.inlong.sdk.dataproxy.common.SendResult;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
+import org.apache.inlong.sort.base.util.LabelUtils;
+
+import com.google.common.base.Preconditions;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Map;
+import java.util.StringJoiner;
+
+@Slf4j
+public class InlongSdkDirtySink implements DirtySink {
+
+ private final InlongSdkOptions options;
+ private final DataType physicalRowDataType;
+ private final String inlongGroupId;
+ private final String inlongStreamId;
+ private final SendMessageCallback callback;
+
+ private transient DateTimeFormatter dateTimeFormatter;
+ private transient RowData.FieldGetter[] fieldGetters;
+ private transient RowDataToJsonConverters.RowDataToJsonConverter converter;
+ private transient MessageSender sender;
+
+ public InlongSdkDirtySink(InlongSdkOptions options, DataType physicalRowDataType) {
+ this.options = options;
+ this.physicalRowDataType = physicalRowDataType;
+ this.inlongGroupId = options.getInlongGroupId();
+ this.inlongStreamId = options.getInlongStreamId();
+ this.callback = new LogCallBack();
+ }
+
+ @Override
+ public void invoke(DirtyData dirtyData) throws Exception {
+ try {
+ Map labelMap = LabelUtils.parseLabels(dirtyData.getLabels());
+ String groupId = Preconditions.checkNotNull(labelMap.get("groupId"));
+ String streamId = Preconditions.checkNotNull(labelMap.get("streamId"));
+
+ String message = join(groupId, streamId,
+ dirtyData.getDirtyType(), dirtyData.getLabels(), formatData(dirtyData, labelMap));
+ sender.asyncSendMessage(inlongGroupId, inlongStreamId, message.getBytes(), callback);
+ } catch (Throwable t) {
+ log.error("failed to send dirty message to inlong sdk", t);
+ }
+ }
+
+ @Override
+ public void open(Configuration configuration) throws Exception {
+ converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
+ fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
+ dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+ // init sender
+ ProxyClientConfig proxyClientConfig =
+ new ProxyClientConfig(options.getInlongManagerAddr(), options.getInlongGroupId(),
+ options.getInlongManagerAuthId(), options.getInlongManagerAuthKey());
+ sender = DefaultMessageSender.generateSenderByClusterId(proxyClientConfig);
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (sender != null) {
+ sender.close();
+ }
+ }
+
+ private String join(
+ String inlongGroup,
+ String inlongStream,
+ DirtyType type,
+ String label,
+ String formattedData) {
+
+ String now = LocalDateTime.now().format(dateTimeFormatter);
+
+ StringJoiner joiner = new StringJoiner(options.getCsvFieldDelimiter());
+ return joiner.add(inlongGroup + "." + inlongStream)
+ .add(now)
+ .add(type.name())
+ .add(label)
+ .add(formattedData).toString();
+ }
+
+ private String formatData(DirtyData dirtyData, Map labels) throws JsonProcessingException {
+ String value;
+ T data = dirtyData.getData();
+ if (data instanceof RowData) {
+ value = formatData((RowData) data, dirtyData.getRowType(), labels);
+ } else {
+ value = data.toString();
+ }
+ return value;
+ }
+
+ private String formatData(RowData data, LogicalType rowType,
+ Map labels) throws JsonProcessingException {
+ String value;
+ switch (options.getFormat()) {
+ case "csv":
+ RowData.FieldGetter[] getters = fieldGetters;
+ if (rowType != null) {
+ getters = FormatUtils.parseFieldGetters(rowType);
+ }
+ value = FormatUtils.csvFormat(data, getters, labels, options.getCsvFieldDelimiter());
+ break;
+ case "json":
+ RowDataToJsonConverters.RowDataToJsonConverter jsonConverter = converter;
+ if (rowType != null) {
+ jsonConverter = FormatUtils.parseRowDataToJsonConverter(rowType);
+ }
+ value = FormatUtils.jsonFormat(data, jsonConverter, labels);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported format for: %s", options.getFormat()));
+ }
+ return value;
+ }
+
+ class LogCallBack implements SendMessageCallback {
+
+ @Override
+ public void onMessageAck(SendResult result) {
+ if (result == SendResult.OK) {
+ return;
+ }
+ log.error("failed to send inlong dirty message, response={}", result);
+
+ if (!options.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException("writing dirty message to inlong sdk failed, response=" + result);
+ }
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ log.error("failed to send inlong dirty message", e);
+
+ if (!options.isIgnoreSideOutputErrors()) {
+ throw new RuntimeException("writing dirty message to inlong sdk failed", e);
+ }
+ }
+ }
+}
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
new file mode 100644
index 00000000000..000836b6675
--- /dev/null
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySinkFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.inlong.sort.base.Constants.DIRTY_IDENTIFIER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_FORMAT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_IGNORE_ERRORS;
+import static org.apache.inlong.sort.base.Constants.DIRTY_SIDE_OUTPUT_LOG_ENABLE;
+
+public class InlongSdkDirtySinkFactory implements DirtySinkFactory {
+
+ private static final String IDENTIFIER = "inlong-sdk";
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_MANAGER =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-manager-addr")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The inlong manager addr to init inlong sdk");
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The inlong manager auth id to init inlong sdk");
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-auth-key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The inlong manager auth id to init inlong sdk");
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_GROUP =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-group-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The inlong group id of dirty sink");
+
+ private static final ConfigOption DIRTY_SIDE_OUTPUT_INLONG_STREAM =
+ ConfigOptions.key("dirty.side-output.inlong-sdk.inlong-stream-id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The inlong stream id of dirty sink");
+
+ @Override
+ public DirtySink createDirtySink(DynamicTableFactory.Context context) {
+ ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions());
+ FactoryUtil.validateFactoryOptions(this, config);
+ validate(config);
+ return new InlongSdkDirtySink<>(getOptions(config),
+ context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
+ }
+
+ private void validate(ReadableConfig config) {
+ String identifier = config.getOptional(DIRTY_IDENTIFIER).orElse(null);
+ if (identifier == null || identifier.trim().isEmpty()) {
+ throw new ValidationException(
+ "The option 'dirty.identifier' is not allowed to be empty.");
+ }
+ }
+
+ private InlongSdkOptions getOptions(ReadableConfig config) {
+ return InlongSdkOptions.builder()
+ .inlongManagerAddr(config.get(DIRTY_SIDE_OUTPUT_INLONG_MANAGER))
+ .inlongGroupId(config.get(DIRTY_SIDE_OUTPUT_INLONG_GROUP))
+ .inlongStreamId(config.get(DIRTY_SIDE_OUTPUT_INLONG_STREAM))
+ .inlongManagerAuthKey(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY))
+ .inlongManagerAuthId(config.get(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID))
+ .ignoreSideOutputErrors(config.getOptional(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS).orElse(true))
+ .enableDirtyLog(true)
+ .build();
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Set> requiredOptions() {
+ final Set> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_MANAGER);
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_ID);
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_AUTH_KEY);
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_GROUP);
+ options.add(DIRTY_SIDE_OUTPUT_INLONG_STREAM);
+ return options;
+ }
+
+ @Override
+ public Set> optionalOptions() {
+ final Set> options = new HashSet<>();
+ options.add(DIRTY_SIDE_OUTPUT_FORMAT);
+ options.add(DIRTY_SIDE_OUTPUT_IGNORE_ERRORS);
+ options.add(DIRTY_SIDE_OUTPUT_LOG_ENABLE);
+ return options;
+ }
+}
diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
new file mode 100644
index 00000000000..0692d78580b
--- /dev/null
+++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.sink.sdk;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Getter;
+
+import java.io.Serializable;
+
+@Data
+@Builder
+@Getter
+public class InlongSdkOptions implements Serializable {
+
+ private static final String DEFAULT_FORMAT = "csv";
+
+ private static final String DEFAULT_CSV_FIELD_DELIMITER = ",";
+ private static final String DEFAULT_CSV_LINE_DELIMITER = "\n";
+
+ private static final String DEFAULT_KV_FIELD_DELIMITER = "&";
+ private static final String DEFAULT_KV_ENTRY_DELIMITER = "=";
+
+ private String inlongGroupId;
+ private String inlongStreamId;
+ private String inlongManagerAddr;
+ private String inlongManagerAuthKey;
+ private String inlongManagerAuthId;
+ private String format = DEFAULT_FORMAT;
+ private boolean ignoreSideOutputErrors;
+ private boolean enableDirtyLog;
+ private String csvFieldDelimiter = DEFAULT_CSV_FIELD_DELIMITER;
+ private String csvLineDelimiter = DEFAULT_CSV_LINE_DELIMITER;
+ private String kvFieldDelimiter = DEFAULT_KV_FIELD_DELIMITER;
+ private String kvEntryDelimiter = DEFAULT_KV_ENTRY_DELIMITER;
+}
diff --git a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index a83e4d025c3..f4effff2534 100644
--- a/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/inlong-sort/sort-flink/base/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,4 +14,5 @@
# limitations under the License.
org.apache.inlong.sort.base.dirty.sink.log.LogDirtySinkFactory
-org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
\ No newline at end of file
+org.apache.inlong.sort.base.dirty.sink.s3.S3DirtySinkFactory
+org.apache.inlong.sort.base.dirty.sink.sdk.InlongSdkDirtySinkFactory
\ No newline at end of file