Skip to content

Commit

Permalink
feat: redis connection to be restarted every configured interval (#74)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Mar 26, 2024
1 parent ce526d6 commit 22c6f17
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.renku.search.http.HttpServerConfig
import io.renku.solr.client.{SolrConfig, SolrUser}
import org.http4s.Uri

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.*

object ConfigValues extends ConfigDecoders:

Expand All @@ -45,8 +45,11 @@ object ConfigValues extends ConfigDecoders:
val maybeDB = renv("REDIS_DB").as[RedisDB].option
val maybePass = renv("REDIS_PASSWORD").as[RedisPassword].option
val maybeMasterSet = renv("REDIS_MASTER_SET").as[RedisMasterSet].option
val connectionRefresh =
renv("REDIS_CONNECTION_REFRESH_INTERVAL").as[FiniteDuration].default(30 minutes)

(host, port, sentinel, maybeDB, maybePass, maybeMasterSet).mapN(RedisConfig.apply)
(host, port, sentinel, maybeDB, maybePass, maybeMasterSet, connectionRefresh)
.mapN(RedisConfig.apply)
}

def eventQueue(eventType: String): ConfigValue[Effect, QueueName] =
Expand Down Expand Up @@ -78,7 +81,12 @@ object ConfigValues extends ConfigDecoders:
(url, core, maybeUser, defaultCommit, logMessageBodies).mapN(SolrConfig.apply)
}

def httpServerConfig(prefix: String, defaultPort: Port): ConfigValue[Effect, HttpServerConfig] =
val bindAddress = renv(s"${prefix}_HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address]
val port = renv(s"${prefix}_HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port]
def httpServerConfig(
prefix: String,
defaultPort: Port
): ConfigValue[Effect, HttpServerConfig] =
val bindAddress =
renv(s"${prefix}_HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address]
val port =
renv(s"${prefix}_HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port]
(bindAddress, port).mapN(HttpServerConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ package io.renku.redis.client
import dev.profunktor.redis4cats.connection.RedisURI
import io.lettuce.core.RedisURI as JRedisURI

import scala.concurrent.duration.FiniteDuration

final case class RedisConfig(
host: RedisHost,
port: RedisPort,
sentinel: Boolean = false,
maybeDB: Option[RedisDB] = None,
maybePassword: Option[RedisPassword] = None,
maybeMasterSet: Option[RedisMasterSet] = None
maybeMasterSet: Option[RedisMasterSet] = None,
connectionRefreshInterval: FiniteDuration
):
lazy val asRedisUri: RedisURI =
val builder = JRedisURI.Builder.redis(host.value, port.value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import io.lettuce.core.RedisConnectionException
import io.renku.redis.client.*
import io.renku.servers.RedisServer

import scala.concurrent.duration.*

trait RedisSpec:
self: munit.Suite =>

Expand Down Expand Up @@ -60,7 +62,8 @@ trait RedisSpec:
override lazy val redisConfig: RedisConfig =
RedisConfig(
RedisHost(server.host),
RedisPort(server.port)
RedisPort(server.port),
connectionRefreshInterval = 10 minutes
)

override def beforeAll(): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,15 @@ trait QueueClient[F[_]]:
def getSize(queueName: QueueName, from: MessageId): F[Long]

object QueueClient:

// Be aware that it was observed that the client can lose the connection to Redis.
// Because of that consider using the QueueClient.stream
// that auto-refreshes (recreates) the connection every connectionRefreshInterval.
def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] =
RedisQueueClient.make[F](redisConfig).map(new QueueClientImpl[F](_))

def stream[F[_]: Async](redisConfig: RedisConfig): Stream[F, QueueClient[F]] =
val s = Stream
.resource[F, QueueClient[F]](make(redisConfig))
.interruptAfter(redisConfig.connectionRefreshInterval)
s ++ stream(redisConfig)
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli.perftests

import cats.effect.{Async, Ref, Resource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import com.monovore.decline.Opts
import io.renku.redis.client.*
import org.http4s.Uri

import scala.concurrent.duration.*

final case class PerfTestsConfig(
itemsToGenerate: Int,
providers: List[Provider],
Expand Down Expand Up @@ -79,9 +81,13 @@ private object RedisConfigOpts:
.option[String]("redis-masterset", "Redis masterset")
.map(RedisMasterSet.apply)
.orNone
private val refreshInterval: Opts[FiniteDuration] =
Opts
.option[FiniteDuration]("redis-connection-refresh", "Redis connection refresh")
.withDefault(1 hour)

val configOpts: Opts[RedisConfig] =
(host, port, sentinel, db.map(Option(_)), password, masterSet)
(host, port, sentinel, db.map(Option(_)), password, masterSet, refreshInterval)
.mapN(RedisConfig.apply)

sealed private trait Provider
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli.perftests

import cats.Show
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.renku.search.metrics.CollectorRegistryBuilder
import io.renku.search.provision.metrics.{MetricsCollectorsUpdater, RedisMetrics}
import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.migration.SchemaMigrator

import scala.concurrent.duration.Duration

object Microservice extends IOApp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.search.provision

import cats.effect.kernel.{Async, Resource}
import fs2.Stream
import fs2.io.net.Network
import io.renku.queue.client.QueueClient
import io.renku.search.provision.handler.PipelineSteps
Expand All @@ -27,7 +28,7 @@ import io.renku.search.solr.client.SearchSolrClient
final case class Services[F[_]](
config: SearchProvisionConfig,
solrClient: SearchSolrClient[F],
queueClient: Resource[F, QueueClient[F]],
queueClient: Stream[F, QueueClient[F]],
messageHandlers: MessageHandlers[F]
)

Expand All @@ -38,9 +39,8 @@ object Services:
cfg <- Resource.eval(SearchProvisionConfig.config.load[F])
solr <- SearchSolrClient.make[F](cfg.solrConfig)

// The redis client must be initialized on each operation to
// be able to connect to the cluster
redis = QueueClient.make[F](cfg.redisConfig)
// The redis client is refreshed every now and then so it's provided as a stream
redis = QueueClient.stream[F](cfg.redisConfig)

steps = PipelineSteps[F](
solr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,17 @@

package io.renku.search.provision.handler

import cats.effect.Sync
import cats.Show
import cats.effect.Async
import cats.syntax.all.*
import fs2.Stream

import io.renku.queue.client.{QueueClient, QueueMessage}
import io.renku.redis.client.{ClientId, QueueName}
import fs2.{Chunk, Stream}
import io.renku.queue.client.{QueueClient, QueueMessage, RequestId}
import io.renku.redis.client.{ClientId, MessageId, QueueName}
import io.renku.search.provision.QueueMessageDecoder
import scala.concurrent.duration.FiniteDuration
import fs2.Chunk
import cats.effect.{Async, Resource}
import cats.Show
import io.renku.queue.client.RequestId
import io.renku.redis.client.MessageId
import scribe.Scribe

import scala.concurrent.duration.FiniteDuration

trait MessageReader[F[_]]:
def read[A](using
QueueMessageDecoder[F, A],
Expand All @@ -51,26 +47,29 @@ trait MessageReader[F[_]]:

object MessageReader:
final case class Message[A](raw: QueueMessage, decoded: Seq[A]):
val id = raw.id
val id: MessageId = raw.id
val requestId: RequestId = RequestId(raw.header.requestId)
def map[B](f: A => B): Message[B] = Message(raw, decoded.map(f))
def stream[F[_]]: Stream[F, A] = Stream.emits(decoded).covary[F]

/** MessageReader that dequeues messages attempt to decode it. If decoding fails, the
* message is marked as processed and the next message is read.
*/
def apply[F[_]: Sync](
queueClient: Resource[F, QueueClient[F]],
def apply[F[_]: Async](
queueClient: Stream[F, QueueClient[F]],
queue: QueueName,
clientId: ClientId,
chunkSize: Int
): MessageReader[F] =
new MessageReader[F]:
val logger = scribe.cats.effect[F]
val logger: Scribe[F] = scribe.cats.effect[F]

def read[A](using QueueMessageDecoder[F, A], Show[A]): Stream[F, Message[A]] =
override def read[A](using
QueueMessageDecoder[F, A],
Show[A]
): Stream[F, Message[A]] =
for {
client <- Stream.resource(queueClient)
client <- queueClient
last <- Stream.eval(client.findLastProcessed(clientId, queue))
qmsg <- client.acquireEventsStream(queue, chunkSize, last)
dec <- Stream
Expand All @@ -91,17 +90,17 @@ object MessageReader:
_ <- Stream.eval(logInfo(dec))
} yield dec

def markProcessed(id: MessageId): F[Unit] =
queueClient.use(_.markProcessed(clientId, queue, id))
override def markProcessed(id: MessageId): F[Unit] =
queueClient.evalMap(_.markProcessed(clientId, queue, id)).take(1).compile.drain

def markProcessedError(err: Throwable, id: MessageId)(using
override def markProcessedError(err: Throwable, id: MessageId)(using
logger: Scribe[F]
): F[Unit] =
markProcessed(id) >>
logger.error(s"Processing messageId: ${id} for '${queue}' failed", err)
logger.error(s"Processing messageId: $id for '$queue' failed", err)

private def logInfo[A: Show](m: Message[A]): F[Unit] =
lazy val values = m.decoded.mkString_(", ")
logger.info(
s"""Received message queue: ${queue.name}, id: ${m.id}, source: ${m.raw.header.source}, type: ${m.raw.header.`type`} for: ${values}"""
s"""Received message queue: ${queue.name}, id: ${m.id}, source: ${m.raw.header.source}, type: ${m.raw.header.`type`} for: $values"""
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

package io.renku.search.provision.handler

import cats.effect.{Resource, Sync}
import io.renku.search.solr.client.SearchSolrClient
import cats.effect.Async
import fs2.Stream
import io.renku.queue.client.QueueClient
import io.renku.redis.client.ClientId
import io.renku.redis.client.QueueName
import io.renku.redis.client.{ClientId, QueueName}
import io.renku.search.provision.QueuesConfig
import io.renku.search.solr.client.SearchSolrClient

trait PipelineSteps[F[_]]:
def reader: MessageReader[F]
Expand All @@ -35,9 +35,9 @@ trait PipelineSteps[F[_]]:
def userUtils: UserUtils[F]

object PipelineSteps:
def apply[F[_]: Sync](
def apply[F[_]: Async](
solrClient: SearchSolrClient[F],
queueClient: Resource[F, QueueClient[F]],
queueClient: Stream[F, QueueClient[F]],
queueConfig: QueuesConfig,
inChunkSize: Int,
clientId: ClientId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@
package io.renku.search.provision.handler

import cats.Show
import cats.effect.{Resource, Sync}
import cats.effect.Sync
import cats.syntax.all.*
import fs2.Pipe

import fs2.{Pipe, Stream}
import io.renku.avro.codec.encoders.all.given
import io.renku.events.v1.ProjectAuthorizationRemoved
import io.renku.queue.client.*
import io.renku.redis.client.ClientId
import io.renku.redis.client.MessageId
import io.renku.redis.client.{ClientId, MessageId}
import io.renku.search.provision.QueuesConfig
import scribe.Scribe

trait PushToRedis[F[_]]:
def pushAuthorizationRemoved(
Expand All @@ -40,31 +39,32 @@ trait PushToRedis[F[_]]:
object PushToRedis:

def apply[F[_]: Sync](
queueClient: Resource[F, QueueClient[F]],
queueClient: Stream[F, QueueClient[F]],
clientId: ClientId,
queueConfig: QueuesConfig
): PushToRedis[F] =
new PushToRedis[F] {
val logger = scribe.cats.effect[F]
val logger: Scribe[F] = scribe.cats.effect[F]

def pushAuthorizationRemoved(
requestId: RequestId
)(using
Show[ProjectAuthorizationRemoved]
): Pipe[F, ProjectAuthorizationRemoved, MessageId] =
_.evalMap(payload =>
createHeader(requestId).flatMap { header =>
logger.debug(show"Pushing $payload to redis") >>
queueClient.use(
_.enqueue(
queueConfig.projectAuthorizationRemoved,
header,
payload
)
)
}
_.evalMap(payload => createHeader(requestId).tupleRight(payload))
.evalTap { case (_, payload) => logger.debug(show"Pushing $payload to redis") }
.flatMap(enqueue)

private def enqueue(header: MessageHeader, payload: ProjectAuthorizationRemoved) =
queueClient.evalMap(
_.enqueue(
queueConfig.projectAuthorizationRemoved,
header,
payload
)
)

def createHeader(requestId: RequestId): F[MessageHeader] =
private def createHeader(requestId: RequestId): F[MessageHeader] =
MessageHeader[F](
MessageSource(clientId.value),
ProjectAuthorizationRemoved.SCHEMA$,
Expand Down
Loading

0 comments on commit 22c6f17

Please sign in to comment.