From a7a3f153f9ef23c9106c6bfb53cbea7747451b7a Mon Sep 17 00:00:00 2001 From: eikek <701128+eikek@users.noreply.github.com> Date: Thu, 5 Sep 2024 17:08:57 +0200 Subject: [PATCH] Allow to run re-indexing as result of a migration (#198) Migrations can communicate whether a re-index is necessary. When the schema migrations run and there is at least one migration requiring a re-index, the solr index will be rebuild on startup. Closes: #184 --- .../renku/search/provision/Microservice.scala | 12 ++++-- .../provision/reindex/ReIndexService.scala | 37 ++++++++++++++----- .../solr/client/migration/MigrateResult.scala | 30 +++++++++++++++ .../client/migration/SchemaMigration.scala | 8 +++- .../client/migration/SchemaMigrator.scala | 13 +++++-- .../client/migration/SolrMigratorSpec.scala | 24 ++++++++++-- 6 files changed, 104 insertions(+), 20 deletions(-) create mode 100644 modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index ae37f32b..10e29318 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -29,7 +29,7 @@ import io.renku.search.metrics.CollectorRegistryBuilder import io.renku.search.provision.BackgroundProcessManage.TaskName import io.renku.search.provision.metrics.* import io.renku.search.solr.schema.Migrations -import io.renku.solr.client.migration.SchemaMigrator +import io.renku.solr.client.migration.{MigrateResult, SchemaMigrator} object Microservice extends IOApp: @@ -39,7 +39,7 @@ object Microservice extends IOApp: Services.make[IO].use { services => for { _ <- IO(LoggingSetup.doConfigure(services.config.verbosity)) - _ <- runSolrMigrations(services.config) + migrateResult <- runSolrMigrations(services.config) registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics .add(RedisMetrics.queueSizeGauge) .add(RedisMetrics.unprocessedGauge) @@ -49,6 +49,12 @@ object Microservice extends IOApp: tasks = services.messageHandlers.getAll + metrics + httpServer pm = services.backgroundManage _ <- tasks.toList.traverse_(pm.register.tupled) + _ <- + if (migrateResult.reindexRequired) + logger.info( + "Re-Index is required after migrations have applied!" + ) >> services.reIndex.resetData(None) + else IO.unit _ <- pm.startAll _ <- IO.never } yield ExitCode.Success @@ -80,7 +86,7 @@ object Microservice extends IOApp: ).run() TaskName.fromString("metrics updater") -> io - private def runSolrMigrations(cfg: SearchProvisionConfig): IO[Unit] = + private def runSolrMigrations(cfg: SearchProvisionConfig): IO[MigrateResult] = SchemaMigrator[IO](cfg.solrConfig) .use(_.migrate(Migrations.all)) .handleErrorWith { err => diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala index b20d960c..94167c66 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/reindex/ReIndexService.scala @@ -31,8 +31,22 @@ import io.renku.search.provision.MessageHandlers.MessageHandlerKey import io.renku.search.solr.client.SearchSolrClient trait ReIndexService[F[_]]: + /** Stops background processes handling redis messages, drop the index and then restarts + * background processes. The `startMessage` can specify from which point to start + * reading the stream again. If it is `None` the stream is read from the beginning. + * + * This method ensures that only one re-indexing is initiated at a time. + */ def startReIndex(startMessage: Option[MessageId]): F[Boolean] + /** Drops the SOLR index and marks the new start of the redis stream according to + * `startMessage`. If `startMessage` is `None`, the stream will be read from the + * beginning. + * + * NOTE: This method assumes no handlers currently reading from the redis streams. + */ + def resetData(startMessage: Option[MessageId]): F[Unit] + object ReIndexService: def apply[F[_]: Clock: Sync]( @@ -56,16 +70,8 @@ object ReIndexService: else dropIndexAndRestart(syncDoc, startMessage) yield res - private def dropIndexAndRestart( - syncDoc: ReIndexDocument, - startMessage: Option[MessageId] - ) = + def resetData(startMessage: Option[MessageId]): F[Unit] = for - _ <- logger.info( - s"Starting re-indexing all data, since message ${syncDoc.messageId}" - ) - _ <- bpm.cancelProcesses(MessageHandlerKey.isInstance) - _ <- logger.info("Background processes stopped") _ <- startMessage match case Some(msgId) => logger.info("Set last seen message id to $msgId for $queueName") >> @@ -93,6 +99,19 @@ object ReIndexService: .drain _ <- logger.info("Delete SOLR index") _ <- solrClient.deletePublicData + yield () + + private def dropIndexAndRestart( + syncDoc: ReIndexDocument, + startMessage: Option[MessageId] + ) = + for + _ <- logger.info( + s"Starting re-indexing all data, since message ${syncDoc.messageId}" + ) + _ <- bpm.cancelProcesses(MessageHandlerKey.isInstance) + _ <- logger.info("Background processes stopped") + _ <- resetData(startMessage) _ <- logger.info("Start background processes") _ <- bpm.background(MessageHandlerKey.isInstance) _ <- solrClient.deleteIds(NonEmptyList.of(syncDoc.id)) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala new file mode 100644 index 00000000..56c6b1e7 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client.migration + +final case class MigrateResult( + startVersion: Option[Long], + endVersion: Option[Long], + migrationsRun: Long, + reindexRequired: Boolean +) + +object MigrateResult: + val empty: MigrateResult = + MigrateResult(None, None, 0L, false) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala index 06cac3d5..1c6820aa 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala @@ -22,8 +22,12 @@ import io.renku.solr.client.schema.SchemaCommand final case class SchemaMigration( version: Long, - commands: Seq[SchemaCommand] -) + commands: Seq[SchemaCommand], + requiresReIndex: Boolean = false +): + + def withRequiresReIndex: SchemaMigration = + copy(requiresReIndex = true) object SchemaMigration: def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration = diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 6c636c26..8ae20627 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -30,7 +30,7 @@ trait SchemaMigrator[F[_]] { def currentVersion: F[Option[Long]] - def migrate(migrations: Seq[SchemaMigration]): F[Unit] + def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult] } object SchemaMigrator: @@ -52,7 +52,7 @@ object SchemaMigrator: .query[VersionDocument](QueryString(s"id:$versionDocId")) .map(_.responseBody.docs.headOption.map(_.currentSchemaVersion)) - override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for { + override def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult] = for { current <- currentVersion _ <- logger.info( s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId" @@ -66,7 +66,14 @@ object SchemaMigrator: logger.info(s"core ${client.config.core}: Run migration ${m.version}") >> client.modifySchema(m.commands) >> upsertVersion(m.version) ) - } yield () + + result = MigrateResult( + startVersion = current, + endVersion = remain.map(_.version).maxOption, + migrationsRun = remain.size, + reindexRequired = remain.exists(_.requiresReIndex) + ) + } yield result private def initVersionDocument: F[Unit] = logger.info( diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala index 294cae09..ff8f59b7 100644 --- a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala @@ -59,9 +59,10 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: client <- IO(solrClient()) migrator = SchemaMigrator[IO](client) _ <- truncate(client) - _ <- migrator.migrate(migrations) + res <- migrator.migrate(migrations) c <- migrator.currentVersion _ = assertEquals(c, Some(-1L)) + _ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, false)) } yield () } @@ -72,11 +73,28 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: first = migrations.take(2) _ <- truncate(client) - _ <- migrator.migrate(first) + res0 <- migrator.migrate(first) v0 <- migrator.currentVersion _ = assertEquals(v0, Some(-4L)) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, false)) - _ <- migrator.migrate(migrations) + res1 <- migrator.migrate(migrations) v1 <- migrator.currentVersion _ = assertEquals(v1, Some(-1L)) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) } yield () + + test("no require-reindex if migrations have been applied already"): + val migs = migrations.head.withRequiresReIndex +: migrations.tail + for + client <- IO(solrClient()) + migrator = SchemaMigrator(client) + first = migs.take(2) + _ <- truncate(client) + + res0 <- migrator.migrate(first) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, true)) + + res1 <- migrator.migrate(migs) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) + yield ()