Skip to content

Commit

Permalink
[INLONG-11298][Agent] Fix bug for pulsar source with empty data proce…
Browse files Browse the repository at this point in the history
…ss and specified time consumption (#11299)

* [INLONG-11298][Agent] Fix bug for pulsar source

* [INLONG-11298][Agent] Fix the issue of inconsistent logic between the page and backend code
  • Loading branch information
justinwwhuang authored Oct 9, 2024
1 parent 778cbe6 commit 3f52c5c
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class CommonConstants {
public static final int DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS = 4 * 1000;

public static final String PROXY_BATCH_FLUSH_INTERVAL = "proxy.batch.flush.interval";
public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 100;
public static final int DEFAULT_PROXY_BATCH_FLUSH_INTERVAL = 1;

public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout";
// max timeout in seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_AUDIT_VERSION;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
import static org.apache.inlong.common.msg.AttributeConstants.AUDIT_VERSION;
Expand All @@ -56,11 +53,8 @@ public class ProxyMessageCache {
private final int maxPackSize;
private final int maxQueueNumber;
private final String groupId;
// ms
private final int cacheTimeout;
// streamId -> list of proxyMessage
private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> messageQueueMap;
private final AtomicLong cacheSize = new AtomicLong(0);
private long lastPrintTime = 0;
private long dataTime;
private boolean isRealTime = false;
Expand All @@ -76,7 +70,6 @@ public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String
this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE);
this.maxQueueNumber = instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
messageQueueMap = new ConcurrentHashMap<>();
dataTime = instanceProfile.getSinkDataTime();
extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
Expand Down Expand Up @@ -109,7 +102,6 @@ public boolean add(ProxyMessage message) {
return false;
}
messageQueue.put(message);
cacheSize.addAndGet(message.getBody().length);
return true;
} catch (Exception ex) {
LOGGER.error("exception caught", ex);
Expand Down Expand Up @@ -159,13 +151,11 @@ public SenderMessage fetchSenderMessage(String streamId, LinkedBlockingQueue<Pro
if (peekMessageLength > maxPackSize) {
LOGGER.warn("message size is {}, greater than max pack size {}, drop it!",
peekMessage.getBody().length, maxPackSize);
cacheSize.addAndGet(-bodySize);
messageQueue.remove();
break;
}
resultBatchSize += bodySize;
// decrease queue size.
cacheSize.addAndGet(-bodySize);
bodyList.add(message.getBody());
offsetList.add(message.getAckInfo());
}
Expand All @@ -183,13 +173,4 @@ public SenderMessage fetchSenderMessage(String streamId, LinkedBlockingQueue<Pro
}
return null;
}

public Map<String, String> getExtraMap() {
return extraMap;
}

public long getCacheSize() {
return cacheSize.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private void restoreFromStore() {
}

private void addInstance(InstanceProfile profile) {
if (instanceMap.size() >= instanceLimit) {
if (instanceMap.size() > instanceLimit) {
LOGGER.error("instanceMap size {} over limit {}", instanceMap.size(), instanceLimit);
actionQueue.offer(new InstanceAction(ActionType.ADD, profile));
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class PulsarSource extends AbstractSource {
private String serviceUrl;
private String subscription;
private String subscriptionType;
private String subscriptionPosition;
private PulsarClient pulsarClient;
private Long timestamp;
private final static String PULSAR_SUBSCRIPTION_PREFIX = "inlong-agent-";
private final static String SUBSCRIPTION_CUSTOM = "Custom";
private boolean isRestoreFromDB = false;
private Consumer<byte[]> consumer;
private long offset = 0L;
Expand All @@ -68,8 +68,6 @@ protected void initSource(InstanceProfile profile) {
topic = profile.getInstanceId();
serviceUrl = profile.get(TASK_PULSAR_SERVICE_URL);
subscription = profile.get(TASK_PULSAR_SUBSCRIPTION, PULSAR_SUBSCRIPTION_PREFIX + inlongStreamId);
subscriptionPosition = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION,
SubscriptionInitialPosition.Latest.name());
subscriptionType = profile.get(TASK_PULSAR_SUBSCRIPTION_TYPE, SubscriptionType.Shared.name());
timestamp = profile.getLong(TASK_PULSAR_RESET_TIME, 0);
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).build();
Expand Down Expand Up @@ -97,35 +95,48 @@ protected List<SourceData> readFromSource() {
org.apache.pulsar.client.api.Message<byte[]> message = null;
try {
message = consumer.receive(0, TimeUnit.MILLISECONDS);
offset = message.getSequenceId();
} catch (PulsarClientException e) {
LOGGER.error("read from pulsar error", e);
}
if (!ObjectUtils.isEmpty(message)) {
offset = message.getSequenceId();
dataList.add(new SourceData(message.getValue(), new String(message.getMessageId().toByteArray(),
StandardCharsets.UTF_8)));
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
LOGGER.error("ack pulsar error", e);
}
}
try {
consumer.acknowledge(message);
} catch (PulsarClientException e) {
LOGGER.error("ack pulsar error", e);
}

return dataList;
}

private Consumer<byte[]> getConsumer() {
Consumer<byte[]> consumer = null;
try {
consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(subscriptionPosition))
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
.subscribe();
if (!isRestoreFromDB && timestamp != 0L) {
consumer.seek(timestamp);
LOGGER.info("Reset consume from {}", timestamp);
String position = profile.get(TASK_PULSAR_SUBSCRIPTION_POSITION, SubscriptionInitialPosition.Latest.name());
if (position.equals(SUBSCRIPTION_CUSTOM)) {
consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
.subscribe();
if (!isRestoreFromDB) {
if (timestamp == 0L) {
LOGGER.error("Reset consume but timestamp is 0L");
} else {
consumer.seek(timestamp);
LOGGER.info("Reset consume from {}", timestamp);
}
}
} else {
consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(position))
.subscriptionType(SubscriptionType.valueOf(subscriptionType))
.subscribe();
LOGGER.info("Skip to reset consume");
}
return consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,10 +54,8 @@
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
Expand Down Expand Up @@ -356,9 +353,7 @@ private SourceData readFromQueue() {
}

private Message createMessage(SourceData sourceData) {
String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId));
Map<String, String> header = new HashMap<>();
header.put(PROXY_KEY_DATA, proxyPartitionKey);
header.put(OFFSET, sourceData.getOffset());
header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
if (extendedHandler != null) {
Expand Down Expand Up @@ -424,6 +419,9 @@ private void clearQueue(BlockingQueue<SourceData> queue) {

@Override
public boolean sourceFinish() {
if (isRealTime) {
return false;
}
return emptyCount > EMPTY_CHECK_COUNT_AT_LEAST;
}
}

0 comments on commit 3f52c5c

Please sign in to comment.