From 4309f2175345b2498b8f85f9f570e067915c6a93 Mon Sep 17 00:00:00 2001 From: fuwenkai <834260992@qq.com> Date: Thu, 10 Oct 2024 15:47:27 +0800 Subject: [PATCH] [INLONG-11323][Manager] Modify the parameters of the data add tasks for file collection --- .../pojo/source/DataAddTaskRequest.java | 11 ++- .../source/AbstractSourceOperator.java | 69 ++++++++++--------- .../service/source/StreamSourceService.java | 11 +-- .../source/StreamSourceServiceImpl.java | 44 +++++++----- .../controller/StreamSourceController.java | 9 +-- 5 files changed, 77 insertions(+), 67 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java index d835c4ee37b..fccc8c06ff5 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/DataAddTaskRequest.java @@ -25,6 +25,8 @@ import javax.validation.constraints.NotBlank; +import java.util.List; + /** * Data add task information */ @@ -33,9 +35,16 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "sourceType") public class DataAddTaskRequest { - @ApiModelProperty(value = "Source ID") + @ApiModelProperty(value = "Group Id") + @NotBlank(message = "inlongGroupId cannot be blank") + private String groupId; + + @ApiModelProperty(value = "Source ID", hidden = true) private Integer sourceId; + @ApiModelProperty(value = "Agent ip List") + private List agentIpList; + @ApiModelProperty("Source type, including: FILE, KAFKA, etc.") @NotBlank(message = "sourceType cannot be blank") @Length(min = 1, max = 20, message = "length must be between 1 and 20") diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index 5caf6d24739..885fd2a99e0 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -421,6 +421,42 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) { if (existEntity != null) { agentTaskConfigEntity = CommonBeanUtils.copyProperties(existEntity, AgentTaskConfigEntity::new, true); } + + LOGGER.debug("begin to get agent config info for {}", request); + Set tagSet = new HashSet<>(16); + InlongClusterEntity agentClusterInfo = clusterMapper.selectByNameAndType(request.getInlongClusterName(), + ClusterType.AGENT); + if (agentClusterInfo == null) { + agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId()); + agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity); + return; + } + String clusterTag = agentClusterInfo.getClusterTags(); + AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder() + .cluster(AgentConfigInfo.AgentClusterInfo.builder() + .parentId(agentClusterInfo.getId()) + .clusterName(agentClusterInfo.getName()) + .build()) + .build(); + if (StringUtils.isNotBlank(clusterTag)) { + tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA))); + List clusterTagList = new ArrayList<>(tagSet); + ClusterPageRequest pageRequest = ClusterPageRequest.builder() + .type(ClusterType.AGENT_ZK) + .clusterTagList(clusterTagList) + .build(); + List agentZkCluster = clusterMapper.selectByCondition(pageRequest); + if (CollectionUtils.isNotEmpty(agentZkCluster)) { + agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl()); + } + } + + String jsonStr = GSON.toJson(agentConfigInfo); + String configMd5 = DigestUtils.md5Hex(jsonStr); + agentConfigInfo.setMd5(configMd5); + agentConfigInfo.setCode(AgentResponseCode.SUCCESS); + agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo)); + List normalSourceEntities = sourceMapper.selectByStatusAndCluster( SourceStatus.NORMAL_STATUS_SET.stream().map(SourceStatus::getCode) .collect(Collectors.toList()), @@ -443,7 +479,7 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) { return cmdConfig; }).collect(Collectors.toList()); if (CollectionUtils.isEmpty(taskLists)) { - agentTaskConfigEntity.setIsDeleted(agentTaskConfigEntity.getId()); + agentTaskConfigEntity.setTaskParams(null); agentTaskConfigEntityMapper.updateByIdSelective(agentTaskConfigEntity); return; } @@ -461,37 +497,6 @@ public void updateAgentTaskConfig(SourceRequest request, String operator) { agentTaskConfigEntity.setClusterName(request.getInlongClusterName()); agentTaskConfigEntity.setTaskParams(objectMapper.writeValueAsString(taskResult)); - LOGGER.debug("begin to get agent config info for {}", request); - Set tagSet = new HashSet<>(16); - InlongGroupEntity groupEntity = - groupMapper.selectByGroupIdWithoutTenant(request.getInlongGroupId()); - String clusterTag = groupEntity.getInlongClusterTag(); - InlongClusterEntity agentClusterInfo = clusterMapper.selectByNameAndType(request.getInlongClusterName(), - ClusterType.AGENT); - AgentConfigInfo agentConfigInfo = AgentConfigInfo.builder() - .cluster(AgentConfigInfo.AgentClusterInfo.builder() - .parentId(agentClusterInfo.getId()) - .clusterName(agentClusterInfo.getName()) - .build()) - .build(); - if (StringUtils.isNotBlank(clusterTag)) { - tagSet.addAll(Arrays.asList(clusterTag.split(InlongConstants.COMMA))); - List clusterTagList = new ArrayList<>(tagSet); - ClusterPageRequest pageRequest = ClusterPageRequest.builder() - .type(ClusterType.AGENT_ZK) - .clusterTagList(clusterTagList) - .build(); - List agentZkCluster = clusterMapper.selectByCondition(pageRequest); - if (CollectionUtils.isNotEmpty(agentZkCluster)) { - agentConfigInfo.setZkUrl(agentZkCluster.get(0).getUrl()); - } - } - - String jsonStr = GSON.toJson(agentConfigInfo); - String configMd5 = DigestUtils.md5Hex(jsonStr); - agentConfigInfo.setMd5(configMd5); - agentConfigInfo.setCode(AgentResponseCode.SUCCESS); - agentTaskConfigEntity.setConfigParams(objectMapper.writeValueAsString(agentConfigInfo)); agentClusterInfo.setModifier(operator); if (existEntity == null) { agentTaskConfigEntity.setCreator(operator); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index 9be7f06172b..50b51508171 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -211,15 +211,6 @@ default Boolean updateAfterApprove(String operator) { * @param operator Operator's name. * @return source id after saving. */ - Integer addDataAddTask(DataAddTaskRequest request, String operator); + List addDataAddTask(DataAddTaskRequest request, String operator); - /** - * Batch Save the data add task information - * - * @param requestList Source request list. - * @param operator Operator's name. - * @return source id list after saving. - */ - List batchAddDataAddTask(String groupId, List requestList, - String operator); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 0241524dcf5..de96386b253 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -68,6 +68,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -533,25 +534,32 @@ private void chkUnmodifiableParams(SourceRequest request) { } @Override - public Integer addDataAddTask(DataAddTaskRequest request, String operator) { + public List addDataAddTask(DataAddTaskRequest request, String operator) { LOGGER.info("begin to add data add task info: {}", request); - StreamSourceEntity entity = sourceMapper.selectById(request.getSourceId()); - StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType()); - int id = sourceOperator.addDataAddTask(request, operator); - LOGGER.info("success to add data add task info: {}", request); - return id; - } - - @Override - public List batchAddDataAddTask(String groupId, List requestList, - String operator) { - List result = new ArrayList<>(); - String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(groupId, null)); - for (DataAddTaskRequest request : requestList) { - request.setAuditVersion(auditVersion); - int id = addDataAddTask(request, operator); - result.add(id); + String auditVersion = String.valueOf(sourceMapper.selectDataAddTaskCount(request.getGroupId(), null)); + request.setAuditVersion(auditVersion); + List agentIpList = request.getAgentIpList(); + List entityList = new ArrayList<>(); + List resultIdList = new ArrayList<>(); + if (CollectionUtils.isEmpty(agentIpList)) { + entityList = sourceMapper.selectByRelatedId(request.getGroupId(), null, null); + } else { + for (String agentIp : agentIpList) { + List sourceEntityList = sourceMapper.selectByAgentIp(agentIp); + entityList.addAll(sourceEntityList); + } } - return result; + for (StreamSourceEntity sourceEntity : entityList) { + if (sourceEntity.getTaskMapId() != null || !Objects.equals(sourceEntity.getInlongGroupId(), + request.getGroupId())) { + continue; + } + StreamSourceOperator sourceOperator = operatorFactory.getInstance(sourceEntity.getSourceType()); + request.setSourceId(sourceEntity.getId()); + int id = sourceOperator.addDataAddTask(request, operator); + resultIdList.add(id); + } + LOGGER.info("success to add data add task info: {}, data add task size: {}", request, resultIdList.size()); + return resultIdList; } } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index d60fa3bad57..4253747ee1d 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -130,13 +130,10 @@ public Response forceDelete(@RequestParam String inlongGroupId, @Reques sourceService.forceDelete(inlongGroupId, inlongStreamId, LoginUserUtils.getLoginUser().getName())); } - @RequestMapping(value = "/source/addDataAddTask/{groupId}", method = RequestMethod.POST) + @RequestMapping(value = "/source/addDataAddTask", method = RequestMethod.POST) @ApiOperation(value = "Add supplementary recording task for stream source") - @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true) - public Response> addSub(@PathVariable String groupId, - @RequestBody List requestList) { - return Response.success( - sourceService.batchAddDataAddTask(groupId, requestList, LoginUserUtils.getLoginUser().getName())); + public Response> addSub(@RequestBody DataAddTaskRequest request) { + return Response.success(sourceService.addDataAddTask(request, LoginUserUtils.getLoginUser().getName())); } }