From 471268d005a09c6e0eeb4a706759c93d08eb5906 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 3 Sep 2024 17:57:33 +0200 Subject: [PATCH] Add endpoint to trigger re-creating the index from the stream Sometimes it is necessary to re-create the entire index from the raw data (for example, when solr schema changes happen). This is done by reading the redis stream from the beginning (or from a given message). The endpoint is only available from the provisioning process and not from the api. --- .../io/renku/search/events/MessageId.scala | 5 + .../renku/redis/client/RedisQueueClient.scala | 20 +++ .../redis/client/RedisQueueClientSpec.scala | 14 ++ .../io/renku/queue/client/QueueClient.scala | 2 + .../renku/queue/client/QueueClientImpl.scala | 3 + .../renku/search/provision/Microservice.scala | 8 +- .../io/renku/search/provision/Routes.scala | 19 ++- .../io/renku/search/provision/Services.scala | 8 +- .../provision/reindex/ReIndexDocument.scala | 51 ++++++++ .../provision/reindex/ReIndexService.scala | 98 ++++++++++++++ .../search/provision/ProvisioningSuite.scala | 51 +++++--- .../renku/search/provision/TestServices.scala | 5 +- .../reindex/ReIndexServiceSpec.scala | 121 ++++++++++++++++++ 13 files changed, 375 insertions(+), 30 deletions(-) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexDocument.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala create mode 100644 modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala diff --git a/modules/events/src/main/scala/io/renku/search/events/MessageId.scala b/modules/events/src/main/scala/io/renku/search/events/MessageId.scala index f967416c..8e985959 100644 --- a/modules/events/src/main/scala/io/renku/search/events/MessageId.scala +++ b/modules/events/src/main/scala/io/renku/search/events/MessageId.scala @@ -18,6 +18,8 @@ package io.renku.search.events +import io.bullet.borer.* + opaque type MessageId = String object MessageId: @@ -25,3 +27,6 @@ object MessageId: def apply(id: String): MessageId = id extension (self: MessageId) def value: String = self + + given Decoder[MessageId] = Decoder.forString + given Encoder[MessageId] = Encoder.forString diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 9572e176..725d12d1 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -56,6 +56,11 @@ trait RedisQueueClient[F[_]] { messageId: MessageId ): F[Unit] + def removeLastProcessed( + clientId: ClientId, + queueNames: NonEmptyList[QueueName] + ): F[Unit] + def findLastProcessed( clientId: ClientId, queueNames: NonEmptyList[QueueName] @@ -144,6 +149,21 @@ class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient) } } + override def removeLastProcessed( + clientId: ClientId, + queueNames: NonEmptyList[QueueName] + ): F[Unit] = + val key = formProcessedKey(clientId, queueNames) + createStringCommands.use { cmd => + logger.debug(s"Delete last message-id for: $key") >> + cmd.del(key).flatMap(n => logger.debug(s"Deleted $n")).recoverWith { case ex => + logger.warn( + s"Error deleting last message-id '$key'", + ex + ) + } + } + override def findLastProcessed( clientId: ClientId, queueNames: NonEmptyList[QueueName] diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index 1ad99b9f..534d0bc5 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -152,6 +152,20 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisBaseSuite: .map(v => assert(v contains messageId)) yield () + test("remove last seen message id"): + val clientId = RedisClientGenerators.clientIdGen.generateOne + val queue = NonEmptyList.of(RedisClientGenerators.queueNameGen.generateOne) + val messageId = RedisClientGenerators.messageIdGen.generateOne + for + client <- IO(redisClients().queueClient) + _ <- client.markProcessed(clientId, queue, messageId) + mid <- client.findLastProcessed(clientId, queue) + _ = assertEquals(mid, Some(messageId)) + + _ <- client.removeLastProcessed(clientId, queue) + _ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty)) + yield () + test("can find out the total size of the given stream"): val queue = RedisClientGenerators.queueNameGen.generateOne val messages = (stringGen, stringGen).mapN(_ -> _).generateList(1, 30) diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index e853c79f..3dae9555 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -58,6 +58,8 @@ trait QueueClient[F[_]]: def findLastProcessed(queueNames: NonEmptyList[QueueName]): F[Option[MessageId]] + def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit] + def getSize(queueName: QueueName): F[Long] def getSize(queueName: QueueName, from: MessageId): F[Long] diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala index 8b9fdbaa..a46feaef 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala @@ -105,6 +105,9 @@ private class QueueClientImpl[F[_]: Async]( ): F[Option[MessageId]] = redisQueueClient.findLastProcessed(clientId, queueNames).map(_.map(MessageId.apply)) + override def removeLastProcessed(queueNames: NonEmptyList[QueueName]): F[Unit] = + redisQueueClient.removeLastProcessed(clientId, queueNames) + override def getSize(queueName: QueueName): F[Long] = redisQueueClient.getSize(queueName) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 16e421ec..ae37f32b 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -45,7 +45,7 @@ object Microservice extends IOApp: .add(RedisMetrics.unprocessedGauge) .addAll(SolrMetrics.allCollectors) metrics = metricsUpdaterTask(services) - httpServer = httpServerTask(registryBuilder, services.config) + httpServer = httpServerTask(registryBuilder, services) tasks = services.messageHandlers.getAll + metrics + httpServer pm = services.backgroundManage _ <- tasks.toList.traverse_(pm.register.tupled) @@ -56,10 +56,10 @@ object Microservice extends IOApp: private def httpServerTask( registryBuilder: CollectorRegistryBuilder[IO], - config: SearchProvisionConfig + services: Services[IO] ): (TaskName, IO[Unit]) = - val io = Routes[IO](registryBuilder) - .flatMap(HttpServer.build(_, config.httpServerConfig)) + val io = Routes[IO](registryBuilder, services) + .flatMap(HttpServer.build(_, services.config.httpServerConfig)) .use(_ => IO.never) TaskName.fromString("http server") -> io diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala index 66e3c919..13f4b04e 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Routes.scala @@ -26,22 +26,35 @@ import io.renku.search.http.metrics.MetricsRoutes import io.renku.search.http.routes.OperationRoutes import io.renku.search.metrics.CollectorRegistryBuilder import org.http4s.HttpRoutes +import org.http4s.dsl.Http4sDsl import org.http4s.server.Router private object Routes: def apply[F[_]: Async: Network]( - registryBuilder: CollectorRegistryBuilder[F] + registryBuilder: CollectorRegistryBuilder[F], + services: Services[F] ): Resource[F, HttpRoutes[F]] = MetricsRoutes[F](registryBuilder).makeRoutes - .map(new Routes[F](_).routes) + .map(new Routes[F](_, services).routes) -final private class Routes[F[_]: Async](metricsRoutes: HttpRoutes[F]): +final private class Routes[F[_]: Async]( + metricsRoutes: HttpRoutes[F], + services: Services[F] +) extends Http4sDsl[F]: private lazy val operationRoutes = Router[F]( + "/reindex" -> reIndexRoutes, "/" -> OperationRoutes[F] ) lazy val routes: HttpRoutes[F] = operationRoutes <+> metricsRoutes + + def reIndexRoutes: HttpRoutes[F] = HttpRoutes.of { case POST -> Root => + services.reIndex.startReIndex(None).flatMap { + case true => NoContent() + case false => UnprocessableEntity() + } + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala index e262b886..b397c14f 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Services.scala @@ -24,6 +24,7 @@ import fs2.io.net.Network import io.renku.queue.client.QueueClient import io.renku.search.provision.handler.PipelineSteps +import io.renku.search.provision.reindex.ReIndexService import io.renku.search.solr.client.SearchSolrClient final case class Services[F[_]]( @@ -31,7 +32,8 @@ final case class Services[F[_]]( solrClient: SearchSolrClient[F], queueClient: Stream[F, QueueClient[F]], messageHandlers: MessageHandlers[F], - backgroundManage: BackgroundProcessManage[F] + backgroundManage: BackgroundProcessManage[F], + reIndex: ReIndexService[F] ) object Services: @@ -52,4 +54,6 @@ object Services: handlers = MessageHandlers[F](steps, cfg.queuesConfig) bm <- BackgroundProcessManage[F](cfg.retryOnErrorDelay) - } yield Services(cfg, solr, redis, handlers, bm) + + ris = ReIndexService[F](bm, redis, solr, cfg.queuesConfig) + } yield Services(cfg, solr, redis, handlers, bm, ris) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexDocument.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexDocument.scala new file mode 100644 index 00000000..f67286b6 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexDocument.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import java.time.Instant + +import cats.Functor +import cats.effect.* +import cats.syntax.all.* + +import io.bullet.borer.Decoder +import io.bullet.borer.Encoder +import io.bullet.borer.derivation.{MapBasedCodecs, key} +import io.renku.json.codecs.all.given +import io.renku.search.events.MessageId +import io.renku.search.model.Id +import io.renku.solr.client.DocVersion + +final private case class ReIndexDocument( + id: Id, + created: Instant, + messageId: Option[MessageId], + @key("_version_") version: DocVersion +) + +private object ReIndexDocument: + private val docId: Id = Id("reindex_31baded5-9fc2-4935-9b07-80f7a3ecb13f") + + def createNew[F[_]: Clock: Functor](messageId: Option[MessageId]): F[ReIndexDocument] = + Clock[F].realTimeInstant.map { now => + ReIndexDocument(docId, now, messageId, DocVersion.NotExists) + } + + given Encoder[ReIndexDocument] = MapBasedCodecs.deriveEncoder + given Decoder[ReIndexDocument] = MapBasedCodecs.deriveDecoder diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala new file mode 100644 index 00000000..c30a7a6c --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import cats.data.NonEmptyList +import cats.effect.* +import cats.syntax.all.* +import fs2.Stream + +import io.renku.queue.client.QueueClient +import io.renku.search.config.QueuesConfig +import io.renku.search.events.MessageId +import io.renku.search.provision.BackgroundProcessManage +import io.renku.search.provision.MessageHandlers.MessageHandlerKey +import io.renku.search.solr.client.SearchSolrClient + +trait ReIndexService[F[_]]: + def startReIndex(startMessage: Option[MessageId]): F[Boolean] + + // depends on BackgroundProcessManage, RedisQueueClient + // insert ReIndexDocument, if error: concurrent update + // if successful inserted: stop tasks, set last message id, drop index, start tasks (to restart them) + // remove ReIndexDocument + + // prerequisites: + // - background tasks must be able to stop/restart + +object ReIndexService: + + def apply[F[_]: Clock: Sync]( + bpm: BackgroundProcessManage[F], + redisClient: Stream[F, QueueClient[F]], + solrClient: SearchSolrClient[F], + queueCfg: QueuesConfig + ): ReIndexService[F] = + new ReIndexService[F] { + private val queueName = NonEmptyList.of(queueCfg.dataServiceAllEvents) + private val logger = scribe.cats.effect[F] + + def startReIndex(startMessage: Option[MessageId]): F[Boolean] = + for + syncDoc <- ReIndexDocument.createNew[F](startMessage) + upsertResp <- solrClient.upsert(Seq(syncDoc)) + _ <- logger.warn(s"Insert reindex sync document: $upsertResp") + res <- + if (upsertResp.isFailure) + logger.info(s"Re-Index called while already being in progress").as(false) + else restartHandlers(syncDoc, startMessage) + yield res + + private def restartHandlers( + syncDoc: ReIndexDocument, + startMessage: Option[MessageId] + ) = + for + _ <- logger.info( + s"Starting re-indexing all data, since message ${syncDoc.messageId}" + ) + _ <- bpm.cancelProcesses(MessageHandlerKey.isInstance) + _ <- logger.info("Background processes stopped") + _ <- startMessage match + case Some(msgId) => + logger.info("Set last seen message id to $msgId for $queueName") >> + redisClient + .evalMap(_.markProcessed(queueName, msgId)) + .take(1) + .compile + .drain + case None => + logger.info(s"Remove last processed message id for $queueName") >> + redisClient + .evalMap(_.removeLastProcessed(queueName)) + .take(1) + .compile + .drain + _ <- logger.info("Delete SOLR index") + _ <- solrClient.deletePublicData + _ <- logger.info("Start background processes") + _ <- bpm.background(MessageHandlerKey.isInstance) + _ <- solrClient.deleteIds(NonEmptyList.of(syncDoc.id)) + yield true + } diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala index 01ec0e62..8d200a79 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala @@ -18,7 +18,7 @@ package io.renku.search.provision -import scala.concurrent.duration.Duration +import scala.concurrent.duration.* import cats.effect.{IO, Resource} import cats.syntax.all.* @@ -29,6 +29,7 @@ import io.renku.redis.client.QueueName import io.renku.search.config.QueuesConfig import io.renku.search.model.{EntityType, Id, Namespace} import io.renku.search.provision.handler.PipelineSteps +import io.renku.search.provision.reindex.ReIndexService import io.renku.search.solr.client.{SearchSolrClient, SearchSolrSuite} import io.renku.search.solr.documents.{Group as GroupDocument, User as UserDocument, *} import io.renku.search.solr.query.SolrToken @@ -36,24 +37,7 @@ import io.renku.solr.client.{QueryData, QueryString} import munit.CatsEffectSuite trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueSuite: - val queueConfig: QueuesConfig = QueuesConfig( - projectCreated = QueueName("projectCreated"), - projectUpdated = QueueName("projectUpdated"), - projectRemoved = QueueName("projectRemoved"), - projectAuthorizationAdded = QueueName("projectAuthorizationAdded"), - projectAuthorizationUpdated = QueueName("projectAuthorizationUpdated"), - projectAuthorizationRemoved = QueueName("projectAuthorizationRemoved"), - userAdded = QueueName("userAdded"), - userUpdated = QueueName("userUpdated"), - userRemoved = QueueName("userRemoved"), - groupAdded = QueueName("groupAdded"), - groupUpdated = QueueName("groupUpdated"), - groupRemoved = QueueName("groupRemoved"), - groupMemberAdded = QueueName("groupMemberAdded"), - groupMemberUpdated = QueueName("groupMemberUpdated"), - groupMemberRemoved = QueueName("groupMemberRemoved"), - dataServiceAllEvents = QueueName("dataServiceAllEvents") - ) + val queueConfig: QueuesConfig = ProvisioningSuite.queueConfig override def munitIOTimeout: Duration = Duration(1, "min") @@ -67,7 +51,14 @@ trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueS inChunkSize = 1 ) handlers = MessageHandlers[IO](steps, queueConfig) - yield TestServices(steps, handlers, queue, solrClient) + bpm <- BackgroundProcessManage[IO](50.millis) + reindex = ReIndexService[IO]( + bpm, + Stream[IO, QueueClient[IO]](queue), + solrClient, + queueConfig + ) + yield TestServices(steps, handlers, queue, solrClient, bpm, reindex) val testServices = ResourceSuiteLocalFixture("test-services", testServicesR) @@ -117,3 +108,23 @@ trait ProvisioningSuite extends CatsEffectSuite with SearchSolrSuite with QueueS CompoundId.partial(id, entityType.some) ) ).mapN((a, b) => a.toSet ++ b.toSet) + +object ProvisioningSuite: + val queueConfig: QueuesConfig = QueuesConfig( + projectCreated = QueueName("projectCreated"), + projectUpdated = QueueName("projectUpdated"), + projectRemoved = QueueName("projectRemoved"), + projectAuthorizationAdded = QueueName("projectAuthorizationAdded"), + projectAuthorizationUpdated = QueueName("projectAuthorizationUpdated"), + projectAuthorizationRemoved = QueueName("projectAuthorizationRemoved"), + userAdded = QueueName("userAdded"), + userUpdated = QueueName("userUpdated"), + userRemoved = QueueName("userRemoved"), + groupAdded = QueueName("groupAdded"), + groupUpdated = QueueName("groupUpdated"), + groupRemoved = QueueName("groupRemoved"), + groupMemberAdded = QueueName("groupMemberAdded"), + groupMemberUpdated = QueueName("groupMemberUpdated"), + groupMemberRemoved = QueueName("groupMemberRemoved"), + dataServiceAllEvents = QueueName("dataServiceAllEvents") + ) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala index b40621dd..8c356100 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/TestServices.scala @@ -23,13 +23,16 @@ import cats.effect.* import io.renku.queue.client.QueueClient import io.renku.redis.client.QueueName import io.renku.search.provision.handler.PipelineSteps +import io.renku.search.provision.reindex.ReIndexService import io.renku.search.solr.client.SearchSolrClient final case class TestServices( pipelineSteps: QueueName => PipelineSteps[IO], messageHandlers: MessageHandlers[IO], queueClient: QueueClient[IO], - searchClient: SearchSolrClient[IO] + searchClient: SearchSolrClient[IO], + backgroundManage: BackgroundProcessManage[IO], + reindex: ReIndexService[IO] ): def syncHandler(qn: QueueName): SyncMessageHandler[IO] = diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala new file mode 100644 index 00000000..d3d782bf --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/reindex/ReIndexServiceSpec.scala @@ -0,0 +1,121 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.reindex + +import scala.concurrent.duration.* + +import cats.effect.* +import fs2.Stream + +import io.renku.events.EventsGenerators +import io.renku.redis.client.QueueName +import io.renku.redis.client.RedisClientGenerators +import io.renku.search.GeneratorSyntax.* +import io.renku.search.config.QueuesConfig +import io.renku.search.model.Name +import io.renku.search.provision.MessageHandlers.MessageHandlerKey +import io.renku.search.provision.ProvisioningSuite +import io.renku.search.provision.TestServices +import io.renku.search.solr.documents.{EntityDocument, Project as ProjectDocument} +import io.renku.search.solr.schema.EntityDocumentSchema +import io.renku.solr.client.DocVersion +import io.renku.solr.client.QueryData +import io.renku.solr.client.QueryString +import io.renku.solr.client.SolrSort +import org.scalacheck.Gen + +class ReIndexServiceSpec extends ProvisioningSuite: + + override val queueConfig: QueuesConfig = + ProvisioningSuite.queueConfig.copy(dataServiceAllEvents = + RedisClientGenerators.queueNameGen.generateOne + ) + + val allQuery: QueryData = + QueryData(QueryString("_kind:*", 10, 0)) + .withSort(SolrSort(EntityDocumentSchema.Fields.id -> SolrSort.Direction.Asc)) + + def waitForSolrDocs(services: TestServices, size: Int): IO[List[EntityDocument]] = + Stream + .repeatEval( + services.searchClient + .queryAll[EntityDocument](allQuery) + .compile + .toList + ) + .takeThrough(_.size != size) + .meteredStartImmediately(15.millis) + .timeout(5.minutes) + .compile + .lastOrError + + test("re-index restores data from redis stream"): + for + services <- IO(testServices()) + _ <- services.backgroundManage.register( + MessageHandlerKey.DataServiceAllEvents, + services.syncHandler(queueConfig.dataServiceAllEvents).create.compile.drain + ) + _ <- services.backgroundManage.startAll + proj1 <- IO(EventsGenerators.projectCreatedGen("p1").generateOne) + proj2 <- IO(EventsGenerators.projectCreatedGen("p2").generateOne) + msg1 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj1)).generateOne) + msg2 <- IO(EventsGenerators.eventMessageGen(Gen.const(proj2)).generateOne) + mId1 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg1) + mId2 <- services.queueClient.enqueue(queueConfig.dataServiceAllEvents, msg2) + + docs <- waitForSolrDocs(services, 2) + _ = assertEquals(docs.size, 2) + + // corrupt the data at solr + _ <- services.searchClient.upsert( + Seq(docs.head.asInstanceOf[ProjectDocument].copy(name = Name("blaaah"))) + ) + changed <- services.searchClient + .queryAll[EntityDocument](allQuery) + .compile + .toList + _ = assertEquals(changed.size, 2) + _ = assertNotEquals(changed, docs) + + // now run re-indexing from beginning when this returns, + _ <- services.reindex.startReIndex(None) + // re-indexing has been initiated, meaning that solr has been + // cleared and background processes restarted. So only need to + // wait for the 2 documents to reappear + docs2 <- waitForSolrDocs(services, 2) + _ = assertEquals(docs2.size, 2) + + // afterwards, the initial state should be re-created + _ = assertEquals( + docs2.map(_.setVersion(DocVersion.Off)), + docs.map(_.setVersion(DocVersion.Off)) + ) + yield () + + test("re-index should not start when running already"): + for + services <- IO(testServices()) + + doc <- ReIndexDocument.createNew[IO](None) + _ <- services.searchClient.upsertSuccess(Seq(doc)) + + r <- services.reindex.startReIndex(None) + _ = assertEquals(r, false) + yield ()