Skip to content

Commit

Permalink
feat: RedisConfig added
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Feb 15, 2024
1 parent fdfa764 commit 04b9345
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ package io.renku.search.config
import cats.syntax.all.*
import ciris.{ConfigDecoder, ConfigError}
import io.renku.queue.client.QueueName
import io.renku.redis.client.RedisUrl
import io.renku.redis.client.*
import org.http4s.Uri

import scala.concurrent.duration.{Duration, FiniteDuration}

trait ConfigDecoders:

given ConfigDecoder[String, Uri] =
ConfigDecoder[String].mapEither { (_, s) =>
Uri.fromString(s).leftMap(err => ConfigError(err.getMessage))
Expand All @@ -37,8 +38,16 @@ trait ConfigDecoders:
Duration.unapply(s).map(Duration.apply.tupled).filter(_.isFinite)
}

given ConfigDecoder[String, RedisUrl] =
ConfigDecoder[String].map(s => RedisUrl(s))
given ConfigDecoder[String, RedisHost] =
ConfigDecoder[String].map(RedisHost.apply)
given ConfigDecoder[String, RedisPort] =
ConfigDecoder[String, Int].map(RedisPort.apply)
given ConfigDecoder[String, RedisDB] =
ConfigDecoder[String, Int].map(RedisDB.apply)
given ConfigDecoder[String, RedisPassword] =
ConfigDecoder[String].map(RedisPassword.apply)
given ConfigDecoder[String, RedisMasterSet] =
ConfigDecoder[String].map(RedisMasterSet.apply)

given ConfigDecoder[String, QueueName] =
ConfigDecoder[String].map(s => QueueName(s))
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package io.renku.search.config
import cats.syntax.all.*
import ciris.*
import io.renku.queue.client.QueueName
import io.renku.redis.client.RedisUrl
import io.renku.redis.client.*
import io.renku.solr.client.{SolrConfig, SolrUser}
import org.http4s.Uri

Expand All @@ -31,8 +31,15 @@ object ConfigValues extends ConfigDecoders:

private val prefix = "RS"

val redisUrl: ConfigValue[Effect, RedisUrl] =
env(s"${prefix}_REDIS_URL").default("redis://localhost:6379").as[RedisUrl]
val redisConfig: ConfigValue[Effect, RedisConfig] = {
val host = env(s"${prefix}_REDIS_HOST").default("localhost").as[RedisHost]
val port = env(s"${prefix}_REDIS_PORT").default("6379").as[RedisPort]
val maybeDB = env(s"${prefix}_REDIS_DB").as[RedisDB].option
val maybePass = env(s"${prefix}_REDIS_PASSWORD").as[RedisPassword].option
val maybeMasterSet = env(s"${prefix}_REDIS_MASTER_SET").as[RedisMasterSet].option

(host, port, maybeDB, maybePass, maybeMasterSet).mapN(RedisConfig.apply)
}

val eventsQueueName: ConfigValue[Effect, QueueName] =
env(s"${prefix}_REDIS_QUEUE_NAME").default("events").as[QueueName]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package io.renku.queue.client

import cats.effect.{Async, Resource}
import fs2.Stream
import io.renku.redis.client.{RedisQueueClient, RedisUrl}
import io.renku.redis.client.{RedisConfig, RedisQueueClient}
import scodec.bits.ByteVector

trait QueueClient[F[_]] {
Expand All @@ -43,5 +43,5 @@ trait QueueClient[F[_]] {
}

object QueueClient:
def apply[F[_]: Async](redisUrl: RedisUrl): Resource[F, QueueClient[F]] =
RedisQueueClient[F](redisUrl)
def apply[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] =
RedisQueueClient[F](redisConfig)
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package io.renku.queue.client
opaque type QueueName = String
object QueueName:
def apply(v: String): QueueName = v
extension (self: QueueName) def name: String = self

opaque type ClientId = String
object ClientId:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.{RedisClient, RedisMasterReplica, RedisURI}
import dev.profunktor.redis4cats.data.{ReadFrom, RedisCodec}
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import scodec.bits.ByteVector

sealed private trait ConnectionCreator[F[_]]:

def createStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]]

def createStringCommands: Resource[F, RedisCommands[F, String, String]]

object ConnectionCreator:
def create[F[_]: Async: Log](cfg: RedisConfig): Resource[F, ConnectionCreator[F]] =
lazy val dbPart = cfg.maybeDB.map(db => s"/$db").getOrElse("")
cfg.maybeMasterSet match {
case None =>
val uri = s"redis://${cfg.host}:${cfg.port}$dbPart"
RedisClient[F].from(uri).map(new SingleConnectionCreator(_))
case Some(masterSet) =>
val uri =
s"redis-sentinel://${cfg.host}:${cfg.port}$dbPart,${cfg.host}:${cfg.port.plusOne}$dbPart#$masterSet"
Resource
.eval(RedisURI.make[F](uri))
.map(new MasterReplicaConnectionCreator(_))
}

private class SingleConnectionCreator[F[_]: Async: Log](client: RedisClient)
extends ConnectionCreator[F]:

override def createStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)

override def createStringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)

private class MasterReplicaConnectionCreator[F[_]: Async: Log](uri: RedisURI)
extends ConnectionCreator[F]:

override def createStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] =
RedisStream
.mkMasterReplicaConnection[F, String, ByteVector](StringBytesCodec.instance, uri)(
None
)

override def createStringCommands: Resource[F, RedisCommands[F, String, String]] =
RedisMasterReplica[F]
.make(RedisCodec.Utf8, uri)(ReadFrom.UpstreamPreferred.some)
.flatMap(Redis[F].masterReplica)
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

final case class RedisConfig(
host: RedisHost,
port: RedisPort,
maybeDB: Option[RedisDB],
maybePassword: Option[RedisPassword] = None,
maybeMasterSet: Option[RedisMasterSet] = None
)

opaque type RedisHost = String
object RedisHost {
def apply(v: String): RedisHost = v
extension (self: RedisHost) def value: String = self
}

opaque type RedisPort = Int
object RedisPort {
def apply(v: Int): RedisPort = v
extension (self: RedisPort)
def value: Int = self
def plusOne: RedisPort = self + 1
}

opaque type RedisDB = Int
object RedisDB {
def apply(v: Int): RedisDB = v
extension (self: RedisDB) def value: Int = self
}

opaque type RedisPassword = String
object RedisPassword {
def apply(v: String): RedisPassword = v
extension (self: RedisPassword) def value: String = self
}

opaque type RedisMasterSet = String
object RedisMasterSet {
def apply(v: String): RedisMasterSet = v
extension (self: RedisMasterSet) def value: String = self
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,22 @@ package io.renku.redis.client

import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.*
import scodec.bits.ByteVector
import scribe.Scribe

object RedisQueueClient:
def apply[F[_]: Async](redisUrl: RedisUrl): Resource[F, QueueClient[F]] =

def apply[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] =
given Scribe[F] = scribe.cats[F]
given Log[F] = RedisLogger[F]
RedisClient[F].from(redisUrl.toString).map(new RedisQueueClient[F](_))
ConnectionCreator.create[F](redisConfig).map(new RedisQueueClient(_))

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {
class RedisQueueClient[F[_]: Async: Log](cc: ConnectionCreator[F])
extends QueueClient[F] {

private val payloadKey = "payload"
private val encodingKey = "encoding"
Expand All @@ -55,7 +52,7 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
Map(payloadKey -> message, encodingKey -> encodeEncoding(encoding))
)
)
createConnection
cc.createStreamingConnection
.flatMap(_.append(m))
.map(id => MessageId(id.value))
.compile
Expand All @@ -81,8 +78,8 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

createConnection >>= {
_.read(Set(queueName.toString), chunkSize, initialOffset)
cc.createStreamingConnection >>= {
_.read(Set(queueName.name), chunkSize, initialOffset)
.map(toMessage)
.collect { case Some(m) => m }
}
Expand All @@ -93,30 +90,23 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
Message(MessageId(m.id.value), encoding, payload)
}

private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)

override def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit] =
stringCommands.use {
cc.createStringCommands.use {
_.set(formProcessedKey(clientId, queueName), messageId.value)
}

override def findLastProcessed(
clientId: ClientId,
queueName: QueueName
): F[Option[MessageId]] =
stringCommands.use {
cc.createStringCommands.use {
_.get(formProcessedKey(clientId, queueName)).map(_.map(MessageId.apply))
}

private def stringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)

private def formProcessedKey(clientId: ClientId, queueName: QueueName) =
s"$queueName.$clientId"
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import io.lettuce.core.RedisConnectionException
import io.renku.queue.client.QueueClient
import io.renku.redis.client.RedisQueueClient
import io.renku.redis.client.*
import io.renku.servers.RedisServer

trait RedisSpec:
Expand Down Expand Up @@ -55,7 +55,15 @@ trait RedisSpec:
apply().flatMap(createRedisCommands)

override def asQueueClient(): Resource[IO, QueueClient[IO]] =
apply().map(new RedisQueueClient[IO](_))
RedisQueueClient[IO](
RedisConfig(
RedisHost(server.host),
RedisPort(server.port),
maybeDB = None,
maybePassword = None,
maybeMasterSet = None
)
)

override def beforeAll(): Unit =
server.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Microservice extends IOApp:
} yield ExitCode.Success

private def startProvisioning(cfg: SearchProvisionConfig): IO[Unit] =
SearchProvisioner[IO](cfg.queueName, cfg.redisUrl, cfg.solrConfig)
SearchProvisioner[IO](cfg.queueName, cfg.redisConfig, cfg.solrConfig)
.evalMap(_.provisionSolr.start)
.use(_ => IO.never)
.handleErrorWith { err =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ package io.renku.search.provision
import cats.syntax.all.*
import ciris.{ConfigValue, Effect}
import io.renku.queue.client.QueueName
import io.renku.redis.client.RedisUrl
import io.renku.redis.client.RedisConfig
import io.renku.search.config.ConfigValues
import io.renku.solr.client.SolrConfig

import scala.concurrent.duration.FiniteDuration

final case class SearchProvisionConfig(
redisUrl: RedisUrl,
redisConfig: RedisConfig,
queueName: QueueName,
solrConfig: SolrConfig,
retryOnErrorDelay: FiniteDuration
Expand All @@ -38,10 +38,8 @@ object SearchProvisionConfig:

val config: ConfigValue[Effect, SearchProvisionConfig] =
(
ConfigValues.redisUrl,
ConfigValues.redisConfig,
ConfigValues.eventsQueueName,
ConfigValues.solrConfig,
ConfigValues.retryOnErrorDelay
).mapN(
SearchProvisionConfig.apply
)
).mapN(SearchProvisionConfig.apply)
Loading

0 comments on commit 04b9345

Please sign in to comment.