From 3e25d55480f4108df70e904923795fa00b7451ca Mon Sep 17 00:00:00 2001 From: vernedeng Date: Mon, 25 Nov 2024 19:31:28 +0800 Subject: [PATCH] [INLONG-11537][Sort] Optimize the session key generation of TubeMQ Source --- .../org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index 4fe32444be..4cbd285f17 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer extends RichParallelSourceFunction private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class); private static final String TUBE_OFFSET_STATE = "tube-offset-state"; + private static final String UNDERSCORE = "_"; + /** * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715. */ @@ -222,7 +224,7 @@ public void open(Configuration parameters) throws Exception { messagePullConsumer.subscribe(topic, streamIdSet); String jobId = getRuntimeContext().getJobId().toString(); String attemptNumber = String.valueOf(getRuntimeContext().getAttemptNumber()); - String startSessionKey = sessionKey.concat("_").concat(jobId).concat("_").concat(attemptNumber); + String startSessionKey = sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber); LOG.info("start to init tube mq consumer, session key={}", startSessionKey); messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, currentOffsets);