Skip to content

Commit

Permalink
[INLONG-9518][Manager] Support resetting the consumption location of …
Browse files Browse the repository at this point in the history
…the consumption group used by sort
  • Loading branch information
fuweng11 committed Dec 26, 2023
1 parent 9d745b8 commit b17b634
Show file tree
Hide file tree
Showing 10 changed files with 376 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.consume;

import io.swagger.annotations.ApiModel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("Sort consumer info")
public class SortConsumerInfo {

private Integer sinkId;
private String inlongGroupId;
private String inlongStreamId;
private String consumerGroup;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.maintenanceTools;

import org.apache.inlong.manager.pojo.consume.SortConsumerInfo;

import org.springframework.web.multipart.MultipartFile;

import java.util.List;

public interface MaintenanceToolsService {

List<SortConsumerInfo> getSortConsumer(MultipartFile file);

Boolean resetCursor(MultipartFile file, String resetTime);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.maintenanceTools;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.pojo.consume.SortConsumerInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.user.LoginUserUtils;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperatorFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

@Service
public class MaintenanceToolsServiceImpl implements MaintenanceToolsService {

private static final Logger LOGGER = LoggerFactory.getLogger(MaintenanceToolsServiceImpl.class);

@Autowired
private InlongGroupService groupService;
@Autowired
private StreamSinkEntityMapper sinkEntityMapper;
@Autowired
private InlongStreamEntityMapper streamEntityMapper;
@Autowired
private QueueResourceOperatorFactory queueOperatorFactory;

@Override
public List<SortConsumerInfo> getSortConsumer(MultipartFile file) {
LoginUserUtils.getLoginUser().getRoles().add(UserRoleCode.INLONG_SERVICE);
List<SortConsumerInfo> sortConsumerInfoList = new ArrayList<>();
try (InputStreamReader read = new InputStreamReader((file.getInputStream()), StandardCharsets.UTF_8)) {
BufferedReader bufferedReader = new BufferedReader(read);
String readerStr = null;
while ((readerStr = bufferedReader.readLine()) != null) {
String[] sinkIdList = readerStr.split(InlongConstants.COMMA);
for (String sinkIdStr : sinkIdList) {
Integer sinkId = Integer.valueOf(sinkIdStr);
StreamSinkEntity sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);

InlongGroupInfo groupInfo = groupService.get(sinkEntity.getInlongGroupId());
InlongStreamEntity streamEntity = streamEntityMapper
.selectByIdentifier(sinkEntity.getInlongGroupId(), sinkEntity.getInlongStreamId());
QueueResourceOperator queueOperator = queueOperatorFactory.getInstance(groupInfo.getMqType());

String consumerGroup = queueOperator.getSortConsumeGroup(groupInfo, streamEntity, sinkEntity);

SortConsumerInfo sortConsumerInfo = SortConsumerInfo.builder()
.sinkId(sinkId)
.consumerGroup(consumerGroup)
.inlongGroupId(sinkEntity.getInlongGroupId())
.inlongStreamId(sinkEntity.getInlongStreamId())
.build();
sortConsumerInfoList.add(sortConsumerInfo);
}
}
read.close();
LOGGER.info("success get sort consumer");
return sortConsumerInfoList;
} catch (IOException e) {
LOGGER.error("get sort consumer failed:", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Can not properly read update file");
} finally {
LoginUserUtils.getLoginUser().getRoles().remove(UserRoleCode.INLONG_SERVICE);
}
}

@Override
public Boolean resetCursor(MultipartFile file, String resetTime) {
LoginUserUtils.getLoginUser().getRoles().add(UserRoleCode.INLONG_SERVICE);
try (InputStreamReader read = new InputStreamReader((file.getInputStream()), StandardCharsets.UTF_8)) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse(resetTime);
long timeStamp = date.getTime();
BufferedReader bufferedReader = new BufferedReader(read);
String readerStr = null;
while ((readerStr = bufferedReader.readLine()) != null) {
String[] sinkIdList = readerStr.split(InlongConstants.COMMA);
for (String sinkIdStr : sinkIdList) {
Integer sinkId = Integer.valueOf(sinkIdStr);
StreamSinkEntity sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);
InlongGroupInfo groupInfo = groupService.get(sinkEntity.getInlongGroupId());
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
InlongStreamEntity streamEntity = streamEntityMapper
.selectByIdentifier(sinkEntity.getInlongGroupId(), sinkEntity.getInlongStreamId());
QueueResourceOperator queueOperator = queueOperatorFactory.getInstance(groupInfo.getMqType());
queueOperator.resetCursor(groupInfo, streamEntity, sinkEntity, timeStamp);
}
}
read.close();
LOGGER.info("success reset cursor consumer");
} catch (Exception e) {
LOGGER.error("reset cursor consumer failed:", e);
throw new BusinessException(ErrorCodeEnum.INVALID_PARAMETER, "Can not properly read update file");
}
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.service.resource.queue;

import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
Expand Down Expand Up @@ -88,4 +90,18 @@ default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, Inlo
return null;
}

/**
* Reset cursor for consumer group
*
* @param groupInfo inlong group info
* @param streamEntity inlong stream entity
* @param sinkEntity sink entity
* @param resetTime timestamp for reset
*/
default void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity,
Long resetTime) throws Exception {
}

String getSortConsumeGroup(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand Down Expand Up @@ -205,4 +207,12 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, Inlon
return kafkaOperator.queryLatestMessage((KafkaClusterInfo) clusterInfo, topicName, consumeGroup, messageCount,
streamInfo);
}

@Override
public String getSortConsumeGroup(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity,
StreamSinkEntity sinkEntity) {
InlongKafkaInfo kafkaInfo = (InlongKafkaInfo) groupInfo;
String topicName = streamEntity.getMqResource();
return String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,20 @@ private List<BriefMQMessage> queryMessageFromPulsar(String topicPartition, Pulsa
return briefMQMessages;
}

/**
* Reset cursor for consumer group.
*/
public void resetCursor(PulsarClusterInfo pulsarClusterInfo, String topicFullName, String subName,
Long resetTime) {
try {
PulsarUtils.resetCursor(restTemplate, pulsarClusterInfo, topicFullName, subName,
resetTime);
} catch (Exception e) {
LOGGER.error("failed reset cursor consumer:", e);
throw new BusinessException("failed reset cursor consumer:" + e.getMessage());
}
}

/**
* Build topicName Of Partition
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
Expand Down Expand Up @@ -324,4 +326,42 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo,
id, subs, groupId, topicName);
return briefMQMessages;
}

/**
* Reset cursor for consumer group
*/
public void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity,
Long resetTime) throws Exception {
log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
List<ClusterInfo> clusterInfos =
clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(), ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
String tenant = pulsarInfo.getPulsarTenant();
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
}
String namespace = pulsarInfo.getMqResource();
String topicName = streamEntity.getMqResource();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
String subs = String.format(PULSAR_SUBSCRIPTION, groupInfo.getInlongClusterTag(), topicName,
sinkEntity.getId());
pulsarOperator.resetCursor(pulsarCluster, fullTopicName, subs, resetTime);
} catch (Exception e) {
log.error("failed reset cursor consumer:", e);
throw new BusinessException("failed reset cursor consumer:" + e.getMessage());
}
}
log.info("success to reset cursor for sinkId={}", sinkEntity.getId());
}

@Override
public String getSortConsumeGroup(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity,
StreamSinkEntity sinkEntity) {
String topicName = streamEntity.getMqResource();
return String.format(PULSAR_SUBSCRIPTION, groupInfo.getInlongClusterTag(), topicName,
sinkEntity.getId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,15 @@ private static List<PulsarMessageInfo> getIndividualMsgsFromBatch(String topic,
return ret;
}

public static void resetCursor(RestTemplate restTemplate, PulsarClusterInfo clusterInfo,
String topicPath, String subscription, Long resetTime) throws Exception {
HttpUtils.request(restTemplate,
clusterInfo.getAdminUrls(QUERY_PERSISTENT_PATH + "/" + topicPath + "/subscription/"
+ subscription + "/resetcursor/" + resetTime),
HttpMethod.POST, null,
getHttpHeaders(clusterInfo.getToken()));
}

/**
* Copy from deSerializeSingleMessageInBatch method of org.apache.pulsar.common.protocol.Commands class.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.consume.BriefMQMessage;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
Expand Down Expand Up @@ -140,4 +142,11 @@ public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, Inlon
return tubeMQOperator.queryLastMessage(tubeCluster, topicName, messageCount, streamInfo);
}

@Override
public String getSortConsumeGroup(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity,
StreamSinkEntity sinkEntity) {
String topicName = streamEntity.getMqResource();
return groupInfo.getInlongClusterTag() + "_" + topicName + "_consumer_group";
}

}
Loading

0 comments on commit b17b634

Please sign in to comment.