diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index acf7b1e1..599a2f88 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -40,6 +40,10 @@ object Microservice extends IOApp: for { _ <- IO(LoggingSetup.doConfigure(services.config.verbosity)) migrateResult <- runSolrMigrations(services.config) + _ <- IO.whenA(migrateResult.migrationsSkipped > 0)( + logger + .warn(s"There were ${migrateResult.migrationsSkipped} skipped migrations!") + ) // this is only safe for a single provisioning service _ <- services.resetLockDocuments registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala index 56c6b1e7..9bb189ea 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/MigrateResult.scala @@ -22,9 +22,10 @@ final case class MigrateResult( startVersion: Option[Long], endVersion: Option[Long], migrationsRun: Long, + migrationsSkipped: Long, reindexRequired: Boolean ) object MigrateResult: val empty: MigrateResult = - MigrateResult(None, None, 0L, false) + MigrateResult(None, None, 0L, 0L, false) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala index 1c6820aa..45fd1540 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigration.scala @@ -18,7 +18,7 @@ package io.renku.solr.client.migration -import io.renku.solr.client.schema.SchemaCommand +import io.renku.solr.client.schema.* final case class SchemaMigration( version: Long, @@ -29,6 +29,31 @@ final case class SchemaMigration( def withRequiresReIndex: SchemaMigration = copy(requiresReIndex = true) + def alignWith(schema: CoreSchema): SchemaMigration = + copy(commands = commands.filterNot(SchemaMigration.isApplied(schema))) + object SchemaMigration: def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration = SchemaMigration(version, cmd +: more) + + def isApplied(schema: CoreSchema)(cmd: SchemaCommand): Boolean = cmd match + case SchemaCommand.Add(ft: FieldType) => + schema.fieldTypes.exists(_.name == ft.name) + case SchemaCommand.Add(f: Field) => + schema.fields.exists(_.name == f.name) + case SchemaCommand.Add(r: DynamicFieldRule) => + schema.dynamicFields.exists(_.name == r.name) + case SchemaCommand.Add(r: CopyFieldRule) => + schema.copyFields.exists(cf => cf.source == r.source && cf.dest == r.dest) + case SchemaCommand.DeleteField(name) => + schema.fields.forall(_.name != name) + case SchemaCommand.DeleteType(name) => + schema.fieldTypes.forall(_.name != name) + case SchemaCommand.DeleteDynamicField(name) => + schema.dynamicFields.forall(_.name != name) + case SchemaCommand.Replace(ft: FieldType) => + schema.fieldTypes.exists(_ == ft) + case SchemaCommand.Replace(f: Field) => + schema.fields.exists(_ == f) + case SchemaCommand.Replace(r: DynamicFieldRule) => + schema.dynamicFields.exists(_ == r) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 6724fdfc..fa9e1148 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -24,6 +24,7 @@ import cats.syntax.all.* import fs2.io.net.Network import io.renku.solr.client.* +import io.renku.solr.client.schema.CoreSchema import io.renku.solr.client.util.DocumentLockResource trait SchemaMigrator[F[_]] { @@ -43,6 +44,17 @@ object SchemaMigrator: ): Resource[F, SchemaMigrator[F]] = SolrClient[F](solrConfig).map(apply[F]) + final private case class MigrationState( + schema: CoreSchema, + doc: VersionDocument, + skippedMigrations: Int = 0 + ) { + def withDocument(d: VersionDocument): MigrationState = + copy(doc = d) + + def incSkippedMigration = copy(skippedMigrations = skippedMigrations + 1) + } + private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] { private val logger = scribe.cats.effect[F] private val migrateLock = DocumentLockResource[F, VersionDocument](client) @@ -65,30 +77,41 @@ object SchemaMigrator: def doMigrate( migrations: Seq[SchemaMigration], - initial: VersionDocument + initialDoc: VersionDocument ): F[MigrateResult] = for { _ <- logger.info( - s"core ${client.config.core}: Found current schema version '${initial.currentSchemaVersion}' using id $versionDocId" + s"core ${client.config.core}: Found current schema version '${initialDoc.currentSchemaVersion}' using id $versionDocId" ) remain = migrations .sortBy(_.version) - .dropWhile(m => m.version <= initial.currentSchemaVersion) + .dropWhile(m => m.version <= initialDoc.currentSchemaVersion) _ <- logger.info( s"core ${client.config.core}: There are ${remain.size} migrations to run" ) - finalDoc <- remain.foldLeftM(initial) { (doc, m) => - client.modifySchema(m.commands) >> upsertVersion(doc, m.version) - } + initial <- client.getSchema.map(_.schema).map(MigrationState(_, initialDoc)) + finalState <- remain.foldLeftM(initial)(applyMigration) result = MigrateResult( - startVersion = Option(initial.currentSchemaVersion).filter(_ > Long.MinValue), + startVersion = Option(initialDoc.currentSchemaVersion).filter(_ > Long.MinValue), endVersion = remain.map(_.version).maxOption, migrationsRun = remain.size, + migrationsSkipped = finalState.skippedMigrations.toLong, reindexRequired = remain.exists(_.requiresReIndex) ) } yield result + def applyMigration(state: MigrationState, m: SchemaMigration): F[MigrationState] = + val cmds = m.alignWith(state.schema).commands + if (cmds.isEmpty) + logger.info(s"Migration ${m.version} seems already applied. Skipping it.") >> + upsertVersion(state.doc, m.version).map(state.incSkippedMigration.withDocument) + else + client.modifySchema(cmds) >> ( + client.getSchema.map(_.schema), + upsertVersion(state.doc, m.version) + ).mapN(MigrationState(_, _)) + private def requireVersionDoc = getVersionDoc.flatMap { case None => diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala index 8dbe415d..f390ed1b 100644 --- a/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/migration/SolrMigratorSpec.scala @@ -18,6 +18,7 @@ package io.renku.solr.client.migration +import cats.data.NonEmptyList import cats.effect.IO import io.renku.solr.client.DocVersion @@ -63,7 +64,7 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res <- migrator.migrate(migrations) c <- migrator.currentVersion _ = assertEquals(c, Some(-1L)) - _ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, false)) + _ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, 0, false)) } yield () } @@ -77,12 +78,12 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res0 <- migrator.migrate(first) v0 <- migrator.currentVersion _ = assertEquals(v0, Some(-4L)) - _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, false)) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, false)) res1 <- migrator.migrate(migrations) v1 <- migrator.currentVersion _ = assertEquals(v1, Some(-1L)) - _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false)) } yield () test("no require-reindex if migrations have been applied already"): @@ -94,10 +95,10 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: _ <- truncate(client) res0 <- migrator.migrate(first) - _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, true)) + _ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, true)) res1 <- migrator.migrate(migs) - _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false)) + _ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false)) yield () test("convert previous version document to current, then migrate remaining"): @@ -170,3 +171,29 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite: res <- migrator.migrate(migrations) _ = assertEquals(res.migrationsRun, 0L) yield () + + test("skip all migrations if already applied"): + for + client <- IO(solrClient()) + migrator = SchemaMigrator(client) + _ <- truncate(client) + + _ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size) + _ <- client.deleteIds(NonEmptyList.of(SchemaMigrator.versionDocId)) + _ <- migrator.migrate(migrations).assert(_.migrationsSkipped == migrations.size) + yield () + + test("skip some migrations if already applied"): + for + client <- IO(solrClient()) + migrator = SchemaMigrator(client) + _ <- truncate(client) + + _ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size) + _ <- client.upsertLoop[VersionDocument, Unit](SchemaMigrator.versionDocId) { + case Some(doc) => + (Some(doc.copy(currentSchemaVersion = doc.currentSchemaVersion - 2)), ()) + case None => (None, ()) + } + _ <- migrator.migrate(migrations).assert(_.migrationsSkipped == 2) + yield ()