Skip to content

Commit

Permalink
Allow to run re-indexing as result of a migration (#198)
Browse files Browse the repository at this point in the history
Migrations can communicate whether a re-index is necessary. When the
schema migrations run and there is at least one migration requiring a
re-index, the solr index will be rebuild on startup.

Closes: #184
  • Loading branch information
eikek authored Sep 5, 2024
1 parent e00be11 commit a7a3f15
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.renku.search.metrics.CollectorRegistryBuilder
import io.renku.search.provision.BackgroundProcessManage.TaskName
import io.renku.search.provision.metrics.*
import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.migration.SchemaMigrator
import io.renku.solr.client.migration.{MigrateResult, SchemaMigrator}

object Microservice extends IOApp:

Expand All @@ -39,7 +39,7 @@ object Microservice extends IOApp:
Services.make[IO].use { services =>
for {
_ <- IO(LoggingSetup.doConfigure(services.config.verbosity))
_ <- runSolrMigrations(services.config)
migrateResult <- runSolrMigrations(services.config)
registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics
.add(RedisMetrics.queueSizeGauge)
.add(RedisMetrics.unprocessedGauge)
Expand All @@ -49,6 +49,12 @@ object Microservice extends IOApp:
tasks = services.messageHandlers.getAll + metrics + httpServer
pm = services.backgroundManage
_ <- tasks.toList.traverse_(pm.register.tupled)
_ <-
if (migrateResult.reindexRequired)
logger.info(
"Re-Index is required after migrations have applied!"
) >> services.reIndex.resetData(None)
else IO.unit
_ <- pm.startAll
_ <- IO.never
} yield ExitCode.Success
Expand Down Expand Up @@ -80,7 +86,7 @@ object Microservice extends IOApp:
).run()
TaskName.fromString("metrics updater") -> io

private def runSolrMigrations(cfg: SearchProvisionConfig): IO[Unit] =
private def runSolrMigrations(cfg: SearchProvisionConfig): IO[MigrateResult] =
SchemaMigrator[IO](cfg.solrConfig)
.use(_.migrate(Migrations.all))
.handleErrorWith { err =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,22 @@ import io.renku.search.provision.MessageHandlers.MessageHandlerKey
import io.renku.search.solr.client.SearchSolrClient

trait ReIndexService[F[_]]:
/** Stops background processes handling redis messages, drop the index and then restarts
* background processes. The `startMessage` can specify from which point to start
* reading the stream again. If it is `None` the stream is read from the beginning.
*
* This method ensures that only one re-indexing is initiated at a time.
*/
def startReIndex(startMessage: Option[MessageId]): F[Boolean]

/** Drops the SOLR index and marks the new start of the redis stream according to
* `startMessage`. If `startMessage` is `None`, the stream will be read from the
* beginning.
*
* NOTE: This method assumes no handlers currently reading from the redis streams.
*/
def resetData(startMessage: Option[MessageId]): F[Unit]

object ReIndexService:

def apply[F[_]: Clock: Sync](
Expand All @@ -56,16 +70,8 @@ object ReIndexService:
else dropIndexAndRestart(syncDoc, startMessage)
yield res

private def dropIndexAndRestart(
syncDoc: ReIndexDocument,
startMessage: Option[MessageId]
) =
def resetData(startMessage: Option[MessageId]): F[Unit] =
for
_ <- logger.info(
s"Starting re-indexing all data, since message ${syncDoc.messageId}"
)
_ <- bpm.cancelProcesses(MessageHandlerKey.isInstance)
_ <- logger.info("Background processes stopped")
_ <- startMessage match
case Some(msgId) =>
logger.info("Set last seen message id to $msgId for $queueName") >>
Expand Down Expand Up @@ -93,6 +99,19 @@ object ReIndexService:
.drain
_ <- logger.info("Delete SOLR index")
_ <- solrClient.deletePublicData
yield ()

private def dropIndexAndRestart(
syncDoc: ReIndexDocument,
startMessage: Option[MessageId]
) =
for
_ <- logger.info(
s"Starting re-indexing all data, since message ${syncDoc.messageId}"
)
_ <- bpm.cancelProcesses(MessageHandlerKey.isInstance)
_ <- logger.info("Background processes stopped")
_ <- resetData(startMessage)
_ <- logger.info("Start background processes")
_ <- bpm.background(MessageHandlerKey.isInstance)
_ <- solrClient.deleteIds(NonEmptyList.of(syncDoc.id))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.solr.client.migration

final case class MigrateResult(
startVersion: Option[Long],
endVersion: Option[Long],
migrationsRun: Long,
reindexRequired: Boolean
)

object MigrateResult:
val empty: MigrateResult =
MigrateResult(None, None, 0L, false)
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ import io.renku.solr.client.schema.SchemaCommand

final case class SchemaMigration(
version: Long,
commands: Seq[SchemaCommand]
)
commands: Seq[SchemaCommand],
requiresReIndex: Boolean = false
):

def withRequiresReIndex: SchemaMigration =
copy(requiresReIndex = true)

object SchemaMigration:
def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ trait SchemaMigrator[F[_]] {

def currentVersion: F[Option[Long]]

def migrate(migrations: Seq[SchemaMigration]): F[Unit]
def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult]
}

object SchemaMigrator:
Expand All @@ -52,7 +52,7 @@ object SchemaMigrator:
.query[VersionDocument](QueryString(s"id:$versionDocId"))
.map(_.responseBody.docs.headOption.map(_.currentSchemaVersion))

override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for {
override def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult] = for {
current <- currentVersion
_ <- logger.info(
s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId"
Expand All @@ -66,7 +66,14 @@ object SchemaMigrator:
logger.info(s"core ${client.config.core}: Run migration ${m.version}") >>
client.modifySchema(m.commands) >> upsertVersion(m.version)
)
} yield ()

result = MigrateResult(
startVersion = current,
endVersion = remain.map(_.version).maxOption,
migrationsRun = remain.size,
reindexRequired = remain.exists(_.requiresReIndex)
)
} yield result

private def initVersionDocument: F[Unit] =
logger.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
client <- IO(solrClient())
migrator = SchemaMigrator[IO](client)
_ <- truncate(client)
_ <- migrator.migrate(migrations)
res <- migrator.migrate(migrations)
c <- migrator.currentVersion
_ = assertEquals(c, Some(-1L))
_ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, false))
} yield ()
}

Expand All @@ -72,11 +73,28 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
first = migrations.take(2)

_ <- truncate(client)
_ <- migrator.migrate(first)
res0 <- migrator.migrate(first)
v0 <- migrator.currentVersion
_ = assertEquals(v0, Some(-4L))
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, false))

_ <- migrator.migrate(migrations)
res1 <- migrator.migrate(migrations)
v1 <- migrator.currentVersion
_ = assertEquals(v1, Some(-1L))
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false))
} yield ()

test("no require-reindex if migrations have been applied already"):
val migs = migrations.head.withRequiresReIndex +: migrations.tail
for
client <- IO(solrClient())
migrator = SchemaMigrator(client)
first = migs.take(2)
_ <- truncate(client)

res0 <- migrator.migrate(first)
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, true))

res1 <- migrator.migrate(migs)
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false))
yield ()

0 comments on commit a7a3f15

Please sign in to comment.