Skip to content

Commit

Permalink
[INLONG-11537][Sort] Optimize the session key generation of TubeMQ So…
Browse files Browse the repository at this point in the history
…urce
  • Loading branch information
vernedeng committed Nov 25, 2024
1 parent 3645d99 commit 3e25d55
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T>
private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class);
private static final String TUBE_OFFSET_STATE = "tube-offset-state";

private static final String UNDERSCORE = "_";

/**
* The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715.
*/
Expand Down Expand Up @@ -222,7 +224,7 @@ public void open(Configuration parameters) throws Exception {
messagePullConsumer.subscribe(topic, streamIdSet);
String jobId = getRuntimeContext().getJobId().toString();
String attemptNumber = String.valueOf(getRuntimeContext().getAttemptNumber());
String startSessionKey = sessionKey.concat("_").concat(jobId).concat("_").concat(attemptNumber);
String startSessionKey = sessionKey.concat(UNDERSCORE).concat(jobId).concat(UNDERSCORE).concat(attemptNumber);
LOG.info("start to init tube mq consumer, session key={}", startSessionKey);
messagePullConsumer.completeSubscribe(startSessionKey, numTasks, true, currentOffsets);

Expand Down

0 comments on commit 3e25d55

Please sign in to comment.