Skip to content

Commit

Permalink
[INLONG-9523][Manager] Fix the problem of sink remains in configurati…
Browse files Browse the repository at this point in the history
…on after standalone cluster allocation failure
  • Loading branch information
fuweng11 committed Jan 9, 2024
1 parent f3963ba commit 501b1d2
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
<result column="log_ts" property="logTs" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
<result column="total_size" property="totalSize" jdbcType="BIGINT"/>
</resultMap>

<resultMap id="SumGroupByIdResultMap" type="java.util.Map">
Expand All @@ -50,6 +51,7 @@
<result column="ip" property="ip" jdbcType="VARCHAR"/>
<result column="total" property="total" jdbcType="BIGINT"/>
<result column="total_delay" property="totalDelay" jdbcType="BIGINT"/>
<result column="total_size" property="totalSize" jdbcType="BIGINT"/>
</resultMap>

<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
Expand Down Expand Up @@ -55,26 +56,33 @@ public abstract class AbstractStandaloneSinkResourceOperator implements SinkReso

@VisibleForTesting
protected void assignCluster(SinkInfo sinkInfo) {
if (StringUtils.isBlank(sinkInfo.getSinkType())) {
throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
try {
if (StringUtils.isBlank(sinkInfo.getSinkType())) {
throw new IllegalArgumentException(ErrorCodeEnum.SINK_TYPE_IS_NULL.getMessage());
}

if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
String info = "no need to auto-assign cluster since the cluster has already assigned";
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
return;
}

String targetCluster = assignOneCluster(sinkInfo);
Preconditions.expectNotBlank(targetCluster,
String.format("find no proper cluster assign to group=%s, stream=%s, sink type=%s, data node=%s ",
sinkInfo.getInlongGroupId(), sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
sinkInfo.getDataNodeName()));

StreamSinkEntity sink = sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
sink.setInlongClusterName(targetCluster);
sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
sinkEntityMapper.updateByIdSelective(sink);
} catch (Throwable e) {
String errMsg = "assign standalone cluster failed: " + e.getMessage();
log.error(errMsg, e);
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg);
throw new WorkflowException(errMsg);
}

if (StringUtils.isNotBlank(sinkInfo.getInlongClusterName())) {
String info = "no need to auto-assign cluster since the cluster has already assigned";
sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
return;
}

String targetCluster = assignOneCluster(sinkInfo);
Preconditions.expectNotBlank(targetCluster,
String.format("find no proper cluster assign to group=%s, stream=%s, sink type=%s, data node=%s ",
sinkInfo.getInlongGroupId(), sinkInfo.getInlongStreamId(), sinkInfo.getSinkType(),
sinkInfo.getDataNodeName()));

StreamSinkEntity sink = sinkEntityMapper.selectByPrimaryKey(sinkInfo.getId());
sink.setInlongClusterName(targetCluster);
sink.setStatus(SinkStatus.CONFIG_SUCCESSFUL.getCode());
sinkEntityMapper.updateByIdSelective(sink);
}

private String assignOneCluster(SinkInfo sinkInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void saveFieldOpt(SinkRequest request) {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void saveFieldOpt(SinkRequest request) {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public void saveFieldOpt(SinkRequest request) {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void saveFieldOpt(SinkRequest request) {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void saveFieldOpt(SinkRequest request) {
Integer sinkId = request.getId();
for (SinkField fieldInfo : fieldList) {
this.checkFieldInfo(fieldInfo);
fieldInfo.setExtParams(null);
StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
fieldEntity.setFieldComment(fieldEntity.getFieldName());
Expand Down

0 comments on commit 501b1d2

Please sign in to comment.