Skip to content

Commit

Permalink
Merge pull request #199 from SwissDataScienceCenter/improve-schema-mi…
Browse files Browse the repository at this point in the history
…gration

- Add a lock implementation based on SOLRs optimstic locking
- Refactor re-indexing to use the solr based lock
- Protect schema migrations from running in parallel
  • Loading branch information
eikek authored Sep 6, 2024
2 parents a7a3f15 + 89a1c36 commit 21f7748
Show file tree
Hide file tree
Showing 14 changed files with 666 additions and 70 deletions.
58 changes: 58 additions & 0 deletions modules/commons/src/test/scala/io/renku/search/Threading.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference

import scala.util.Try

import cats.effect.*

import munit.*

trait Threading:
self: CatsEffectSuite =>

// using "real" threads to have higher chance of parallelism

def runParallel[A](block: => IO[A], blockn: IO[A]*): IO[List[A]] =
IO.blocking {
val code = block +: blockn
val latch = CountDownLatch(1)
val result = new AtomicReference[List[A]](Nil)
val done = new CountDownLatch(code.size)
code.foreach { ioa =>
val t = new Thread(new Runnable {
def run() =
latch.await()
val ta = Try(ioa.unsafeRunSync())
ta.fold(_ => (), a => result.updateAndGet(list => a :: list))
done.countDown()
ta.fold(throw _, _ => ())
()
})
t.setDaemon(true)
t.start()
}
latch.countDown()
done.await(munitIOTimeout.toMillis, TimeUnit.MILLISECONDS)
result.get()
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,34 @@ package io.renku.search.provision.reindex

import java.time.Instant

import cats.Functor
import cats.effect.*
import cats.syntax.all.*

import io.bullet.borer.Decoder
import io.bullet.borer.Encoder
import io.bullet.borer.*
import io.bullet.borer.NullOptions.*
import io.bullet.borer.derivation.{MapBasedCodecs, key}
import io.renku.json.codecs.all.given
import io.renku.search.events.MessageId
import io.renku.search.model.Id
import io.renku.solr.client.DocVersion
import io.renku.solr.client.util.LockDocument

final private case class ReIndexDocument(
id: Id,
created: Instant,
messageId: Option[MessageId],
@key("created_dt") created: Instant,
@key("message_id_s") messageId: Option[MessageId] = None,
@key("_version_") version: DocVersion
)

private object ReIndexDocument:
private val docId: Id = Id("reindex_31baded5-9fc2-4935-9b07-80f7a3ecb13f")

def createNew[F[_]: Clock: Functor](messageId: Option[MessageId]): F[ReIndexDocument] =
Clock[F].realTimeInstant.map { now =>
ReIndexDocument(docId, now, messageId, DocVersion.NotExists)
}

given Encoder[ReIndexDocument] = MapBasedCodecs.deriveEncoder
given Decoder[ReIndexDocument] = MapBasedCodecs.deriveDecoder

def lockDocument[F[_]: Sync: Clock](
messageId: Option[MessageId]
): LockDocument[F, ReIndexDocument] =
LockDocument.whenExists(id =>
Clock[F].realTimeInstant.map { now =>
ReIndexDocument(Id(id), now, messageId, DocVersion.NotExists)
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import io.renku.search.events.MessageId
import io.renku.search.provision.BackgroundProcessManage
import io.renku.search.provision.MessageHandlers.MessageHandlerKey
import io.renku.search.solr.client.SearchSolrClient
import io.renku.solr.client.util.*

trait ReIndexService[F[_]]:
/** Stops background processes handling redis messages, drop the index and then restarts
Expand All @@ -48,6 +49,7 @@ trait ReIndexService[F[_]]:
def resetData(startMessage: Option[MessageId]): F[Unit]

object ReIndexService:
private[reindex] val lockId: String = "reindex_31baded5-9fc2-4935-9b07-80f7a3ecb13f"

def apply[F[_]: Clock: Sync](
bpm: BackgroundProcessManage[F],
Expand All @@ -60,15 +62,14 @@ object ReIndexService:
private val logger = scribe.cats.effect[F]

def startReIndex(startMessage: Option[MessageId]): F[Boolean] =
for
syncDoc <- ReIndexDocument.createNew[F](startMessage)
upsertResp <- solrClient.upsert(Seq(syncDoc))
_ <- logger.debug(s"Insert reindex sync document: $upsertResp")
res <-
if (upsertResp.isFailure)
logger.debug(s"Re-Index called while already in progress").as(false)
else dropIndexAndRestart(syncDoc, startMessage)
yield res
given LockDocument[F, ReIndexDocument] =
ReIndexDocument.lockDocument(startMessage)
val lock = solrClient.underlying.lockBy[ReIndexDocument](lockId)
lock.use {
case None =>
logger.debug(s"Re-Index called while already in progress").as(false)
case Some(d) => dropIndexAndRestart(d, startMessage)
}

def resetData(startMessage: Option[MessageId]): F[Unit] =
for
Expand Down Expand Up @@ -114,6 +115,5 @@ object ReIndexService:
_ <- resetData(startMessage)
_ <- logger.info("Start background processes")
_ <- bpm.background(MessageHandlerKey.isInstance)
_ <- solrClient.deleteIds(NonEmptyList.of(syncDoc.id))
yield true
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ import io.renku.search.provision.ProvisioningSuite
import io.renku.search.provision.TestServices
import io.renku.search.solr.documents.{EntityDocument, Project as ProjectDocument}
import io.renku.search.solr.schema.EntityDocumentSchema
import io.renku.solr.client.DocVersion
import io.renku.solr.client.QueryData
import io.renku.solr.client.QueryString
import io.renku.solr.client.SolrSort
import io.renku.solr.client.*
import org.scalacheck.Gen

class ReIndexServiceSpec extends ProvisioningSuite:
Expand Down Expand Up @@ -113,7 +110,8 @@ class ReIndexServiceSpec extends ProvisioningSuite:
for
services <- IO(testServices())

doc <- ReIndexDocument.createNew[IO](None)
lockDoc = ReIndexDocument.lockDocument[IO](None)
doc <- lockDoc.acquire(None, ReIndexService.lockId)
_ <- services.searchClient.upsertSuccess(Seq(doc))

r <- services.reindex.startReIndex(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.renku.search.solr.documents.*
import io.renku.solr.client.*

trait SearchSolrClient[F[_]]:
private[solr] def underlying: SolrClient[F]
def underlying: SolrClient[F]
def findById[D: Decoder](id: CompoundId): F[Option[D]]
def upsert[D: Encoder](documents: Seq[D]): F[UpsertResponse]
def upsertSuccess[D: Encoder](documents: Seq[D]): F[Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F])
EntityDocumentSchema.Fields.entityType,
EntityDocumentSchema.Fields.entityType
)
private[solr] val underlying: SolrClient[F] = solrClient
val underlying: SolrClient[F] = solrClient

override def upsert[D: Encoder](documents: Seq[D]): F[UpsertResponse] =
solrClient.upsert(documents)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package io.renku.solr.client

import cats.MonadThrow
import cats.data.NonEmptyList
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.bullet.borer.{Decoder, Encoder}
import io.renku.search.http.{ClientBuilder, ResponseLogging, RetryConfig}
import io.renku.solr.client.schema.SchemaCommand
import io.renku.solr.client.util.{DocumentLockResource, LockDocument}
import org.http4s.ember.client.EmberClientBuilder

trait SolrClient[F[_]]:
Expand Down Expand Up @@ -52,6 +54,29 @@ trait SolrClient[F[_]]:
def createCore(name: String, configSet: Option[String] = None): F[Unit]
def deleteCore(name: String): F[Unit]

/** Returns a `Resource` that yields `true` if a lock for `id` could be obtained. It
* yields `false` if the lock `id` is already held.
*
* It uses a solr document of the given `id`.
*/
def lockOn(id: String)(using MonadThrow[F]): Resource[F, Boolean] =
DocumentLockResource.create[F](this)(id)

/** Returns a `Resource` that yields a `Some` if the lock represented by `A` could be
* obtained and `None` if not.
*
* The lock is represented by a solr document `A`. The `acquire` function either
* returns a new document in "acquired" state or sets the acquired state should the
* document already exist. Analogous, `release` puts the document back into free state
* or return `None` to remove the document from SOLR. The function `isFree` is used to
* determine the state if a document already exists with that id. If it doesn't exist,
* the lock is free to obtain.
*/
def lockBy[A](
id: String
)(using MonadThrow[F], LockDocument[F, A]): Resource[F, Option[A]] =
DocumentLockResource[F, A](this).make(id)

object SolrClient:
def apply[F[_]: Async: Network](config: SolrConfig): Resource[F, SolrClient[F]] =
ClientBuilder(EmberClientBuilder.default[F])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import cats.effect.{Async, Sync}
import cats.syntax.all.*
import fs2.io.net.Network

import io.renku.solr.client.schema.*
import io.renku.solr.client.{QueryString, SolrClient, SolrConfig}
import io.renku.solr.client.*
import io.renku.solr.client.util.DocumentLockResource

trait SchemaMigrator[F[_]] {

Expand All @@ -34,6 +34,7 @@ trait SchemaMigrator[F[_]] {
}

object SchemaMigrator:
private[migration] val versionDocId = "VERSION_ID_EB779C6B-1D96-47CB-B304-BECF15E4A607"

def apply[F[_]: Sync](client: SolrClient[F]): SchemaMigrator[F] = Impl[F](client)

Expand All @@ -44,55 +45,85 @@ object SchemaMigrator:

private class Impl[F[_]: Sync](client: SolrClient[F]) extends SchemaMigrator[F] {
private val logger = scribe.cats.effect[F]
private val versionDocId = "VERSION_ID_EB779C6B-1D96-47CB-B304-BECF15E4A607"
private val versionTypeName: TypeName = TypeName("plong")
private val migrateLock = DocumentLockResource[F, VersionDocument](client)

override def currentVersion: F[Option[Long]] =
client
.query[VersionDocument](QueryString(s"id:$versionDocId"))
.map(_.responseBody.docs.headOption.map(_.currentSchemaVersion))
getVersionDoc.map(_.map(_.currentSchemaVersion))

override def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult] = for {
current <- currentVersion
private def getVersionDoc: F[Option[VersionDocument]] =
client
.findById[VersionDocument](versionDocId)
.map(_.responseBody.docs.headOption)

def migrate(migrations: Seq[SchemaMigration]): F[MigrateResult] =
convertVersionDocument >>
migrateLock.make(versionDocId).use {
case None =>
logger.info("A migration is already running").as(MigrateResult.empty)
case Some(doc) => doMigrate(migrations, doc)
}

def doMigrate(
migrations: Seq[SchemaMigration],
initial: VersionDocument
): F[MigrateResult] = for {
_ <- logger.info(
s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId"
s"core ${client.config.core}: Found current schema version '${initial.currentSchemaVersion}' using id $versionDocId"
)
_ <- current.fold(initVersionDocument)(_ => ().pure[F])
remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version))
remain = migrations
.sortBy(_.version)
.dropWhile(m => m.version <= initial.currentSchemaVersion)
_ <- logger.info(
s"core ${client.config.core}: There are ${remain.size} migrations to run"
)
_ <- remain.traverse_(m =>
logger.info(s"core ${client.config.core}: Run migration ${m.version}") >>
client.modifySchema(m.commands) >> upsertVersion(m.version)
)

finalDoc <- remain.foldLeftM(initial) { (doc, m) =>
client.modifySchema(m.commands) >> upsertVersion(doc, m.version)
}

result = MigrateResult(
startVersion = current,
startVersion = Option(initial.currentSchemaVersion).filter(_ > Long.MinValue),
endVersion = remain.map(_.version).maxOption,
migrationsRun = remain.size,
reindexRequired = remain.exists(_.requiresReIndex)
)
} yield result

private def initVersionDocument: F[Unit] =
logger.info(
s"core ${client.config.core}: Initialize schema migration version document"
) >>
client.modifySchema(
Seq(
SchemaCommand.Add(
Field(
FieldName("currentSchemaVersion"),
versionTypeName
)
)
private def requireVersionDoc =
getVersionDoc.flatMap {
case None =>
Sync[F].raiseError(
new Exception("No version document available during migration!")
)
)

private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n)
case Some(d) => d.pure[F]
}

private def upsertVersion(n: Long) =
logger.info(s"core ${client.config.core}: Set schema migration version to $n") >>
client.upsert(Seq(version(n)))
private def upsertVersion(currentDoc: VersionDocument, nextVersion: Long) =
for
_ <- logger.info(
s"core ${client.config.core}: Set schema migration version to $nextVersion"
)
_ <- client.upsertSuccess(
Seq(currentDoc.copy(currentSchemaVersion = nextVersion))
)
doc <- requireVersionDoc
yield doc

private def convertVersionDocument =
val task = client.lockOn("fc72c840-67a8-4a42-8ce1-f9baa409ea84").use {
case true =>
client
.findById[VersionDocument.HistoricDocument1](versionDocId)
.map(_.responseBody.docs.headOption)
.map(_.flatMap(_.toCurrent))
.flatMap {
case None => ().pure[F]
case Some(d) =>
logger.info(s"Converting old version document $d") >> client
.upsertSuccess(Seq(d))
}
.as(true)
case false => false.pure[F]
}
fs2.Stream.repeatEval(task).takeWhile(!_).compile.drain
}
Loading

0 comments on commit 21f7748

Please sign in to comment.