diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java index fdfcff0ef44..be4aa144133 100644 --- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java +++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java @@ -236,6 +236,7 @@ void testListGroup4BinlogSource() { .inlongGroupId("1") .inlongStreamId("2") .sourceType(SourceType.MYSQL_BINLOG) + .hostname("127.0.0.1") .status(1) .user("root") .password("pwd") @@ -560,6 +561,7 @@ void testListStream4AllSink() { MySQLBinlogSource.builder() .id(2) .sourceType(SourceType.MYSQL_BINLOG) + .hostname("127.0.0.1") .user("user") .password("pwd") .build(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java index 5d6fb1101e3..83c858971dc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java @@ -57,7 +57,9 @@ import org.apache.inlong.manager.pojo.source.file.FileSourceDTO; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.service.core.AgentService; +import org.apache.inlong.manager.service.source.SourceOperatorFactory; import org.apache.inlong.manager.service.source.SourceSnapshotOperator; +import org.apache.inlong.manager.service.source.StreamSourceOperator; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.LoadingCache; @@ -152,6 +154,8 @@ public class AgentServiceImpl implements AgentService { private InlongClusterEntityMapper clusterMapper; @Autowired private InlongClusterNodeEntityMapper clusterNodeMapper; + @Autowired + private SourceOperatorFactory operatorFactory; /** * Start the update task @@ -593,7 +597,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) { InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId); InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); - String extParams = entity.getExtParams(); + StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); + String extParams = sourceOperator.getExtParams(entity); if (groupEntity != null && streamEntity != null) { dataConfig.setState( SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index bb54145a06b..935a688d821 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -78,6 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { */ protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity); + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + return sourceEntity.getExtParams(); + } + @Override @Transactional(rollbackFor = Throwable.class) public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java index be1768b5997..5e7168879bb 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceOperator.java @@ -43,6 +43,8 @@ public interface StreamSourceOperator { */ Boolean accept(String sourceType); + String getExtParams(StreamSourceEntity sourceEntity); + /** * Save the source info. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java index 2a42b24b4f1..5342d2e56a6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/binlog/BinlogSourceOperator.java @@ -17,24 +17,31 @@ package org.apache.inlong.manager.service.source.binlog; +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo; import org.apache.inlong.manager.pojo.source.SourceRequest; import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO; import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest; import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; import org.apache.inlong.manager.service.source.AbstractSourceOperator; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.List; +import java.util.Objects; /** * Binlog source operator @@ -42,6 +49,8 @@ @Service public class BinlogSourceOperator extends AbstractSourceOperator { + @Autowired + protected DataNodeOperateHelper dataNodeHelper; @Autowired private ObjectMapper objectMapper; @@ -55,6 +64,23 @@ protected String getSourceType() { return SourceType.MYSQL_BINLOG; } + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + MySQLBinlogSourceDTO mySQLBinlogSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(), + MySQLBinlogSourceDTO.class); + if (Objects.nonNull(mySQLBinlogSourceDTO) && StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) { + MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo( + sourceEntity.getDataNodeName(), DataNodeType.MYSQL); + CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO, true); + mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername()); + mySQLBinlogSourceDTO.setPassword(dataNodeInfo.getToken()); + mySQLBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + mySQLBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + return JsonUtils.toJsonString(mySQLBinlogSourceDTO); + } + return sourceEntity.getExtParams(); + } + @Override protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { MySQLBinlogSourceRequest sourceRequest = (MySQLBinlogSourceRequest) request; @@ -76,6 +102,19 @@ public StreamSource getFromEntity(StreamSourceEntity entity) { } MySQLBinlogSourceDTO dto = MySQLBinlogSourceDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getHostname())) { + if (StringUtils.isBlank(entity.getDataNodeName())) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + "mysql url and data node is blank"); + } + MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo( + entity.getDataNodeName(), DataNodeType.MYSQL); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setUser(dataNodeInfo.getUsername()); + dto.setPassword(dataNodeInfo.getToken()); + dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + } CommonBeanUtils.copyProperties(entity, source, true); CommonBeanUtils.copyProperties(dto, source, true); diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java index 4cdd5b9c66c..70034803e82 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java @@ -105,6 +105,7 @@ public Integer saveSource() { sourceInfo.setInlongStreamId(GLOBAL_STREAM_ID); sourceInfo.setSourceType(SourceType.MYSQL_BINLOG); sourceInfo.setSourceName("binlog_source_in_agent_service_test"); + sourceInfo.setHostname("127.0.0.1"); return sourceService.save(sourceInfo, GLOBAL_OPERATOR); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java index 4d343a6dd82..45c41eb4b87 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/StreamSourceServiceTest.java @@ -54,6 +54,7 @@ public Integer saveSource() { String sourceName = "stream_source_service_test"; sourceInfo.setSourceName(sourceName); sourceInfo.setSourceType(SourceType.MYSQL_BINLOG); + sourceInfo.setHostname("127.0.0.1"); Map properties = Maps.newLinkedHashMap(); properties.put("append-mode", "true"); sourceInfo.setProperties(properties);