diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt index 3798bf8..955cdb5 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueConfiguration.kt @@ -79,7 +79,8 @@ class RedisQueueConfiguration { deadMessageHandler = deadMessageHandler, publisher = publisher, ackTimeout = Duration.ofSeconds(redisQueueProperties.ackTimeoutSeconds.toLong()), - serializationMigrator = serializationMigrator + serializationMigrator = serializationMigrator, + prefetchCount = redisQueueProperties.prefetchCount ) @Bean diff --git a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt index 98f0f73..34b871c 100644 --- a/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt +++ b/keiko-redis-spring/src/main/kotlin/com/netflix/spinnaker/config/RedisQueueProperties.kt @@ -23,4 +23,5 @@ class RedisQueueProperties { var queueName: String = "keiko.queue" var deadLetterQueueName: String = "keiko.queue.deadLetters" var ackTimeoutSeconds: Int = 60 + var prefetchCount: Int = 10 } diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt index 64cd2d4..5efbe50 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt @@ -48,6 +48,7 @@ class RedisQueue( private val pool: Pool, private val clock: Clock, private val lockTtlSeconds: Int = 10, + private val prefetchCount: Int = 10, private val mapper: ObjectMapper, private val serializationMigrator: Optional, override val ackTimeout: TemporalAmount = Duration.ofMinutes(1), @@ -76,11 +77,7 @@ class RedisQueue( override fun poll(callback: (Message, () -> Unit) -> Unit) { pool.resource.use { redis -> - redis.zrangeByScore(queueKey, 0.0, score(), 0, 1) - .firstOrNull() - ?.takeIf { fingerprint -> - redis.acquireLock(fingerprint) - } + redis.fetchFingerprint() ?.also { fingerprint -> val ack = this::ackMessage.partially1(fingerprint) redis.readMessage(fingerprint) { message -> @@ -312,6 +309,21 @@ class RedisQueue( deadMessageHandler.invoke(this, message) } + /** + * Will pre-fetch a list of fingerprints, returning the first fingerprint it is + * able to successfully acquire a lock on. + */ + private fun Jedis.fetchFingerprint() = + zrangeByScore(queueKey, 0.0, score(), 0, prefetchCount) + .let { fingerprints -> + fingerprints.forEach { fingerprint -> + if (acquireLock(fingerprint)) { + return@let fingerprint + } + } + return@let null + } + private fun Jedis.acquireLock(fingerprint: String) = (set("$locksKey:$fingerprint", "\uD83D\uDD12", "NX", "EX", lockTtlSeconds) == "OK") .also {