From d9e03116d64633610f83642f2323420ae78232f6 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Mon, 18 Dec 2023 14:27:11 +0800 Subject: [PATCH] [INLONG-9475][Manager] Support setting dataNode when configuring streamSource for MYSQL --- .../service/core/impl/AgentServiceImpl.java | 7 +++- .../source/AbstractSourceOperator.java | 5 +++ .../service/source/StreamSourceOperator.java | 2 + .../source/binlog/BinlogSourceOperator.java | 39 +++++++++++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 5d6fb1101e..83c858971d 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -57,7 +57,9 @@ import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.core.AgentService; +import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; +import org.apache.inlong.manager.service.source.StreamSourceOperator; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -152,6 +154,8 @@ public class AgentServiceImpl implements AgentService { private InlongClusterEntityMapper clusterMapper; @Autowired private InlongClusterNodeEntityMapper clusterNodeMapper; + @Autowired + private SourceOperatorFactory operatorFactory; /** * Start the update task @@ -593,7 +597,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) { InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); - String extParams = entity.getExtParams(); + StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); + String extParams = sourceOperator.getExtParams(entity); if (groupEntity != null && streamEntity != null) { dataConfig.setState( SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index bb54145a06..935a688d82 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -78,6 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { */ protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity); + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + return sourceEntity.getExtParams(); + } + @Override @Transactional(rollbackFor = Throwable.class) public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index be1768b599..5e7168879b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -43,6 +43,8 @@ public interface StreamSourceOperator { */ Boolean accept(String sourceType); + String getExtParams(StreamSourceEntity sourceEntity); + /** * Save the source info. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java index 2a42b24b4f..5342d2e56a 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java @@ -17,24 +17,31 @@ package org.apache.inlong.manager.service.source.binlog; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest; import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Objects; /** * Binlog source operator @@ -42,6 +49,8 @@ @Service public class BinlogSourceOperator extends AbstractSourceOperator { + @Autowired + protected DataNodeOperateHelper dataNodeHelper; @Autowired private ObjectMapper objectMapper; @@ -55,6 +64,23 @@ protected String getSourceType() { return SourceType.MYSQL_BINLOG; } + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + MySQLBinlogSourceDTO mySQLBinlogSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(), + MySQLBinlogSourceDTO.class); + if (Objects.nonNull(mySQLBinlogSourceDTO) && StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) { + MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo( + sourceEntity.getDataNodeName(), DataNodeType.MYSQL); + CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO, true); + mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername()); + mySQLBinlogSourceDTO.setPassword(dataNodeInfo.getToken()); + mySQLBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + mySQLBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + return JsonUtils.toJsonString(mySQLBinlogSourceDTO); + } + return sourceEntity.getExtParams(); + } + @Override protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { MySQLBinlogSourceRequest sourceRequest = (MySQLBinlogSourceRequest) request; @@ -76,6 +102,19 @@ public StreamSource getFromEntity(StreamSourceEntity entity) { } MySQLBinlogSourceDTO dto = MySQLBinlogSourceDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getHostname())) { + if (StringUtils.isBlank(entity.getDataNodeName())) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + "mysql url and data node is blank"); + } + MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo( + entity.getDataNodeName(), DataNodeType.MYSQL); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setUser(dataNodeInfo.getUsername()); + dto.setPassword(dataNodeInfo.getToken()); + dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + } CommonBeanUtils.copyProperties(entity, source, true); CommonBeanUtils.copyProperties(dto, source, true);