diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt index 8d925b2..61e0ee5 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/QueueProcessor.kt @@ -23,6 +23,8 @@ import com.netflix.spinnaker.q.metrics.NoHandlerCapacity import org.slf4j.Logger import org.slf4j.LoggerFactory.getLogger import org.springframework.scheduling.annotation.Scheduled +import java.time.Duration +import java.util.Random import java.util.concurrent.RejectedExecutionException import javax.annotation.PostConstruct @@ -37,9 +39,12 @@ class QueueProcessor( private val activator: Activator, private val publisher: EventPublisher, private val deadMessageHandler: DeadMessageCallback, - private val fillExecutorEachCycle: Boolean = false + private val fillExecutorEachCycle: Boolean = false, + private val requeueDelay : Duration = Duration.ofSeconds(0), + private val requeueMaxJitter : Duration = Duration.ofSeconds(0) ) { private val log: Logger = getLogger(javaClass) + private val random: Random = Random() /** * Polls the [Queue] once (or more if [fillExecutorEachCycle] is true) so @@ -82,8 +87,22 @@ class QueueProcessor( } } } catch (e: RejectedExecutionException) { - log.warn("Executor at capacity, immediately re-queuing message", e) - queue.push(message) + var requeueDelaySeconds = requeueDelay.seconds + if (requeueMaxJitter.seconds > 0) { + requeueDelaySeconds += random.nextInt(requeueMaxJitter.seconds.toInt()) + } + + val requeueDelay = Duration.ofSeconds(requeueDelaySeconds) + val numberOfAttempts = message.getAttribute() + + log.warn( + "Executor at capacity, re-queuing message {} (delay: {}, attempts: {})", + message, + requeueDelay, + numberOfAttempts, + e + ) + queue.push(message, requeueDelay) } } else { log.error("Unsupported message type ${message.javaClass.simpleName}: $message") diff --git a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt index cbeb734..b34fab2 100644 --- a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt +++ b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueConfiguration.kt @@ -28,6 +28,7 @@ import org.springframework.context.annotation.Configuration import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import java.time.Clock +import java.time.Duration @Configuration @EnableConfigurationProperties(QueueProperties::class) @@ -57,7 +58,17 @@ class QueueConfiguration { publisher: EventPublisher, queueProperties: QueueProperties, deadMessageHandler: DeadMessageCallback - ) = QueueProcessor(queue, executor, handlers, activator, publisher, deadMessageHandler, queueProperties.fillExecutorEachCycle) + ) = QueueProcessor( + queue, + executor, + handlers, + activator, + publisher, + deadMessageHandler, + queueProperties.fillExecutorEachCycle, + Duration.ofSeconds(queueProperties.requeueDelaySeconds), + Duration.ofSeconds(queueProperties.requeueMaxJitterSeconds) + ) @Bean fun queueEventPublisher( diff --git a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt index 48d5a10..4e422d4 100644 --- a/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt +++ b/keiko-spring/src/main/kotlin/com/netflix/spinnaker/config/QueueProperties.kt @@ -24,4 +24,6 @@ class QueueProperties { var handlerCorePoolSize: Int = 20 var handlerMaxPoolSize: Int = 20 var fillExecutorEachCycle: Boolean = false + var requeueDelaySeconds : Long = 0 + var requeueMaxJitterSeconds : Long = 0 }