Skip to content

Commit

Permalink
[INLONG-11157][Manager] Asynchronous processing agent installation (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweng11 authored Sep 23, 2024
1 parent 0a884d9 commit 9969932
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ public enum NodeStatus {

NORMAL(1),

HEARTBEAT_TIMEOUT(2);
HEARTBEAT_TIMEOUT(2),

INSTALLING(3),

INSTALL_FAILED(4),

INSTALL_SUCCESS(5),

UNLOAD_FAILED(6);

int status;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ List<InlongClusterNodeEntity> selectByParentId(@Param("parentId") Integer parent
*/
int updateStatus(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus, @Param("status") Integer status);

int updateOperateLogById(@Param("id") Integer id, @Param("operateLog") String operateLog);
int updateOperateLogById(@Param("id") Integer id, @Param("nextStatus") Integer nextStatus,
@Param("operateLog") String operateLog);

int deleteById(Integer id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,14 @@
</update>
<update id="updateOperateLogById">
update inlong_cluster_node
set operate_log = #{operateLog,jdbcType=LONGVARCHAR}
<set>
<if test="nextStatus != null">
status = #{nextStatus,jdbcType=INTEGER},
</if>
<if test="operateLog != null">
operate_log = #{operateLog,jdbcType=LONGVARCHAR}
</if>
</set>
where id = #{id,jdbcType=INTEGER}
and is_deleted = 0
</update>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -101,6 +102,8 @@
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand All @@ -113,6 +116,12 @@
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.inlong.manager.pojo.cluster.InlongClusterTagExtParam.packExtParams;
Expand All @@ -126,6 +135,15 @@ public class InlongClusterServiceImpl implements InlongClusterService {

private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final Gson GSON = new Gson();
private final ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
10L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("agent-install-%s").build(),
new CallerRunsPolicy());
private final LinkedBlockingQueue<ClusterNodeRequest> pendingInstallRequests = new LinkedBlockingQueue<>();

@Autowired
private InlongGroupEntityMapper groupMapper;
Expand Down Expand Up @@ -158,6 +176,13 @@ public class InlongClusterServiceImpl implements InlongClusterService {
@Autowired
private DataProxyConfigRepository proxyRepository;

@PostConstruct
private void startInstallTask() {
InstallTaskRunnable installTaskRunnable = new InstallTaskRunnable();
this.executorService.execute(installTaskRunnable);
LOGGER.info("install task started successfully");
}

@Override
public Integer saveTag(ClusterTagRequest request, String operator) {
LOGGER.debug("begin to save cluster tag {}", request);
Expand Down Expand Up @@ -692,9 +717,7 @@ public Integer saveNode(ClusterNodeRequest request, String operator) {
Integer id = instance.saveOpt(request, operator);
if (request.getIsInstall()) {
request.setId(id);
InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance(
request.getType());
clusterNodeInstallOperator.install(request, operator);
pendingInstallRequests.add(request);
}
return id;
}
Expand Down Expand Up @@ -810,7 +833,6 @@ public List<String> listNodeIpByType(String type) {
}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public Boolean updateNode(ClusterNodeRequest request, String operator) {
LOGGER.debug("begin to update inlong cluster node={}", request);
Preconditions.expectNotNull(request, "inlong cluster node cannot be empty");
Expand Down Expand Up @@ -843,9 +865,9 @@ public Boolean updateNode(ClusterNodeRequest request, String operator) {
InlongClusterNodeOperator instance = clusterNodeOperatorFactory.getInstance(request.getType());
instance.updateOpt(request, operator);
if (request.getIsInstall()) {
InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance(
request.getType());
clusterNodeInstallOperator.install(request, operator);
// when reinstall set install to false
request.setIsInstall(false);
pendingInstallRequests.add(request);
}
return true;
}
Expand Down Expand Up @@ -1381,4 +1403,37 @@ private void chkUnmodifiableParams(InlongClusterEntity entity, ClusterRequest re
request.setClusterTags(entity.getClusterTags());
}
}

private class InstallTaskRunnable implements Runnable {

private static final int WAIT_SECONDS = 60 * 1000;

@Override
public void run() {
while (true) {
try {
processInstall();
Thread.sleep(WAIT_SECONDS);
} catch (Exception e) {
LOGGER.error("exception occurred when install", e);
}
}
}

@Transactional(rollbackFor = Throwable.class)
public void processInstall() {
if (pendingInstallRequests.isEmpty()) {
return;
}
ClusterNodeRequest request = pendingInstallRequests.poll();
InlongClusterNodeInstallOperator clusterNodeInstallOperator = clusterNodeInstallOperatorFactory.getInstance(
request.getType());
if (request.getIsInstall()) {
clusterNodeInstallOperator.install(request, request.getCurrentUser());
} else {
clusterNodeInstallOperator.reInstall(request, request.getCurrentUser());
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ModuleType;
import org.apache.inlong.manager.common.enums.NodeStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.AESUtils;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
Expand Down Expand Up @@ -96,12 +97,17 @@ public String getClusterNodeType() {
public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) {
LOGGER.info("begin to insert agent cluster node={}", clusterNodeRequest);
try {
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.INSTALLING.getStatus(),
"begin to install");
AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest;
deployInstaller(request, operator);
String startCmd = agentInstallPath + INSTALLER_START_CMD;
commandExecutor.execRemote(request, startCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALL_SUCCESS.getStatus(), "success to install");
} catch (Exception e) {
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage());
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("install agent cluster node failed for ip=%s", clusterNodeRequest.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
Expand All @@ -114,13 +120,18 @@ public boolean install(ClusterNodeRequest clusterNodeRequest, String operator) {
public boolean reInstall(ClusterNodeRequest clusterNodeRequest, String operator) {
LOGGER.info("begin to reInstall agent cluster node={}", clusterNodeRequest);
try {
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.INSTALLING.getStatus(),
"begin to reinstall");
AgentClusterNodeRequest request = (AgentClusterNodeRequest) clusterNodeRequest;
commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator)));
deployInstaller(request, operator);
String reStartCmd = agentInstallPath + INSTALLER_RESTART_CMD;
commandExecutor.execRemote(request, reStartCmd);
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), NodeStatus.NORMAL.getStatus(),
"success to reinstall");
} catch (Exception e) {
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(), e.getMessage());
clusterNodeEntityMapper.updateOperateLogById(clusterNodeRequest.getId(),
NodeStatus.INSTALL_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("reInstall agent cluster node failed for ip=%s", clusterNodeRequest.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
Expand All @@ -140,7 +151,8 @@ public boolean unload(InlongClusterNodeEntity clusterNodeEntity, String operator
commandExecutor.execRemote(request, stopCmd);
commandExecutor.rmDir(request, agentInstallPath.substring(0, agentInstallPath.lastIndexOf(File.separator)));
} catch (Exception e) {
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(), e.getMessage());
clusterNodeEntityMapper.updateOperateLogById(clusterNodeEntity.getId(),
NodeStatus.UNLOAD_FAILED.getStatus(), e.getMessage());
String errMsg = String.format("unload agent cluster node failed for ip=%s", clusterNodeEntity.getIp());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
Expand Down

0 comments on commit 9969932

Please sign in to comment.