Skip to content

Commit

Permalink
[INLONG-9475][Manager] Support setting dataNode when configuring stre…
Browse files Browse the repository at this point in the history
…amSource for MYSQL
  • Loading branch information
fuweng11 committed Dec 18, 2023
1 parent 7fe1edd commit d9e0311
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceOperatorFactory;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
import org.apache.inlong.manager.service.source.StreamSourceOperator;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
Expand Down Expand Up @@ -152,6 +154,8 @@ public class AgentServiceImpl implements AgentService {
private InlongClusterEntityMapper clusterMapper;
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
@Autowired
private SourceOperatorFactory operatorFactory;

/**
* Start the update task
Expand Down Expand Up @@ -593,7 +597,8 @@ private DataConfig getDataConfig(StreamSourceEntity entity, int op) {

InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
String extParams = entity.getExtParams();
StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
String extParams = sourceOperator.getExtParams(entity);
if (groupEntity != null && streamEntity != null) {
dataConfig.setState(
SourceStatus.NORMAL_STATUS_SET.contains(SourceStatus.forCode(entity.getStatus())) ? 1 : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
*/
protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);

@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
return sourceEntity.getExtParams();
}

@Override
@Transactional(rollbackFor = Throwable.class)
public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public interface StreamSourceOperator {
*/
Boolean accept(String sourceType);

String getExtParams(StreamSourceEntity sourceEntity);

/**
* Save the source info.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,40 @@

package org.apache.inlong.manager.service.source.binlog;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.node.mysql.MySQLDataNodeInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Objects;

/**
* Binlog source operator
*/
@Service
public class BinlogSourceOperator extends AbstractSourceOperator {

@Autowired
protected DataNodeOperateHelper dataNodeHelper;
@Autowired
private ObjectMapper objectMapper;

Expand All @@ -55,6 +64,23 @@ protected String getSourceType() {
return SourceType.MYSQL_BINLOG;
}

@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
MySQLBinlogSourceDTO mySQLBinlogSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
MySQLBinlogSourceDTO.class);
if (Objects.nonNull(mySQLBinlogSourceDTO) && StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) {
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
sourceEntity.getDataNodeName(), DataNodeType.MYSQL);
CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO, true);
mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername());
mySQLBinlogSourceDTO.setPassword(dataNodeInfo.getToken());
mySQLBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
mySQLBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
return JsonUtils.toJsonString(mySQLBinlogSourceDTO);
}
return sourceEntity.getExtParams();
}

@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
MySQLBinlogSourceRequest sourceRequest = (MySQLBinlogSourceRequest) request;
Expand All @@ -76,6 +102,19 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
}

MySQLBinlogSourceDTO dto = MySQLBinlogSourceDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getHostname())) {
if (StringUtils.isBlank(entity.getDataNodeName())) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
"mysql url and data node is blank");
}
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), DataNodeType.MYSQL);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setUser(dataNodeInfo.getUsername());
dto.setPassword(dataNodeInfo.getToken());
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
}
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);

Expand Down

0 comments on commit d9e0311

Please sign in to comment.