Skip to content

Commit

Permalink
[INLONG-9533][Manager] Fix error
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 committed Dec 27, 2023
1 parent e1550de commit e288d8e
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.node.DataNodeService;

import com.github.pagehelper.Page;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -64,7 +64,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
@Autowired
protected InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
protected DataNodeOperateHelper dataNodeHelper;
protected DataNodeService dataNodeService;

/**
* Getting the source type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public String getExtParams(StreamSourceEntity sourceEntity) {
MySQLBinlogSourceDTO mySQLBinlogSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
MySQLBinlogSourceDTO.class);
if (Objects.nonNull(mySQLBinlogSourceDTO) && StringUtils.isBlank(mySQLBinlogSourceDTO.getHostname())) {
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeService.get(
sourceEntity.getDataNodeName(), DataNodeType.MYSQL);
CommonBeanUtils.copyProperties(dataNodeInfo, mySQLBinlogSourceDTO, true);
mySQLBinlogSourceDTO.setUser(dataNodeInfo.getUsername());
Expand Down Expand Up @@ -104,7 +104,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
"mysql url and data node is blank");
}
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
MySQLDataNodeInfo dataNodeInfo = (MySQLDataNodeInfo) dataNodeService.get(
entity.getDataNodeName(), DataNodeType.MYSQL);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setUser(dataNodeInfo.getUsername());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public String getExtParams(StreamSourceEntity sourceEntity) {
IcebergSourceDTO icebergSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
IcebergSourceDTO.class);
if (Objects.nonNull(icebergSourceDTO) && StringUtils.isBlank(icebergSourceDTO.getUri())) {
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeHelper.getDataNodeInfo(
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeService.get(
sourceEntity.getDataNodeName(), DataNodeType.ICEBERG);
CommonBeanUtils.copyProperties(dataNodeInfo, icebergSourceDTO, true);
icebergSourceDTO.setUri(dataNodeInfo.getUrl());
Expand Down Expand Up @@ -113,7 +113,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"iceberg catalog uri unspecified and data node is blank");
}
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeHelper.getDataNodeInfo(
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeService.get(
entity.getDataNodeName(), DataNodeType.ICEBERG);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setUri(dataNodeInfo.getUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String getExtParams(StreamSourceEntity sourceEntity) {
PostgreSQLSourceDTO postgreSQLSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
PostgreSQLSourceDTO.class);
if (java.util.Objects.nonNull(postgreSQLSourceDTO) && StringUtils.isBlank(postgreSQLSourceDTO.getHostname())) {
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeService.get(
sourceEntity.getDataNodeName(), DataNodeType.POSTGRESQL);
CommonBeanUtils.copyProperties(dataNodeInfo, postgreSQLSourceDTO, true);
postgreSQLSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
Expand Down Expand Up @@ -102,7 +102,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"postgreSQl hostname unspecified and data node is blank");
}
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeService.get(
entity.getDataNodeName(), DataNodeType.POSTGRESQL);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public String getExtParams(StreamSourceEntity sourceEntity) {
PulsarSourceDTO pulsarSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
PulsarSourceDTO.class);
if (java.util.Objects.nonNull(pulsarSourceDTO) && StringUtils.isBlank(pulsarSourceDTO.getAdminUrl())) {
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeHelper.getDataNodeInfo(
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeService.get(
sourceEntity.getDataNodeName(), DataNodeType.PULSAR);
CommonBeanUtils.copyProperties(dataNodeInfo, pulsarSourceDTO, true);
return JsonUtils.toJsonString(pulsarSourceDTO);
Expand Down Expand Up @@ -126,7 +126,7 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"pulsar admin url unspecified and data node is blank");
}
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeHelper.getDataNodeInfo(
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeService.get(
entity.getDataNodeName(), DataNodeType.PULSAR);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
}
Expand Down

0 comments on commit e288d8e

Please sign in to comment.