Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
fix(redis): re-cache script if redis has restarted (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
Scott authored and robzienert committed Jan 3, 2019
1 parent a22e9aa commit b7dd214
Showing 1 changed file with 38 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import redis.clients.jedis.Jedis
import redis.clients.jedis.JedisCommands
import redis.clients.jedis.ScriptingCommands
import redis.clients.jedis.Transaction
import redis.clients.jedis.exceptions.JedisDataException
import redis.clients.jedis.params.sortedset.ZAddParams.zAddParams
import redis.clients.util.Pool
import java.io.IOException
Expand Down Expand Up @@ -93,10 +94,14 @@ class RedisQueue(
private lateinit var readMessageWithLockScriptSha: String

init {
cacheScript()
log.info("Configured queue: $queueName")
}

fun cacheScript() {
pool.resource.use { redis ->
readMessageWithLockScriptSha = redis.scriptLoad(READ_MESSAGE_WITH_LOCK_SRC)
}
log.info("Configured queue: $queueName")
}

override fun poll(callback: (Message, () -> Unit) -> Unit) {
Expand Down Expand Up @@ -321,29 +326,38 @@ class RedisQueue(
}

private fun ScriptingCommands.readMessageWithLock(): Triple<String, Instant, String?>? {
val response = evalsha(readMessageWithLockScriptSha, listOf(
queueKey,
unackedKey,
locksKey,
messagesKey
), listOf(
score().toString(),
10.toString(), // TODO rz - make this configurable.
lockTtlSeconds.toString(),
format(Locale.US, "%f", score(ackTimeout)),
format(Locale.US, "%f", score())
))
if (response is List<*>) {
return Triple(
response[0].toString(), // fingerprint
Instant.ofEpochMilli(response[1].toString().toLong()), // fingerprintScore
response[2]?.toString() // message
)
}
if (response == "ReadLockFailed") {
// This isn't a "bad" thing, but means there's more work than keiko can process in a cycle
// in this case, but may be a signal to tune `peekFingerprintCount`
fire(LockFailed)
try {
val response = evalsha(readMessageWithLockScriptSha, listOf(
queueKey,
unackedKey,
locksKey,
messagesKey
), listOf(
score().toString(),
10.toString(), // TODO rz - make this configurable.
lockTtlSeconds.toString(),
format(Locale.US, "%f", score(ackTimeout)),
format(Locale.US, "%f", score())
))
if (response is List<*>) {
return Triple(
response[0].toString(), // fingerprint
Instant.ofEpochMilli(response[1].toString().toLong()), // fingerprintScore
response[2]?.toString() // message
)
}
if (response == "ReadLockFailed") {
// This isn't a "bad" thing, but means there's more work than keiko can process in a cycle
// in this case, but may be a signal to tune `peekFingerprintCount`
fire(LockFailed)
}
} catch (e: JedisDataException) {
if ((e.message ?: "").startsWith("NOSCRIPT")) {
cacheScript()
return readMessageWithLock()
} else {
throw e
}
}
return null
}
Expand Down

0 comments on commit b7dd214

Please sign in to comment.