Skip to content

Commit

Permalink
Schema migration run only if not applied (#207)
Browse files Browse the repository at this point in the history
The solr schema can be fetched and it is used to test each command in
the schema migration if it had already been applied. If not, the
command is run and otherwise it is ignored.

With this change it is possible to drop solr entirely, including
schema information. On restart, the schema will be fixed by running
those changes that haven't been applied.
  • Loading branch information
eikek authored Sep 20, 2024
1 parent a9fd1a5 commit 73d4876
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ object Microservice extends IOApp:
for {
_ <- IO(LoggingSetup.doConfigure(services.config.verbosity))
migrateResult <- runSolrMigrations(services.config)
_ <- IO.whenA(migrateResult.migrationsSkipped > 0)(
logger
.warn(s"There were ${migrateResult.migrationsSkipped} skipped migrations!")
)
// this is only safe for a single provisioning service
_ <- services.resetLockDocuments
registryBuilder = CollectorRegistryBuilder[IO].withJVMMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ final case class MigrateResult(
startVersion: Option[Long],
endVersion: Option[Long],
migrationsRun: Long,
migrationsSkipped: Long,
reindexRequired: Boolean
)

object MigrateResult:
val empty: MigrateResult =
MigrateResult(None, None, 0L, false)
MigrateResult(None, None, 0L, 0L, false)
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package io.renku.solr.client.migration

import io.renku.solr.client.schema.SchemaCommand
import io.renku.solr.client.schema.*

final case class SchemaMigration(
version: Long,
Expand All @@ -29,6 +29,31 @@ final case class SchemaMigration(
def withRequiresReIndex: SchemaMigration =
copy(requiresReIndex = true)

def alignWith(schema: CoreSchema): SchemaMigration =
copy(commands = commands.filterNot(SchemaMigration.isApplied(schema)))

object SchemaMigration:
def apply(version: Long, cmd: SchemaCommand, more: SchemaCommand*): SchemaMigration =
SchemaMigration(version, cmd +: more)

def isApplied(schema: CoreSchema)(cmd: SchemaCommand): Boolean = cmd match
case SchemaCommand.Add(ft: FieldType) =>
schema.fieldTypes.exists(_.name == ft.name)
case SchemaCommand.Add(f: Field) =>
schema.fields.exists(_.name == f.name)
case SchemaCommand.Add(r: DynamicFieldRule) =>
schema.dynamicFields.exists(_.name == r.name)
case SchemaCommand.Add(r: CopyFieldRule) =>
schema.copyFields.exists(cf => cf.source == r.source && cf.dest == r.dest)
case SchemaCommand.DeleteField(name) =>
schema.fields.forall(_.name != name)
case SchemaCommand.DeleteType(name) =>
schema.fieldTypes.forall(_.name != name)
case SchemaCommand.DeleteDynamicField(name) =>
schema.dynamicFields.forall(_.name != name)
case SchemaCommand.Replace(ft: FieldType) =>
schema.fieldTypes.exists(_ == ft)
case SchemaCommand.Replace(f: Field) =>
schema.fields.exists(_ == f)
case SchemaCommand.Replace(r: DynamicFieldRule) =>
schema.dynamicFields.exists(_ == r)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import cats.syntax.all.*
import fs2.io.net.Network

import io.renku.solr.client.*
import io.renku.solr.client.schema.CoreSchema
import io.renku.solr.client.util.DocumentLockResource

trait SchemaMigrator[F[_]] {
Expand All @@ -43,6 +44,17 @@ object SchemaMigrator:
): Resource[F, SchemaMigrator[F]] =
SolrClient[F](solrConfig).map(apply[F])

final private case class MigrationState(
schema: CoreSchema,
doc: VersionDocument,
skippedMigrations: Int = 0
) {
def withDocument(d: VersionDocument): MigrationState =
copy(doc = d)

def incSkippedMigration = copy(skippedMigrations = skippedMigrations + 1)
}

private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] {
private val logger = scribe.cats.effect[F]
private val migrateLock = DocumentLockResource[F, VersionDocument](client)
Expand All @@ -65,30 +77,41 @@ object SchemaMigrator:

def doMigrate(
migrations: Seq[SchemaMigration],
initial: VersionDocument
initialDoc: VersionDocument
): F[MigrateResult] = for {
_ <- logger.info(
s"core ${client.config.core}: Found current schema version '${initial.currentSchemaVersion}' using id $versionDocId"
s"core ${client.config.core}: Found current schema version '${initialDoc.currentSchemaVersion}' using id $versionDocId"
)
remain = migrations
.sortBy(_.version)
.dropWhile(m => m.version <= initial.currentSchemaVersion)
.dropWhile(m => m.version <= initialDoc.currentSchemaVersion)
_ <- logger.info(
s"core ${client.config.core}: There are ${remain.size} migrations to run"
)

finalDoc <- remain.foldLeftM(initial) { (doc, m) =>
client.modifySchema(m.commands) >> upsertVersion(doc, m.version)
}
initial <- client.getSchema.map(_.schema).map(MigrationState(_, initialDoc))
finalState <- remain.foldLeftM(initial)(applyMigration)

result = MigrateResult(
startVersion = Option(initial.currentSchemaVersion).filter(_ > Long.MinValue),
startVersion = Option(initialDoc.currentSchemaVersion).filter(_ > Long.MinValue),
endVersion = remain.map(_.version).maxOption,
migrationsRun = remain.size,
migrationsSkipped = finalState.skippedMigrations.toLong,
reindexRequired = remain.exists(_.requiresReIndex)
)
} yield result

def applyMigration(state: MigrationState, m: SchemaMigration): F[MigrationState] =
val cmds = m.alignWith(state.schema).commands
if (cmds.isEmpty)
logger.info(s"Migration ${m.version} seems already applied. Skipping it.") >>
upsertVersion(state.doc, m.version).map(state.incSkippedMigration.withDocument)
else
client.modifySchema(cmds) >> (
client.getSchema.map(_.schema),
upsertVersion(state.doc, m.version)
).mapN(MigrationState(_, _))

private def requireVersionDoc =
getVersionDoc.flatMap {
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.renku.solr.client.migration

import cats.data.NonEmptyList
import cats.effect.IO

import io.renku.solr.client.DocVersion
Expand Down Expand Up @@ -63,7 +64,7 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
res <- migrator.migrate(migrations)
c <- migrator.currentVersion
_ = assertEquals(c, Some(-1L))
_ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, false))
_ = assertEquals(res, MigrateResult(None, Some(-1L), migrations.size, 0, false))
} yield ()
}

Expand All @@ -77,12 +78,12 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
res0 <- migrator.migrate(first)
v0 <- migrator.currentVersion
_ = assertEquals(v0, Some(-4L))
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, false))
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, false))

res1 <- migrator.migrate(migrations)
v1 <- migrator.currentVersion
_ = assertEquals(v1, Some(-1L))
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false))
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false))
} yield ()

test("no require-reindex if migrations have been applied already"):
Expand All @@ -94,10 +95,10 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
_ <- truncate(client)

res0 <- migrator.migrate(first)
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, true))
_ = assertEquals(res0, MigrateResult(None, Some(-4L), 2, 0, true))

res1 <- migrator.migrate(migs)
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, false))
_ = assertEquals(res1, MigrateResult(Some(-4L), Some(-1L), 3, 0, false))
yield ()

test("convert previous version document to current, then migrate remaining"):
Expand Down Expand Up @@ -170,3 +171,29 @@ class SolrMigratorSpec extends CatsEffectSuite with SolrClientBaseSuite:
res <- migrator.migrate(migrations)
_ = assertEquals(res.migrationsRun, 0L)
yield ()

test("skip all migrations if already applied"):
for
client <- IO(solrClient())
migrator = SchemaMigrator(client)
_ <- truncate(client)

_ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size)
_ <- client.deleteIds(NonEmptyList.of(SchemaMigrator.versionDocId))
_ <- migrator.migrate(migrations).assert(_.migrationsSkipped == migrations.size)
yield ()

test("skip some migrations if already applied"):
for
client <- IO(solrClient())
migrator = SchemaMigrator(client)
_ <- truncate(client)

_ <- migrator.migrate(migrations).assert(_.migrationsRun == migrations.size)
_ <- client.upsertLoop[VersionDocument, Unit](SchemaMigrator.versionDocId) {
case Some(doc) =>
(Some(doc.copy(currentSchemaVersion = doc.currentSchemaVersion - 2)), ())
case None => (None, ())
}
_ <- migrator.migrate(migrations).assert(_.migrationsSkipped == 2)
yield ()

0 comments on commit 73d4876

Please sign in to comment.