diff --git a/inlong-tubemq/tubemq-connectors/pom.xml b/inlong-tubemq/tubemq-connectors/pom.xml
index e9c4b1b1472..1fc71477c34 100644
--- a/inlong-tubemq/tubemq-connectors/pom.xml
+++ b/inlong-tubemq/tubemq-connectors/pom.xml
@@ -29,7 +29,6 @@
Apache InLong - TubeMQ Connectors
- tubemq-connector-flink
tubemq-connector-flume
tubemq-connector-spark
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml
deleted file mode 100644
index 47bb6ccce70..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml
+++ /dev/null
@@ -1,106 +0,0 @@
-
-
-
- 4.0.0
-
- org.apache.inlong
- tubemq-connectors
- 2.1.0-SNAPSHOT
-
-
- tubemq-connector-flink
- Apache InLong - TubeMQ Connectors-flink
-
-
- ${project.parent.parent.parent.basedir}
- 1.13.5
- 2.11
-
-
-
-
- org.apache.inlong
- tubemq-client
- ${project.version}
-
-
-
- org.apache.inlong
- tubemq-core
- ${project.version}
-
-
-
- org.apache.flink
- flink-core
- provided
-
-
-
- org.apache.flink
- flink-runtime_${scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-streaming-java_${scala.binary.version}
- ${flink.version}
- provided
-
-
-
- org.apache.flink
- flink-table-common
- ${flink.version}
- provided
- true
-
-
-
- org.apache.flink
- flink-table-api-java-bridge_${scala.binary.version}
- ${flink.version}
- provided
- true
-
-
-
- org.slf4j
- slf4j-api
- provided
-
-
-
- org.apache.flink
- flink-table-common
- ${flink.version}
- test-jar
- test
-
-
-
- junit
- junit
- ${junit.version}
- test
-
-
-
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
deleted file mode 100644
index 2e34c133300..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.ConnectorDescriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP;
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER;
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES;
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS;
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC;
-import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The {@link ConnectorDescriptor} for tubemq sources and sinks.
- */
-public class Tubemq extends ConnectorDescriptor {
-
- @Nullable
- private boolean consumerRole = true;
-
- @Nullable
- private String topic;
-
- @Nullable
- private String master;
-
- @Nullable
- private String group;
-
- @Nullable
- private String streamIds;
-
- @Nonnull
- private Map properties;
-
- public Tubemq() {
- super(CONNECTOR_TYPE_VALUE_TUBEMQ, 1, true);
-
- this.properties = new HashMap<>();
- }
-
- /**
- * Sets the tubemq topic to be used.
- *
- * @param topic The topic name.
- */
- public Tubemq topic(String topic) {
- checkNotNull(topic);
-
- this.topic = topic;
- return this;
- }
-
- /**
- * Sets the client role to be used.
- *
- * @param isConsumer The client role if consumer.
- */
- public Tubemq asConsumer(boolean isConsumer) {
- this.consumerRole = isConsumer;
- return this;
- }
-
- /**
- * Sets the address of tubemq master to connect.
- *
- * @param master The address of tubemq master.
- */
- public Tubemq master(String master) {
- checkNotNull(master);
-
- this.master = master;
- return this;
- }
-
- /**
- * Sets the tubemq (consumer or producer) group to be used.
- *
- * @param group The group name.
- */
- public Tubemq group(String group) {
- checkNotNull(group);
-
- this.group = group;
- return this;
- }
-
- /**
- * The tubemq consumers use these streamIds to filter records reading from server.
- *
- * @param streamIds The filter for consume record from server.
- */
- public Tubemq streamIds(String streamIds) {
-
- this.streamIds = streamIds;
- return this;
- }
-
- /**
- * Sets the tubemq property.
- *
- * @param key The key of the property.
- * @param value The value of the property.
- */
- public Tubemq property(String key, String value) {
- checkNotNull(key);
- checkNotNull(value);
-
- properties.put(key, value);
- return this;
- }
-
- @Override
- protected Map toConnectorProperties() {
- DescriptorProperties descriptorProperties = new DescriptorProperties();
-
- if (topic != null) {
- descriptorProperties.putString(CONNECTOR_TOPIC, topic);
- }
-
- if (master != null) {
- descriptorProperties.putString(CONNECTOR_MASTER, master);
- }
- if (consumerRole) {
- if (group != null) {
- descriptorProperties.putString(CONNECTOR_GROUP, group);
- }
-
- if (streamIds != null) {
- descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds);
- }
- }
-
- descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, properties);
-
- return descriptorProperties.asMap();
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
deleted file mode 100644
index e94fbf86c5f..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ConfigOptions;
-
-/**
- * The configuration options for tubemq sources and sink.
- */
-public class TubemqOptions {
-
- public static final ConfigOption SESSION_KEY =
- ConfigOptions.key("session.key")
- .noDefaultValue()
- .withDescription("The session key for this consumer group at startup.");
-
- public static final ConfigOption STREAM_ID =
- ConfigOptions.key("topic.streamId")
- .noDefaultValue()
- .withDescription("The streamId owned this topic.");
-
- public static final ConfigOption MAX_RETRIES =
- ConfigOptions.key("max.retries")
- .defaultValue(5)
- .withDescription("The maximum number of retries when an "
- + "exception is caught.");
-
- public static final ConfigOption BOOTSTRAP_FROM_MAX =
- ConfigOptions.key("bootstrap.from.max")
- .defaultValue(false)
- .withDescription("True if consuming from the most recent "
- + "position when the tubemq source starts.. It only takes "
- + "effect when the tubemq source does not recover from "
- + "checkpoints.");
-
- public static final ConfigOption SOURCE_MAX_IDLE_TIME =
- ConfigOptions.key("source.task.max.idle.time")
- .defaultValue("5min")
- .withDescription("The max time of the source marked as temporarily idle.");
-
- public static final ConfigOption MESSAGE_NOT_FOUND_WAIT_PERIOD =
- ConfigOptions.key("message.not.found.wait.period")
- .defaultValue("350ms")
- .withDescription("The time of waiting period if tubemq broker return message not found.");
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
deleted file mode 100644
index fabd43af6e1..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.inlong.tubemq.client.config.TubeClientConfig;
-import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.client.producer.MessageProducer;
-import org.apache.inlong.tubemq.client.producer.MessageSentResult;
-import org.apache.inlong.tubemq.corebase.Message;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-
-import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES;
-
-public class TubemqSinkFunction extends RichSinkFunction implements CheckpointedFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class);
-
- private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
-
- /**
- * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
- */
- private final String masterAddress;
-
- /**
- * The topic name.
- */
- private final String topic;
-
- /**
- * The streamId of this topic
- */
- private final String streamId;
- /**
- * The serializer for the records sent to pulsar.
- */
- private final SerializationSchema serializationSchema;
-
- /**
- * The tubemq producer.
- */
- private transient MessageProducer producer;
-
- /**
- * The tubemq session factory.
- */
- private transient MessageSessionFactory sessionFactory;
-
- /**
- * The maximum number of retries.
- */
- private final int maxRetries;
-
- public TubemqSinkFunction(String topic,
- String masterAddress,
- SerializationSchema serializationSchema,
- Configuration configuration) {
- Preconditions.checkNotNull(topic,
- "The topic must not be null.");
- Preconditions.checkNotNull(masterAddress,
- "The master address must not be null.");
- Preconditions.checkNotNull(serializationSchema,
- "The serialization schema must not be null.");
- Preconditions.checkNotNull(configuration,
- "The configuration must not be null.");
-
- this.topic = topic;
- this.masterAddress = masterAddress;
- this.serializationSchema = serializationSchema;
- this.streamId = configuration.getString(TubemqOptions.STREAM_ID);
- this.maxRetries = configuration.getInteger(MAX_RETRIES);
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
- // Nothing to do.
- }
-
- @Override
- public void initializeState(FunctionInitializationContext functionInitializationContext) {
- // Nothing to do.
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- TubeClientConfig tubeClientConfig = new TubeClientConfig(masterAddress);
- this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig);
- this.producer = sessionFactory.createProducer();
- HashSet hashSet = new HashSet<>();
- hashSet.add(topic);
- producer.publish(hashSet);
- }
-
- @Override
- public void invoke(T in, Context context) throws Exception {
-
- int retries = 0;
- Exception exception = null;
-
- while (maxRetries <= 0 || retries < maxRetries) {
-
- try {
- byte[] body = serializationSchema.serialize(in);
- Message message = new Message(topic, body);
- if (StringUtils.isNotBlank(streamId)) {
- SimpleDateFormat sdf = new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
- long currTimeMillis = System.currentTimeMillis();
- message.putSystemHeader(streamId, sdf.format(new Date(currTimeMillis)));
- }
-
- MessageSentResult sendResult = producer.sendMessage(message);
- if (sendResult.isSuccess()) {
- return;
- } else {
- LOG.warn("Send msg fail, error code: {}, error message: {}",
- sendResult.getErrCode(), sendResult.getErrMsg());
- }
- } catch (Exception e) {
- LOG.warn("Could not properly send the message to hippo "
- + "(retries: {}).", retries, e);
-
- retries++;
- exception = ExceptionUtils.firstOrSuppressed(e, exception);
- }
- }
-
- throw new IOException("Could not properly send the message to hippo.", exception);
- }
-
- @Override
- public void close() throws Exception {
-
- try {
- if (producer != null) {
- producer.shutdown();
- producer = null;
- }
- if (sessionFactory != null) {
- sessionFactory.shutdown();
- sessionFactory = null;
- }
- } catch (Throwable e) {
- LOG.error("Shutdown producer error", e);
- } finally {
- super.close();
- }
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
deleted file mode 100644
index 4f9fc050a93..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.inlong.tubemq.client.config.ConsumerConfig;
-import org.apache.inlong.tubemq.client.consumer.ConsumePosition;
-import org.apache.inlong.tubemq.client.consumer.ConsumerResult;
-import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer;
-import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
-import org.apache.inlong.tubemq.corebase.Message;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.time.Duration;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
-import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.TimeUtils.parseDuration;
-
-/**
- * The Flink TubeMQ Consumer.
- *
- * @param The type of records produced by this data source
- */
-public class TubemqSourceFunction
- extends
- RichParallelSourceFunction
- implements
- CheckpointedFunction {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TubemqSourceFunction.class);
-
- private static final String TUBE_OFFSET_STATE = "tube-offset-state";
-
- private static final String SPLIT_COMMA = ",";
- private static final String SPLIT_COLON = ":";
-
- /**
- * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081.
- */
- private final String masterAddress;
-
- /**
- * The topic name.
- */
- private final String topic;
-
- /**
- * The tubemq consumers use this streamId set to filter records reading from server.
- */
- private final TreeSet streamIdSet;
-
- /**
- * The consumer group name.
- */
- private final String consumerGroup;
-
- /**
- * The deserializer for records.
- */
- private final DeserializationSchema deserializationSchema;
-
- /**
- * The random key for TubeMQ consumer group when startup.
- */
- private final String sessionKey;
-
- /**
- * True if consuming message from max offset.
- */
- private final boolean consumeFromMax;
-
- /**
- * The time to wait if tubemq broker returns message not found.
- */
- private final Duration messageNotFoundWaitPeriod;
-
- /**
- * The max time to marked source idle.
- */
- private final Duration maxIdleTime;
-
- /**
- * Flag indicating whether the consumer is still running.
- **/
- private volatile boolean running;
-
- /**
- * The state for the offsets of queues.
- */
- private transient ListState> offsetsState;
-
- /**
- * The current offsets of partitions which are stored in {@link #offsetsState}
- * once a checkpoint is triggered.
- *
- *NOTE: The offsets are populated in the main thread and saved in the
- * checkpoint thread. Its usage must be guarded by the checkpoint lock.
- */
- private transient Map currentOffsets;
-
- /**
- * The TubeMQ session factory.
- */
- private transient TubeSingleSessionFactory messageSessionFactory;
-
- /**
- * The TubeMQ pull consumer.
- */
- private transient PullMessageConsumer messagePullConsumer;
-
- /**
- * Build a TubeMQ source function
- *
- * @param masterAddress the master address of TubeMQ
- * @param topic the topic name
- * @param streamIdSet the topic's filter condition items
- * @param consumerGroup the consumer group name
- * @param deserializationSchema the deserialize schema
- * @param configuration the configure
- */
- public TubemqSourceFunction(
- String masterAddress,
- String topic,
- TreeSet streamIdSet,
- String consumerGroup,
- DeserializationSchema deserializationSchema,
- Configuration configuration) {
- checkNotNull(masterAddress,
- "The master address must not be null.");
- checkNotNull(topic,
- "The topic must not be null.");
- checkNotNull(streamIdSet,
- "The streamId set must not be null.");
- checkNotNull(consumerGroup,
- "The consumer group must not be null.");
- checkNotNull(deserializationSchema,
- "The deserialization schema must not be null.");
- checkNotNull(configuration,
- "The configuration must not be null.");
-
- this.masterAddress = masterAddress;
- this.topic = topic;
- this.streamIdSet = streamIdSet;
- this.consumerGroup = consumerGroup;
- this.deserializationSchema = deserializationSchema;
-
- this.sessionKey =
- configuration.getString(TubemqOptions.SESSION_KEY);
- this.consumeFromMax =
- configuration.getBoolean(TubemqOptions.BOOTSTRAP_FROM_MAX);
- this.messageNotFoundWaitPeriod =
- parseDuration(
- configuration.getString(
- TubemqOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD));
- this.maxIdleTime =
- parseDuration(
- configuration.getString(
- TubemqOptions.SOURCE_MAX_IDLE_TIME));
- }
-
- @Override
- public void initializeState(
- FunctionInitializationContext context) throws Exception {
-
- TypeInformation> typeInformation =
- new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO);
- ListStateDescriptor> stateDescriptor =
- new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation);
-
- OperatorStateStore stateStore = context.getOperatorStateStore();
- offsetsState = stateStore.getListState(stateDescriptor);
-
- currentOffsets = new HashMap<>();
- if (context.isRestored()) {
- for (Tuple2 tubeOffset : offsetsState.get()) {
- currentOffsets.put(tubeOffset.f0, tubeOffset.f1);
- }
-
- LOG.info("Successfully restore the offsets {}.", currentOffsets);
- } else {
- LOG.info("No restore offsets.");
- }
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- ConsumerConfig consumerConfig =
- new ConsumerConfig(masterAddress, consumerGroup);
- consumerConfig.setConsumePosition(consumeFromMax
- ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS
- : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
- consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis());
-
- final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
- messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
-
- messagePullConsumer =
- messageSessionFactory.createPullConsumer(consumerConfig);
- messagePullConsumer
- .subscribe(topic, streamIdSet);
- messagePullConsumer
- .completeSubscribe(sessionKey, numTasks, true, currentOffsets);
-
- running = true;
- }
-
- @Override
- public void run(SourceContext ctx) throws Exception {
-
- Instant lastConsumeInstant = Instant.now();
-
- while (running) {
-
- ConsumerResult consumeResult = messagePullConsumer.getMessage();
- if (!consumeResult.isSuccess()) {
- if (!(consumeResult.getErrCode() == 400
- || consumeResult.getErrCode() == 404
- || consumeResult.getErrCode() == 405
- || consumeResult.getErrCode() == 406
- || consumeResult.getErrCode() == 407
- || consumeResult.getErrCode() == 408)) {
- LOG.info("Could not consume messages from tubemq (errcode: {}, "
- + "errmsg: {}).", consumeResult.getErrCode(),
- consumeResult.getErrMsg());
- }
-
- Duration idleTime =
- Duration.between(lastConsumeInstant, Instant.now());
- if (idleTime.compareTo(maxIdleTime) > 0) {
- ctx.markAsTemporarilyIdle();
- }
-
- continue;
- }
-
- List messageList = consumeResult.getMessageList();
-
- List records = new ArrayList<>();
- if (messageList != null) {
- lastConsumeInstant = Instant.now();
-
- for (Message message : messageList) {
- T record =
- deserializationSchema.deserialize(message.getData());
- records.add(record);
- }
- }
-
- synchronized (ctx.getCheckpointLock()) {
-
- for (T record : records) {
- ctx.collect(record);
- }
-
- currentOffsets.put(
- consumeResult.getPartitionKey(),
- consumeResult.getCurrOffset());
- }
-
- ConsumerResult confirmResult =
- messagePullConsumer
- .confirmConsume(consumeResult.getConfirmContext(), true);
- if (!confirmResult.isSuccess()) {
- if (!(confirmResult.getErrCode() == 400
- || confirmResult.getErrCode() == 404
- || confirmResult.getErrCode() == 405
- || confirmResult.getErrCode() == 406
- || confirmResult.getErrCode() == 407
- || confirmResult.getErrCode() == 408)) {
- LOG.warn("Could not confirm messages to tubemq (errcode: {}, "
- + "errmsg: {}).", confirmResult.getErrCode(),
- confirmResult.getErrMsg());
- }
- }
- }
- }
-
- @Override
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
-
- offsetsState.clear();
- for (Map.Entry entry : currentOffsets.entrySet()) {
- offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
- }
-
- LOG.info("Successfully save the offsets in checkpoint {}: {}.",
- context.getCheckpointId(), currentOffsets);
- }
-
- @Override
- public void cancel() {
- running = false;
- }
-
- @Override
- public void close() throws Exception {
-
- cancel();
-
- if (messagePullConsumer != null) {
- try {
- messagePullConsumer.shutdown();
- } catch (Throwable t) {
- LOG.warn("Could not properly shutdown the tubemq pull consumer.", t);
- }
- }
-
- if (messageSessionFactory != null) {
- try {
- messageSessionFactory.shutdown();
- } catch (Throwable t) {
- LOG.warn("Could not properly shutdown the tubemq session factory.", t);
- }
- }
-
- super.close();
-
- LOG.info("Closed the tubemq source.");
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
deleted file mode 100644
index b9f7e5f86db..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.sinks.AppendStreamTableSink;
-import org.apache.flink.table.utils.TableConnectorUtils;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}.
- */
-public class TubemqTableSink implements AppendStreamTableSink {
-
- /**
- * Serialization schema for records to tubemq.
- */
- private final SerializationSchema serializationSchema;
-
- /**
- * The schema of the table.
- */
- private final TableSchema schema;
-
- /**
- * The tubemq topic name.
- */
- private final String topic;
-
- /**
- * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
- */
- private final String masterAddress;
-
- /**
- * The parameters collection for tubemq producer.
- */
- private final Configuration configuration;
-
- public TubemqTableSink(
- SerializationSchema serializationSchema,
- TableSchema schema,
- String topic,
- String masterAddress,
- Configuration configuration) {
- this.serializationSchema = checkNotNull(serializationSchema,
- "The deserialization schema must not be null.");
- this.schema = checkNotNull(schema,
- "The schema must not be null.");
- this.topic = checkNotNull(topic,
- "Topic must not be null.");
- this.masterAddress = checkNotNull(masterAddress,
- "Master address must not be null.");
- this.configuration = checkNotNull(configuration,
- "The configuration must not be null.");
- }
-
- @Override
- public DataStreamSink> consumeDataStream(DataStream dataStream) {
-
- final SinkFunction tubemqSinkFunction =
- new TubemqSinkFunction<>(
- topic,
- masterAddress,
- serializationSchema,
- configuration);
-
- return dataStream
- .addSink(tubemqSinkFunction)
- .name(
- TableConnectorUtils.generateRuntimeName(
- getClass(),
- getFieldNames()));
- }
-
- @Override
- public TypeInformation getOutputType() {
- return schema.toRowType();
- }
-
- @Override
- public String[] getFieldNames() {
- return schema.getFieldNames();
- }
-
- @Override
- public TypeInformation>[] getFieldTypes() {
- return schema.getFieldTypes();
- }
-
- @Override
- public TubemqTableSink configure(String[] fieldNames, TypeInformation>[] fieldTypes) {
- if (!Arrays.equals(getFieldNames(), fieldNames)
- || !Arrays.equals(getFieldTypes(), fieldTypes)) {
- throw new ValidationException("Reconfiguration with different fields is not allowed. "
- + "Expected: " + Arrays.toString(getFieldNames())
- + " / " + Arrays.toString(getFieldTypes()) + ". "
- + "But was: " + Arrays.toString(fieldNames) + " / "
- + Arrays.toString(fieldTypes));
- }
-
- return this;
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
deleted file mode 100644
index 934a4416876..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.sources.DefinedFieldMapping;
-import org.apache.flink.table.sources.DefinedProctimeAttribute;
-import org.apache.flink.table.sources.DefinedRowtimeAttributes;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeSet;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * TubeMQ {@link StreamTableSource}.
- */
-public class TubemqTableSource
- implements
- StreamTableSource,
- DefinedProctimeAttribute,
- DefinedRowtimeAttributes,
- DefinedFieldMapping {
-
- /**
- * Deserialization schema for records from TubeMQ.
- */
- private final DeserializationSchema deserializationSchema;
-
- /**
- * The schema of the table.
- */
- private final TableSchema schema;
-
- /**
- * Field name of the processing time attribute, null if no processing time
- * field is defined.
- */
- private final Optional proctimeAttribute;
-
- /**
- * Descriptors for rowtime attributes.
- */
- private final List rowtimeAttributeDescriptors;
-
- /**
- * Mapping for the fields of the table schema to fields of the physical
- * returned type.
- */
- private final Map fieldMapping;
-
- /**
- * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 .
- */
- private final String masterAddress;
-
- /**
- * The TubeMQ topic name.
- */
- private final String topic;
-
- /**
- * The TubeMQ streamId filter collection.
- */
- private final TreeSet streamIdSet;
-
- /**
- * The TubeMQ consumer group name.
- */
- private final String consumerGroup;
-
- /**
- * The parameters collection for TubeMQ consumer.
- */
- private final Configuration configuration;
-
- /**
- * Build TubeMQ table source
- *
- * @param deserializationSchema the deserialize schema
- * @param schema the data schema
- * @param proctimeAttribute the proc time
- * @param rowtimeAttributeDescriptors the row time attribute descriptor
- * @param fieldMapping the field map information
- * @param masterAddress the master address
- * @param topic the topic name
- * @param streamIdSet the topic's filter condition items
- * @param consumerGroup the consumer group
- * @param configuration the configure
- */
- public TubemqTableSource(
- DeserializationSchema deserializationSchema,
- TableSchema schema,
- Optional proctimeAttribute,
- List rowtimeAttributeDescriptors,
- Map fieldMapping,
- String masterAddress,
- String topic,
- TreeSet streamIdSet,
- String consumerGroup,
- Configuration configuration) {
- checkNotNull(deserializationSchema,
- "The deserialization schema must not be null.");
- checkNotNull(schema,
- "The schema must not be null.");
- checkNotNull(fieldMapping,
- "The field mapping must not be null.");
- checkNotNull(masterAddress,
- "The master address must not be null.");
- checkNotNull(topic,
- "The topic must not be null.");
- checkNotNull(streamIdSet,
- "The streamId set must not be null.");
- checkNotNull(consumerGroup,
- "The consumer group must not be null.");
- checkNotNull(configuration,
- "The configuration must not be null.");
-
- this.deserializationSchema = deserializationSchema;
- this.schema = schema;
- this.fieldMapping = fieldMapping;
- this.masterAddress = masterAddress;
- this.topic = topic;
- this.streamIdSet = streamIdSet;
- this.consumerGroup = consumerGroup;
- this.configuration = configuration;
-
- this.proctimeAttribute =
- validateProcTimeAttribute(proctimeAttribute);
- this.rowtimeAttributeDescriptors =
- validateRowTimeAttributeDescriptors(rowtimeAttributeDescriptors);
- }
-
- @Override
- public TableSchema getTableSchema() {
- return schema;
- }
-
- @Nullable
- @Override
- public String getProctimeAttribute() {
- return proctimeAttribute.orElse(null);
- }
-
- @Override
- public List getRowtimeAttributeDescriptors() {
- return rowtimeAttributeDescriptors;
- }
-
- @Override
- public Map getFieldMapping() {
- return fieldMapping;
- }
-
- @Override
- public DataStream getDataStream(
- StreamExecutionEnvironment streamExecutionEnvironment) {
- SourceFunction sourceFunction =
- new TubemqSourceFunction<>(
- masterAddress,
- topic,
- streamIdSet,
- consumerGroup,
- deserializationSchema,
- configuration);
-
- return streamExecutionEnvironment
- .addSource(sourceFunction)
- .name(explainSource());
- }
-
- private Optional validateProcTimeAttribute(
- Optional proctimeAttribute) {
- return proctimeAttribute.map((attribute) -> {
- Optional> tpe = schema.getFieldType(attribute);
- if (!tpe.isPresent()) {
- throw new ValidationException("Proc time attribute '"
- + attribute + "' isn't present in TableSchema.");
- } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
- throw new ValidationException("Proc time attribute '"
- + attribute + "' isn't of type SQL_TIMESTAMP.");
- }
- return attribute;
- });
- }
-
- private List validateRowTimeAttributeDescriptors(
- List attributeDescriptors) {
- checkNotNull(attributeDescriptors);
-
- for (RowtimeAttributeDescriptor desc : attributeDescriptors) {
- String name = desc.getAttributeName();
- Optional> tpe = schema.getFieldType(name);
- if (!tpe.isPresent()) {
- throw new ValidationException("Row time attribute '"
- + name + "' is not present.");
- } else if (tpe.get() != Types.SQL_TIMESTAMP()) {
- throw new ValidationException("Row time attribute '"
- + name + "' is not of type SQL_TIMESTAMP.");
- }
- }
-
- return attributeDescriptors;
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
deleted file mode 100644
index e28e85c1b76..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.api.common.serialization.DeserializationSchema;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
-import org.apache.flink.table.factories.DeserializationSchemaFactory;
-import org.apache.flink.table.factories.SerializationSchemaFactory;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
-import org.apache.flink.table.factories.TableFactoryService;
-import org.apache.flink.table.sinks.StreamTableSink;
-import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
-import org.apache.flink.table.sources.StreamTableSource;
-import org.apache.flink.types.Row;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeSet;
-
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION;
-import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
-import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
-import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
-import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND;
-
-/**
- * Factory for creating configured instances of {@link TubemqTableSource}.
- */
-public class TubemqTableSourceSinkFactory
- implements
- StreamTableSourceFactory,
- StreamTableSinkFactory {
-
- private static final String SPLIT_COMMA = ",";
-
- public TubemqTableSourceSinkFactory() {
- }
-
- @Override
- public Map requiredContext() {
-
- Map context = new HashMap<>();
- context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND);
- context.put(CONNECTOR_TYPE, TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ);
- context.put(CONNECTOR_PROPERTY_VERSION, "1");
-
- return context;
- }
-
- @Override
- public List supportedProperties() {
- List properties = new ArrayList<>();
-
- // tubemq
- properties.add(TubemqValidator.CONNECTOR_TOPIC);
- properties.add(TubemqValidator.CONNECTOR_MASTER);
- properties.add(TubemqValidator.CONNECTOR_GROUP);
- properties.add(TubemqValidator.CONNECTOR_STREAMIDS);
- properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*");
-
- // schema
- properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
- properties.add(SCHEMA + ".#." + SCHEMA_NAME);
- properties.add(SCHEMA + ".#." + SCHEMA_FROM);
-
- // time attributes
- properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
- properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
- properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);
-
- // format wildcard
- properties.add(FORMAT + ".*");
-
- return properties;
- }
-
- @Override
- public StreamTableSource createStreamTableSource(
- Map properties) {
- final DeserializationSchema deserializationSchema =
- getDeserializationSchema(properties);
-
- final DescriptorProperties descriptorProperties =
- new DescriptorProperties(true);
- descriptorProperties.putProperties(properties);
-
- validateProperties(descriptorProperties);
-
- final TableSchema schema =
- descriptorProperties.getTableSchema(SCHEMA);
- final Optional proctimeAttribute =
- SchemaValidator.deriveProctimeAttribute(descriptorProperties);
- final List rowtimeAttributeDescriptors =
- SchemaValidator.deriveRowtimeAttributes(descriptorProperties);
- final Map fieldMapping =
- SchemaValidator.deriveFieldMapping(
- descriptorProperties,
- Optional.of(deserializationSchema.getProducedType()));
- final String topic =
- descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC);
- final String masterAddress =
- descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
- final String consumerGroup =
- descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP);
- final String streamIds =
- descriptorProperties
- .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS)
- .orElse(null);
- final Configuration configuration =
- getConfiguration(descriptorProperties);
-
- TreeSet streamIdSet = new TreeSet<>();
- if (streamIds != null) {
- streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA)));
- }
-
- return new TubemqTableSource(
- deserializationSchema,
- schema,
- proctimeAttribute,
- rowtimeAttributeDescriptors,
- fieldMapping,
- masterAddress,
- topic,
- streamIdSet,
- consumerGroup,
- configuration);
- }
-
- @Override
- public StreamTableSink createStreamTableSink(
- Map properties) {
- final SerializationSchema serializationSchema =
- getSerializationSchema(properties);
-
- final DescriptorProperties descriptorProperties =
- new DescriptorProperties(true);
- descriptorProperties.putProperties(properties);
-
- validateProperties(descriptorProperties);
-
- final TableSchema tableSchema =
- descriptorProperties.getTableSchema(SCHEMA);
- final String topic =
- descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC);
- final String masterAddress =
- descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
-
- final Configuration configuration =
- getConfiguration(descriptorProperties);
-
- return new TubemqTableSink(
- serializationSchema,
- tableSchema,
- topic,
- masterAddress,
- configuration);
- }
-
- private SerializationSchema getSerializationSchema(
- Map properties) {
- @SuppressWarnings("unchecked")
- final SerializationSchemaFactory formatFactory =
- TableFactoryService.find(
- SerializationSchemaFactory.class,
- properties,
- this.getClass().getClassLoader());
-
- return formatFactory.createSerializationSchema(properties);
- }
-
- private void validateProperties(DescriptorProperties descriptorProperties) {
- new SchemaValidator(true, false, false).validate(descriptorProperties);
- new TubemqValidator().validate(descriptorProperties);
- }
-
- private DeserializationSchema getDeserializationSchema(
- Map properties) {
- @SuppressWarnings("unchecked")
- final DeserializationSchemaFactory formatFactory =
- TableFactoryService.find(
- DeserializationSchemaFactory.class,
- properties,
- this.getClass().getClassLoader());
-
- return formatFactory.createDeserializationSchema(properties);
- }
-
- private Configuration getConfiguration(
- DescriptorProperties descriptorProperties) {
- Map properties =
- descriptorProperties.getPropertiesWithPrefix(TubemqValidator.CONNECTOR_PROPERTIES);
-
- Configuration configuration = new Configuration();
- for (Map.Entry property : properties.entrySet()) {
- configuration.setString(property.getKey(), property.getValue());
- }
-
- return configuration;
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
deleted file mode 100644
index bddaec4eea9..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-
-/**
- * The validator for {@link Tubemq}.
- */
-public class TubemqValidator extends ConnectorDescriptorValidator {
-
- /**
- * The type of connector.
- */
- public static final String CONNECTOR_TYPE_VALUE_TUBEMQ = "tubemq";
-
- /**
- * The address of tubemq master.
- */
- public static final String CONNECTOR_MASTER = "connector.master";
-
- /**
- * The tubemq topic name.
- */
- public static final String CONNECTOR_TOPIC = "connector.topic";
-
- /**
- * The tubemq (consumer or producer) group name.
- */
- public static final String CONNECTOR_GROUP = "connector.group";
-
- /**
- * The tubemq consumers use these streamIds to filter records reading from server.
- */
- public static final String CONNECTOR_STREAMIDS = "connector.stream-ids";
-
- /**
- * The prefix of tubemq properties (optional).
- */
- public static final String CONNECTOR_PROPERTIES = "connector.properties";
-
- @Override
- public void validate(DescriptorProperties properties) {
- super.validate(properties);
-
- // Validates that the connector type is tubemq.
- properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TUBEMQ, false);
-
- // Validate that the topic name is set.
- properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE);
-
- // Validate that the master address is set.
- properties.validateString(CONNECTOR_MASTER, false, 1, Integer.MAX_VALUE);
-
- // Validate that the group name is set.
- properties.validateString(CONNECTOR_GROUP, false, 1, Integer.MAX_VALUE);
-
- // Validate that the streamIds is set.
- properties.validateString(CONNECTOR_STREAMIDS, true, 1, Integer.MAX_VALUE);
- }
-}
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
deleted file mode 100644
index 30831743b76..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.flink.connectors.tubemq.TubemqTableSourceSinkFactory
diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
deleted file mode 100644
index 7f0e68def3c..00000000000
--- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connectors.tubemq;
-
-import org.apache.flink.table.descriptors.Descriptor;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.DescriptorTestBase;
-import org.apache.flink.table.descriptors.DescriptorValidator;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Unit tests for {@link Tubemq}.
- */
-public class TubemqTest extends DescriptorTestBase {
-
- @Override
- protected List descriptors() {
- final Descriptor descriptor1 =
- new Tubemq()
- .topic("test-topic-1")
- .master("localhost:9001")
- .group("test-group-1");
-
- final Descriptor descriptor2 =
- new Tubemq()
- .topic("test-topic-2")
- .master("localhost:9001")
- .group("test-group-2")
- .property("bootstrap.from.max", "true");
-
- final Descriptor descriptor3 =
- new Tubemq()
- .topic("test-topic-3")
- .master("localhost:9001")
- .group("test-group-3")
- .streamIds("test-streamId-1,test-streamId-2");
-
- return Arrays.asList(descriptor1, descriptor2, descriptor3);
- }
-
- @Override
- protected List