Skip to content

Commit

Permalink
[INLONG-9533][Manager] Support setting dataNode when configuring stre…
Browse files Browse the repository at this point in the history
…amSource for Pulsar、Iceberg、PostgreSQL
  • Loading branch information
fuweng11 committed Dec 27, 2023
1 parent f495de5 commit e1550de
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;

import com.github.pagehelper.Page;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -62,6 +63,8 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
protected StreamSourceFieldEntityMapper sourceFieldMapper;
@Autowired
protected InlongStreamFieldEntityMapper streamFieldMapper;
@Autowired
protected DataNodeOperateHelper dataNodeHelper;

/**
* Getting the source type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceDTO;
import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSourceRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -49,8 +48,6 @@
@Service
public class BinlogSourceOperator extends AbstractSourceOperator {

@Autowired
protected DataNodeOperateHelper dataNodeHelper;
@Autowired
private ObjectMapper objectMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.inlong.manager.service.source.iceberg;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.node.iceberg.IcebergDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.source.SourceRequest;
Expand All @@ -37,6 +40,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -46,6 +50,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* Iceberg stream source operator
Expand All @@ -68,6 +73,20 @@ protected String getSourceType() {
return SourceType.ICEBERG;
}

@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
IcebergSourceDTO icebergSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
IcebergSourceDTO.class);
if (Objects.nonNull(icebergSourceDTO) && StringUtils.isBlank(icebergSourceDTO.getUri())) {
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeHelper.getDataNodeInfo(
sourceEntity.getDataNodeName(), DataNodeType.ICEBERG);
CommonBeanUtils.copyProperties(dataNodeInfo, icebergSourceDTO, true);
icebergSourceDTO.setUri(dataNodeInfo.getUrl());
return JsonUtils.toJsonString(icebergSourceDTO);
}
return sourceEntity.getExtParams();
}

@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request;
Expand All @@ -89,6 +108,16 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
}

IcebergSourceDTO dto = IcebergSourceDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getUri())) {
if (StringUtils.isBlank(entity.getDataNodeName())) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"iceberg catalog uri unspecified and data node is blank");
}
IcebergDataNodeInfo dataNodeInfo = (IcebergDataNodeInfo) dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), DataNodeType.ICEBERG);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setUri(dataNodeInfo.getUrl());
}
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.inlong.manager.service.source.postgresql;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.node.postgresql.PostgreSQLDataNodeInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
Expand All @@ -31,6 +35,7 @@
import org.apache.inlong.manager.service.source.AbstractSourceOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -55,6 +60,22 @@ protected String getSourceType() {
return SourceType.POSTGRESQL;
}

@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
PostgreSQLSourceDTO postgreSQLSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
PostgreSQLSourceDTO.class);
if (java.util.Objects.nonNull(postgreSQLSourceDTO) && StringUtils.isBlank(postgreSQLSourceDTO.getHostname())) {
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
sourceEntity.getDataNodeName(), DataNodeType.POSTGRESQL);
CommonBeanUtils.copyProperties(dataNodeInfo, postgreSQLSourceDTO, true);
postgreSQLSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
postgreSQLSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
postgreSQLSourceDTO.setPassword(dataNodeInfo.getToken());
return JsonUtils.toJsonString(postgreSQLSourceDTO);
}
return sourceEntity.getExtParams();
}

@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
PostgreSQLSourceRequest sourceRequest = (PostgreSQLSourceRequest) request;
Expand All @@ -76,6 +97,18 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
}

PostgreSQLSourceDTO dto = PostgreSQLSourceDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getHostname())) {
if (StringUtils.isBlank(entity.getDataNodeName())) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"postgreSQl hostname unspecified and data node is blank");
}
PostgreSQLDataNodeInfo dataNodeInfo = (PostgreSQLDataNodeInfo) dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), DataNodeType.POSTGRESQL);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
dto.setPassword(dataNodeInfo.getToken());
}
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@
package org.apache.inlong.manager.service.source.pulsar;

import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.node.pulsar.PulsarDataNodeInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
Expand Down Expand Up @@ -84,6 +87,19 @@ protected String getSourceType() {
return SourceType.PULSAR;
}

@Override
public String getExtParams(StreamSourceEntity sourceEntity) {
PulsarSourceDTO pulsarSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(),
PulsarSourceDTO.class);
if (java.util.Objects.nonNull(pulsarSourceDTO) && StringUtils.isBlank(pulsarSourceDTO.getAdminUrl())) {
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeHelper.getDataNodeInfo(
sourceEntity.getDataNodeName(), DataNodeType.PULSAR);
CommonBeanUtils.copyProperties(dataNodeInfo, pulsarSourceDTO, true);
return JsonUtils.toJsonString(pulsarSourceDTO);
}
return sourceEntity.getExtParams();
}

@Override
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
PulsarSourceRequest sourceRequest = (PulsarSourceRequest) request;
Expand All @@ -105,6 +121,15 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
}

PulsarSourceDTO dto = PulsarSourceDTO.getFromJson(entity.getExtParams());
if (StringUtils.isBlank(dto.getAdminUrl())) {
if (StringUtils.isBlank(entity.getDataNodeName())) {
throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
"pulsar admin url unspecified and data node is blank");
}
PulsarDataNodeInfo dataNodeInfo = (PulsarDataNodeInfo) dataNodeHelper.getDataNodeInfo(
entity.getDataNodeName(), DataNodeType.PULSAR);
CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
}
CommonBeanUtils.copyProperties(entity, source, true);
CommonBeanUtils.copyProperties(dto, source, true);

Expand Down

0 comments on commit e1550de

Please sign in to comment.