Skip to content

Commit

Permalink
[INLONG-9517][Manager] Fix the problem of the tube consumption group …
Browse files Browse the repository at this point in the history
…sent to the sort side is incorrect
  • Loading branch information
fuweng11 committed Dec 22, 2023
1 parent 1e25c54 commit 05a087d
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;

import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -45,12 +48,19 @@
@Service
public class TubeMQQueueResourceOperator implements QueueResourceOperator {

/**
* The name rule for Pulsar subscription: clusterTag_topicName_sinkId_consumer_group
*/
public static final String TUBE_CONSUMER_GROUP = "%s_%s_%s_consumer_group";

@Autowired
private InlongClusterService clusterService;
@Autowired
private InlongConsumeService consumeService;
@Autowired
private TubeMQOperator tubeMQOperator;
@Autowired
private StreamSinkService sinkService;

@Override
public boolean accept(String mqType) {
Expand All @@ -76,20 +86,7 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
String topicName = groupInfo.getMqResource();
tubeMQOperator.createTopic(tubeCluster, topicName, operator);
log.info("success to create tubemq topic for groupId={}", groupId);

// 2. create tubemq consumer group
// consumer naming rules: clusterTag_topicName_consumer_group
String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
log.info("success to create tubemq consumer group for groupId={}", groupId);

// insert the consumer group info
Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
id, consumeGroup, groupId, topicName);

log.info("success to create tubemq resource for groupId={}, cluster={}", groupId, tubeCluster);
log.info("success to create tubemq topic for groupId={}, cluster={}", groupId, tubeCluster);
} catch (Exception e) {
log.error("failed to create tubemq resource for groupId=" + groupId, e);
throw new WorkflowListenerException("failed to create tubemq resource: " + e.getMessage());
Expand All @@ -103,7 +100,28 @@ public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {

@Override
public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, String operator) {
// currently, not support create tubemq resource for stream
String groupId = groupInfo.getInlongGroupId();
String streamId = streamInfo.getInlongStreamId();
List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
if (CollectionUtils.isEmpty(streamSinks)) {
log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
return;
}
for (StreamSink sink : streamSinks) {
// 1. create tubemq consumer group
// consumer naming rules: clusterTag_topicName_sinkId_consumer_group
String clusterTag = groupInfo.getInlongClusterTag();
TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
String topicName = groupInfo.getMqResource();
String consumeGroup = String.format(TUBE_CONSUMER_GROUP, clusterTag, topicName, sink.getId());
tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
log.info("success to create tubemq consumer group for groupId={}", groupId);

// insert the consumer group info
Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
id, consumeGroup, groupId, topicName);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand All @@ -48,6 +50,8 @@
import java.util.Map;
import java.util.Objects;

import static org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;

/**
* TubeMQ source operator
*/
Expand All @@ -58,6 +62,8 @@ public class TubeMQSourceOperator extends AbstractSourceOperator {
private ObjectMapper objectMapper;
@Autowired
private InlongClusterService clusterService;
@Autowired
private StreamSinkEntityMapper sinkMapper;

@Override
public Boolean accept(String sourceType) {
Expand Down Expand Up @@ -109,7 +115,16 @@ public Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
String streamId = streamInfo.getInlongStreamId();
tubeMQSource.setSourceName(streamId);
tubeMQSource.setTopic(groupInfo.getMqResource());
tubeMQSource.setConsumeGroup(streamId);
List<StreamSinkEntity> sinkEntityList = sinkMapper.selectByRelatedId(groupInfo.getInlongGroupId(),
streamId);
// Issued pulsar subscriptions to sort only supports a stream with only one source and one sink
String consumeGroup = streamId;
if (sinkEntityList.size() == 1) {
// consumer naming rules: clusterTag_topicName_sinkId_consumer_group
consumeGroup = String.format(TUBE_CONSUMER_GROUP, groupInfo.getInlongClusterTag(),
groupInfo.getMqResource(), sinkEntityList.get(0).getId());
}
tubeMQSource.setConsumeGroup(consumeGroup);
tubeMQSource.setMasterRpc(masterRpc);
tubeMQSource.setWrapType(streamInfo.getWrapType());
tubeMQSource.setIgnoreParseError(streamInfo.getIgnoreParseError());
Expand Down

0 comments on commit 05a087d

Please sign in to comment.