Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11516][Agent] Accelerate the process exit speed #11517

Merged
merged 2 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public abstract class Instance extends AbstractStateWrapper {
*/
public abstract void destroy();

/**
* notify destroy instance.
*/
public abstract void notifyDestroy();

/**
* get instance profile
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public boolean submitAction(InstanceAction action) {
if (action == null) {
return false;
}
if (isFull()) {
return false;
}
return actionQueue.offer(action);
}

Expand All @@ -163,7 +166,7 @@ private Runnable coreThread() {
try {
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME_MS);
printInstanceState();
dealWithActionQueue(actionQueue);
dealWithActionQueue();
keepPaceWithStore();
String inlongGroupId = taskFromStore.getInlongGroupId();
String inlongStreamId = taskFromStore.getInlongStreamId();
Expand Down Expand Up @@ -251,10 +254,10 @@ private void traverseMemoryTasksToStore() {
});
}

private void dealWithActionQueue(BlockingQueue<InstanceAction> queue) {
private void dealWithActionQueue() {
while (isRunnable()) {
try {
InstanceAction action = queue.poll();
InstanceAction action = actionQueue.poll();
if (action == null) {
break;
}
Expand Down Expand Up @@ -375,6 +378,15 @@ private void deleteFromMemory(String instanceId) {
instance.getProfile().getSinkDataTime(), 1, 1, auditVersion);
}

private void notifyDestroyInstance(String instanceId) {
Instance instance = instanceMap.get(instanceId);
if (instance == null) {
LOGGER.error("try to notify destroy instance but not found: taskId {} instanceId {}", taskId, instanceId);
return;
}
instance.notifyDestroy();
}

private void addToStore(InstanceProfile profile, boolean addNew) {
LOGGER.info("add instance to instance store state {} instanceId {}", profile.getState(),
profile.getInstanceId());
Expand Down Expand Up @@ -433,6 +445,9 @@ private void addToMemory(InstanceProfile instanceProfile) {
}

private void stopAllInstances() {
instanceMap.values().forEach((instance) -> {
notifyDestroyInstance(instance.getInstanceId());
});
instanceMap.values().forEach((instance) -> {
deleteFromMemory(instance.getInstanceId());
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,29 @@ public boolean init(Object srcManager, InstanceProfile srcProfile) {

@Override
public void destroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
Long start = AgentUtils.getCurrentTime();
notifyDestroy();
while (running) {
AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
}
LOGGER.info("destroy instance wait run elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.source.destroy();
LOGGER.info("destroy instance wait source elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
start = AgentUtils.getCurrentTime();
this.sink.destroy();
LOGGER.info("destroy instance wait sink elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

@Override
public void notifyDestroy() {
if (!inited) {
return;
}
doChangeState(State.SUCCEEDED);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class SenderManager {

private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
public static final int RESEND_QUEUE_WAIT_MS = 10;
// cache for group and sender list, share the map cross agent lifecycle.
private DefaultMessageSender sender;
private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
Expand Down Expand Up @@ -172,9 +173,12 @@ public void Stop() {
}

private void closeMessageSender() {
Long start = AgentUtils.getCurrentTime();
if (sender != null) {
sender.close();
}
LOGGER.info("close sender elapse {} ms instance {}", AgentUtils.getCurrentTime() - start,
profile.getInstanceId());
}

private AgentMetricItem getMetricItem(Map<String, String> otherDimensions) {
Expand Down Expand Up @@ -286,7 +290,7 @@ private Runnable flushResendQueue() {
resendRunning = true;
while (!shutdown) {
try {
AgentSenderCallback callback = resendQueue.poll(1, TimeUnit.SECONDS);
AgentSenderCallback callback = resendQueue.poll(RESEND_QUEUE_WAIT_MS, TimeUnit.MILLISECONDS);
if (callback != null) {
SenderMessage message = callback.message;
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_RESEND, message.getGroupId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ protected class SourceData {
protected final Integer BATCH_READ_LINE_COUNT = 10000;
protected final Integer BATCH_READ_LINE_TOTAL_LEN = 1024 * 1024;
protected final Integer CACHE_QUEUE_SIZE = 10 * BATCH_READ_LINE_COUNT;
protected final Integer READ_WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60;
protected final Integer WAIT_TIMEOUT_MS = 10;
private final Integer EMPTY_CHECK_COUNT_AT_LEAST = 5 * 60 * 100;
private final Integer CORE_THREAD_PRINT_INTERVAL_MS = 1000;
protected BlockingQueue<SourceData> queue;

Expand Down Expand Up @@ -172,7 +172,7 @@ private void doRun() {
emptyCount = 0;
}
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, BATCH_READ_LINE_TOTAL_LEN);
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
continue;
}
emptyCount = 0;
Expand Down Expand Up @@ -231,7 +231,7 @@ private boolean waitForPermit(String permitName, int permitLen) {
if (!isRunnable()) {
return false;
}
AgentUtils.silenceSleepInSeconds(1);
AgentUtils.silenceSleepInMs(WAIT_TIMEOUT_MS);
}
}
return true;
Expand All @@ -247,7 +247,7 @@ private void putIntoQueue(SourceData sourceData) {
try {
boolean offerSuc = false;
while (isRunnable() && !offerSuc) {
offerSuc = queue.offer(sourceData, 1, TimeUnit.SECONDS);
offerSuc = queue.offer(sourceData, WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
}
if (!offerSuc) {
MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, sourceData.getData().length);
Expand Down Expand Up @@ -338,7 +338,7 @@ private boolean filterSourceData(Message msg) {
private SourceData readFromQueue() {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId);
}
Expand Down Expand Up @@ -405,7 +405,7 @@ private void clearQueue(BlockingQueue<SourceData> queue) {
while (queue != null && !queue.isEmpty()) {
SourceData sourceData = null;
try {
sourceData = queue.poll(READ_WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
sourceData = queue.poll(WAIT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("poll {} data get interrupted.", instanceId, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ public void destroy() {
destroyTime = index.getAndAdd(1);
}

@Override
public void notifyDestroy() {

}

@Override
public InstanceProfile getProfile() {
return profile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private LogFileSource getSource(int taskId, long offset) {
Whitebox.setInternalState(source, "CORE_THREAD_PRINT_INTERVAL_MS", 0);
Whitebox.setInternalState(source, "SIZE_OF_BUFFER_TO_READ_FILE", 2);
Whitebox.setInternalState(source, "EMPTY_CHECK_COUNT_AT_LEAST", 3);
Whitebox.setInternalState(source, "READ_WAIT_TIMEOUT_MS", 10);
Whitebox.setInternalState(source, "WAIT_TIMEOUT_MS", 10);
if (offset > 0) {
OffsetProfile offsetProfile = new OffsetProfile(instanceProfile.getTaskId(),
instanceProfile.getInstanceId(),
Expand Down
Loading