From d62dc3e57465e02434c00017c9ed57e0b064abf0 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 11 Sep 2024 11:41:45 +0200 Subject: [PATCH 1/5] Introduce handling reprovisioning events - Integrates the new events into the system - A reprovisioning-msg is processed by triggering a re-index which would restart all event handlers. To not run into a dead lock, the restart must happen on a separate thread - handling events on the `SyncMessageHandler` is also paused. When this is the only running handler, using a `resetIndex` instead of a `startReIndex` can be used which wouldn't restart all handlers. Currently, there are still more handlers running concurrently (un-paused) - When starting up, reset the solr lock documents. This is not safe when multiple provisioning services are deployed, but we don't have that currently. In case of a crash/restart when holding onto such a lock, the service would startup into an unusable state otherwise --- .../adr/0001-reprovision-from-data-service.md | 80 +++++ .../io/renku/search/events/MessageId.scala | 8 +- .../io/renku/search/events/MsgType.scala | 2 + .../events/ReprovisioningFinished.scala | 63 ++++ .../search/events/ReprovisioningStarted.scala | 63 ++++ .../search/events/SyncEventMessage.scala | 23 +- .../io/renku/events/EventsGenerators.scala | 10 + .../redis/client/util/RedisBaseSuite.scala | 3 + .../io/renku/search/cli/ReprovisionCmd.scala | 46 +++ .../scala/io/renku/search/cli/SearchCli.scala | 3 + .../io/renku/search/cli/SubCommands.scala | 5 + .../search/cli/reprovision/FinishCmd.scala | 45 +++ .../search/cli/reprovision/StartCmd.scala | 45 +++ .../search/provision/MessageHandlers.scala | 43 +-- .../renku/search/provision/Microservice.scala | 4 +- .../io/renku/search/provision/Routes.scala | 42 ++- .../io/renku/search/provision/Services.scala | 19 +- .../search/provision/SyncMessageHandler.scala | 76 ++++- .../provision/handler/PipelineSteps.scala | 4 +- .../provision/process/Reprovisioning.scala | 64 ++++ .../provision/reindex/ReIndexService.scala | 14 +- .../reindex/ReprovisionService.scala | 90 ++++++ .../reindex/ReprovisionServiceImpl.scala | 175 +++++++++++ .../search/provision/ProvisioningSuite.scala | 30 +- .../renku/search/provision/TestServices.scala | 5 +- .../reindex/ReIndexServiceSpec.scala | 22 +- .../reindex/ReprovisionServiceSpec.scala | 278 ++++++++++++++++++ .../io/renku/solr/client/SolrClient.scala | 9 + .../io/renku/solr/client/SolrClientImpl.scala | 23 ++ project/AvroSchemaDownload.scala | 3 +- 30 files changed, 1219 insertions(+), 78 deletions(-) create mode 100644 docs/adr/0001-reprovision-from-data-service.md create mode 100644 modules/events/src/main/scala/io/renku/search/events/ReprovisioningFinished.scala create mode 100644 modules/events/src/main/scala/io/renku/search/events/ReprovisioningStarted.scala create mode 100644 modules/search-cli/src/main/scala/io/renku/search/cli/ReprovisionCmd.scala create mode 100644 modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/FinishCmd.scala create mode 100644 modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/StartCmd.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala create mode 100644 modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala diff --git a/docs/adr/0001-reprovision-from-data-service.md b/docs/adr/0001-reprovision-from-data-service.md new file mode 100644 index 00000000..daf3e924 --- /dev/null +++ b/docs/adr/0001-reprovision-from-data-service.md @@ -0,0 +1,80 @@ +# Reprovision From Data-Service + +## Context and Problem Statement + +The SOLR index can get out of sync due to many possible problems: +wrong events get sent, redis streams corrupted, bugs in search, etc. +Also for some SOLR schema changes, a re-index of every document is +necessary. This results in two things that we need: + +1. Run a re-index from search service directly that would ingest every + document again. This can be run on demand or as a result of + specific migrations that require re-indexing. +2. In case the redis stream is corrupted, all data must be sent again + from data services. This indicates a new "start" in the stream and + consumers should discard all previous events. + +## Considered Options + +### switching multiple streams + +In this scenario, data services would create a new stream, recreate +all events from the database and push them into the stream. This +happens concurrently with any "live" events that are happening at the +time. Once done, a different "control message" is sent to tell all +consumers to switch to a new stream. + +Pros: +- it makes it very easy to delete the entire old stream and cleanup + space on redis +- if something fails when re-creating the events, data services could + just start over as no one has consumed anything yet +- a re-index from the search side would be simple as it only requires + to re-read the stream from its beginning + +Cons: +- it is more complex protocol to tell consumers to switch, they could + miss the message (due to bugs) etc +- it requires to change configuration of services going from + statically known stream names to dynamic ones +- only when the "control" message is sent, consumers know about the + new stream and can start only then with consuming, which would lead + to a longer time of degraded search + +### using the existing stream + +Since all events are pushed to a single stream, send a +`ReprovisionStarted` message to indicate that data service is going to +re-send data. The search service can process this message by clearing +the index and keeping the message id. + +Pros: +- Doesn't require consumers to change their configuration to redis. +- Simple protocol by introducing another message +- Consumers can start immediately with processing new messages + +Cons: +- Deleting obsolete messages from the redis stream is possible, but + the data structure is not optimized for that +- A re-index from the search side requires to store the redis message + id of the `ReprovisionStarted` message and thus more book keeping is + required +- Consumers could still read "bad data" if they go before the + `ReprovisionStarted` message, as these messages are still visible + (until finally deleted) + +### Additional + +We also discussed marking the events coming from reprovisioning so +they are discoverable by consumers. One idea would be to use entirely +different event types, or to add some other property to the events. + +## Decision Outcome + +For now, we opted into the _using the existing stream_ option. It is +the easiest and quickest to implement and most of the code would be +needed also for the other option. The other option is still possible +to re-visit in the future. + +The idea of marking reprovisioning events has been dropped, as it was +deemed to be not necessary or useful. diff --git a/modules/events/src/main/scala/io/renku/search/events/MessageId.scala b/modules/events/src/main/scala/io/renku/search/events/MessageId.scala index 8e985959..32c6ed08 100644 --- a/modules/events/src/main/scala/io/renku/search/events/MessageId.scala +++ b/modules/events/src/main/scala/io/renku/search/events/MessageId.scala @@ -26,7 +26,13 @@ object MessageId: def apply(id: String): MessageId = id - extension (self: MessageId) def value: String = self + extension (self: MessageId) + def value: String = self + + private def order = new Ordered[MessageId] { + override def compare(that: MessageId): Int = self.compareTo(that) + } + export order.* given Decoder[MessageId] = Decoder.forString given Encoder[MessageId] = Encoder.forString diff --git a/modules/events/src/main/scala/io/renku/search/events/MsgType.scala b/modules/events/src/main/scala/io/renku/search/events/MsgType.scala index 84d69e50..35761c9f 100644 --- a/modules/events/src/main/scala/io/renku/search/events/MsgType.scala +++ b/modules/events/src/main/scala/io/renku/search/events/MsgType.scala @@ -35,6 +35,8 @@ enum MsgType(val name: String): case GroupMemberAdded extends MsgType("memberGroup.added") case GroupMemberUpdated extends MsgType("memberGroup.updated") case GroupMemberRemoved extends MsgType("memberGroup.removed") + case ReprovisioningStarted extends MsgType("reprovisioning.started") + case ReprovisioningFinished extends MsgType("reprovisioning.finished") object MsgType: def fromString(s: String): Either[String, MsgType] = diff --git a/modules/events/src/main/scala/io/renku/search/events/ReprovisioningFinished.scala b/modules/events/src/main/scala/io/renku/search/events/ReprovisioningFinished.scala new file mode 100644 index 00000000..8d2856a6 --- /dev/null +++ b/modules/events/src/main/scala/io/renku/search/events/ReprovisioningFinished.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.events + +import cats.Show +import cats.data.NonEmptyList + +import io.renku.avro.codec.AvroEncoder +import io.renku.avro.codec.all.given +import io.renku.events.v2 +import io.renku.search.model.* +import org.apache.avro.Schema + +sealed trait ReprovisioningFinished extends RenkuEventPayload: + val msgType = MsgType.ReprovisioningFinished + def withId(id: Id): ReprovisioningFinished + val schema: Schema = v2.ReprovisioningFinished.SCHEMA$ + val version: NonEmptyList[SchemaVersion] = NonEmptyList.of(SchemaVersion.V2) + def fold[A](f: v2.ReprovisioningFinished => A): A + +object ReprovisioningFinished: + def apply(id: Id): ReprovisioningFinished = V2(v2.ReprovisioningFinished(id.value)) + + final case class V2(event: v2.ReprovisioningFinished) extends ReprovisioningFinished: + val id: Id = Id(event.id) + def withId(id: Id): V2 = V2(event.copy(id = id.value)) + def fold[A](f: v2.ReprovisioningFinished => A): A = f(event) + + given AvroEncoder[ReprovisioningFinished] = + val v2e = AvroEncoder[v2.ReprovisioningFinished] + AvroEncoder.basic { v => + v.fold(v2e.encode(v.schema)) + } + + given EventMessageDecoder[ReprovisioningFinished] = + EventMessageDecoder.instance { qm => + qm.header.schemaVersion match + case SchemaVersion.V1 => + Left(DecodeFailure.VersionNotSupported(qm.id, qm.header)) + + case SchemaVersion.V2 => + val schema = v2.ReprovisioningFinished.SCHEMA$ + qm.toMessage[v2.ReprovisioningFinished](schema) + .map(_.map(ReprovisioningFinished.V2.apply)) + } + + given Show[ReprovisioningFinished] = Show.show(_.fold(_.toString)) diff --git a/modules/events/src/main/scala/io/renku/search/events/ReprovisioningStarted.scala b/modules/events/src/main/scala/io/renku/search/events/ReprovisioningStarted.scala new file mode 100644 index 00000000..87fb3036 --- /dev/null +++ b/modules/events/src/main/scala/io/renku/search/events/ReprovisioningStarted.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.events + +import cats.Show +import cats.data.NonEmptyList + +import io.renku.avro.codec.AvroEncoder +import io.renku.avro.codec.all.given +import io.renku.events.v2 +import io.renku.search.model.* +import org.apache.avro.Schema + +sealed trait ReprovisioningStarted extends RenkuEventPayload: + val msgType = MsgType.ReprovisioningStarted + def withId(id: Id): ReprovisioningStarted + val schema: Schema = v2.ReprovisioningStarted.SCHEMA$ + val version: NonEmptyList[SchemaVersion] = NonEmptyList.of(SchemaVersion.V2) + def fold[A](f: v2.ReprovisioningStarted => A): A + +object ReprovisioningStarted: + def apply(id: Id): ReprovisioningStarted = V2(v2.ReprovisioningStarted(id.value)) + + final case class V2(event: v2.ReprovisioningStarted) extends ReprovisioningStarted: + val id: Id = Id(event.id) + def withId(id: Id): V2 = V2(event.copy(id = id.value)) + def fold[A](f: v2.ReprovisioningStarted => A): A = f(event) + + given AvroEncoder[ReprovisioningStarted] = + val v2e = AvroEncoder[v2.ReprovisioningStarted] + AvroEncoder.basic { v => + v.fold(v2e.encode(v.schema)) + } + + given EventMessageDecoder[ReprovisioningStarted] = + EventMessageDecoder.instance { qm => + qm.header.schemaVersion match + case SchemaVersion.V1 => + Left(DecodeFailure.VersionNotSupported(qm.id, qm.header)) + + case SchemaVersion.V2 => + val schema = v2.ReprovisioningStarted.SCHEMA$ + qm.toMessage[v2.ReprovisioningStarted](schema) + .map(_.map(ReprovisioningStarted.V2.apply)) + } + + given Show[ReprovisioningStarted] = Show.show(_.fold(_.toString)) diff --git a/modules/events/src/main/scala/io/renku/search/events/SyncEventMessage.scala b/modules/events/src/main/scala/io/renku/search/events/SyncEventMessage.scala index 84cceb32..527a463c 100644 --- a/modules/events/src/main/scala/io/renku/search/events/SyncEventMessage.scala +++ b/modules/events/src/main/scala/io/renku/search/events/SyncEventMessage.scala @@ -26,7 +26,8 @@ type SyncEventMessage = EventMessage[ProjectCreated] | EventMessage[ProjectUpdat EventMessage[UserAdded] | EventMessage[UserUpdated] | EventMessage[UserRemoved] | EventMessage[GroupAdded] | EventMessage[GroupUpdated] | EventMessage[GroupRemoved] | EventMessage[GroupMemberAdded] | EventMessage[GroupMemberUpdated] | - EventMessage[GroupMemberRemoved] + EventMessage[GroupMemberRemoved] | EventMessage[ReprovisioningStarted] | + EventMessage[ReprovisioningFinished] object SyncEventMessage: @@ -63,6 +64,10 @@ object SyncEventMessage: mt.decoder.decode(qm) case mt: MsgType.GroupMemberRemoved.type => mt.decoder.decode(qm) + case mt: MsgType.ReprovisioningStarted.type => + mt.decoder.decode(qm) + case mt: MsgType.ReprovisioningFinished.type => + mt.decoder.decode(qm) // maps each MsgType to its correspondin payload type by providing a // cast method paired with the decoder that results in that payload @@ -188,4 +193,20 @@ object SyncEventMessage: @targetName("decoderGroupMemberRemoved") def decoder: EventMessageDecoder[GroupMemberRemoved] = EventMessageDecoder[GroupMemberRemoved] + + extension (self: MsgType.ReprovisioningStarted.type) + @targetName("castReprovisioningStarted") + def cast(x: SyncEventMessage): EventMessage[ReprovisioningStarted] = + x.asInstanceOf[EventMessage[ReprovisioningStarted]] + @targetName("decoderReprovisioningStarted") + def decoder: EventMessageDecoder[ReprovisioningStarted] = + EventMessageDecoder[ReprovisioningStarted] + + extension (self: MsgType.ReprovisioningFinished.type) + @targetName("castReprovisioningFinished") + def cast(x: SyncEventMessage): EventMessage[ReprovisioningFinished] = + x.asInstanceOf[EventMessage[ReprovisioningFinished]] + @targetName("decoderReprovisioningFinished") + def decoder: EventMessageDecoder[ReprovisioningFinished] = + EventMessageDecoder[ReprovisioningFinished] } diff --git a/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala index 7bc1c486..6d33609b 100644 --- a/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala +++ b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala @@ -383,6 +383,16 @@ object EventsGenerators: def groupUpdatedGen(prefix: String): Gen[GroupUpdated] = v2GroupUpdatedGen(prefix).map(GroupUpdated.V2.apply) + def reprovisionStarted( + id: Gen[Id] = stringGen(5).map(Id.apply) + ): Gen[ReprovisioningStarted] = + id.map(ReprovisioningStarted.apply) + + def reprovisionFinished( + id: Gen[Id] = stringGen(5).map(Id.apply) + ): Gen[ReprovisioningFinished] = + id.map(ReprovisioningFinished.apply) + def stringGen(max: Int): Gen[String] = Gen .chooseNum(3, max) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisBaseSuite.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisBaseSuite.scala index 3c6fcada..87413510 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisBaseSuite.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisBaseSuite.scala @@ -49,3 +49,6 @@ trait RedisBaseSuite yield RedisClients(config, lc, cmds, qc) val redisClients = ResourceSuiteLocalFixture("all-redis-clients", redisClientsR) + + val redisClearAll: IO[Unit] = + IO(redisClients()).flatMap(_.commands.flushAll) diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/ReprovisionCmd.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/ReprovisionCmd.scala new file mode 100644 index 00000000..7541f23c --- /dev/null +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/ReprovisionCmd.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.cli + +import cats.effect.* + +import com.monovore.decline.Opts +import io.renku.search.cli.reprovision.* + +object ReprovisionCmd: + + enum SubCmdOpts: + case Start(opts: StartCmd.Options) + case Finish(opts: FinishCmd.Options) + + private val startOpts: Opts[StartCmd.Options] = + Opts.subcommand("start", "Send reprovisioning-started message")(StartCmd.opts) + + private val finishOpts: Opts[FinishCmd.Options] = + Opts.subcommand("finish", "Send reprovisioning-finished message")(FinishCmd.opts) + + val opts: Opts[SubCmdOpts] = + startOpts + .map(SubCmdOpts.Start.apply) + .orElse(finishOpts.map(SubCmdOpts.Finish.apply)) + + def apply(opts: SubCmdOpts): IO[ExitCode] = + opts match + case SubCmdOpts.Start(c) => StartCmd(c) + case SubCmdOpts.Finish(c) => FinishCmd(c) diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/SearchCli.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/SearchCli.scala index 960effb8..9bd64af7 100644 --- a/modules/search-cli/src/main/scala/io/renku/search/cli/SearchCli.scala +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/SearchCli.scala @@ -44,4 +44,7 @@ object SearchCli case SubCommands.User(opts) => UserCmd(opts) + + case SubCommands.Reprovision(opts) => + ReprovisionCmd(opts) } diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/SubCommands.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/SubCommands.scala index 5c9a632d..a8c3b484 100644 --- a/modules/search-cli/src/main/scala/io/renku/search/cli/SubCommands.scala +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/SubCommands.scala @@ -28,6 +28,7 @@ enum SubCommands: case Group(opts: GroupCmd.SubCmdOpts) case Project(opts: ProjectCmd.SubCmdOpts) case User(opts: UserCmd.SubCmdOpts) + case Reprovision(opts: ReprovisionCmd.SubCmdOpts) private object SubCommands: @@ -43,9 +44,13 @@ private object SubCommands: private val userOpts: Opts[UserCmd.SubCmdOpts] = Opts.subcommand("user", "Manage user events")(UserCmd.opts) + private val reprovisionOpts: Opts[ReprovisionCmd.SubCmdOpts] = + Opts.subcommand("reprovision", "Send reprovisioning messages")(ReprovisionCmd.opts) + val opts: Opts[SubCommands] = perfTestOpts .map(SubCommands.PerfTests.apply) .orElse(groupOpts.map(SubCommands.Group.apply)) .orElse(projectOpts.map(SubCommands.Project.apply)) .orElse(userOpts.map(SubCommands.User.apply)) + .orElse(reprovisionOpts.map(SubCommands.Reprovision.apply)) diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/FinishCmd.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/FinishCmd.scala new file mode 100644 index 00000000..766268e4 --- /dev/null +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/FinishCmd.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.cli.reprovision + +import cats.effect.* +import cats.syntax.all.* + +import com.monovore.decline.Opts +import io.renku.search.cli.{CommonOpts, Services} +import io.renku.search.config.QueuesConfig +import io.renku.search.events.ReprovisioningFinished +import io.renku.search.model.* + +object FinishCmd: + + final case class Options(id: Id): + def asPayload: ReprovisioningFinished = ReprovisioningFinished(id) + + val opts: Opts[Options] = + CommonOpts.idOpt.map(Options.apply) + + def apply(cfg: Options): IO[ExitCode] = + Services.queueClient.use { queue => + for + queuesCfg <- QueuesConfig.config.load[IO] + msg <- Services.createMessage(cfg.asPayload) + _ <- queue.enqueue(queuesCfg.dataServiceAllEvents, msg) + yield ExitCode.Success + } diff --git a/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/StartCmd.scala b/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/StartCmd.scala new file mode 100644 index 00000000..f9ac5e84 --- /dev/null +++ b/modules/search-cli/src/main/scala/io/renku/search/cli/reprovision/StartCmd.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.cli.reprovision + +import cats.effect.* +import cats.syntax.all.* + +import com.monovore.decline.Opts +import io.renku.search.cli.{CommonOpts, Services} +import io.renku.search.config.QueuesConfig +import io.renku.search.events.ReprovisioningStarted +import io.renku.search.model.* + +object StartCmd: + + final case class Options(id: Id): + def asPayload: ReprovisioningStarted = ReprovisioningStarted(id) + + val opts: Opts[Options] = + CommonOpts.idOpt.map(Options.apply) + + def apply(cfg: Options): IO[ExitCode] = + Services.queueClient.use { queue => + for + queuesCfg <- QueuesConfig.config.load[IO] + msg <- Services.createMessage(cfg.asPayload) + _ <- queue.enqueue(queuesCfg.dataServiceAllEvents, msg) + yield ExitCode.Success + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/MessageHandlers.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/MessageHandlers.scala index 99ce4a72..3877ebb3 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/MessageHandlers.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/MessageHandlers.scala @@ -27,6 +27,7 @@ import io.renku.search.config.QueuesConfig import io.renku.search.provision.BackgroundProcessManage.TaskName import io.renku.search.provision.MessageHandlers.MessageHandlerKey import io.renku.search.provision.handler.* +import io.renku.search.provision.reindex.ReprovisionService /** The entry point for defining all message handlers. * @@ -35,7 +36,9 @@ import io.renku.search.provision.handler.* */ final class MessageHandlers[F[_]: Async]( steps: QueueName => PipelineSteps[F], + reprovisionService: ReprovisionService[F], cfg: QueuesConfig, + ctrl: SyncMessageHandler.Control[F], maxConflictRetries: Int = 20 ): assert(maxConflictRetries >= 0, "maxConflictRetries must be >= 0") @@ -47,105 +50,105 @@ final class MessageHandlers[F[_]: Async]( task.void private[provision] def withMaxConflictRetries(n: Int): MessageHandlers[F] = - new MessageHandlers[F](steps, cfg, n) + new MessageHandlers[F](steps, reprovisionService, cfg, ctrl, n) def getAll: Map[TaskName, F[Unit]] = tasks + private[provision] def createHandler(qn: QueueName): SyncMessageHandler[F] = + new SyncMessageHandler(steps(qn), reprovisionService, ctrl, maxConflictRetries) + val allEvents = add( MessageHandlerKey.DataServiceAllEvents, - SyncMessageHandler(steps(cfg.dataServiceAllEvents), maxConflictRetries).create + createHandler(cfg.dataServiceAllEvents).create ) val projectCreated: Stream[F, Unit] = add( MessageHandlerKey.ProjectCreated, - SyncMessageHandler(steps(cfg.projectCreated), maxConflictRetries).create + createHandler(cfg.projectCreated).create ) val projectUpdated: Stream[F, Unit] = add( MessageHandlerKey.ProjectUpdated, - SyncMessageHandler(steps(cfg.projectUpdated), maxConflictRetries).create + createHandler(cfg.projectUpdated).create ) val projectRemoved: Stream[F, Unit] = add( MessageHandlerKey.ProjectRemoved, - SyncMessageHandler(steps(cfg.projectRemoved), maxConflictRetries).create + createHandler(cfg.projectRemoved).create ) val projectAuthAdded: Stream[F, Unit] = add( MessageHandlerKey.ProjectAuthorizationAdded, - SyncMessageHandler(steps(cfg.projectAuthorizationAdded), maxConflictRetries).create + createHandler(cfg.projectAuthorizationAdded).create ) val projectAuthUpdated: Stream[F, Unit] = add( MessageHandlerKey.ProjectAuthorizationUpdated, - SyncMessageHandler( - steps(cfg.projectAuthorizationUpdated), - maxConflictRetries - ).create + createHandler(cfg.projectAuthorizationUpdated).create ) val projectAuthRemoved: Stream[F, Unit] = add( MessageHandlerKey.ProjectAuthorizationRemoved, - SyncMessageHandler(steps(cfg.projectAuthorizationRemoved), maxConflictRetries).create + createHandler(cfg.projectAuthorizationRemoved).create ) val userAdded: Stream[F, Unit] = add( MessageHandlerKey.UserAdded, - SyncMessageHandler(steps(cfg.userAdded), maxConflictRetries).create + createHandler(cfg.userAdded).create ) val userUpdated: Stream[F, Unit] = add( MessageHandlerKey.UserUpdated, - SyncMessageHandler(steps(cfg.userUpdated), maxConflictRetries).create + createHandler(cfg.userUpdated).create ) val userRemoved: Stream[F, Unit] = add( MessageHandlerKey.UserRemoved, - SyncMessageHandler(steps(cfg.userRemoved), maxConflictRetries).create + createHandler(cfg.userRemoved).create ) val groupAdded: Stream[F, Unit] = add( MessageHandlerKey.GroupAdded, - SyncMessageHandler(steps(cfg.groupAdded), maxConflictRetries).create + createHandler(cfg.groupAdded).create ) val groupUpdated: Stream[F, Unit] = add( MessageHandlerKey.GroupUpdated, - SyncMessageHandler(steps(cfg.groupUpdated), maxConflictRetries).create + createHandler(cfg.groupUpdated).create ) val groupRemove: Stream[F, Unit] = add( MessageHandlerKey.GroupRemoved, - SyncMessageHandler(steps(cfg.groupRemoved), maxConflictRetries).create + createHandler(cfg.groupRemoved).create ) val groupMemberAdded: Stream[F, Unit] = add( MessageHandlerKey.GroupMemberAdded, - SyncMessageHandler(steps(cfg.groupMemberAdded), maxConflictRetries).create + createHandler(cfg.groupMemberAdded).create ) val groupMemberUpdated: Stream[F, Unit] = add( MessageHandlerKey.GroupMemberUpdated, - SyncMessageHandler(steps(cfg.groupMemberUpdated), maxConflictRetries).create + createHandler(cfg.groupMemberUpdated).create ) val groupMemberRemoved: Stream[F, Unit] = add( MessageHandlerKey.GroupMemberRemoved, - SyncMessageHandler(steps(cfg.groupMemberRemoved), maxConflictRetries).create + createHandler(cfg.groupMemberRemoved).create ) object MessageHandlers: diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 7417d594..f18f183a 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -40,6 +40,8 @@ object Microservice extends IOApp: for { _ <- IO(LoggingSetup.doConfigure(services.config.verbosity)) migrateResult <- runSolrMigrations(services.config) + // this is only safe for a single provisioning service + _ <- services.resetLockDocuments registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics .add(RedisMetrics.queueSizeGauge) .add(RedisMetrics.unprocessedGauge) @@ -53,7 +55,7 @@ object Microservice extends IOApp: if (migrateResult.reindexRequired) logger.info( "Re-Index is required after migrations have applied!" - ) >> services.reIndex.resetData(None) + ) >> services.reprovision.recreateIndex else IO.unit _ <- pm.startAll _ <- IO.never diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala index 13f4b04e..627819f9 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala @@ -22,26 +22,23 @@ import cats.effect.{Async, Resource} import cats.syntax.all.* import fs2.io.net.Network +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs +import io.renku.search.events.MessageId +import io.renku.search.http.borer.BorerEntityJsonCodec import io.renku.search.http.metrics.MetricsRoutes import io.renku.search.http.routes.OperationRoutes import io.renku.search.metrics.CollectorRegistryBuilder +import io.renku.search.provision.reindex.ReprovisionService.ReprovisionRequest import org.http4s.HttpRoutes import org.http4s.dsl.Http4sDsl import org.http4s.server.Router -private object Routes: - - def apply[F[_]: Async: Network]( - registryBuilder: CollectorRegistryBuilder[F], - services: Services[F] - ): Resource[F, HttpRoutes[F]] = - MetricsRoutes[F](registryBuilder).makeRoutes - .map(new Routes[F](_, services).routes) - final private class Routes[F[_]: Async]( metricsRoutes: HttpRoutes[F], services: Services[F] -) extends Http4sDsl[F]: +) extends Http4sDsl[F] + with BorerEntityJsonCodec: private lazy val operationRoutes = Router[F]( @@ -52,9 +49,26 @@ final private class Routes[F[_]: Async]( lazy val routes: HttpRoutes[F] = operationRoutes <+> metricsRoutes - def reIndexRoutes: HttpRoutes[F] = HttpRoutes.of { case POST -> Root => - services.reIndex.startReIndex(None).flatMap { - case true => NoContent() - case false => UnprocessableEntity() + def reIndexRoutes: HttpRoutes[F] = HttpRoutes.of { case req @ POST -> Root => + req.as[Routes.ReIndexMessage].flatMap { msg => + services.reprovision.reprovision(msg.toRequest).flatMap { + case true => NoContent() + case false => UnprocessableEntity() + } } } + +private object Routes: + def apply[F[_]: Async: Network]( + registryBuilder: CollectorRegistryBuilder[F], + services: Services[F] + ): Resource[F, HttpRoutes[F]] = + MetricsRoutes[F](registryBuilder).makeRoutes + .map(new Routes[F](_, services).routes) + + final case class ReIndexMessage(messageId: Option[MessageId] = None): + def toRequest: ReprovisionRequest = messageId + .map(ReprovisionRequest.forSpecificMessage) + .getOrElse(ReprovisionRequest.lastStart) + object ReIndexMessage: + given Decoder[ReIndexMessage] = MapBasedCodecs.deriveDecoder diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala index b397c14f..48fb479f 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala @@ -18,13 +18,16 @@ package io.renku.search.provision +import cats.NonEmptyParallel import cats.effect.kernel.{Async, Resource} +import cats.syntax.all.* import fs2.Stream import fs2.io.net.Network import io.renku.queue.client.QueueClient import io.renku.search.provision.handler.PipelineSteps import io.renku.search.provision.reindex.ReIndexService +import io.renku.search.provision.reindex.ReprovisionService import io.renku.search.solr.client.SearchSolrClient final case class Services[F[_]]( @@ -33,8 +36,12 @@ final case class Services[F[_]]( queueClient: Stream[F, QueueClient[F]], messageHandlers: MessageHandlers[F], backgroundManage: BackgroundProcessManage[F], - reIndex: ReIndexService[F] -) + reprovision: ReprovisionService[F], + reindex: ReIndexService[F] +): + + def resetLockDocuments(using NonEmptyParallel[F]): F[Unit] = + (reprovision.resetLockDocument, reindex.resetLockDocument).parMapN((_, _) => ()) object Services: @@ -51,9 +58,9 @@ object Services: redis, inChunkSize = 1 ) - handlers = MessageHandlers[F](steps, cfg.queuesConfig) - bm <- BackgroundProcessManage[F](cfg.retryOnErrorDelay) - ris = ReIndexService[F](bm, redis, solr, cfg.queuesConfig) - } yield Services(cfg, solr, redis, handlers, bm, ris) + rps = ReprovisionService(ris, solr.underlying) + ctrl <- Resource.eval(SyncMessageHandler.Control[F]) + handlers = MessageHandlers[F](steps, rps, cfg.queuesConfig, ctrl) + } yield Services(cfg, solr, redis, handlers, bm, rps, ris) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala index b3dfe1e3..1ae3117c 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala @@ -19,21 +19,26 @@ package io.renku.search.provision import cats.effect.* +import cats.effect.std.Semaphore import cats.syntax.all.* import fs2.{Pipe, Stream} import io.renku.search.events.* import io.renku.search.events.SyncEventMessage.syntax.* +import io.renku.search.model.Id import io.renku.search.provision.SyncMessageHandler.Result import io.renku.search.provision.handler.* import io.renku.search.provision.handler.DeleteFromSolr.DeleteResult import io.renku.search.provision.handler.PipelineSteps import io.renku.search.provision.process.* +import io.renku.search.provision.reindex.ReprovisionService import io.renku.solr.client.UpsertResponse import scribe.Scribe final class SyncMessageHandler[F[_]: Async]( ps: PipelineSteps[F], + reprovisionService: ReprovisionService[F], + control: SyncMessageHandler.Control[F], maxConflictRetries: Int = 20 ): @@ -46,11 +51,17 @@ final class SyncMessageHandler[F[_]: Async]( private val groupUpdate = GroupUpdate[F](ps) private val groupRemove = GroupRemove[F](ps) private val groupMemberUpsert = GroupMemberUpsert[F](ps) + private val reprovisioning = Reprovisioning[F](reprovisionService) def create: Stream[F, Result] = ps.reader.readSyncEvents.through(processEvents) def processEvents: Pipe[F, SyncEventMessage, Result] = - _.evalMap(processEvent) + _.evalMap { msg => + for + _ <- control.await + r <- processEvent(msg) + yield r + } def processEvent(m: SyncEventMessage): F[Result] = m.header.msgType match @@ -131,6 +142,31 @@ final class SyncMessageHandler[F[_]: Async]( .map(Result.Upsert.apply) ) + case mt: MsgType.ReprovisioningStarted.type => + // currently there are more than one stream handler pausing is + // only for this one, so it is necessary to do the "full" + // re-index, which restarts all handlers. For this it must be + // run on a separate thread. Ideally, when only this handler + // is active, the pause is sufficient and we can use + // `resetIndex` instead of `startReIndex` + control.whilePaused.use { _ => + for + res <- Deferred[F, Option[Id]] + _ <- reprovisioning.processStart( + mt.cast(m), + id => res.complete(id).void + ) + id <- res.get + yield Result.ReprovisionStart(id) + } + + case mt: MsgType.ReprovisioningFinished.type => + markMessage(m)( + reprovisioning + .processFinish(mt.cast(m)) + .map(Result.ReprovisionFinish.apply) + ) + private def markMessage[A](m: SyncEventMessage)(fa: F[A]): F[A] = Resource .onFinalizeCase[F] { @@ -141,14 +177,42 @@ final class SyncMessageHandler[F[_]: Async]( .use(_ => fa) object SyncMessageHandler: + def apply[F[_]: Async]( + ps: PipelineSteps[F], + reprovisionService: ReprovisionService[F], + maxConflictRetries: Int = 20 + ): F[SyncMessageHandler[F]] = + Control[F].map(c => + new SyncMessageHandler[F](ps, reprovisionService, c, maxConflictRetries) + ) + + final class Control[F[_]: Sync](s: Semaphore[F]) { + def await: F[Unit] = s.acquire >> s.release + def pause: F[Unit] = s.acquire + def resume: F[Unit] = s.release + def whilePaused: Resource[F, Unit] = + Resource.make(pause)(_ => resume) + } + object Control { + def apply[F[_]: Async] = Semaphore[F](1).map(new Control(_)) + } enum Result: case Upsert(value: UpsertResponse) case Delete(value: DeleteFromSolr.DeleteResult[?]) - - def fold[A](fu: UpsertResponse => A, fd: DeleteFromSolr.DeleteResult[?] => A): A = + case ReprovisionStart(id: Option[Id]) + case ReprovisionFinish(id: Option[Id]) + + def fold[A]( + fu: UpsertResponse => A, + fd: DeleteFromSolr.DeleteResult[?] => A, + frs: ReprovisionStart => A, + frf: ReprovisionFinish => A + ): A = this match - case Upsert(r) => fu(r) - case Delete(r) => fd(r) + case Upsert(r) => fu(r) + case Delete(r) => fd(r) + case r: ReprovisionStart => frs(r) + case r: ReprovisionFinish => frf(r) - def asUpsert: Option[UpsertResponse] = fold(Some(_), _ => None) + def asUpsert: Option[UpsertResponse] = fold(Some(_), _ => None, _ => None, _ => None) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala index 5c437442..914c3ead 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/handler/PipelineSteps.scala @@ -31,16 +31,18 @@ trait PipelineSteps[F[_]]: def fetchFromSolr: FetchFromSolr[F] def deleteFromSolr: DeleteFromSolr[F] def userUtils: UserUtils[F] + def solrClient: SearchSolrClient[F] object PipelineSteps: def apply[F[_]: Async]( - solrClient: SearchSolrClient[F], + searchSolrClient: SearchSolrClient[F], queueClient: Stream[F, QueueClient[F]], inChunkSize: Int )( queue: QueueName ): PipelineSteps[F] = new PipelineSteps[F] { + val solrClient = searchSolrClient val reader: MessageReader[F] = MessageReader[F](queueClient, queue, inChunkSize) val pushToSolr: PushToSolr[F] = diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala new file mode 100644 index 00000000..50c1ba8a --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.process + +import cats.effect.* +import cats.syntax.all.* + +import io.renku.search.events.* +import io.renku.search.model.Id +import io.renku.search.provision.reindex.ReprovisionService +import io.renku.search.provision.reindex.ReprovisionService.ReprovisionRequest + +final private[provision] class Reprovisioning[F[_]: Async]( + reprovisionService: ReprovisionService[F] +): + private val logger = scribe.cats.effect[F] + + // must run reprovisioning in background thread, because this + // cancels processes where one of them triggered this action. it + // would result in a deadlock to have it run inside itself + + def processStart( + msg: EventMessage[ReprovisioningStarted], + result: Option[Id] => F[Unit] + ): F[Fiber[F, Throwable, Option[Id]]] = + Async[F].start { + ReprovisionRequest.started(msg) match + case None => + logger + .info(s"Received reprovision-started message without payload: $msg") + .flatTap(_ => result(None)) + .as(None) + case Some(req) => + reprovisionService + .reprovision(req) + .map { + case true => Some(req.reprovisionId) + case false => None + } + .flatTap(result) + } + + def processFinish(msg: EventMessage[ReprovisioningFinished]): F[Option[Id]] = + ReprovisionRequest.finished(msg) match + case None => + logger.info(s"Received reprovision-finish message without payload: $msg").as(None) + case Some(req) => + reprovisionService.reprovision(req).as(req.reprovisionId.some) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala index ef5ca825..0dc94dbf 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala @@ -48,10 +48,12 @@ trait ReIndexService[F[_]]: */ def resetData(startMessage: Option[MessageId]): F[Unit] + def resetLockDocument: F[Unit] + object ReIndexService: private[reindex] val lockId: String = "reindex_31baded5-9fc2-4935-9b07-80f7a3ecb13f" - def apply[F[_]: Clock: Sync]( + def apply[F[_]: Clock: Async]( bpm: BackgroundProcessManage[F], redisClient: Stream[F, QueueClient[F]], solrClient: SearchSolrClient[F], @@ -61,6 +63,10 @@ object ReIndexService: private val queueName = NonEmptyList.of(queueCfg.dataServiceAllEvents) private val logger = scribe.cats.effect[F] + def resetLockDocument: F[Unit] = + logger.info(s"Reset reindex lock document $lockId") >> + solrClient.underlying.deleteIds(NonEmptyList.of(lockId)) + def startReIndex(startMessage: Option[MessageId]): F[Boolean] = given LockDocument[F, ReIndexDocument] = ReIndexDocument.lockDocument(startMessage) @@ -68,14 +74,14 @@ object ReIndexService: lock.use { case None => logger.debug(s"Re-Index called while already in progress").as(false) - case Some(d) => dropIndexAndRestart(d, startMessage) + case Some(d) => dropIndexAndRestart(d, startMessage).as(true) } def resetData(startMessage: Option[MessageId]): F[Unit] = for _ <- startMessage match case Some(msgId) => - logger.info("Set last seen message id to $msgId for $queueName") >> + logger.info(s"Set last seen message id to $msgId for $queueName") >> redisClient .evalMap(_.markProcessed(queueName, msgId)) .take(1) @@ -115,5 +121,5 @@ object ReIndexService: _ <- resetData(startMessage) _ <- logger.info("Start background processes") _ <- bpm.background(MessageHandlerKey.isInstance) - yield true + yield () } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala new file mode 100644 index 00000000..49d850e1 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import cats.effect.kernel.Sync +import cats.syntax.all.* + +import io.renku.search.events.* +import io.renku.search.model.Id +import io.renku.search.provision.reindex.ReprovisionService.{ReprovisionRequest, Result} +import io.renku.solr.client.SolrClient + +trait ReprovisionService[F[_]]: + def reprovision(req: ReprovisionRequest): F[Result] + + def recreateIndex: F[Result] = reprovision(ReprovisionRequest.lastStart) + def resetLockDocument: F[Unit] + +/* + +re-index is the unconditional reset, reprovision is more controled + +reprovisioning can happen in these cases: + - manual request: get the latest "beginning" message id and do reindex(messageId) + - manual request with a messageId: just call reindex(messageId) - or leave this to reindex? + - reprovsion-start message: validate + store and do reset_index(newMessageId) + - reprovision-finish message: validate + set flag to false + + */ + +object ReprovisionService: + def apply[F[_]: Sync]( + reIndex: ReIndexService[F], + solrClient: SolrClient[F] + ): ReprovisionService[F] = + ReprovisionServiceImpl(reIndex, solrClient) + + type Result = Boolean + + sealed trait ReprovisionRequest + sealed trait ReprovisionMessageData extends ReprovisionRequest: + def messageId: MessageId + def reprovisionId: Id + def isStarted: Boolean + + object ReprovisionRequest { + final case class Started( + messageId: MessageId, + reprovisionId: Id + ) extends ReprovisionMessageData: + val isStarted = true + + final case class Done( + messageId: MessageId, + reprovisionId: Id + ) extends ReprovisionMessageData: + val isStarted = false + + case object FromLastStart extends ReprovisionRequest + + final case class FromMessage(messageId: MessageId) extends ReprovisionRequest + + def started( + msg: EventMessage[ReprovisioningStarted] + ): Option[ReprovisionMessageData] = + msg.payload.headOption.map(p => Started(msg.id, p.id)) + def finished( + msg: EventMessage[ReprovisioningFinished] + ): Option[ReprovisionMessageData] = + msg.payload.headOption.map(p => Done(msg.id, p.id)) + + def lastStart: ReprovisionRequest = FromLastStart + def forSpecificMessage(id: MessageId): ReprovisionRequest = FromMessage(id) + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala new file mode 100644 index 00000000..c98555a9 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala @@ -0,0 +1,175 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import scala.concurrent.duration.* + +import cats.Applicative +import cats.effect.* +import cats.syntax.all.* + +import io.bullet.borer.derivation.{MapBasedCodecs, key} +import io.bullet.borer.{Decoder, Encoder} +import io.renku.search.events.* +import io.renku.search.model.Id +import io.renku.search.provision.reindex.ReprovisionService.* +import io.renku.search.provision.reindex.ReprovisionServiceImpl.ReprovisionManageDoc +import io.renku.solr.client.DocVersion +import io.renku.solr.client.SolrClient +import io.renku.solr.client.util.DocumentLockResource +import io.renku.solr.client.util.LockDocument + +final class ReprovisionServiceImpl[F[_]: Sync]( + reIndex: ReIndexService[F], + solrClient: SolrClient[F] +) extends ReprovisionService[F]: + + private val logger = scribe.cats.effect[F] + private val docId = ReprovisionServiceImpl.docId + + def resetLockDocument: F[Unit] = + logger.info(s"Reset reprovision lock document $docId") >> + solrClient.upsertLoop[ReprovisionManageDoc, Unit](docId) { doc => + (doc.map(_.copy(isHandling = false)), ()) + } + + def reprovision(req: ReprovisionRequest): F[Result] = req match + case ReprovisionRequest.FromMessage(id) => + logger.info(s"Reprovsion index from message $id") >> reIndex + .startReIndex(Some(id)) + + case ReprovisionRequest.FromLastStart => + solrClient + .findById[ReprovisionManageDoc](docId) + .map(_.responseBody.docs.headOption) + .flatMap { + case None => + logger.info("Reprovsion index from the beginning!") >> reIndex.startReIndex( + None + ) + case Some(doc) => + logger.info( + s"Reprovision index from stored message-id ${doc.messageId}" + ) >> reIndex + .startReIndex(doc.messageId.some) + } + + case data: ReprovisionRequest.Started => + given LockDocument[F, ReprovisionManageDoc] = + ReprovisionManageDoc.lockDocument[F](data) + + solrClient + .findById[ReprovisionManageDoc](docId) + .map(_.responseBody.docs.headOption) + .flatMap { + case Some(d) if d.messageId >= data.messageId => + logger + .info( + s"Ignore reprovision-started message for a messageId in the past (${data.messageId})" + ) + .as(false) + + case _ => + DocumentLockResource(solrClient).make(docId).use { + case None => + logger + .info( + "Handling reprovision-started message is already taking place" + ) + .as(false) + + case Some(doc) => + logger + .info( + s"Handling reprovision-started message by triggering re-index from message ${data.messageId}" + ) + .as(true) + + } + } + + case data: ReprovisionRequest.Done => + solrClient + .upsertLoop[ReprovisionManageDoc, Boolean]( + docId, + interval = 30.millis, + timeout = 2.minutes + ) { + case Some(doc) if data.reprovisionId == doc.reprovisionId => + (doc.copy(reprovisionStarted = false).some, true) + case _ => + (None, false) + } + .flatMap { + case true => + logger.debug( + s"ReprovisionManageDoc updated to finished state for ${data.reprovisionId}" + ) + case false => + logger + .info( + s"Ignore reprovision-finish message for unknown id (${data.reprovisionId})" + ) + } + .attempt + .flatMap { + case Left(ex) => + logger + .warn( + "Error updating finished state on reprovision document ${data.reprovisionId}", + ex + ) + .as(false) + + case Right(_) => true.pure[F] + } + +object ReprovisionServiceImpl: + private[reindex] val docId = "REPROV_MSG_0f8f7edc-ddc6-45aa-8655-4494d063c8d9" + + final case class ReprovisionManageDoc( + id: String, + @key("message_id_s") messageId: MessageId, + @key("reprovision_id_s") reprovisionId: Id, + @key("reprovision_started_b") reprovisionStarted: Boolean, + @key("is_handling_b") isHandling: Boolean, + @key("_version_") version: DocVersion + ) + + object ReprovisionManageDoc { + given Encoder[ReprovisionManageDoc] = MapBasedCodecs.deriveEncoder + given Decoder[ReprovisionManageDoc] = MapBasedCodecs.deriveDecoder + + def lockDocument[F[_]: Applicative]( + msg: ReprovisionMessageData + ): LockDocument[F, ReprovisionManageDoc] = + LockDocument( + !_.isHandling, + (d, id) => + ReprovisionManageDoc( + d.map(_.id).getOrElse(id), + msg.messageId, + msg.reprovisionId, + msg.isStarted, + true, + d.map(_.version).getOrElse(DocVersion.NotExists) + ).pure[F], + _.copy(isHandling = false).some + ) + } diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala index 8d200a79..9d367330 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala @@ -30,6 +30,7 @@ import io.renku.search.config.QueuesConfig import io.renku.search.model.{EntityType, Id, Namespace} import io.renku.search.provision.handler.PipelineSteps import io.renku.search.provision.reindex.ReIndexService +import io.renku.search.provision.reindex.ReprovisionService import io.renku.search.solr.client.{SearchSolrClient, SearchSolrSuite} import io.renku.search.solr.documents.{Group as GroupDocument, User as UserDocument, *} import io.renku.search.solr.query.SolrToken @@ -50,7 +51,6 @@ trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueS Stream[IO, QueueClient[IO]](queue), inChunkSize = 1 ) - handlers = MessageHandlers[IO](steps, queueConfig) bpm <- BackgroundProcessManage[IO](50.millis) reindex = ReIndexService[IO]( bpm, @@ -58,11 +58,14 @@ trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueS solrClient, queueConfig ) + rps = ReprovisionService(reindex, solrClient.underlying) + ctrl <- Resource.eval(SyncMessageHandler.Control[IO]) + handlers = MessageHandlers[IO](steps, rps, queueConfig, ctrl) yield TestServices(steps, handlers, queue, solrClient, bpm, reindex) val testServices = ResourceSuiteLocalFixture("test-services", testServicesR) - override def munitFixtures = List(solrServer, redisServer, testServices) + override def munitFixtures = List(solrServer, redisServer, testServices, redisClients) def loadProjectsByNs(solrClient: SearchSolrClient[IO])( ns: Namespace @@ -109,6 +112,29 @@ trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueS ) ).mapN((a, b) => a.toSet ++ b.toSet) + def waitForSolrDocs( + services: TestServices, + query: QueryData, + until: List[EntityDocument] => Boolean, + timeout: FiniteDuration = 30.seconds + ): IO[List[EntityDocument]] = + Stream + .repeatEval( + scribe.cats.io.trace(s"Searching $query…") >> + services.searchClient + .queryAll[EntityDocument](query) + .compile + .toList + ) + .takeThrough(d => !until(d)) + .meteredStartImmediately(50.millis) + .interruptWhen( + IO.sleep(timeout) + .as(Left(new Exception(s"Query timed out after $timeout: $query"))) + ) + .compile + .lastOrError + object ProvisioningSuite: val queueConfig: QueuesConfig = QueuesConfig( projectCreated = QueueName("projectCreated"), diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala index 8c356100..e40df517 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala @@ -24,6 +24,7 @@ import io.renku.queue.client.QueueClient import io.renku.redis.client.QueueName import io.renku.search.provision.handler.PipelineSteps import io.renku.search.provision.reindex.ReIndexService +import io.renku.search.provision.reindex.ReprovisionService import io.renku.search.solr.client.SearchSolrClient final case class TestServices( @@ -34,6 +35,8 @@ final case class TestServices( backgroundManage: BackgroundProcessManage[IO], reindex: ReIndexService[IO] ): + val reprovision: ReprovisionService[IO] = + ReprovisionService(reindex, searchClient.underlying) def syncHandler(qn: QueueName): SyncMessageHandler[IO] = - SyncMessageHandler[IO](pipelineSteps(qn)) + messageHandlers.createHandler(qn) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala index 111230cc..0b5b1771 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala @@ -18,10 +18,7 @@ package io.renku.search.provision.reindex -import scala.concurrent.duration.* - import cats.effect.* -import fs2.Stream import io.renku.events.EventsGenerators import io.renku.redis.client.QueueName @@ -31,7 +28,6 @@ import io.renku.search.config.QueuesConfig import io.renku.search.model.Name import io.renku.search.provision.MessageHandlers.MessageHandlerKey import io.renku.search.provision.ProvisioningSuite -import io.renku.search.provision.TestServices import io.renku.search.solr.documents.{EntityDocument, Project as ProjectDocument} import io.renku.search.solr.schema.EntityDocumentSchema import io.renku.solr.client.* @@ -48,20 +44,6 @@ class ReIndexServiceSpec extends ProvisioningSuite: QueryData(QueryString("_kind:*", 10, 0)) .withSort(SolrSort(EntityDocumentSchema.Fields.id -> SolrSort.Direction.Asc)) - def waitForSolrDocs(services: TestServices, size: Int): IO[List[EntityDocument]] = - Stream - .repeatEval( - services.searchClient - .queryAll[EntityDocument](allQuery) - .compile - .toList - ) - .takeThrough(_.size != size) - .meteredStartImmediately(15.millis) - .timeout(5.minutes) - .compile - .lastOrError - test("re-index restores data from redis stream"): for services <- IO(testServices()) @@ -77,7 +59,7 @@ class ReIndexServiceSpec extends ProvisioningSuite: mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) - docs <- waitForSolrDocs(services, 2) + docs <- waitForSolrDocs(services, allQuery, _.size == 2) _ = assertEquals(docs.size, 2) // corrupt the data at solr @@ -96,7 +78,7 @@ class ReIndexServiceSpec extends ProvisioningSuite: // re-indexing has been initiated, meaning that solr has been // cleared and background processes restarted. So only need to // wait for the 2 documents to reappear - docs2 <- waitForSolrDocs(services, 2) + docs2 <- waitForSolrDocs(services, allQuery, _.size == 2) _ = assertEquals(docs2.size, 2) // afterwards, the initial state should be re-created diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala new file mode 100644 index 00000000..ff9f8056 --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala @@ -0,0 +1,278 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import cats.effect.* + +import io.renku.events.EventsGenerators +import io.renku.redis.client.QueueName +import io.renku.redis.client.RedisClientGenerators +import io.renku.search.GeneratorSyntax.* +import io.renku.search.config.QueuesConfig +import io.renku.search.model.Id +import io.renku.search.provision.MessageHandlers.MessageHandlerKey +import io.renku.search.provision.ProvisioningSuite +import io.renku.search.provision.TestServices +import io.renku.search.provision.reindex.ReprovisionService.ReprovisionRequest +import io.renku.search.solr.schema.EntityDocumentSchema +import io.renku.solr.client.* +import org.scalacheck.Gen + +class ReprovisionServiceSpec extends ProvisioningSuite: + type ManagementDoc = ReprovisionServiceImpl.ReprovisionManageDoc + val mgmtDocId = ReprovisionServiceImpl.docId + + override def defaultVerbosity: Int = 2 + override val queueConfig: QueuesConfig = + ProvisioningSuite.queueConfig.copy(dataServiceAllEvents = + RedisClientGenerators.queueNameGen.generateOne + ) + + val allQuery: QueryData = + QueryData(QueryString("_kind:*", 10, 0)) + .withSort(SolrSort(EntityDocumentSchema.Fields.id -> SolrSort.Direction.Asc)) + + def idQuery(id: Id, more: Id*): QueryData = + val idsq = (id +: more).map(i => s"id:$i").mkString(" OR ") + QueryData(QueryString(s"_kind:* AND ($idsq)", more.size + 1, 0)) + .withSort(SolrSort(EntityDocumentSchema.Fields.id -> SolrSort.Direction.Asc)) + + // must cancel all handlers after each test case + def reset: Resource[IO, TestServices] = + Resource.make( + IO(testServices()) + .flatTap(s => truncateAll(s.searchClient.underlying)(Seq.empty, Seq.empty)) + .flatTap(_ => redisClearAll) + )(_.backgroundManage.cancelProcesses(_ => true)) + + test("reprovision started sets a document with correct messsage id"): + reset.use { services => + for + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + + // no management document + _ <- services.searchClient.underlying + .findById[ManagementDoc](mgmtDocId) + .assert(_.responseBody.docs.isEmpty) + + // set two projects + proj1 <- IO(EventsGenerators.projectCreatedGen("p1").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p2").generateOne) + query = idQuery(proj1.id, proj2.id) + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + + docs <- waitForSolrDocs(services, query, _.size >= 2) + _ = assertEquals(docs.size, 2) + + r <- services.reprovision.reprovision( + ReprovisionRequest.Started(mId1, Id("repro-id")) + ) + _ = assert(r) + + docsN <- waitForSolrDocs(services, query, _.size >= 1) + _ = assertEquals(docsN.size, 1) + + // management document + _ <- services.searchClient.underlying + .findById[ManagementDoc](mgmtDocId) + .assert { resp => + resp.responseBody.docs.size == 1 && + resp.responseBody.docs.head.messageId == mId1 + } + yield () + } + + test("receiving a past message id, ignore reprovisioning"): + reset.use { services => + for + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + + // no management document + _ <- services.searchClient.underlying + .findById[ManagementDoc](mgmtDocId) + .assert(_.responseBody.docs.isEmpty) + + // set two projects + proj1 <- IO(EventsGenerators.projectCreatedGen("p3").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p4").generateOne) + query = idQuery(proj1.id, proj2.id) + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + + _ <- waitForSolrDocs(services, query, _.size >= 2).assert(_.size == 2) + + r <- services.reprovision.reprovision( + ReprovisionRequest.Started(mId2, Id("repro-id")) + ) + _ = assert(r) + + _ <- waitForSolrDocs(services, query, _.isEmpty).assert(_.size == 0) + + r2 <- services.reprovision.reprovision( + ReprovisionRequest.Started(mId1, Id("repro-id")) + ) + _ = assert(!r2) + + // management document is unchanged + _ <- services.searchClient.underlying + .findById[ManagementDoc](mgmtDocId) + .assert { resp => + resp.responseBody.docs.size == 1 && + resp.responseBody.docs.head.messageId == mId2 + } + yield () + } + + test("from last start uses stored message id"): + reset.use { services => + for + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + + // no management document + _ <- services.searchClient.underlying + .findById[ManagementDoc](mgmtDocId) + .assert(_.responseBody.docs.isEmpty) + + // set two projects + proj1 <- IO(EventsGenerators.projectCreatedGen("p5").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p6").generateOne) + + query = idQuery(proj1.id, proj2.id) + + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + + _ <- waitForSolrDocs(services, query, _.size >= 2).assert(_.size == 2) + r <- services.reprovision.reprovision( + ReprovisionRequest.Started(mId2, Id("repro-id")) + ) + _ = assert(r) + _ <- waitForSolrDocs(services, query, _.isEmpty).assert(_.size == 0) + + // forcefully set a messageId, so it re-processes only the last one + _ <- services.searchClient.underlying.upsertLoop[ManagementDoc, Unit](mgmtDocId) { + doc => + (doc.map(_.copy(messageId = mId1)), ()) + } + + _ <- services.reprovision.recreateIndex.assertEquals(true) + + _ <- waitForSolrDocs(services, query, _.size >= 1).assert(_.size == 1) + yield () + } + + test("process reprovisioning started message"): + reset.use { services => + for + proj1 <- IO(EventsGenerators.projectCreatedGen("p7").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p8").generateOne) + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO( + EventsGenerators + .eventMessageGen(EventsGenerators.reprovisionStarted()) + .generateOne + ) + msg3 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + mId3 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg3) + + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + + query = idQuery(proj1.id, proj2.id) + docs <- waitForSolrDocs(services, query, _.map(_.id).contains(proj2.id)) + _ = assertEquals(docs.map(_.id), List(proj2.id)) + // _ <- services.backgroundManage.cancelProcesses(_ => true) + yield () + } + + test("skip previous reprovisioning started messages"): + reset.use { services => + for + // SETUP + // set up 2 projects (proj1 + proj2), reset index and then add 1 project (proj3) + + proj1 <- IO(EventsGenerators.projectCreatedGen("p9").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p10").generateOne) + proj3 <- IO(EventsGenerators.projectCreatedGen("p11").generateOne) + + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + msg3 <- IO( + EventsGenerators + .eventMessageGen(EventsGenerators.reprovisionStarted()) + .generateOne + ) + msg4 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj3)).generateOne) + + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + mId3 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg3) + mId4 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg4) + + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + + query = idQuery(proj1.id, proj2.id, proj3.id) + docs <- waitForSolrDocs( + services, + query, + d => d.map(_.id).contains(proj3.id) + ) + + // when running re-index from first message (so starts with + // processing the second msg), it should create proj2, ignore + // the already processed reprovision-started msg and then create + // proj3 again + _ <- services.reindex.startReIndex(Some(mId1)) + docs2 <- waitForSolrDocs( + services, + query, + d => d.map(_.id).toSet == Set(proj2.id, proj3.id) + ) + _ = assertEquals(docs2.map(_.id).toSet, Set(proj3.id, proj2.id)) + // _ <- services.backgroundManage.cancelProcesses(_ => true) + yield () + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala index 19183ebc..c61a7446 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala @@ -18,6 +18,8 @@ package io.renku.solr.client +import scala.concurrent.duration.* + import cats.MonadThrow import cats.data.NonEmptyList import cats.effect.{Async, Resource} @@ -45,6 +47,13 @@ trait SolrClient[F[_]]: def upsert[A: Encoder](docs: Seq[A]): F[UpsertResponse] def upsertSuccess[A: Encoder](docs: Seq[A]): F[Unit] + def upsertLoop[D: Decoder: Encoder, R]( + id: String, + timeout: FiniteDuration = 1.seconds, + interval: FiniteDuration = 15.millis + )( + update: Option[D] => (Option[D], R) + ): F[R] def findById[A: Decoder](id: String, other: String*): F[GetByIdResponse[A]] diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala index 7ea8fd83..2742b6de 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala @@ -18,6 +18,8 @@ package io.renku.solr.client +import scala.concurrent.duration.* + import cats.data.NonEmptyList import cats.effect.Async import cats.syntax.all.* @@ -97,6 +99,27 @@ private class SolrClientImpl[F[_]: Async](val config: SolrConfig, underlying: Cl ) } + def upsertLoop[D: Decoder: Encoder, R]( + id: String, + timeout: FiniteDuration, + interval: FiniteDuration + )( + update: Option[D] => (Option[D], R) + ): F[R] = + val task = + findById[D](id).map(_.responseBody.docs.headOption).map(update).flatMap { + case (None, r) => (UpsertResponse.Success(ResponseHeader.empty), r).pure[F] + case (Some(doc), r) => upsert(Seq(doc)).map(_ -> r) + } + fs2.Stream + .repeatEval(task) + .meteredStartImmediately(interval) + .takeThrough(_._1.isFailure) + .map(_._2) + .timeout(timeout) + .compile + .lastOrError + def getSchema: F[SchemaResponse] = val url = solrUrl / "schema" val req = Method.GET(url) diff --git a/project/AvroSchemaDownload.scala b/project/AvroSchemaDownload.scala index f47106ca..73130428 100644 --- a/project/AvroSchemaDownload.scala +++ b/project/AvroSchemaDownload.scala @@ -58,7 +58,8 @@ object AvroSchemaDownload extends AutoPlugin { schemaTargetDirectory.value / "v2" / "common", schemaTargetDirectory.value / "v2" / "user", schemaTargetDirectory.value / "v2" / "project", - schemaTargetDirectory.value / "v2" / "group" + schemaTargetDirectory.value / "v2" / "group", + schemaTargetDirectory.value / "v2" / "notify" ), Compile / sourceGenerators += Def .sequential( From 8b5d2033d5ca2d491ad66b5a04edabfbe721f8aa Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Fri, 13 Sep 2024 13:32:40 +0200 Subject: [PATCH 2/5] Add metrics for message handling --- .../search/provision/MessageMetrics.scala | 119 ++++++++++++++++++ .../renku/search/provision/Microservice.scala | 1 + .../search/provision/SyncMessageHandler.scala | 2 + .../provision/process/Reprovisioning.scala | 5 +- .../reindex/ReprovisionService.scala | 13 -- .../reindex/ReprovisionServiceImpl.scala | 12 +- 6 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/MessageMetrics.scala diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/MessageMetrics.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/MessageMetrics.scala new file mode 100644 index 00000000..a9ecb3bf --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/MessageMetrics.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.effect.* + +import io.prometheus.client.Counter +import io.renku.search.events.MsgType +import io.renku.search.events.SyncEventMessage +import io.renku.search.metrics.Collector +import io.renku.search.provision.handler.DeleteFromSolr +import io.renku.solr.client.UpsertResponse + +object MessageMetrics: + def countReceived[F[_]: Sync](msg: SyncEventMessage): F[Unit] = + ReceivedMessageCounter.increment(msg.header.msgType) + + def countResult[F[_]: Sync]( + msg: SyncEventMessage, + result: SyncMessageHandler.Result + ): F[Unit] = + val mt = msg.header.msgType + result match + case SyncMessageHandler.Result.Upsert(UpsertResponse.Success(_)) => + SuccessMessageCounter.increment(mt) + case SyncMessageHandler.Result.Upsert(UpsertResponse.VersionConflict) => + FailedMessageCounter.increment(mt) + + case SyncMessageHandler.Result.Delete(DeleteFromSolr.DeleteResult.Success(_)) => + SuccessMessageCounter.increment(mt) + case SyncMessageHandler.Result.Delete(DeleteFromSolr.DeleteResult.NoIds(_)) => + IgnoredMessageCounter.increment(mt) + case SyncMessageHandler.Result.Delete(DeleteFromSolr.DeleteResult.Failed(_, _)) => + FailedMessageCounter.increment(mt) + + case SyncMessageHandler.Result.ReprovisionStart(Some(_)) => + SuccessMessageCounter.increment(mt) + case SyncMessageHandler.Result.ReprovisionStart(None) => + IgnoredMessageCounter.increment(mt) + + case SyncMessageHandler.Result.ReprovisionFinish(Some(_)) => + SuccessMessageCounter.increment(mt) + case SyncMessageHandler.Result.ReprovisionFinish(None) => + IgnoredMessageCounter.increment(mt) + + def all: List[MessageCounter] = + List( + ReceivedMessageCounter, + SuccessMessageCounter, + FailedMessageCounter, + IgnoredMessageCounter + ) + + trait MessageCounter extends Collector: + def asJCollector: Counter + def increment[F[_]](mt: MsgType)(using Sync[F]): F[Unit] = + Sync[F].blocking(asJCollector.labels(mkLabel(mt)).inc()) + private def mkLabel(mt: MsgType): String = + mt.name.replace('.', '_') + + object ReceivedMessageCounter extends Collector with MessageCounter: + private val underlying = + Counter + .build() + .name("received_messages") + .help("Total number of received messages") + .labelNames("type") + .create() + val asJCollector: Counter = underlying + + object SuccessMessageCounter extends Collector with MessageCounter: + private val underlying = + Counter + .build() + .name("success_messages") + .help("Total number of received messages that have been processed successfully") + .labelNames("type") + .create() + val asJCollector: Counter = underlying + + object FailedMessageCounter extends Collector with MessageCounter: + private val underlying = + Counter + .build() + .name("failed_messages") + .labelNames("type") + .help( + "Total number of received messages that have been processed, but resulted in an error" + ) + .create() + val asJCollector: Counter = underlying + + object IgnoredMessageCounter extends Collector with MessageCounter: + private val underlying = + Counter + .build() + .name("ignored_messages") + .labelNames("type") + .help( + "Total number of received messages that have been received, but ignored for processing" + ) + .create() + val asJCollector: Counter = underlying diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index f18f183a..acf7b1e1 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -45,6 +45,7 @@ object Microservice extends IOApp: registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics .add(RedisMetrics.queueSizeGauge) .add(RedisMetrics.unprocessedGauge) + .addAll(MessageMetrics.all) .addAll(SolrMetrics.allCollectors) metrics = metricsUpdaterTask(services) httpServer = httpServerTask(registryBuilder, services) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala index 1ae3117c..770b0db5 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SyncMessageHandler.scala @@ -59,7 +59,9 @@ final class SyncMessageHandler[F[_]: Async]( _.evalMap { msg => for _ <- control.await + _ <- MessageMetrics.countReceived(msg) r <- processEvent(msg) + _ <- MessageMetrics.countResult(msg, r) yield r } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala index 50c1ba8a..2aef6663 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/process/Reprovisioning.scala @@ -61,4 +61,7 @@ final private[provision] class Reprovisioning[F[_]: Async]( case None => logger.info(s"Received reprovision-finish message without payload: $msg").as(None) case Some(req) => - reprovisionService.reprovision(req).as(req.reprovisionId.some) + reprovisionService.reprovision(req).map { + case true => Some(req.reprovisionId) + case false => None + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala index 49d850e1..2bcdbd45 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionService.scala @@ -28,22 +28,9 @@ import io.renku.solr.client.SolrClient trait ReprovisionService[F[_]]: def reprovision(req: ReprovisionRequest): F[Result] - def recreateIndex: F[Result] = reprovision(ReprovisionRequest.lastStart) def resetLockDocument: F[Unit] -/* - -re-index is the unconditional reset, reprovision is more controled - -reprovisioning can happen in these cases: - - manual request: get the latest "beginning" message id and do reindex(messageId) - - manual request with a messageId: just call reindex(messageId) - or leave this to reindex? - - reprovsion-start message: validate + store and do reset_index(newMessageId) - - reprovision-finish message: validate + set flag to false - - */ - object ReprovisionService: def apply[F[_]: Sync]( reIndex: ReIndexService[F], diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala index c98555a9..48749242 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReprovisionServiceImpl.scala @@ -60,9 +60,8 @@ final class ReprovisionServiceImpl[F[_]: Sync]( .map(_.responseBody.docs.headOption) .flatMap { case None => - logger.info("Reprovsion index from the beginning!") >> reIndex.startReIndex( - None - ) + logger.info("Reprovsion index from the beginning!") >> + reIndex.startReIndex(None) case Some(doc) => logger.info( s"Reprovision index from stored message-id ${doc.messageId}" @@ -95,11 +94,12 @@ final class ReprovisionServiceImpl[F[_]: Sync]( .as(false) case Some(doc) => - logger - .info( + for + _ <- logger.info( s"Handling reprovision-started message by triggering re-index from message ${data.messageId}" ) - .as(true) + _ <- reIndex.startReIndex(Some(data.messageId)) + yield true } } From 06d81c9b8097602eba61ef10a9f624bb38899709 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Fri, 13 Sep 2024 13:33:03 +0200 Subject: [PATCH 3/5] Remove seemingly left over line This line looks like being forgotten, without any purpose. --- .../main/scala/io/renku/redis/client/RedisQueueClient.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 725d12d1..c3e3cdbf 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -122,7 +122,7 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient) lazy val logInfo: ((XReadMessage[?, ?], Option[RedisMessage])) => F[Unit] = { case (m, None) => - Log[F].info( + logger.warn( s"Message '${m.id}' skipped as it has no '${MessageBodyKeys.headers}' or '${MessageBodyKeys.payload}'" ) case _ => ().pure[F] @@ -216,7 +216,6 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient) client.underlying.connectAsync[K, V](codec.underlying, client.uri.underlying) ) - client.underlying.connect val release: StatefulRedisConnection[K, V] => F[Unit] = c => FutureLift[F].lift(c.closeAsync()) *> Log[F].info(s"Releasing Streaming connection: ${client.uri.underlying}") From 1c627aa9eb6b08ff6f701856dca94cb5ee68f678 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 17 Sep 2024 11:20:14 +0200 Subject: [PATCH 4/5] Set correct host name for dev container and vm --- flake.nix | 4 ++++ .../search/provision/reindex/ReprovisionServiceSpec.scala | 1 - nix/services.nix | 2 -- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flake.nix b/flake.nix index 7278726e..0f3e9ad3 100644 --- a/flake.nix +++ b/flake.nix @@ -19,6 +19,7 @@ ./nix/services.nix { virtualisation.memorySize = 2048; + networking.hostName = "rsdev-vm"; } ]; }; @@ -27,6 +28,9 @@ system = flake-utils.lib.system.x86_64-linux; modules = [ ./nix/services.nix + { + networking.hostName = "rsdev-cnt"; + } ]; }; }; diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala index ff9f8056..133a4ae1 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReprovisionServiceSpec.scala @@ -38,7 +38,6 @@ class ReprovisionServiceSpec extends ProvisioningSuite: type ManagementDoc = ReprovisionServiceImpl.ReprovisionManageDoc val mgmtDocId = ReprovisionServiceImpl.docId - override def defaultVerbosity: Int = 2 override val queueConfig: QueuesConfig = ProvisioningSuite.queueConfig.copy(dataServiceAllEvents = RedisClientGenerators.queueNameGen.generateOne diff --git a/nix/services.nix b/nix/services.nix index 4cae5ec9..b86aa552 100644 --- a/nix/services.nix +++ b/nix/services.nix @@ -10,8 +10,6 @@ heap = 1024; }; - networking.hostName = "rsdev"; - services.dev-redis = { enable = true; instance = "search"; From 211ee48e290dfaecbca2b047123ba784e421c713 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Wed, 18 Sep 2024 14:24:33 +0200 Subject: [PATCH 5/5] Cancel all handlers on finalizing the background process manager --- .../provision/BackgroundProcessManage.scala | 123 ++++++++++-------- 1 file changed, 67 insertions(+), 56 deletions(-) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/BackgroundProcessManage.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/BackgroundProcessManage.scala index 610c6200..03bbfe44 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/BackgroundProcessManage.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/BackgroundProcessManage.scala @@ -79,62 +79,73 @@ object BackgroundProcessManage: retryDelay: FiniteDuration, maxRetries: Option[Int] = None ): Resource[F, BackgroundProcessManage[F]] = - val logger = scribe.cats.effect[F] Supervisor[F](await = false).flatMap { supervisor => - Resource.eval(Ref.of[F, State[F]](State.empty[F])).map { state => - new BackgroundProcessManage[F] { - def register(name: TaskName, task: F[Unit]): F[Unit] = - state.update(_.put(name, wrapTask(name, task))) - - def startAll: F[Unit] = - state.get - .flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >> - background(_ => true) - - def currentProcesses: F[Set[TaskName]] = - state.get.map(_.processes.keySet) - - def background(taskFilter: TaskName => Boolean): F[Unit] = - for { - ts <- state.get.map(_.getTasks(taskFilter)) - _ <- ts.toList - .traverse { case (name, task) => - supervisor.supervise(task).map(t => name -> t) - } - .map(_.toMap) - .flatMap(ps => state.update(_.setProcesses(ps))) - } yield () - - /** Stop all tasks by filtering on their registered name. */ - def cancelProcesses(filter: TaskName => Boolean): F[Unit] = - for - current <- state.get - ps = current.getProcesses(filter) - _ <- ps.toList.traverse_ { case (name, p) => - logger.info(s"Cancel background process $name") >> p.cancel >> p.join - .flatMap(out => logger.info(s"Task $name cancelled: $out")) - } - _ <- state.update(_.removeProcesses(ps.keySet)) - yield () - - private def wrapTask(name: TaskName, task: F[Unit]): F[Unit] = - def run(c: Ref[F, Long]): F[Unit] = - logger.info(s"Starting process for: ${name}") >> - task.handleErrorWith { err => - c.updateAndGet(_ + 1).flatMap { - case n if maxRetries.exists(_ <= n) => - logger.error( - s"Max retries ($maxRetries) for process ${name} exceeded" - ) >> Async[F].raiseError(err) - case n => - val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("") - logger.error( - s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying", - err - ) >> Async[F].delayBy(run(c), retryDelay) - } - } - Ref.of[F, Long](0).flatMap(run) >> state.update(_.removeProcesses(Set(name))) - } + Resource.eval(Ref.of[F, State[F]](State.empty[F])).flatMap { state => + Resource + .make(Async[F].pure(new Impl(supervisor, state, retryDelay, maxRetries)))( + _.cancelProcesses(_ => true) + ) } } + + private class Impl[F[_]: Async]( + supervisor: Supervisor[F], + state: Ref[F, State[F]], + retryDelay: FiniteDuration, + maxRetries: Option[Int] = None + ) extends BackgroundProcessManage[F] { + val logger = scribe.cats.effect[F] + + def register(name: TaskName, task: F[Unit]): F[Unit] = + state.update(_.put(name, wrapTask(name, task))) + + def startAll: F[Unit] = + state.get + .flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >> + background(_ => true) + + def currentProcesses: F[Set[TaskName]] = + state.get.map(_.processes.keySet) + + def background(taskFilter: TaskName => Boolean): F[Unit] = + for { + ts <- state.get.map(_.getTasks(taskFilter)) + _ <- ts.toList + .traverse { case (name, task) => + supervisor.supervise(task).map(t => name -> t) + } + .map(_.toMap) + .flatMap(ps => state.update(_.setProcesses(ps))) + } yield () + + /** Stop all tasks by filtering on their registered name. */ + def cancelProcesses(filter: TaskName => Boolean): F[Unit] = + for + current <- state.get + ps = current.getProcesses(filter) + _ <- ps.toList.traverse_ { case (name, p) => + logger.info(s"Cancel background process $name") >> p.cancel >> p.join + .flatMap(out => logger.info(s"Task $name cancelled: $out")) + } + _ <- state.update(_.removeProcesses(ps.keySet)) + yield () + + private def wrapTask(name: TaskName, task: F[Unit]): F[Unit] = + def run(c: Ref[F, Long]): F[Unit] = + logger.info(s"Starting process for: ${name}") >> + task.handleErrorWith { err => + c.updateAndGet(_ + 1).flatMap { + case n if maxRetries.exists(_ <= n) => + logger.error( + s"Max retries ($maxRetries) for process ${name} exceeded" + ) >> Async[F].raiseError(err) + case n => + val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("") + logger.error( + s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying", + err + ) >> Async[F].delayBy(run(c), retryDelay) + } + } + Ref.of[F, Long](0).flatMap(run) >> state.update(_.removeProcesses(Set(name))) + }