Skip to content

Commit

Permalink
Merge pull request #196 from SwissDataScienceCenter/recreate-index-api
Browse files Browse the repository at this point in the history
Allow to re-create the index from the redis stream
  • Loading branch information
eikek authored Sep 5, 2024
2 parents 227d114 + 376b3c7 commit e00be11
Show file tree
Hide file tree
Showing 23 changed files with 707 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@

package io.renku.search.events

import io.bullet.borer.*

opaque type MessageId = String

object MessageId:

def apply(id: String): MessageId = id

extension (self: MessageId) def value: String = self

given Decoder[MessageId] = Decoder.forString
given Encoder[MessageId] = Encoder.forString
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ trait RedisQueueClient[F[_]] {
messageId: MessageId
): F[Unit]

def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit]

def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down Expand Up @@ -144,6 +149,21 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
}
}

override def removeLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
): F[Unit] =
val key = formProcessedKey(clientId, queueNames)
createStringCommands.use { cmd =>
logger.debug(s"Delete last message-id for: $key") >>
cmd.del(key).flatMap(n => logger.debug(s"Deleted $n")).recoverWith { case ex =>
logger.warn(
s"Error deleting last message-id '$key'",
ex
)
}
}

override def findLastProcessed(
clientId: ClientId,
queueNames: NonEmptyList[QueueName]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,20 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisBaseSuite:
.map(v => assert(v contains messageId))
yield ()

test("remove last seen message id"):
val clientId = RedisClientGenerators.clientIdGen.generateOne
val queue = NonEmptyList.of(RedisClientGenerators.queueNameGen.generateOne)
val messageId = RedisClientGenerators.messageIdGen.generateOne
for
client <- IO(redisClients().queueClient)
_ <- client.markProcessed(clientId, queue, messageId)
mid <- client.findLastProcessed(clientId, queue)
_ = assertEquals(mid, Some(messageId))

_ <- client.removeLastProcessed(clientId, queue)
_ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty))
yield ()

test("can find out the total size of the given stream"):
val queue = RedisClientGenerators.queueNameGen.generateOne
val messages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ trait QueueClient[F[_]]:

def findLastProcessed(queueNames: NonEmptyList[QueueName]): F[Option[MessageId]]

def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit]

def getSize(queueName: QueueName): F[Long]

def getSize(queueName: QueueName, from: MessageId): F[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ private class QueueClientImpl[F[_]: Async](
): F[Option[MessageId]] =
redisQueueClient.findLastProcessed(clientId, queueNames).map(_.map(MessageId.apply))

override def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit] =
redisQueueClient.removeLastProcessed(clientId, queueNames)

override def getSize(queueName: QueueName): F[Long] =
redisQueueClient.getSize(queueName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,118 @@ import scala.concurrent.duration.FiniteDuration
import cats.effect.*
import cats.effect.kernel.Fiber
import cats.effect.kernel.Ref
import cats.effect.std.Supervisor
import cats.syntax.all.*

import io.renku.search.provision.BackgroundProcessManage.TaskName

trait BackgroundProcessManage[F[_]]:
def register(name: String, process: F[Unit]): F[Unit]
def register(name: TaskName, task: F[Unit]): F[Unit]

/** Starts all registered tasks in the background. */
def background(taskFilter: TaskName => Boolean): F[Unit]

/** Starts all registered tasks in the background, represented by `F[Unit]`. */
def background: Resource[F, F[Unit]]
def startAll: F[Unit]

/** Same as `.background.useForever` */
def startAll: F[Nothing]
/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit]

/** Get the names of all processses currently running. */
def currentProcesses: F[Set[TaskName]]

object BackgroundProcessManage:
type Process[F[_]] = Fiber[F, Throwable, Unit]
private type Process[F[_]] = Fiber[F, Throwable, Unit]

trait TaskName:
def equals(x: Any): Boolean
def hashCode(): Int

object TaskName:
final case class Name(value: String) extends TaskName
def fromString(name: String): TaskName = Name(name)

private case class State[F[_]](
tasks: Map[TaskName, F[Unit]],
processes: Map[TaskName, Process[F]]
):
def put(name: TaskName, p: F[Unit]): State[F] =
State(tasks.updated(name, p), processes)

def getTasks(filter: TaskName => Boolean): Map[TaskName, F[Unit]] =
tasks.view.filterKeys(filter).toMap

private case class State[F[_]](tasks: Map[String, F[Unit]]):
def put(name: String, p: F[Unit]): State[F] =
State(tasks.updated(name, p))
def getProcesses(filter: TaskName => Boolean): Map[TaskName, Process[F]] =
processes.view.filterKeys(filter).toMap

def getTasks: List[F[Unit]] = tasks.values.toList
def setProcesses(ps: Map[TaskName, Process[F]]): State[F] =
copy(processes = ps)

def removeProcesses(names: Set[TaskName]): State[F] =
copy(processes = processes.view.filterKeys(n => !names.contains(n)).toMap)

private object State:
def empty[F[_]]: State[F] = State[F](Map.empty)
def empty[F[_]]: State[F] = State[F](Map.empty, Map.empty)

def apply[F[_]: Async](
retryDelay: FiniteDuration,
maxRetries: Option[Int] = None
): F[BackgroundProcessManage[F]] =
): Resource[F, BackgroundProcessManage[F]] =
val logger = scribe.cats.effect[F]
Ref.of[F, State[F]](State.empty[F]).map { state =>
new BackgroundProcessManage[F] {
def register(name: String, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Nothing] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background.useForever

def background: Resource[F, F[Unit]] =
for {
ts <- Resource.eval(state.get.map(_.getTasks))
x <- ts.traverse(t => Async[F].background(t))
y = x.traverse_(_.map(_.embed(logger.info(s"Got cancelled"))))
} yield y

def wrapTask(name: String, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
Supervisor[F](await = false).flatMap { supervisor =>
Resource.eval(Ref.of[F, State[F]](State.empty[F])).map { state =>
new BackgroundProcessManage[F] {
def register(name: TaskName, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Unit] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background(_ => true)

def currentProcesses: F[Set[TaskName]] =
state.get.map(_.processes.keySet)

def background(taskFilter: TaskName => Boolean): F[Unit] =
for {
ts <- state.get.map(_.getTasks(taskFilter))
_ <- ts.toList
.traverse { case (name, task) =>
supervisor.supervise(task).map(t => name -> t)
}
.map(_.toMap)
.flatMap(ps => state.update(_.setProcesses(ps)))
} yield ()

/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit] =
for
current <- state.get
ps = current.getProcesses(filter)
_ <- ps.toList.traverse_ { case (name, p) =>
logger.info(s"Cancel background process $name") >> p.cancel >> p.join
.flatMap(out => logger.info(s"Task $name cancelled: $out"))
}
Ref.of[F, Long](0).flatMap(run)
_ <- state.update(_.removeProcesses(ps.keySet))
yield ()

private def wrapTask(name: TaskName, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
}
}
Ref.of[F, Long](0).flatMap(run) >> state.update(_.removeProcesses(Set(name)))
}
}
}
Loading

0 comments on commit e00be11

Please sign in to comment.