From 3f52c5c821e7fc391cd4d13f12b60325c6f8d127 Mon Sep 17 00:00:00 2001 From: justinwwhuang Date: Wed, 9 Oct 2024 23:38:19 +0800 Subject: [PATCH] [INLONG-11298][Agent] Fix bug for pulsar source with empty data process and specified time consumption (#11299) * [INLONG-11298][Agent] Fix bug for pulsar source * [INLONG-11298][Agent] Fix the issue of inconsistent logic between the page and backend code --- .../agent/constant/CommonConstants.java | 2 +- .../agent/message/file/ProxyMessageCache.java | 19 -------- .../agent/core/instance/InstanceManager.java | 2 +- .../agent/plugin/sources/PulsarSource.java | 47 ++++++++++++------- .../plugin/sources/file/AbstractSource.java | 8 ++-- 5 files changed, 34 insertions(+), 44 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index 45320406ef6..53a5bd976ce 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -59,7 +59,7 @@ public class CommonConstants { public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000; public static final String PROXY_BATCH_FLUSH_INTERVAL = "proxy.batch.flush.interval"; - public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100; + public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1; public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout"; // max timeout in seconds. diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java index c7b151a26c1..7e2aa28034d 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/file/ProxyMessageCache.java @@ -32,14 +32,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION; import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION; @@ -56,11 +53,8 @@ public class ProxyMessageCache { private final int maxPackSize; private final int maxQueueNumber; private final String groupId; - // ms - private final int cacheTimeout; // streamId -> list of proxyMessage private final ConcurrentHashMap> messageQueueMap; - private final AtomicLong cacheSize = new AtomicLong(0); private long lastPrintTime = 0; private long dataTime; private boolean isRealTime = false; @@ -76,7 +70,6 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); this.maxQueueNumber = instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER, DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER); - this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS); messageQueueMap = new ConcurrentHashMap<>(); dataTime = instanceProfile.getSinkDataTime(); extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); @@ -109,7 +102,6 @@ public boolean add(ProxyMessage message) { return false; } messageQueue.put(message); - cacheSize.addAndGet(message.getBody().length); return true; } catch (Exception ex) { LOGGER.error("exception caught", ex); @@ -159,13 +151,11 @@ public SenderMessage fetchSenderMessage(String streamId, LinkedBlockingQueue maxPackSize) { LOGGER.warn("message size is {}, greater than max pack size {}, drop it!", peekMessage.getBody().length, maxPackSize); - cacheSize.addAndGet(-bodySize); messageQueue.remove(); break; } resultBatchSize += bodySize; // decrease queue size. - cacheSize.addAndGet(-bodySize); bodyList.add(message.getBody()); offsetList.add(message.getAckInfo()); } @@ -183,13 +173,4 @@ public SenderMessage fetchSenderMessage(String streamId, LinkedBlockingQueue getExtraMap() { - return extraMap; - } - - public long getCacheSize() { - return cacheSize.get(); - } - } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java index 48aedfd09d0..06dd20a99e5 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/instance/InstanceManager.java @@ -317,7 +317,7 @@ private void restoreFromStore() { } private void addInstance(InstanceProfile profile) { - if (instanceMap.size() >= instanceLimit) { + if (instanceMap.size() > instanceLimit) { LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit); actionQueue.offer(new InstanceAction(ActionType.ADD, profile)); AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java index 56dec44d553..949cf5a4c71 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/PulsarSource.java @@ -50,10 +50,10 @@ public class PulsarSource extends AbstractSource { private String serviceUrl; private String subscription; private String subscriptionType; - private String subscriptionPosition; private PulsarClient pulsarClient; private Long timestamp; private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-"; + private final static String SUBSCRIPTION_CUSTOM = "Custom"; private boolean isRestoreFromDB = false; private Consumer consumer; private long offset = 0L; @@ -68,8 +68,6 @@ protected void initSource(InstanceProfile profile) { topic = profile.getInstanceId(); serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL); subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId); - subscriptionPosition = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION, - SubscriptionInitialPosition.Latest.name()); subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared.name()); timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0); pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build(); @@ -97,35 +95,48 @@ protected List readFromSource() { org.apache.pulsar.client.api.Message message = null; try { message = consumer.receive(0, TimeUnit.MILLISECONDS); - offset = message.getSequenceId(); } catch (PulsarClientException e) { LOGGER.error("read from pulsar error", e); } if (!ObjectUtils.isEmpty(message)) { + offset = message.getSequenceId(); dataList.add(new SourceData(message.getValue(), new String(message.getMessageId().toByteArray(), StandardCharsets.UTF_8))); + try { + consumer.acknowledge(message); + } catch (PulsarClientException e) { + LOGGER.error("ack pulsar error", e); + } } - try { - consumer.acknowledge(message); - } catch (PulsarClientException e) { - LOGGER.error("ack pulsar error", e); - } + return dataList; } private Consumer getConsumer() { Consumer consumer = null; try { - consumer = pulsarClient.newConsumer(Schema.BYTES) - .topic(topic) - .subscriptionName(subscription) - .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition)) - .subscriptionType(SubscriptionType.valueOf(subscriptionType)) - .subscribe(); - if (!isRestoreFromDB && timestamp != 0L) { - consumer.seek(timestamp); - LOGGER.info("Reset consume from {}", timestamp); + String position = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION, SubscriptionInitialPosition.Latest.name()); + if (position.equals(SUBSCRIPTION_CUSTOM)) { + consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionType(SubscriptionType.valueOf(subscriptionType)) + .subscribe(); + if (!isRestoreFromDB) { + if (timestamp == 0L) { + LOGGER.error("Reset consume but timestamp is 0L"); + } else { + consumer.seek(timestamp); + LOGGER.info("Reset consume from {}", timestamp); + } + } } else { + consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(position)) + .subscriptionType(SubscriptionType.valueOf(subscriptionType)) + .subscribe(); LOGGER.info("Skip to reset consume"); } return consumer; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java index f1fb8b55700..299c3829a0b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/AbstractSource.java @@ -38,7 +38,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,10 +54,8 @@ import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; -import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS; @@ -356,9 +353,7 @@ private SourceData readFromQueue() { } private Message createMessage(SourceData sourceData) { - String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map header = new HashMap<>(); - header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.getOffset()); header.put(PROXY_KEY_STREAM_ID, inlongStreamId); if (extendedHandler != null) { @@ -424,6 +419,9 @@ private void clearQueue(BlockingQueue queue) { @Override public boolean sourceFinish() { + if (isRealTime) { + return false; + } return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST; } }