diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java index b240c7217a2..7ee92cc537f 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/NodeStatus.java @@ -26,7 +26,15 @@ public enum NodeStatus { NORMAL(1), - HEARTBEAT_TIMEOUT(2); + HEARTBEAT_TIMEOUT(2), + + INSTALLING(3), + + INSTALL_FAILED(4), + + INSTALL_SUCCESS(5), + + UNLOAD_FAILED(6); int status; diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java index f168de98d9e..2b87842e998 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongClusterNodeEntityMapper.java @@ -55,7 +55,8 @@ List selectByParentId(@Param("parentId") Integer parent */ int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, @Param("status") Integer status); - int updateOperateLogById(@Param("id") Integer id, @Param("operateLog") String operateLog); + int updateOperateLogById(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, + @Param("operateLog") String operateLog); int deleteById(Integer id); diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml index 2afe00ee6bc..76043621061 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongClusterNodeEntityMapper.xml @@ -256,7 +256,14 @@ update inlong_cluster_node - set operate_log = #{operateLog,jdbcType=LONGVARCHAR} + + + status = #{nextStatus,jdbcType=INTEGER}, + + + operate_log = #{operateLog,jdbcType=LONGVARCHAR} + + where id = #{id,jdbcType=INTEGER} and is_deleted = 0 diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index f399455803e..7882e29c127 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -90,6 +90,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -101,6 +102,8 @@ import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; +import javax.annotation.PostConstruct; + import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -113,6 +116,12 @@ import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams; @@ -126,6 +135,15 @@ public class InlongClusterServiceImpl implements InlongClusterService { private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class); private static final Gson GSON = new Gson(); + private final ExecutorService executorService = new ThreadPoolExecutor( + 5, + 10, + 10L, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(100), + new ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(), + new CallerRunsPolicy()); + private final LinkedBlockingQueue pendingInstallRequests = new LinkedBlockingQueue<>(); @Autowired private InlongGroupEntityMapper groupMapper; @@ -158,6 +176,13 @@ public class InlongClusterServiceImpl implements InlongClusterService { @Autowired private DataProxyConfigRepository proxyRepository; + @PostConstruct + private void startInstallTask() { + InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable(); + this.executorService.execute(installTaskRunnable); + LOGGER.info("install task started successfully"); + } + @Override public Integer saveTag(ClusterTagRequest request, String operator) { LOGGER.debug("begin to save cluster tag {}", request); @@ -692,9 +717,7 @@ public Integer saveNode(ClusterNodeRequest request, String operator) { Integer id = instance.saveOpt(request, operator); if (request.getIsInstall()) { request.setId(id); - InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance( - request.getType()); - clusterNodeInstallOperator.install(request, operator); + pendingInstallRequests.add(request); } return id; } @@ -810,7 +833,6 @@ public List listNodeIpByType(String type) { } @Override - @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ) public Boolean updateNode(ClusterNodeRequest request, String operator) { LOGGER.debug("begin to update inlong cluster node={}", request); Preconditions.expectNotNull(request, "inlong cluster node cannot be empty"); @@ -843,9 +865,9 @@ public Boolean updateNode(ClusterNodeRequest request, String operator) { InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType()); instance.updateOpt(request, operator); if (request.getIsInstall()) { - InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance( - request.getType()); - clusterNodeInstallOperator.install(request, operator); + // when reinstall set install to false + request.setIsInstall(false); + pendingInstallRequests.add(request); } return true; } @@ -1381,4 +1403,37 @@ private void chkUnmodifiableParams(InlongClusterEntity entity, ClusterRequest re request.setClusterTags(entity.getClusterTags()); } } + + private class InstallTaskRunnable implements Runnable { + + private static final int WAIT_SECONDS = 60 * 1000; + + @Override + public void run() { + while (true) { + try { + processInstall(); + Thread.sleep(WAIT_SECONDS); + } catch (Exception e) { + LOGGER.error("exception occurred when install", e); + } + } + } + + @Transactional(rollbackFor = Throwable.class) + public void processInstall() { + if (pendingInstallRequests.isEmpty()) { + return; + } + ClusterNodeRequest request = pendingInstallRequests.poll(); + InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance( + request.getType()); + if (request.getIsInstall()) { + clusterNodeInstallOperator.install(request, request.getCurrentUser()); + } else { + clusterNodeInstallOperator.reInstall(request, request.getCurrentUser()); + } + + } + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java index 06fd1db3ad4..abf8a895cde 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/node/AgentClusterNodeInstallOperator.java @@ -19,6 +19,7 @@ import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.enums.ModuleType; +import org.apache.inlong.manager.common.enums.NodeStatus; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.CommonBeanUtils; @@ -96,12 +97,17 @@ public String getClusterNodeType() { public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) { LOGGER.info("begin to insert agent cluster node={}", clusterNodeRequest); try { + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.INSTALLING.getStatus(), + "begin to install"); AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest; deployInstaller(request, operator); String startCmd = agentInstallPath + INSTALLER_START_CMD; commandExecutor.execRemote(request, startCmd); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), + NodeStatus.INSTALL_SUCCESS.getStatus(), "success to install"); } catch (Exception e) { - clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage()); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), + NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage()); String errMsg = String.format("install agent cluster node failed for ip=%s", clusterNodeRequest.getIp()); LOGGER.error(errMsg, e); throw new BusinessException(errMsg); @@ -114,13 +120,18 @@ public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) { public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator) { LOGGER.info("begin to reInstall agent cluster node={}", clusterNodeRequest); try { + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.INSTALLING.getStatus(), + "begin to reinstall"); AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest; commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator))); deployInstaller(request, operator); String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD; commandExecutor.execRemote(request, reStartCmd); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.NORMAL.getStatus(), + "success to reinstall"); } catch (Exception e) { - clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage()); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), + NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage()); String errMsg = String.format("reInstall agent cluster node failed for ip=%s", clusterNodeRequest.getIp()); LOGGER.error(errMsg, e); throw new BusinessException(errMsg); @@ -140,7 +151,8 @@ public boolean unload(InlongClusterNodeEntity clusterNodeEntity, String operator commandExecutor.execRemote(request, stopCmd); commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator))); } catch (Exception e) { - clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(), e.getMessage()); + clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(), + NodeStatus.UNLOAD_FAILED.getStatus(), e.getMessage()); String errMsg = String.format("unload agent cluster node failed for ip=%s", clusterNodeEntity.getIp()); LOGGER.error(errMsg, e); throw new BusinessException(errMsg);