Skip to content

Commit

Permalink
Extend background process manager
Browse files Browse the repository at this point in the history
It must be able to track processes so they can be restarted
  • Loading branch information
eikek committed Sep 5, 2024
1 parent 67e9ec5 commit eb1181b
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,68 +23,118 @@ import scala.concurrent.duration.FiniteDuration
import cats.effect.*
import cats.effect.kernel.Fiber
import cats.effect.kernel.Ref
import cats.effect.std.Supervisor
import cats.syntax.all.*

import io.renku.search.provision.BackgroundProcessManage.TaskName

trait BackgroundProcessManage[F[_]]:
def register(name: String, process: F[Unit]): F[Unit]
def register(name: TaskName, task: F[Unit]): F[Unit]

/** Starts all registered tasks in the background. */
def background(taskFilter: TaskName => Boolean): F[Unit]

/** Starts all registered tasks in the background, represented by `F[Unit]`. */
def background: Resource[F, F[Unit]]
def startAll: F[Unit]

/** Same as `.background.useForever` */
def startAll: F[Nothing]
/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit]

/** Get the names of all processses currently running. */
def currentProcesses: F[Set[TaskName]]

object BackgroundProcessManage:
type Process[F[_]] = Fiber[F, Throwable, Unit]
private type Process[F[_]] = Fiber[F, Throwable, Unit]

trait TaskName:
def equals(x: Any): Boolean
def hashCode(): Int

object TaskName:
final case class Name(value: String) extends TaskName
def fromString(name: String): TaskName = Name(name)

private case class State[F[_]](
tasks: Map[TaskName, F[Unit]],
processes: Map[TaskName, Process[F]]
):
def put(name: TaskName, p: F[Unit]): State[F] =
State(tasks.updated(name, p), processes)

def getTasks(filter: TaskName => Boolean): Map[TaskName, F[Unit]] =
tasks.view.filterKeys(filter).toMap

private case class State[F[_]](tasks: Map[String, F[Unit]]):
def put(name: String, p: F[Unit]): State[F] =
State(tasks.updated(name, p))
def getProcesses(filter: TaskName => Boolean): Map[TaskName, Process[F]] =
processes.view.filterKeys(filter).toMap

def getTasks: List[F[Unit]] = tasks.values.toList
def setProcesses(ps: Map[TaskName, Process[F]]): State[F] =
copy(processes = ps)

def removeProcesses(names: Set[TaskName]): State[F] =
copy(processes = processes.view.filterKeys(n => !names.contains(n)).toMap)

private object State:
def empty[F[_]]: State[F] = State[F](Map.empty)
def empty[F[_]]: State[F] = State[F](Map.empty, Map.empty)

def apply[F[_]: Async](
retryDelay: FiniteDuration,
maxRetries: Option[Int] = None
): F[BackgroundProcessManage[F]] =
): Resource[F, BackgroundProcessManage[F]] =
val logger = scribe.cats.effect[F]
Ref.of[F, State[F]](State.empty[F]).map { state =>
new BackgroundProcessManage[F] {
def register(name: String, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Nothing] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background.useForever

def background: Resource[F, F[Unit]] =
for {
ts <- Resource.eval(state.get.map(_.getTasks))
x <- ts.traverse(t => Async[F].background(t))
y = x.traverse_(_.map(_.embed(logger.info(s"Got cancelled"))))
} yield y

def wrapTask(name: String, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
Supervisor[F](await = false).flatMap { supervisor =>
Resource.eval(Ref.of[F, State[F]](State.empty[F])).map { state =>
new BackgroundProcessManage[F] {
def register(name: TaskName, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Unit] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background(_ => true)

def currentProcesses: F[Set[TaskName]] =
state.get.map(_.processes.keySet)

def background(taskFilter: TaskName => Boolean): F[Unit] =
for {
ts <- state.get.map(_.getTasks(taskFilter))
_ <- ts.toList
.traverse { case (name, task) =>
supervisor.supervise(task).map(t => name -> t)
}
.map(_.toMap)
.flatMap(ps => state.update(_.setProcesses(ps)))
} yield ()

/** Stop all tasks by filtering on their registered name. */
def cancelProcesses(filter: TaskName => Boolean): F[Unit] =
for
current <- state.get
ps = current.getProcesses(filter)
_ <- ps.toList.traverse_ { case (name, p) =>
logger.info(s"Cancel background process $name") >> p.cancel >> p.join
.flatMap(out => logger.info(s"Task $name cancelled: $out"))
}
Ref.of[F, Long](0).flatMap(run)
_ <- state.update(_.removeProcesses(ps.keySet))
yield ()

private def wrapTask(name: TaskName, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
}
}
Ref.of[F, Long](0).flatMap(run) >> state.update(_.removeProcesses(Set(name)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import fs2.Stream

import io.renku.redis.client.QueueName
import io.renku.search.config.QueuesConfig
import io.renku.search.provision.BackgroundProcessManage.TaskName
import io.renku.search.provision.MessageHandlers.MessageHandlerKey
import io.renku.search.provision.handler.*

/** The entry point for defining all message handlers.
Expand All @@ -39,109 +41,134 @@ final class MessageHandlers[F[_]: Async](
assert(maxConflictRetries >= 0, "maxConflictRetries must be >= 0")

private val logger = scribe.cats.effect[F]
private var tasks: Map[String, F[Unit]] = Map.empty
private def add[A](queue: QueueName, task: Stream[F, A]): Stream[F, Unit] =
tasks = tasks.updated(queue.name, task.compile.drain)
private var tasks: Map[TaskName, F[Unit]] = Map.empty
private def add[A](name: MessageHandlerKey, task: Stream[F, A]): Stream[F, Unit] =
tasks = tasks.updated(name, task.compile.drain)
task.void

private[provision] def withMaxConflictRetries(n: Int): MessageHandlers[F] =
new MessageHandlers[F](steps, cfg, n)

def getAll: Map[String, F[Unit]] = tasks
def getAll: Map[TaskName, F[Unit]] = tasks

val allEvents = add(
cfg.dataServiceAllEvents,
MessageHandlerKey.DataServiceAllEvents,
SyncMessageHandler(steps(cfg.dataServiceAllEvents), maxConflictRetries).create
)

val projectCreated: Stream[F, Unit] =
add(
cfg.projectCreated,
MessageHandlerKey.ProjectCreated,
SyncMessageHandler(steps(cfg.projectCreated), maxConflictRetries).create
)

val projectUpdated: Stream[F, Unit] =
add(
cfg.projectUpdated,
MessageHandlerKey.ProjectUpdated,
SyncMessageHandler(steps(cfg.projectUpdated), maxConflictRetries).create
)

val projectRemoved: Stream[F, Unit] =
add(
cfg.projectRemoved,
MessageHandlerKey.ProjectRemoved,
SyncMessageHandler(steps(cfg.projectRemoved), maxConflictRetries).create
)

val projectAuthAdded: Stream[F, Unit] =
add(
cfg.projectAuthorizationAdded,
MessageHandlerKey.ProjectAuthorizationAdded,
SyncMessageHandler(steps(cfg.projectAuthorizationAdded), maxConflictRetries).create
)

val projectAuthUpdated: Stream[F, Unit] =
add(
cfg.projectAuthorizationUpdated,
MessageHandlerKey.ProjectAuthorizationUpdated,
SyncMessageHandler(
steps(cfg.projectAuthorizationUpdated),
maxConflictRetries
).create
)

val projectAuthRemoved: Stream[F, Unit] = add(
cfg.projectAuthorizationRemoved,
MessageHandlerKey.ProjectAuthorizationRemoved,
SyncMessageHandler(steps(cfg.projectAuthorizationRemoved), maxConflictRetries).create
)

val userAdded: Stream[F, Unit] =
add(
cfg.userAdded,
MessageHandlerKey.UserAdded,
SyncMessageHandler(steps(cfg.userAdded), maxConflictRetries).create
)

val userUpdated: Stream[F, Unit] =
add(
cfg.userUpdated,
MessageHandlerKey.UserUpdated,
SyncMessageHandler(steps(cfg.userUpdated), maxConflictRetries).create
)

val userRemoved: Stream[F, Unit] =
add(
cfg.userRemoved,
MessageHandlerKey.UserRemoved,
SyncMessageHandler(steps(cfg.userRemoved), maxConflictRetries).create
)

val groupAdded: Stream[F, Unit] =
add(
cfg.groupAdded,
MessageHandlerKey.GroupAdded,
SyncMessageHandler(steps(cfg.groupAdded), maxConflictRetries).create
)

val groupUpdated: Stream[F, Unit] =
add(
cfg.groupUpdated,
MessageHandlerKey.GroupUpdated,
SyncMessageHandler(steps(cfg.groupUpdated), maxConflictRetries).create
)

val groupRemove: Stream[F, Unit] =
add(
cfg.groupRemoved,
MessageHandlerKey.GroupRemoved,
SyncMessageHandler(steps(cfg.groupRemoved), maxConflictRetries).create
)

val groupMemberAdded: Stream[F, Unit] =
add(
cfg.groupMemberAdded,
MessageHandlerKey.GroupMemberAdded,
SyncMessageHandler(steps(cfg.groupMemberAdded), maxConflictRetries).create
)

val groupMemberUpdated: Stream[F, Unit] =
add(
cfg.groupMemberUpdated,
MessageHandlerKey.GroupMemberUpdated,
SyncMessageHandler(steps(cfg.groupMemberUpdated), maxConflictRetries).create
)

val groupMemberRemoved: Stream[F, Unit] =
add(
cfg.groupMemberRemoved,
MessageHandlerKey.GroupMemberRemoved,
SyncMessageHandler(steps(cfg.groupMemberRemoved), maxConflictRetries).create
)

object MessageHandlers:

enum MessageHandlerKey extends TaskName:
case DataServiceAllEvents
case GroupMemberRemoved
case GroupMemberUpdated
case GroupMemberAdded
case GroupRemoved
case GroupUpdated
case GroupAdded
case UserRemoved
case UserAdded
case UserUpdated
case ProjectAuthorizationRemoved
case ProjectAuthorizationUpdated
case ProjectAuthorizationAdded
case ProjectRemoved
case ProjectUpdated
case ProjectCreated

object MessageHandlerKey:
def isInstance(tn: TaskName): Boolean = tn match
case _: MessageHandlerKey => true
case _ => false
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import cats.syntax.all.*
import io.renku.logging.LoggingSetup
import io.renku.search.http.HttpServer
import io.renku.search.metrics.CollectorRegistryBuilder
import io.renku.search.provision.BackgroundProcessManage.TaskName
import io.renku.search.provision.metrics.*
import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.migration.SchemaMigrator
Expand All @@ -46,22 +47,23 @@ object Microservice extends IOApp:
metrics = metricsUpdaterTask(services)
httpServer = httpServerTask(registryBuilder, services.config)
tasks = services.messageHandlers.getAll + metrics + httpServer
pm <- BackgroundProcessManage[IO](services.config.retryOnErrorDelay)
pm = services.backgroundManage
_ <- tasks.toList.traverse_(pm.register.tupled)
_ <- pm.startAll
_ <- IO.never
} yield ExitCode.Success
}

private def httpServerTask(
registryBuilder: CollectorRegistryBuilder[IO],
config: SearchProvisionConfig
) =
): (TaskName, IO[Unit]) =
val io = Routes[IO](registryBuilder)
.flatMap(HttpServer.build(_, config.httpServerConfig))
.use(_ => IO.never)
"http server" -> io
TaskName.fromString("http server") -> io

private def metricsUpdaterTask(services: Services[IO]) =
private def metricsUpdaterTask(services: Services[IO]): (TaskName, IO[Unit]) =
val updateInterval = services.config.metricsUpdateInterval
val io =
if (updateInterval <= Duration.Zero)
Expand All @@ -76,7 +78,7 @@ object Microservice extends IOApp:
services.queueClient,
services.solrClient
).run()
"metrics updater" -> io
TaskName.fromString("metrics updater") -> io

private def runSolrMigrations(cfg: SearchProvisionConfig): IO[Unit] =
SchemaMigrator[IO](cfg.solrConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ final case class Services[F[_]](
config: SearchProvisionConfig,
solrClient: SearchSolrClient[F],
queueClient: Stream[F, QueueClient[F]],
messageHandlers: MessageHandlers[F]
messageHandlers: MessageHandlers[F],
backgroundManage: BackgroundProcessManage[F]
)

object Services:
Expand All @@ -49,4 +50,6 @@ object Services:
inChunkSize = 1
)
handlers = MessageHandlers[F](steps, cfg.queuesConfig)
} yield Services(cfg, solr, redis, handlers)

bm <- BackgroundProcessManage[F](cfg.retryOnErrorDelay)
} yield Services(cfg, solr, redis, handlers, bm)
Loading

0 comments on commit eb1181b

Please sign in to comment.