Skip to content

Commit

Permalink
feat: redis sentinel config
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Feb 16, 2024
1 parent f024eab commit 157f630
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ object ConfigValues extends ConfigDecoders:
val redisConfig: ConfigValue[Effect, RedisConfig] = {
val host = env(s"${prefix}_REDIS_HOST").default("localhost").as[RedisHost]
val port = env(s"${prefix}_REDIS_PORT").default("6379").as[RedisPort]
val sentinel = env(s"${prefix}_REDIS_SENTINEL").as[Boolean].default(false)
val maybeDB = env(s"${prefix}_REDIS_DB").as[RedisDB].option
val maybePass = env(s"${prefix}_REDIS_PASSWORD").as[RedisPassword].option
val maybeMasterSet = env(s"${prefix}_REDIS_MASTER_SET").as[RedisMasterSet].option

(host, port, maybeDB, maybePass, maybeMasterSet).mapN(RedisConfig.apply)
(host, port, sentinel, maybeDB, maybePass, maybeMasterSet).mapN(RedisConfig.apply)
}

val eventsQueueName: ConfigValue[Effect, QueueName] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.renku.redis.client

import cats.ApplicativeThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.{RedisClient, RedisMasterReplica, RedisURI}
Expand All @@ -26,13 +27,11 @@ import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.lettuce.core.{
RedisCredentials,
RedisURI as JRedisURI,
StaticCredentialsProvider
}
import io.lettuce.core.{ReadFrom as JReadFrom, RedisURI as JRedisURI}
import scodec.bits.ByteVector

import scala.jdk.CollectionConverters.given

sealed private trait ConnectionCreator[F[_]]:

def createStreamingConnection
Expand All @@ -43,28 +42,37 @@ sealed private trait ConnectionCreator[F[_]]:
object ConnectionCreator:

def create[F[_]: Async: Log](cfg: RedisConfig): Resource[F, ConnectionCreator[F]] =
val uri = createRedisUri(cfg)
cfg.maybeMasterSet match {
case None =>
RedisClient[F]
.fromUri(RedisURI.fromUnderlying(uri))
.map(new SingleConnectionCreator(_))
case Some(masterSet) =>
uri.setSentinelMasterId(masterSet.value)
Resource
.pure[F, RedisURI](RedisURI.fromUnderlying(uri))
.map(new MasterReplicaConnectionCreator(_))
}

private def createRedisUri(cfg: RedisConfig): JRedisURI = {
val uri = JRedisURI.create(cfg.host.value, cfg.port.value)
cfg.maybeDB.foreach(db => uri.setDatabase(db.value))
cfg.maybePassword.foreach(pass =>
uri.setCredentialsProvider(
new StaticCredentialsProvider(RedisCredentials.just(null, pass.value))
)
)
uri
val uri = redisUri(cfg)
if cfg.sentinel then
Resource.eval[F, ConnectionCreator[F]] {
ApplicativeThrow[F]
.catchNonFatal {
uri.getSentinels.asScala.toList.map(RedisURI.fromUnderlying)
}
.map(new SentinelConnectionCreator(_))
}
else
RedisClient[F]
.fromUri(RedisURI.fromUnderlying(uri))
.map(new SingleConnectionCreator(_))

private def redisUri(cfg: RedisConfig): JRedisURI = {

val uriBuilder = JRedisURI.builder
cfg.maybeDB.map(_.value).foreach(uriBuilder.withDatabase)

if cfg.sentinel then
cfg.maybePassword.fold(
uriBuilder.withSentinel(cfg.host.value, cfg.port.value)
)(pass => uriBuilder.withSentinel(cfg.host.value, cfg.port.value, pass.value))
cfg.maybeMasterSet.map(_.value).foreach(uriBuilder.withSentinelMasterId)
else
uriBuilder
.withHost(cfg.host.value)
.withPort(cfg.port.value)
cfg.maybePassword.foreach(pass => uriBuilder.withPassword(pass.value.toCharArray))

uriBuilder.build()
}

private class SingleConnectionCreator[F[_]: Async: Log](client: RedisClient)
Expand All @@ -78,17 +86,19 @@ private class SingleConnectionCreator[F[_]: Async: Log](client: RedisClient)
override def createStringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)

private class MasterReplicaConnectionCreator[F[_]: Async: Log](uri: RedisURI)
private class SentinelConnectionCreator[F[_]: Async: Log](uris: List[RedisURI])
extends ConnectionCreator[F]:

private val maybeReadFrom: Option[JReadFrom] = ReadFrom.UpstreamPreferred.some

override def createStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] =
RedisStream
.mkMasterReplicaConnection[F, String, ByteVector](StringBytesCodec.instance, uri)(
None
)
RedisStream.mkMasterReplicaConnection[F, String, ByteVector](
StringBytesCodec.instance,
uris: _*
)(maybeReadFrom)

override def createStringCommands: Resource[F, RedisCommands[F, String, String]] =
RedisMasterReplica[F]
.make(RedisCodec.Utf8, uri)(ReadFrom.UpstreamPreferred.some)
.make(RedisCodec.Utf8, uris: _*)(maybeReadFrom)
.flatMap(Redis[F].masterReplica)
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ package io.renku.redis.client
final case class RedisConfig(
host: RedisHost,
port: RedisPort,
maybeDB: Option[RedisDB],
sentinel: Boolean = false,
maybeDB: Option[RedisDB] = None,
maybePassword: Option[RedisPassword] = None,
maybeMasterSet: Option[RedisMasterSet] = None
)
Expand All @@ -35,9 +36,7 @@ object RedisHost {
opaque type RedisPort = Int
object RedisPort {
def apply(v: Int): RedisPort = v
extension (self: RedisPort)
def value: Int = self
def plusOne: RedisPort = self + 1
extension (self: RedisPort) def value: Int = self
}

opaque type RedisDB = Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class RedisQueueClient[F[_]: Async: Log](cc: ConnectionCreator[F])
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(
queueName.toString,
queueName.name,
Map(payloadKey -> message, encodingKey -> encodeEncoding(encoding))
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,7 @@ trait RedisSpec:
RedisQueueClient[IO](
RedisConfig(
RedisHost(server.host),
RedisPort(server.port),
maybeDB = None,
maybePassword = None,
maybeMasterSet = None
RedisPort(server.port)
)
)

Expand Down

0 comments on commit 157f630

Please sign in to comment.