Skip to content

Commit

Permalink
[INLONG-9591][Manager] Support printing thread status before submitti…
Browse files Browse the repository at this point in the history
…ng tasks
  • Loading branch information
fuweng11 committed Jan 18, 2024
1 parent a80a168 commit f79bd73
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.common.threadPool;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class VisiableThreadPoolTaskExecutor extends ThreadPoolExecutor {

private static final Logger logger =
LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);

public VisiableThreadPoolTaskExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

private void showThreadPoolInfo(String prefix) {
logger.info(
"Current thread pool class = {}, opType = {}, taskCount = {}, completedTaskCount = {}, activeCount = {}, poolSize = {}, queueSize = {}",
this.getThreadFactory().getClass(),
prefix,
this.getTaskCount(),
this.getCompletedTaskCount(),
this.getActiveCount(),
this.getPoolSize(),
this.getQueue().size());
}

@Override
public void execute(Runnable task) {
showThreadPoolInfo("execute");
super.execute(task);
}

@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("submit");
return super.submit(task);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("submit");
return super.submit(task);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.threadPool.VisiableThreadPoolTaskExecutor;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
Expand Down Expand Up @@ -56,7 +57,6 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;

Expand All @@ -73,7 +73,7 @@ public class InlongGroupProcessService {

private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupProcessService.class);

private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
private static final ExecutorService EXECUTOR_SERVICE = new VisiableThreadPoolTaskExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
ALIVE_TIME_MS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.threadPool.VisiableThreadPoolTaskExecutor;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
Expand All @@ -41,7 +42,6 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;

Expand All @@ -57,7 +57,7 @@
@Service
public class InlongStreamProcessService {

private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(
private static final ExecutorService EXECUTOR_SERVICE = new VisiableThreadPoolTaskExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
ALIVE_TIME_MS,
Expand Down

0 comments on commit f79bd73

Please sign in to comment.