From 7b3f42f21685a8053dd92f4e9dd472a621b1e2f9 Mon Sep 17 00:00:00 2001 From: AloysZhang Date: Fri, 1 Nov 2024 16:43:42 +0800 Subject: [PATCH] [INLONG-11446][TubeMQ] Remove legacy codes for flink-tube connector (#11448) --- inlong-tubemq/tubemq-connectors/pom.xml | 1 - .../tubemq-connector-flink/pom.xml | 106 ------ .../flink/connectors/tubemq/Tubemq.java | 162 -------- .../connectors/tubemq/TubemqOptions.java | 61 --- .../connectors/tubemq/TubemqSinkFunction.java | 182 --------- .../tubemq/TubemqSourceFunction.java | 358 ------------------ .../connectors/tubemq/TubemqTableSink.java | 130 ------- .../connectors/tubemq/TubemqTableSource.java | 235 ------------ .../tubemq/TubemqTableSourceSinkFactory.java | 244 ------------ .../connectors/tubemq/TubemqValidator.java | 77 ---- ....apache.flink.table.factories.TableFactory | 16 - .../flink/connectors/tubemq/TubemqTest.java | 101 ----- .../src/test/resources/log4j2.properties | 21 - 13 files changed, 1694 deletions(-) delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java delete mode 100644 inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties diff --git a/inlong-tubemq/tubemq-connectors/pom.xml b/inlong-tubemq/tubemq-connectors/pom.xml index e9c4b1b1472..1fc71477c34 100644 --- a/inlong-tubemq/tubemq-connectors/pom.xml +++ b/inlong-tubemq/tubemq-connectors/pom.xml @@ -29,7 +29,6 @@ Apache InLong - TubeMQ Connectors - tubemq-connector-flink tubemq-connector-flume tubemq-connector-spark diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml deleted file mode 100644 index 47bb6ccce70..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/pom.xml +++ /dev/null @@ -1,106 +0,0 @@ - - - - 4.0.0 - - org.apache.inlong - tubemq-connectors - 2.1.0-SNAPSHOT - - - tubemq-connector-flink - Apache InLong - TubeMQ Connectors-flink - - - ${project.parent.parent.parent.basedir} - 1.13.5 - 2.11 - - - - - org.apache.inlong - tubemq-client - ${project.version} - - - - org.apache.inlong - tubemq-core - ${project.version} - - - - org.apache.flink - flink-core - provided - - - - org.apache.flink - flink-runtime_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-streaming-java_${scala.binary.version} - ${flink.version} - provided - - - - org.apache.flink - flink-table-common - ${flink.version} - provided - true - - - - org.apache.flink - flink-table-api-java-bridge_${scala.binary.version} - ${flink.version} - provided - true - - - - org.slf4j - slf4j-api - provided - - - - org.apache.flink - flink-table-common - ${flink.version} - test-jar - test - - - - junit - junit - ${junit.version} - test - - - diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java deleted file mode 100644 index 2e34c133300..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.ConnectorDescriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; - -import java.util.HashMap; -import java.util.Map; - -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC; -import static org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * The {@link ConnectorDescriptor} for tubemq sources and sinks. - */ -public class Tubemq extends ConnectorDescriptor { - - @Nullable - private boolean consumerRole = true; - - @Nullable - private String topic; - - @Nullable - private String master; - - @Nullable - private String group; - - @Nullable - private String streamIds; - - @Nonnull - private Map properties; - - public Tubemq() { - super(CONNECTOR_TYPE_VALUE_TUBEMQ, 1, true); - - this.properties = new HashMap<>(); - } - - /** - * Sets the tubemq topic to be used. - * - * @param topic The topic name. - */ - public Tubemq topic(String topic) { - checkNotNull(topic); - - this.topic = topic; - return this; - } - - /** - * Sets the client role to be used. - * - * @param isConsumer The client role if consumer. - */ - public Tubemq asConsumer(boolean isConsumer) { - this.consumerRole = isConsumer; - return this; - } - - /** - * Sets the address of tubemq master to connect. - * - * @param master The address of tubemq master. - */ - public Tubemq master(String master) { - checkNotNull(master); - - this.master = master; - return this; - } - - /** - * Sets the tubemq (consumer or producer) group to be used. - * - * @param group The group name. - */ - public Tubemq group(String group) { - checkNotNull(group); - - this.group = group; - return this; - } - - /** - * The tubemq consumers use these streamIds to filter records reading from server. - * - * @param streamIds The filter for consume record from server. - */ - public Tubemq streamIds(String streamIds) { - - this.streamIds = streamIds; - return this; - } - - /** - * Sets the tubemq property. - * - * @param key The key of the property. - * @param value The value of the property. - */ - public Tubemq property(String key, String value) { - checkNotNull(key); - checkNotNull(value); - - properties.put(key, value); - return this; - } - - @Override - protected Map toConnectorProperties() { - DescriptorProperties descriptorProperties = new DescriptorProperties(); - - if (topic != null) { - descriptorProperties.putString(CONNECTOR_TOPIC, topic); - } - - if (master != null) { - descriptorProperties.putString(CONNECTOR_MASTER, master); - } - if (consumerRole) { - if (group != null) { - descriptorProperties.putString(CONNECTOR_GROUP, group); - } - - if (streamIds != null) { - descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds); - } - } - - descriptorProperties.putPropertiesWithPrefix(CONNECTOR_PROPERTIES, properties); - - return descriptorProperties.asMap(); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java deleted file mode 100644 index e94fbf86c5f..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.configuration.ConfigOptions; - -/** - * The configuration options for tubemq sources and sink. - */ -public class TubemqOptions { - - public static final ConfigOption SESSION_KEY = - ConfigOptions.key("session.key") - .noDefaultValue() - .withDescription("The session key for this consumer group at startup."); - - public static final ConfigOption STREAM_ID = - ConfigOptions.key("topic.streamId") - .noDefaultValue() - .withDescription("The streamId owned this topic."); - - public static final ConfigOption MAX_RETRIES = - ConfigOptions.key("max.retries") - .defaultValue(5) - .withDescription("The maximum number of retries when an " - + "exception is caught."); - - public static final ConfigOption BOOTSTRAP_FROM_MAX = - ConfigOptions.key("bootstrap.from.max") - .defaultValue(false) - .withDescription("True if consuming from the most recent " - + "position when the tubemq source starts.. It only takes " - + "effect when the tubemq source does not recover from " - + "checkpoints."); - - public static final ConfigOption SOURCE_MAX_IDLE_TIME = - ConfigOptions.key("source.task.max.idle.time") - .defaultValue("5min") - .withDescription("The max time of the source marked as temporarily idle."); - - public static final ConfigOption MESSAGE_NOT_FOUND_WAIT_PERIOD = - ConfigOptions.key("message.not.found.wait.period") - .defaultValue("350ms") - .withDescription("The time of waiting period if tubemq broker return message not found."); -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java deleted file mode 100644 index fabd43af6e1..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.inlong.tubemq.client.config.TubeClientConfig; -import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; -import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.inlong.tubemq.client.producer.MessageProducer; -import org.apache.inlong.tubemq.client.producer.MessageSentResult; -import org.apache.inlong.tubemq.corebase.Message; - -import org.apache.commons.lang3.StringUtils; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashSet; - -import static org.apache.flink.connectors.tubemq.TubemqOptions.MAX_RETRIES; - -public class TubemqSinkFunction extends RichSinkFunction implements CheckpointedFunction { - - private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class); - - private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm"; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081. - */ - private final String masterAddress; - - /** - * The topic name. - */ - private final String topic; - - /** - * The streamId of this topic - */ - private final String streamId; - /** - * The serializer for the records sent to pulsar. - */ - private final SerializationSchema serializationSchema; - - /** - * The tubemq producer. - */ - private transient MessageProducer producer; - - /** - * The tubemq session factory. - */ - private transient MessageSessionFactory sessionFactory; - - /** - * The maximum number of retries. - */ - private final int maxRetries; - - public TubemqSinkFunction(String topic, - String masterAddress, - SerializationSchema serializationSchema, - Configuration configuration) { - Preconditions.checkNotNull(topic, - "The topic must not be null."); - Preconditions.checkNotNull(masterAddress, - "The master address must not be null."); - Preconditions.checkNotNull(serializationSchema, - "The serialization schema must not be null."); - Preconditions.checkNotNull(configuration, - "The configuration must not be null."); - - this.topic = topic; - this.masterAddress = masterAddress; - this.serializationSchema = serializationSchema; - this.streamId = configuration.getString(TubemqOptions.STREAM_ID); - this.maxRetries = configuration.getInteger(MAX_RETRIES); - } - - @Override - public void snapshotState(FunctionSnapshotContext functionSnapshotContext) { - // Nothing to do. - } - - @Override - public void initializeState(FunctionInitializationContext functionInitializationContext) { - // Nothing to do. - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - - TubeClientConfig tubeClientConfig = new TubeClientConfig(masterAddress); - this.sessionFactory = new TubeSingleSessionFactory(tubeClientConfig); - this.producer = sessionFactory.createProducer(); - HashSet hashSet = new HashSet<>(); - hashSet.add(topic); - producer.publish(hashSet); - } - - @Override - public void invoke(T in, Context context) throws Exception { - - int retries = 0; - Exception exception = null; - - while (maxRetries <= 0 || retries < maxRetries) { - - try { - byte[] body = serializationSchema.serialize(in); - Message message = new Message(topic, body); - if (StringUtils.isNotBlank(streamId)) { - SimpleDateFormat sdf = new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT); - long currTimeMillis = System.currentTimeMillis(); - message.putSystemHeader(streamId, sdf.format(new Date(currTimeMillis))); - } - - MessageSentResult sendResult = producer.sendMessage(message); - if (sendResult.isSuccess()) { - return; - } else { - LOG.warn("Send msg fail, error code: {}, error message: {}", - sendResult.getErrCode(), sendResult.getErrMsg()); - } - } catch (Exception e) { - LOG.warn("Could not properly send the message to hippo " - + "(retries: {}).", retries, e); - - retries++; - exception = ExceptionUtils.firstOrSuppressed(e, exception); - } - } - - throw new IOException("Could not properly send the message to hippo.", exception); - } - - @Override - public void close() throws Exception { - - try { - if (producer != null) { - producer.shutdown(); - producer = null; - } - if (sessionFactory != null) { - sessionFactory.shutdown(); - sessionFactory = null; - } - } catch (Throwable e) { - LOG.error("Shutdown producer error", e); - } finally { - super.close(); - } - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java deleted file mode 100644 index 4f9fc050a93..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java +++ /dev/null @@ -1,358 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.inlong.tubemq.client.config.ConsumerConfig; -import org.apache.inlong.tubemq.client.consumer.ConsumePosition; -import org.apache.inlong.tubemq.client.consumer.ConsumerResult; -import org.apache.inlong.tubemq.client.consumer.PullMessageConsumer; -import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; -import org.apache.inlong.tubemq.corebase.Message; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.OperatorStateStore; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.FunctionInitializationContext; -import org.apache.flink.runtime.state.FunctionSnapshotContext; -import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; - -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO; -import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; -import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.TimeUtils.parseDuration; - -/** - * The Flink TubeMQ Consumer. - * - * @param The type of records produced by this data source - */ -public class TubemqSourceFunction - extends - RichParallelSourceFunction - implements - CheckpointedFunction { - - private static final Logger LOG = - LoggerFactory.getLogger(TubemqSourceFunction.class); - - private static final String TUBE_OFFSET_STATE = "tube-offset-state"; - - private static final String SPLIT_COMMA = ","; - private static final String SPLIT_COLON = ":"; - - /** - * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081. - */ - private final String masterAddress; - - /** - * The topic name. - */ - private final String topic; - - /** - * The tubemq consumers use this streamId set to filter records reading from server. - */ - private final TreeSet streamIdSet; - - /** - * The consumer group name. - */ - private final String consumerGroup; - - /** - * The deserializer for records. - */ - private final DeserializationSchema deserializationSchema; - - /** - * The random key for TubeMQ consumer group when startup. - */ - private final String sessionKey; - - /** - * True if consuming message from max offset. - */ - private final boolean consumeFromMax; - - /** - * The time to wait if tubemq broker returns message not found. - */ - private final Duration messageNotFoundWaitPeriod; - - /** - * The max time to marked source idle. - */ - private final Duration maxIdleTime; - - /** - * Flag indicating whether the consumer is still running. - **/ - private volatile boolean running; - - /** - * The state for the offsets of queues. - */ - private transient ListState> offsetsState; - - /** - * The current offsets of partitions which are stored in {@link #offsetsState} - * once a checkpoint is triggered. - * - *NOTE: The offsets are populated in the main thread and saved in the - * checkpoint thread. Its usage must be guarded by the checkpoint lock.

- */ - private transient Map currentOffsets; - - /** - * The TubeMQ session factory. - */ - private transient TubeSingleSessionFactory messageSessionFactory; - - /** - * The TubeMQ pull consumer. - */ - private transient PullMessageConsumer messagePullConsumer; - - /** - * Build a TubeMQ source function - * - * @param masterAddress the master address of TubeMQ - * @param topic the topic name - * @param streamIdSet the topic's filter condition items - * @param consumerGroup the consumer group name - * @param deserializationSchema the deserialize schema - * @param configuration the configure - */ - public TubemqSourceFunction( - String masterAddress, - String topic, - TreeSet streamIdSet, - String consumerGroup, - DeserializationSchema deserializationSchema, - Configuration configuration) { - checkNotNull(masterAddress, - "The master address must not be null."); - checkNotNull(topic, - "The topic must not be null."); - checkNotNull(streamIdSet, - "The streamId set must not be null."); - checkNotNull(consumerGroup, - "The consumer group must not be null."); - checkNotNull(deserializationSchema, - "The deserialization schema must not be null."); - checkNotNull(configuration, - "The configuration must not be null."); - - this.masterAddress = masterAddress; - this.topic = topic; - this.streamIdSet = streamIdSet; - this.consumerGroup = consumerGroup; - this.deserializationSchema = deserializationSchema; - - this.sessionKey = - configuration.getString(TubemqOptions.SESSION_KEY); - this.consumeFromMax = - configuration.getBoolean(TubemqOptions.BOOTSTRAP_FROM_MAX); - this.messageNotFoundWaitPeriod = - parseDuration( - configuration.getString( - TubemqOptions.MESSAGE_NOT_FOUND_WAIT_PERIOD)); - this.maxIdleTime = - parseDuration( - configuration.getString( - TubemqOptions.SOURCE_MAX_IDLE_TIME)); - } - - @Override - public void initializeState( - FunctionInitializationContext context) throws Exception { - - TypeInformation> typeInformation = - new TupleTypeInfo<>(STRING_TYPE_INFO, LONG_TYPE_INFO); - ListStateDescriptor> stateDescriptor = - new ListStateDescriptor<>(TUBE_OFFSET_STATE, typeInformation); - - OperatorStateStore stateStore = context.getOperatorStateStore(); - offsetsState = stateStore.getListState(stateDescriptor); - - currentOffsets = new HashMap<>(); - if (context.isRestored()) { - for (Tuple2 tubeOffset : offsetsState.get()) { - currentOffsets.put(tubeOffset.f0, tubeOffset.f1); - } - - LOG.info("Successfully restore the offsets {}.", currentOffsets); - } else { - LOG.info("No restore offsets."); - } - } - - @Override - public void open(Configuration parameters) throws Exception { - ConsumerConfig consumerConfig = - new ConsumerConfig(masterAddress, consumerGroup); - consumerConfig.setConsumePosition(consumeFromMax - ? ConsumePosition.CONSUMER_FROM_MAX_OFFSET_ALWAYS - : ConsumePosition.CONSUMER_FROM_LATEST_OFFSET); - consumerConfig.setMsgNotFoundWaitPeriodMs(messageNotFoundWaitPeriod.toMillis()); - - final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks(); - - messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); - - messagePullConsumer = - messageSessionFactory.createPullConsumer(consumerConfig); - messagePullConsumer - .subscribe(topic, streamIdSet); - messagePullConsumer - .completeSubscribe(sessionKey, numTasks, true, currentOffsets); - - running = true; - } - - @Override - public void run(SourceContext ctx) throws Exception { - - Instant lastConsumeInstant = Instant.now(); - - while (running) { - - ConsumerResult consumeResult = messagePullConsumer.getMessage(); - if (!consumeResult.isSuccess()) { - if (!(consumeResult.getErrCode() == 400 - || consumeResult.getErrCode() == 404 - || consumeResult.getErrCode() == 405 - || consumeResult.getErrCode() == 406 - || consumeResult.getErrCode() == 407 - || consumeResult.getErrCode() == 408)) { - LOG.info("Could not consume messages from tubemq (errcode: {}, " - + "errmsg: {}).", consumeResult.getErrCode(), - consumeResult.getErrMsg()); - } - - Duration idleTime = - Duration.between(lastConsumeInstant, Instant.now()); - if (idleTime.compareTo(maxIdleTime) > 0) { - ctx.markAsTemporarilyIdle(); - } - - continue; - } - - List messageList = consumeResult.getMessageList(); - - List records = new ArrayList<>(); - if (messageList != null) { - lastConsumeInstant = Instant.now(); - - for (Message message : messageList) { - T record = - deserializationSchema.deserialize(message.getData()); - records.add(record); - } - } - - synchronized (ctx.getCheckpointLock()) { - - for (T record : records) { - ctx.collect(record); - } - - currentOffsets.put( - consumeResult.getPartitionKey(), - consumeResult.getCurrOffset()); - } - - ConsumerResult confirmResult = - messagePullConsumer - .confirmConsume(consumeResult.getConfirmContext(), true); - if (!confirmResult.isSuccess()) { - if (!(confirmResult.getErrCode() == 400 - || confirmResult.getErrCode() == 404 - || confirmResult.getErrCode() == 405 - || confirmResult.getErrCode() == 406 - || confirmResult.getErrCode() == 407 - || confirmResult.getErrCode() == 408)) { - LOG.warn("Could not confirm messages to tubemq (errcode: {}, " - + "errmsg: {}).", confirmResult.getErrCode(), - confirmResult.getErrMsg()); - } - } - } - } - - @Override - public void snapshotState(FunctionSnapshotContext context) throws Exception { - - offsetsState.clear(); - for (Map.Entry entry : currentOffsets.entrySet()) { - offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue())); - } - - LOG.info("Successfully save the offsets in checkpoint {}: {}.", - context.getCheckpointId(), currentOffsets); - } - - @Override - public void cancel() { - running = false; - } - - @Override - public void close() throws Exception { - - cancel(); - - if (messagePullConsumer != null) { - try { - messagePullConsumer.shutdown(); - } catch (Throwable t) { - LOG.warn("Could not properly shutdown the tubemq pull consumer.", t); - } - } - - if (messageSessionFactory != null) { - try { - messageSessionFactory.shutdown(); - } catch (Throwable t) { - LOG.warn("Could not properly shutdown the tubemq session factory.", t); - } - } - - super.close(); - - LOG.info("Closed the tubemq source."); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java deleted file mode 100644 index b9f7e5f86db..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSink.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sinks.AppendStreamTableSink; -import org.apache.flink.table.utils.TableConnectorUtils; -import org.apache.flink.types.Row; - -import java.util.Arrays; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Tubemq {@link org.apache.flink.table.sinks.StreamTableSink}. - */ -public class TubemqTableSink implements AppendStreamTableSink { - - /** - * Serialization schema for records to tubemq. - */ - private final SerializationSchema serializationSchema; - - /** - * The schema of the table. - */ - private final TableSchema schema; - - /** - * The tubemq topic name. - */ - private final String topic; - - /** - * The address of tubemq master, format eg: 127.0.0.1:8080,127.0.0.2:8081 . - */ - private final String masterAddress; - - /** - * The parameters collection for tubemq producer. - */ - private final Configuration configuration; - - public TubemqTableSink( - SerializationSchema serializationSchema, - TableSchema schema, - String topic, - String masterAddress, - Configuration configuration) { - this.serializationSchema = checkNotNull(serializationSchema, - "The deserialization schema must not be null."); - this.schema = checkNotNull(schema, - "The schema must not be null."); - this.topic = checkNotNull(topic, - "Topic must not be null."); - this.masterAddress = checkNotNull(masterAddress, - "Master address must not be null."); - this.configuration = checkNotNull(configuration, - "The configuration must not be null."); - } - - @Override - public DataStreamSink consumeDataStream(DataStream dataStream) { - - final SinkFunction tubemqSinkFunction = - new TubemqSinkFunction<>( - topic, - masterAddress, - serializationSchema, - configuration); - - return dataStream - .addSink(tubemqSinkFunction) - .name( - TableConnectorUtils.generateRuntimeName( - getClass(), - getFieldNames())); - } - - @Override - public TypeInformation getOutputType() { - return schema.toRowType(); - } - - @Override - public String[] getFieldNames() { - return schema.getFieldNames(); - } - - @Override - public TypeInformation[] getFieldTypes() { - return schema.getFieldTypes(); - } - - @Override - public TubemqTableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - if (!Arrays.equals(getFieldNames(), fieldNames) - || !Arrays.equals(getFieldTypes(), fieldTypes)) { - throw new ValidationException("Reconfiguration with different fields is not allowed. " - + "Expected: " + Arrays.toString(getFieldNames()) - + " / " + Arrays.toString(getFieldTypes()) + ". " - + "But was: " + Arrays.toString(fieldNames) + " / " - + Arrays.toString(fieldTypes)); - } - - return this; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java deleted file mode 100644 index 934a4416876..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.Types; -import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.sources.DefinedFieldMapping; -import org.apache.flink.table.sources.DefinedProctimeAttribute; -import org.apache.flink.table.sources.DefinedRowtimeAttributes; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.types.Row; - -import javax.annotation.Nullable; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * TubeMQ {@link StreamTableSource}. - */ -public class TubemqTableSource - implements - StreamTableSource, - DefinedProctimeAttribute, - DefinedRowtimeAttributes, - DefinedFieldMapping { - - /** - * Deserialization schema for records from TubeMQ. - */ - private final DeserializationSchema deserializationSchema; - - /** - * The schema of the table. - */ - private final TableSchema schema; - - /** - * Field name of the processing time attribute, null if no processing time - * field is defined. - */ - private final Optional proctimeAttribute; - - /** - * Descriptors for rowtime attributes. - */ - private final List rowtimeAttributeDescriptors; - - /** - * Mapping for the fields of the table schema to fields of the physical - * returned type. - */ - private final Map fieldMapping; - - /** - * The address of TubeMQ master, format eg: 127.0.0.1:8080,127.0.0.2:8081 . - */ - private final String masterAddress; - - /** - * The TubeMQ topic name. - */ - private final String topic; - - /** - * The TubeMQ streamId filter collection. - */ - private final TreeSet streamIdSet; - - /** - * The TubeMQ consumer group name. - */ - private final String consumerGroup; - - /** - * The parameters collection for TubeMQ consumer. - */ - private final Configuration configuration; - - /** - * Build TubeMQ table source - * - * @param deserializationSchema the deserialize schema - * @param schema the data schema - * @param proctimeAttribute the proc time - * @param rowtimeAttributeDescriptors the row time attribute descriptor - * @param fieldMapping the field map information - * @param masterAddress the master address - * @param topic the topic name - * @param streamIdSet the topic's filter condition items - * @param consumerGroup the consumer group - * @param configuration the configure - */ - public TubemqTableSource( - DeserializationSchema deserializationSchema, - TableSchema schema, - Optional proctimeAttribute, - List rowtimeAttributeDescriptors, - Map fieldMapping, - String masterAddress, - String topic, - TreeSet streamIdSet, - String consumerGroup, - Configuration configuration) { - checkNotNull(deserializationSchema, - "The deserialization schema must not be null."); - checkNotNull(schema, - "The schema must not be null."); - checkNotNull(fieldMapping, - "The field mapping must not be null."); - checkNotNull(masterAddress, - "The master address must not be null."); - checkNotNull(topic, - "The topic must not be null."); - checkNotNull(streamIdSet, - "The streamId set must not be null."); - checkNotNull(consumerGroup, - "The consumer group must not be null."); - checkNotNull(configuration, - "The configuration must not be null."); - - this.deserializationSchema = deserializationSchema; - this.schema = schema; - this.fieldMapping = fieldMapping; - this.masterAddress = masterAddress; - this.topic = topic; - this.streamIdSet = streamIdSet; - this.consumerGroup = consumerGroup; - this.configuration = configuration; - - this.proctimeAttribute = - validateProcTimeAttribute(proctimeAttribute); - this.rowtimeAttributeDescriptors = - validateRowTimeAttributeDescriptors(rowtimeAttributeDescriptors); - } - - @Override - public TableSchema getTableSchema() { - return schema; - } - - @Nullable - @Override - public String getProctimeAttribute() { - return proctimeAttribute.orElse(null); - } - - @Override - public List getRowtimeAttributeDescriptors() { - return rowtimeAttributeDescriptors; - } - - @Override - public Map getFieldMapping() { - return fieldMapping; - } - - @Override - public DataStream getDataStream( - StreamExecutionEnvironment streamExecutionEnvironment) { - SourceFunction sourceFunction = - new TubemqSourceFunction<>( - masterAddress, - topic, - streamIdSet, - consumerGroup, - deserializationSchema, - configuration); - - return streamExecutionEnvironment - .addSource(sourceFunction) - .name(explainSource()); - } - - private Optional validateProcTimeAttribute( - Optional proctimeAttribute) { - return proctimeAttribute.map((attribute) -> { - Optional> tpe = schema.getFieldType(attribute); - if (!tpe.isPresent()) { - throw new ValidationException("Proc time attribute '" - + attribute + "' isn't present in TableSchema."); - } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Proc time attribute '" - + attribute + "' isn't of type SQL_TIMESTAMP."); - } - return attribute; - }); - } - - private List validateRowTimeAttributeDescriptors( - List attributeDescriptors) { - checkNotNull(attributeDescriptors); - - for (RowtimeAttributeDescriptor desc : attributeDescriptors) { - String name = desc.getAttributeName(); - Optional> tpe = schema.getFieldType(name); - if (!tpe.isPresent()) { - throw new ValidationException("Row time attribute '" - + name + "' is not present."); - } else if (tpe.get() != Types.SQL_TIMESTAMP()) { - throw new ValidationException("Row time attribute '" - + name + "' is not of type SQL_TIMESTAMP."); - } - } - - return attributeDescriptors; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java deleted file mode 100644 index e28e85c1b76..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java +++ /dev/null @@ -1,244 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.SchemaValidator; -import org.apache.flink.table.factories.DeserializationSchemaFactory; -import org.apache.flink.table.factories.SerializationSchemaFactory; -import org.apache.flink.table.factories.StreamTableSinkFactory; -import org.apache.flink.table.factories.StreamTableSourceFactory; -import org.apache.flink.table.factories.TableFactoryService; -import org.apache.flink.table.sinks.StreamTableSink; -import org.apache.flink.table.sources.RowtimeAttributeDescriptor; -import org.apache.flink.table.sources.StreamTableSource; -import org.apache.flink.types.Row; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.TreeSet; - -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_PROPERTY_VERSION; -import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE; -import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED; -import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME; -import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE; -import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE_VALUE_APPEND; - -/** - * Factory for creating configured instances of {@link TubemqTableSource}. - */ -public class TubemqTableSourceSinkFactory - implements - StreamTableSourceFactory, - StreamTableSinkFactory { - - private static final String SPLIT_COMMA = ","; - - public TubemqTableSourceSinkFactory() { - } - - @Override - public Map requiredContext() { - - Map context = new HashMap<>(); - context.put(UPDATE_MODE, UPDATE_MODE_VALUE_APPEND); - context.put(CONNECTOR_TYPE, TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ); - context.put(CONNECTOR_PROPERTY_VERSION, "1"); - - return context; - } - - @Override - public List supportedProperties() { - List properties = new ArrayList<>(); - - // tubemq - properties.add(TubemqValidator.CONNECTOR_TOPIC); - properties.add(TubemqValidator.CONNECTOR_MASTER); - properties.add(TubemqValidator.CONNECTOR_GROUP); - properties.add(TubemqValidator.CONNECTOR_STREAMIDS); - properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*"); - - // schema - properties.add(SCHEMA + ".#." + SCHEMA_TYPE); - properties.add(SCHEMA + ".#." + SCHEMA_NAME); - properties.add(SCHEMA + ".#." + SCHEMA_FROM); - - // time attributes - properties.add(SCHEMA + ".#." + SCHEMA_PROCTIME); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS); - properties.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED); - properties.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY); - - // format wildcard - properties.add(FORMAT + ".*"); - - return properties; - } - - @Override - public StreamTableSource createStreamTableSource( - Map properties) { - final DeserializationSchema deserializationSchema = - getDeserializationSchema(properties); - - final DescriptorProperties descriptorProperties = - new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - validateProperties(descriptorProperties); - - final TableSchema schema = - descriptorProperties.getTableSchema(SCHEMA); - final Optional proctimeAttribute = - SchemaValidator.deriveProctimeAttribute(descriptorProperties); - final List rowtimeAttributeDescriptors = - SchemaValidator.deriveRowtimeAttributes(descriptorProperties); - final Map fieldMapping = - SchemaValidator.deriveFieldMapping( - descriptorProperties, - Optional.of(deserializationSchema.getProducedType())); - final String topic = - descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC); - final String masterAddress = - descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER); - final String consumerGroup = - descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP); - final String streamIds = - descriptorProperties - .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS) - .orElse(null); - final Configuration configuration = - getConfiguration(descriptorProperties); - - TreeSet streamIdSet = new TreeSet<>(); - if (streamIds != null) { - streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA))); - } - - return new TubemqTableSource( - deserializationSchema, - schema, - proctimeAttribute, - rowtimeAttributeDescriptors, - fieldMapping, - masterAddress, - topic, - streamIdSet, - consumerGroup, - configuration); - } - - @Override - public StreamTableSink createStreamTableSink( - Map properties) { - final SerializationSchema serializationSchema = - getSerializationSchema(properties); - - final DescriptorProperties descriptorProperties = - new DescriptorProperties(true); - descriptorProperties.putProperties(properties); - - validateProperties(descriptorProperties); - - final TableSchema tableSchema = - descriptorProperties.getTableSchema(SCHEMA); - final String topic = - descriptorProperties.getString(TubemqValidator.CONNECTOR_TOPIC); - final String masterAddress = - descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER); - - final Configuration configuration = - getConfiguration(descriptorProperties); - - return new TubemqTableSink( - serializationSchema, - tableSchema, - topic, - masterAddress, - configuration); - } - - private SerializationSchema getSerializationSchema( - Map properties) { - @SuppressWarnings("unchecked") - final SerializationSchemaFactory formatFactory = - TableFactoryService.find( - SerializationSchemaFactory.class, - properties, - this.getClass().getClassLoader()); - - return formatFactory.createSerializationSchema(properties); - } - - private void validateProperties(DescriptorProperties descriptorProperties) { - new SchemaValidator(true, false, false).validate(descriptorProperties); - new TubemqValidator().validate(descriptorProperties); - } - - private DeserializationSchema getDeserializationSchema( - Map properties) { - @SuppressWarnings("unchecked") - final DeserializationSchemaFactory formatFactory = - TableFactoryService.find( - DeserializationSchemaFactory.class, - properties, - this.getClass().getClassLoader()); - - return formatFactory.createDeserializationSchema(properties); - } - - private Configuration getConfiguration( - DescriptorProperties descriptorProperties) { - Map properties = - descriptorProperties.getPropertiesWithPrefix(TubemqValidator.CONNECTOR_PROPERTIES); - - Configuration configuration = new Configuration(); - for (Map.Entry property : properties.entrySet()) { - configuration.setString(property.getKey(), property.getValue()); - } - - return configuration; - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java deleted file mode 100644 index bddaec4eea9..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; -import org.apache.flink.table.descriptors.DescriptorProperties; - -/** - * The validator for {@link Tubemq}. - */ -public class TubemqValidator extends ConnectorDescriptorValidator { - - /** - * The type of connector. - */ - public static final String CONNECTOR_TYPE_VALUE_TUBEMQ = "tubemq"; - - /** - * The address of tubemq master. - */ - public static final String CONNECTOR_MASTER = "connector.master"; - - /** - * The tubemq topic name. - */ - public static final String CONNECTOR_TOPIC = "connector.topic"; - - /** - * The tubemq (consumer or producer) group name. - */ - public static final String CONNECTOR_GROUP = "connector.group"; - - /** - * The tubemq consumers use these streamIds to filter records reading from server. - */ - public static final String CONNECTOR_STREAMIDS = "connector.stream-ids"; - - /** - * The prefix of tubemq properties (optional). - */ - public static final String CONNECTOR_PROPERTIES = "connector.properties"; - - @Override - public void validate(DescriptorProperties properties) { - super.validate(properties); - - // Validates that the connector type is tubemq. - properties.validateValue(CONNECTOR_TYPE, CONNECTOR_TYPE_VALUE_TUBEMQ, false); - - // Validate that the topic name is set. - properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); - - // Validate that the master address is set. - properties.validateString(CONNECTOR_MASTER, false, 1, Integer.MAX_VALUE); - - // Validate that the group name is set. - properties.validateString(CONNECTOR_GROUP, false, 1, Integer.MAX_VALUE); - - // Validate that the streamIds is set. - properties.validateString(CONNECTOR_STREAMIDS, true, 1, Integer.MAX_VALUE); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory deleted file mode 100644 index 30831743b76..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -org.apache.flink.connectors.tubemq.TubemqTableSourceSinkFactory diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java deleted file mode 100644 index 7f0e68def3c..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.connectors.tubemq; - -import org.apache.flink.table.descriptors.Descriptor; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.descriptors.DescriptorTestBase; -import org.apache.flink.table.descriptors.DescriptorValidator; -import org.junit.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Unit tests for {@link Tubemq}. - */ -public class TubemqTest extends DescriptorTestBase { - - @Override - protected List descriptors() { - final Descriptor descriptor1 = - new Tubemq() - .topic("test-topic-1") - .master("localhost:9001") - .group("test-group-1"); - - final Descriptor descriptor2 = - new Tubemq() - .topic("test-topic-2") - .master("localhost:9001") - .group("test-group-2") - .property("bootstrap.from.max", "true"); - - final Descriptor descriptor3 = - new Tubemq() - .topic("test-topic-3") - .master("localhost:9001") - .group("test-group-3") - .streamIds("test-streamId-1,test-streamId-2"); - - return Arrays.asList(descriptor1, descriptor2, descriptor3); - } - - @Override - protected List> properties() { - final Map props1 = new HashMap<>(); - props1.put("connector.property-version", "1"); - props1.put("connector.type", "tubemq"); - props1.put("connector.master", "localhost:9001"); - props1.put("connector.topic", "test-topic-1"); - props1.put("connector.group", "test-group-1"); - - final Map props2 = new HashMap<>(); - props2.put("connector.property-version", "1"); - props2.put("connector.type", "tubemq"); - props2.put("connector.master", "localhost:9001"); - props2.put("connector.topic", "test-topic-2"); - props2.put("connector.group", "test-group-2"); - props2.put("connector.properties.bootstrap.from.max", "true"); - - final Map props3 = new HashMap<>(); - props3.put("connector.property-version", "1"); - props3.put("connector.type", "tubemq"); - props3.put("connector.master", "localhost:9001"); - props3.put("connector.topic", "test-topic-3"); - props3.put("connector.stream-ids", "test-streamId-1,test-streamId-2"); - props3.put("connector.group", "test-group-3"); - - return Arrays.asList(props1, props2, props3); - } - - @Override - protected DescriptorValidator validator() { - return new TubemqValidator(); - } - - @Test - public void testTubePropertiesValidator() { - DescriptorValidator validator = this.validator(); - DescriptorProperties descriptorProperties = new DescriptorProperties(); - descriptorProperties.putProperties(properties().get(0)); - validator.validate(descriptorProperties); - } -} diff --git a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties deleted file mode 100644 index 1da9b515e34..00000000000 --- a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/resources/log4j2.properties +++ /dev/null @@ -1,21 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -rootLogger=info, A1 -# A1 is set to be a ConsoleAppender. -appender.A1.name=A1 -appender.A1.type=Console