Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: taskControl的loop循环太久导致Consumer线程池被占满问题 #11352 #11370

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,10 @@ import com.tencent.devops.common.api.pojo.ErrorType
import com.tencent.devops.common.api.util.EnvUtils
import com.tencent.devops.common.api.util.timestampmilli
import com.tencent.devops.common.event.enums.ActionType
import com.tencent.devops.common.pipeline.container.NormalContainer
import com.tencent.devops.common.pipeline.container.VMBuildContainer
import com.tencent.devops.common.pipeline.enums.BuildStatus
import com.tencent.devops.common.pipeline.pojo.element.Element
import com.tencent.devops.common.pipeline.pojo.element.RunCondition
import com.tencent.devops.common.pipeline.type.docker.DockerDispatchType
import com.tencent.devops.process.engine.common.Timeout
import com.tencent.devops.process.engine.common.VMUtils
import com.tencent.devops.process.engine.pojo.PipelineBuildTask
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -101,26 +98,12 @@ interface IAtomTask<T> {
// 未结束?检查是否超时
if (!atomResponse.buildStatus.isFinish()) {
val startTime = task.startTime?.timestampmilli() ?: 0L
val timeoutMills: Long =
if (param is Element) {
val additionalOptions = param.additionalOptions
Timeout.transMinuteTimeoutToMills(additionalOptions?.timeout?.toInt())
} else if (param is NormalContainer) {
Timeout.transMinuteTimeoutToMills(
(param.jobControlOption?.prepareTimeout ?: Timeout.DEFAULT_PREPARE_MINUTES)
)
} else if (param is VMBuildContainer) {
// docker 构建机要求10分钟内超时
if (param.dispatchType is DockerDispatchType || !param.dockerBuildVersion.isNullOrBlank()) {
Timeout.transMinuteTimeoutToMills(
(param.jobControlOption?.prepareTimeout ?: Timeout.DEFAULT_PREPARE_MINUTES)
)
} else {
Timeout.transMinuteTimeoutToMills(param.jobControlOption?.timeout)
}
} else {
0L
}
var timeout = task.additionalOptions?.timeout?.toInt()
if (timeout == null && VMUtils.isVMTask(task.taskId)) {
// 如果timeout为空且task为开关机插件任务,则给timeout赋默认值
timeout = Timeout.DEFAULT_PREPARE_MINUTES
}
val timeoutMills = Timeout.transMinuteTimeoutToMills(timeout)
val runCondition = task.additionalOptions?.runCondition
if (timeoutMills > 0 && System.currentTimeMillis() - startTime >= timeoutMills) {
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.devops.process.engine.control

import com.tencent.devops.common.pipeline.container.Container
import com.tencent.devops.common.pipeline.container.NormalContainer
import com.tencent.devops.common.pipeline.container.VMBuildContainer
import com.tencent.devops.common.pipeline.enums.BuildStatus
import com.tencent.devops.common.pipeline.enums.EnvControlTaskType
Expand Down Expand Up @@ -90,21 +91,27 @@ class VmOperateTaskGenerator {
val taskType: String
val taskName: String
val taskAtom: String
var timeout: Long? = null
if (container is VMBuildContainer) {
val buildType = container.dispatchType?.buildType()?.name ?: BuildType.DOCKER.name
val baseOS = container.baseOS.name
atomCode = "$START_VM_TASK_ATOM-$buildType-$baseOS"
taskType = EnvControlTaskType.VM.name
taskName = "Prepare_Job#${container.id!!}"
taskAtom = START_VM_TASK_ATOM
timeout = container.jobControlOption?.prepareTimeout?.toLong()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改用jobTimeout

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

STARTVM去掉 call_waiting 改为running

} else {
atomCode = START_NORMAL_TASK_ATOM
taskType = EnvControlTaskType.NORMAL.name
taskName = "Prepare_Job#${container.id!!}(N)"
taskAtom = START_NORMAL_TASK_ATOM
if (container is NormalContainer) {
timeout = container.jobControlOption?.prepareTimeout?.toLong()
}
}
val additionalOptions = ElementAdditionalOptions(
runCondition = RunCondition.PRE_TASK_FAILED_BUT_CANCEL
runCondition = RunCondition.PRE_TASK_FAILED_BUT_CANCEL,
timeout = timeout
)
return PipelineBuildTask(
projectId = projectId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,10 @@ class EngineVMBuildService @Autowired(required = false) constructor(
return false
}
val finalBuildStatus = getFinalBuildStatus(buildStatus, buildId, vmSeqId, startUpVMTask)
// 如果是完成状态,则更新构建机启动插件的状态
if (finalBuildStatus.isFinish()) {

// #2043 上报启动构建机状态时,重新刷新开始时间,以防止调度的耗时占用了Job的超时时间
if (!startUpVMTask.status.isFinish() && finalBuildStatus.isFinish()) { // #2043 构建机当前启动状态是未结束状态,才进行刷新开始时间
// 如果是完成状态,则更新构建机启动插件的状态
pipelineTaskService.updateTaskStatus(
task = startUpVMTask,
userId = startUpVMTask.starter,
Expand All @@ -460,27 +462,23 @@ class EngineVMBuildService @Autowired(required = false) constructor(
errorCode = errorCode,
errorMsg = errorMsg
)

// #2043 上报启动构建机状态时,重新刷新开始时间,以防止调度的耗时占用了Job的超时时间
if (!startUpVMTask.status.isFinish()) { // #2043 构建机当前启动状态是未结束状态,才进行刷新开始时间
pipelineContainerService.updateContainerStatus(
projectId = projectId,
buildId = buildId,
stageId = startUpVMTask.stageId,
containerId = startUpVMTask.containerId,
startTime = LocalDateTime.now(),
endTime = null,
buildStatus = BuildStatus.RUNNING
)
containerBuildRecordService.containerStarted(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
containerId = vmSeqId,
executeCount = startUpVMTask.executeCount ?: 1,
containerBuildStatus = finalBuildStatus
)
}
pipelineContainerService.updateContainerStatus(
projectId = projectId,
buildId = buildId,
stageId = startUpVMTask.stageId,
containerId = startUpVMTask.containerId,
startTime = LocalDateTime.now(),
endTime = null,
buildStatus = BuildStatus.RUNNING
)
containerBuildRecordService.containerStarted(
projectId = projectId,
pipelineId = pipelineId,
buildId = buildId,
containerId = vmSeqId,
executeCount = startUpVMTask.executeCount ?: 1,
containerBuildStatus = finalBuildStatus
)
}

// 失败的话就发终止事件
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,15 +344,27 @@ class TaskAtomService @Autowired(required = false) constructor(

private fun log(atomResponse: AtomResponse, task: PipelineBuildTask, stopFlag: Boolean) {
if (atomResponse.buildStatus.isFinish()) {
buildLogPrinter.addLine(
buildId = task.buildId,
message = "Task [${task.taskName}] ${atomResponse.buildStatus}!",
tag = task.taskId,
containerHashId = task.containerHashId,
executeCount = task.executeCount ?: 1,
jobId = null,
stepId = task.stepId
)
if (atomResponse.errorCode != null) {
buildLogPrinter.addErrorLine(
buildId = task.buildId,
message = "Task [${task.taskName}] ${atomResponse.buildStatus} (${atomResponse.errorMsg})!",
tag = task.taskId,
containerHashId = task.containerHashId,
executeCount = task.executeCount ?: 1,
jobId = null,
stepId = task.stepId
)
} else {
buildLogPrinter.addLine(
buildId = task.buildId,
message = "Task [${task.taskName}] ${atomResponse.buildStatus}!",
tag = task.taskId,
containerHashId = task.containerHashId,
executeCount = task.executeCount ?: 1,
jobId = null,
stepId = task.stepId
)
}
} else {
if (stopFlag) {
buildLogPrinter.addLine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ import kotlin.system.exitProcess

object Runner {

private const val maxSleepStep = 50L
private const val maxSleepStep = 80L
private const val windows = 5L
private const val millsStep = 100L
private val logger = LoggerFactory.getLogger(Runner::class.java)
Expand Down