diff --git a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueue.kt b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueue.kt index 2cb4f58..095ca17 100644 --- a/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueue.kt +++ b/keiko-core/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueue.kt @@ -16,6 +16,7 @@ package com.netflix.spinnaker.q.metrics +import com.netflix.spinnaker.q.Message import com.netflix.spinnaker.q.Queue /** @@ -30,6 +31,12 @@ interface MonitorableQueue : Queue { * @return the current state of the queue. */ fun readState(): QueueState + + /** + * Confirms if the queue currently contains one or more messages matching + * [predicate]. + */ + fun containsMessage(predicate: (Message) -> Boolean): Boolean } /** diff --git a/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt b/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt index 5b723e7..10a2d3e 100644 --- a/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt +++ b/keiko-mem/src/main/kotlin/com/netflix/spinnaker/q/memory/InMemoryQueue.kt @@ -118,6 +118,9 @@ class InMemoryQueue( unacked = unacked.size ) + override fun containsMessage(predicate: (Message) -> Boolean): Boolean = + queue.map(Envelope::payload).any(predicate) + private fun ack(messageId: UUID) { unacked.removeIf { it.id == messageId } } diff --git a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt index 0e5b469..6b7a259 100644 --- a/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt +++ b/keiko-redis/src/main/kotlin/com/netflix/spinnaker/q/redis/RedisQueue.kt @@ -20,10 +20,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.google.common.hash.Hashing import com.netflix.spinnaker.KotlinOpen -import com.netflix.spinnaker.q.AttemptsAttribute -import com.netflix.spinnaker.q.DeadMessageCallback -import com.netflix.spinnaker.q.MaxAttemptsAttribute -import com.netflix.spinnaker.q.Message +import com.netflix.spinnaker.q.* import com.netflix.spinnaker.q.Queue import com.netflix.spinnaker.q.metrics.* import com.netflix.spinnaker.q.migration.SerializationMigrator @@ -215,6 +212,22 @@ class RedisQueue( } } + override fun containsMessage(predicate: (Message) -> Boolean): Boolean = + pool.resource.use { redis -> + var found = false + var cursor = "0" + while (!found) { + redis.hscan(messagesKey, cursor).apply { + found = result + .map { mapper.readValue(it.value) } + .any(predicate) + cursor = stringCursor + } + if (cursor == "0") break + } + return found + } + override fun toString() = "RedisQueue[$queueName]" private fun ackMessage(fingerprint: String) { diff --git a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt index 33fde50..19c9323 100644 --- a/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt +++ b/keiko-tck/src/main/kotlin/com/netflix/spinnaker/q/metrics/MonitorableQueueTest.kt @@ -24,10 +24,7 @@ import com.netflix.spinnaker.time.MutableClock import com.nhaarman.mockito_kotlin.* import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on +import org.jetbrains.spek.api.dsl.* import java.io.Closeable import java.time.Clock import java.time.Duration @@ -76,6 +73,12 @@ abstract class MonitorableQueueTest( } } } + + it("reports no matching message exists") { + with(queue!!) { + assertThat(containsMessage { it is TestMessage }).isFalse() + } + } } describe("pushing a message") { @@ -101,6 +104,12 @@ abstract class MonitorableQueueTest( } } } + + it("reports a matching message exists") { + with(queue!!) { + assertThat(containsMessage { it is TestMessage && it.payload == "a" }).isTrue() + } + } } describe("pushing a duplicate message") {