Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow to re-create the index from the redis stream #196

Merged
merged 4 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package io.renku.search.events

import io.bullet.borer.*

opaque type MessageId = String

object MessageId:

def apply(id: String): MessageId = id

extension (self: MessageId) def value: String = self

given Decoder[MessageId] = Decoder.forString
given Encoder[MessageId] = Encoder.forString
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ trait RedisQueueClient[F[_]] {
messageId: MessageId
): F[Unit]

def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit]

def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down Expand Up @@ -144,6 +149,21 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
}
}

override def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit] =
val key = formProcessedKey(clientId, queueNames)
createStringCommands.use { cmd =>
logger.debug(s"Delete last message-id for: $key") >>
cmd.del(key).flatMap(n => logger.debug(s"Deleted $n")).recoverWith { case ex =>
logger.warn(
s"Error deleting last message-id '$key'",
ex
)
}
}

override def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisBaseSuite:
.map(v => assert(v contains messageId))
yield ()

test("remove last seen message id"):
val clientId = RedisClientGenerators.clientIdGen.generateOne
val queue = NonEmptyList.of(RedisClientGenerators.queueNameGen.generateOne)
val messageId = RedisClientGenerators.messageIdGen.generateOne
for
client <- IO(redisClients().queueClient)
_ <- client.markProcessed(clientId, queue, messageId)
mid <- client.findLastProcessed(clientId, queue)
_ = assertEquals(mid, Some(messageId))

_ <- client.removeLastProcessed(clientId, queue)
_ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty))
yield ()

test("can find out the total size of the given stream"):
val queue = RedisClientGenerators.queueNameGen.generateOne
val messages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ trait QueueClient[F[_]]:

def findLastProcessed(queueNames: NonEmptyList[QueueName]): F[Option[MessageId]]

def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit]

def getSize(queueName: QueueName): F[Long]

def getSize(queueName: QueueName, from: MessageId): F[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ private class QueueClientImpl[F[_]: Async](
): F[Option[MessageId]] =
redisQueueClient.findLastProcessed(clientId, queueNames).map(_.map(MessageId.apply))

override def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit] =
redisQueueClient.removeLastProcessed(clientId, queueNames)

override def getSize(queueName: QueueName): F[Long] =
redisQueueClient.getSize(queueName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,118 @@ import scala.concurrent.duration.FiniteDuration
import cats.effect.*
import cats.effect.kernel.Fiber
import cats.effect.kernel.Ref
import cats.effect.std.Supervisor
import cats.syntax.all.*

import io.renku.search.provision.BackgroundProcessManage.TaskName

trait BackgroundProcessManage[F[_]]:
def register(name: String, process: F[Unit]): F[Unit]
def register(name: TaskName, task: F[Unit]): F[Unit]

/** Starts all registered tasks in the background. */
def background(taskFilter: TaskName => Boolean): F[Unit]

/** Starts all registered tasks in the background, represented by `F[Unit]`. */
def background: Resource[F, F[Unit]]
def startAll: F[Unit]

/** Same as `.background.useForever` */
def startAll: F[Nothing]
/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit]

/** Get the names of all processses currently running. */
def currentProcesses: F[Set[TaskName]]

object BackgroundProcessManage:
type Process[F[_]] = Fiber[F, Throwable, Unit]
private type Process[F[_]] = Fiber[F, Throwable, Unit]

trait TaskName:
def equals(x: Any): Boolean
def hashCode(): Int

object TaskName:
final case class Name(value: String) extends TaskName
def fromString(name: String): TaskName = Name(name)

private case class State[F[_]](
tasks: Map[TaskName, F[Unit]],
processes: Map[TaskName, Process[F]]
):
def put(name: TaskName, p: F[Unit]): State[F] =
State(tasks.updated(name, p), processes)

def getTasks(filter: TaskName => Boolean): Map[TaskName, F[Unit]] =
tasks.view.filterKeys(filter).toMap

private case class State[F[_]](tasks: Map[String, F[Unit]]):
def put(name: String, p: F[Unit]): State[F] =
State(tasks.updated(name, p))
def getProcesses(filter: TaskName => Boolean): Map[TaskName, Process[F]] =
processes.view.filterKeys(filter).toMap

def getTasks: List[F[Unit]] = tasks.values.toList
def setProcesses(ps: Map[TaskName, Process[F]]): State[F] =
copy(processes = ps)

def removeProcesses(names: Set[TaskName]): State[F] =
copy(processes = processes.view.filterKeys(n => !names.contains(n)).toMap)

private object State:
def empty[F[_]]: State[F] = State[F](Map.empty)
def empty[F[_]]: State[F] = State[F](Map.empty, Map.empty)

def apply[F[_]: Async](
retryDelay: FiniteDuration,
maxRetries: Option[Int] = None
): F[BackgroundProcessManage[F]] =
): Resource[F, BackgroundProcessManage[F]] =
val logger = scribe.cats.effect[F]
Ref.of[F, State[F]](State.empty[F]).map { state =>
new BackgroundProcessManage[F] {
def register(name: String, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Nothing] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background.useForever

def background: Resource[F, F[Unit]] =
for {
ts <- Resource.eval(state.get.map(_.getTasks))
x <- ts.traverse(t => Async[F].background(t))
y = x.traverse_(_.map(_.embed(logger.info(s"Got cancelled"))))
} yield y

def wrapTask(name: String, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
Supervisor[F](await = false).flatMap { supervisor =>
Resource.eval(Ref.of[F, State[F]](State.empty[F])).map { state =>
new BackgroundProcessManage[F] {
def register(name: TaskName, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Unit] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background(_ => true)

def currentProcesses: F[Set[TaskName]] =
state.get.map(_.processes.keySet)

def background(taskFilter: TaskName => Boolean): F[Unit] =
for {
ts <- state.get.map(_.getTasks(taskFilter))
_ <- ts.toList
.traverse { case (name, task) =>
supervisor.supervise(task).map(t => name -> t)
}
.map(_.toMap)
.flatMap(ps => state.update(_.setProcesses(ps)))
} yield ()

/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit] =
for
current <- state.get
ps = current.getProcesses(filter)
_ <- ps.toList.traverse_ { case (name, p) =>
logger.info(s"Cancel background process $name") >> p.cancel >> p.join
.flatMap(out => logger.info(s"Task $name cancelled: $out"))
}
Ref.of[F, Long](0).flatMap(run)
_ <- state.update(_.removeProcesses(ps.keySet))
yield ()

private def wrapTask(name: TaskName, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
}
}
Ref.of[F, Long](0).flatMap(run) >> state.update(_.removeProcesses(Set(name)))
}
}
}
Loading
Loading