From 73d4876bd9f3696bce9e64c629195ac288cd7aff Mon Sep 17 00:00:00 2001 From: eikek <701128+eikek@users.noreply.github.com> Date: Fri, 20 Sep 2024 15:29:41 +0200 Subject: [PATCH] Schema migration run only if not applied (#207) The solr schema can be fetched and it is used to test each command in the schema migration if it had already been applied. If not, the command is run and otherwise it is ignored. With this change it is possible to drop solr entirely, including schema information. On restart, the schema will be fixed by running those changes that haven't been applied. --- .../renku/search/provision/Microservice.scala | 4 ++ .../solr/client/migration/MigrateResult.scala | 3 +- .../client/migration/SchemaMigration.scala | 27 +++++++++++++- .../client/migration/SchemaMigrator.scala | 37 +++++++++++++++---- .../client/migration/SolrMigratorSpec.scala | 37 ++++++++++++++++--- 5 files changed, 94 insertions(+), 14 deletions(-) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index acf7b1e1..599a2f88 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -40,6 +40,10 @@ object Microservice extends IOApp: for { _ <- IO(LoggingSetup.doConfigure(services.config.verbosity)) migrateResult <- runSolrMigrations(services.config) + _ <- IO.whenA(migrateResult.migrationsSkipped > 0)( + logger + .warn(s"There were ${migrateResult.migrationsSkipped} skipped migrations!") + ) // this is only safe for a single provisioning service _ <- services.resetLockDocuments registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala index 56c6b1e7..9bb189ea 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala @@ -22,9 +22,10 @@ final case class MigrateResult( startVersion: Option[Long], endVersion: Option[Long], migrationsRun: Long, + migrationsSkipped: Long, reindexRequired: Boolean ) object MigrateResult: val empty: MigrateResult = - MigrateResult(None, None, 0L, false) + MigrateResult(None, None, 0L, 0L, false) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala index 1c6820aa..45fd1540 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala @@ -18,7 +18,7 @@ package io.renku.solr.client.migration -import io.renku.solr.client.schema.SchemaCommand +import io.renku.solr.client.schema.* final case class SchemaMigration( version: Long, @@ -29,6 +29,31 @@ final case class SchemaMigration( def withRequiresReIndex: SchemaMigration = copy(requiresReIndex = true) + def alignWith(schema: CoreSchema): SchemaMigration = + copy(commands = commands.filterNot(SchemaMigration.isApplied(schema))) + object SchemaMigration: def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration = SchemaMigration(version, cmd +: more) + + def isApplied(schema: CoreSchema)(cmd: SchemaCommand): Boolean = cmd match + case SchemaCommand.Add(ft: FieldType) => + schema.fieldTypes.exists(_.name == ft.name) + case SchemaCommand.Add(f: Field) => + schema.fields.exists(_.name == f.name) + case SchemaCommand.Add(r: DynamicFieldRule) => + schema.dynamicFields.exists(_.name == r.name) + case SchemaCommand.Add(r: CopyFieldRule) => + schema.copyFields.exists(cf => cf.source == r.source && cf.dest == r.dest) + case SchemaCommand.DeleteField(name) => + schema.fields.forall(_.name != name) + case SchemaCommand.DeleteType(name) => + schema.fieldTypes.forall(_.name != name) + case SchemaCommand.DeleteDynamicField(name) => + schema.dynamicFields.forall(_.name != name) + case SchemaCommand.Replace(ft: FieldType) => + schema.fieldTypes.exists(_ == ft) + case SchemaCommand.Replace(f: Field) => + schema.fields.exists(_ == f) + case SchemaCommand.Replace(r: DynamicFieldRule) => + schema.dynamicFields.exists(_ == r) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 6724fdfc..fa9e1148 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -24,6 +24,7 @@ import cats.syntax.all.* import fs2.io.net.Network import io.renku.solr.client.* +import io.renku.solr.client.schema.CoreSchema import io.renku.solr.client.util.DocumentLockResource trait SchemaMigrator[F[_]] { @@ -43,6 +44,17 @@ object SchemaMigrator: ): Resource[F, SchemaMigrator[F]] = SolrClient[F](solrConfig).map(apply[F]) + final private case class MigrationState( + schema: CoreSchema, + doc: VersionDocument, + skippedMigrations: Int = 0 + ) { + def withDocument(d: VersionDocument): MigrationState = + copy(doc = d) + + def incSkippedMigration = copy(skippedMigrations = skippedMigrations + 1) + } + private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] { private val logger = scribe.cats.effect[F] private val migrateLock = DocumentLockResource[F, VersionDocument](client) @@ -65,30 +77,41 @@ object SchemaMigrator: def doMigrate( migrations: Seq[SchemaMigration], - initial: VersionDocument + initialDoc: VersionDocument ): F[MigrateResult] = for { _ <- logger.info( - s"core ${client.config.core}: Found current schema version '${initial.currentSchemaVersion}' using id $versionDocId" + s"core ${client.config.core}: Found current schema version '${initialDoc.currentSchemaVersion}' using id $versionDocId" ) remain = migrations .sortBy(_.version) - .dropWhile(m => m.version <= initial.currentSchemaVersion) + .dropWhile(m => m.version <= initialDoc.currentSchemaVersion) _ <- logger.info( s"core ${client.config.core}: There are ${remain.size} migrations to run" ) - finalDoc <- remain.foldLeftM(initial) { (doc, m) => - client.modifySchema(m.commands) >> upsertVersion(doc, m.version) - } + initial <- client.getSchema.map(_.schema).map(MigrationState(_, initialDoc)) + finalState <- remain.foldLeftM(initial)(applyMigration) result = MigrateResult( - startVersion = Option(initial.currentSchemaVersion).filter(_ > Long.MinValue), + startVersion = Option(initialDoc.currentSchemaVersion).filter(_ > Long.MinValue), endVersion = remain.map(_.version).maxOption, migrationsRun = remain.size, + migrationsSkipped = finalState.skippedMigrations.toLong, reindexRequired = remain.exists(_.requiresReIndex) ) } yield result + def applyMigration(state: MigrationState, m: SchemaMigration): F[MigrationState] = + val cmds = m.alignWith(state.schema).commands + if (cmds.isEmpty) + logger.info(s"Migration ${m.version} seems already applied. Skipping it.") >> + upsertVersion(state.doc, m.version).map(state.incSkippedMigration.withDocument) + else + client.modifySchema(cmds) >> ( + client.getSchema.map(_.schema), + upsertVersion(state.doc, m.version) + ).mapN(MigrationState(_, _)) + private def requireVersionDoc = getVersionDoc.flatMap { case None => diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala index 8dbe415d..f390ed1b 100644 --- a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala @@ -18,6 +18,7 @@ package io.renku.solr.client.migration +import cats.data.NonEmptyList import cats.effect.IO import io.renku.solr.client.DocVersion @@ -63,7 +64,7 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res <- migrator.migrate(migrations) c <- migrator.currentVersion _ = assertEquals(c, Some(-1L)) - _ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, false)) + _ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, 0, false)) } yield () } @@ -77,12 +78,12 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res0 <- migrator.migrate(first) v0 <- migrator.currentVersion _ = assertEquals(v0, Some(-4L)) - _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, false)) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, false)) res1 <- migrator.migrate(migrations) v1 <- migrator.currentVersion _ = assertEquals(v1, Some(-1L)) - _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false)) } yield () test("no require-reindex if migrations have been applied already"): @@ -94,10 +95,10 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: _ <- truncate(client) res0 <- migrator.migrate(first) - _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, true)) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, true)) res1 <- migrator.migrate(migs) - _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false)) yield () test("convert previous version document to current, then migrate remaining"): @@ -170,3 +171,29 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res <- migrator.migrate(migrations) _ = assertEquals(res.migrationsRun, 0L) yield () + + test("skip all migrations if already applied"): + for + client <- IO(solrClient()) + migrator = SchemaMigrator(client) + _ <- truncate(client) + + _ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size) + _ <- client.deleteIds(NonEmptyList.of(SchemaMigrator.versionDocId)) + _ <- migrator.migrate(migrations).assert(_.migrationsSkipped == migrations.size) + yield () + + test("skip some migrations if already applied"): + for + client <- IO(solrClient()) + migrator = SchemaMigrator(client) + _ <- truncate(client) + + _ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size) + _ <- client.upsertLoop[VersionDocument, Unit](SchemaMigrator.versionDocId) { + case Some(doc) => + (Some(doc.copy(currentSchemaVersion = doc.currentSchemaVersion - 2)), ()) + case None => (None, ()) + } + _ <- migrator.migrate(migrations).assert(_.migrationsSkipped == 2) + yield ()