Skip to content

Commit

Permalink
Lock on solr migrations in tests, more logging on solr migrations
Browse files Browse the repository at this point in the history
  • Loading branch information
eikek committed May 14, 2024
1 parent fe48442 commit f9f1927
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 9 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ lazy val searchProvision = project
.settings(commonSettings)
.settings(
name := "search-provision",
libraryDependencies ++= Dependencies.ciris
libraryDependencies ++= Dependencies.ciris,
Test / parallelExecution := false
)
.dependsOn(
commons % "compile->compile;test->test",
Expand Down Expand Up @@ -425,6 +426,7 @@ lazy val commonSettings = Seq(
),
Compile / console / scalacOptions := (Compile / scalacOptions).value.filterNot(_ == "-Xfatal-warnings"),
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
Test / parallelExecution := false,
semanticdbEnabled := true, // enable SemanticDB
semanticdbVersion := scalafixSemanticdb.revision,
libraryDependencies ++=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.renku.search.provision

import scala.concurrent.duration.Duration

import cats.effect.{IO, Resource}
import cats.syntax.all.*
import fs2.Stream
Expand All @@ -33,6 +35,7 @@ import io.renku.search.solr.query.SolrToken
import io.renku.solr.client.{QueryData, QueryString}

trait ProvisioningSuite extends SearchSolrSuite with QueueSpec:
override val munitTimeout = Duration(1, "min")

val queueConfig: QueuesConfig = QueuesConfig(
projectCreated = QueueName("projectCreated"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GroupMemberUpdatedSpec extends ProvisioningSuite:
override def munitFixtures: Seq[Fixture[?]] =
List(withRedisClient, withQueueClient, withSearchSolrClient)

override def defaultVerbosity: Int = 2

test("updating member to group and related projects"):
withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) =>
val initialState = DbState.groupWithProjectsGen.generateOne
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package io.renku.search.solr.client

import cats.effect.{IO, Resource}
import cats.effect.*
import cats.effect.std.CountDownLatch

import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.SolrClient
Expand All @@ -34,7 +35,7 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite:

def apply(): Resource[IO, SearchSolrClient[IO]] =
SolrClient[IO](solrConfig.copy(core = server.searchCoreName))
.evalTap(SchemaMigrator[IO](_).migrate(Migrations.all).attempt.void)
.evalTap(SearchSolrSuite.setupSchema(server.searchCoreName, _))
.map(new SearchSolrClientImpl[IO](_))

override def beforeAll(): Unit =
Expand All @@ -45,3 +46,30 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite:

override def munitFixtures: Seq[Fixture[?]] =
List(withSearchSolrClient)

object SearchSolrSuite:
private val logger = scribe.cats.io
private case class MigrateState(tasks: Map[String, IO[Unit]] = Map.empty):
def add(name: String, task: IO[Unit]): MigrateState = copy(tasks.updated(name, task))
private val currentState: Ref[IO, MigrateState] =
Ref.unsafe(MigrateState())

private def setupSchema(coreName: String, client: SolrClient[IO]): IO[Unit] =
CountDownLatch[IO](1).flatMap { latch =>
currentState.flatModify { state =>
state.tasks.get(coreName) match
case Some(t) =>
(
state,
logger
.info(s"Waiting for migrations to finish for core $coreName")
.flatMap(_ => t)
)
case None =>
val task = SchemaMigrator[IO](client)
.migrate(Migrations.all)
.flatTap(_ => latch.release)
val wait = latch.await
(state.add(coreName, wait), task)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.http4s.ember.client.EmberClientBuilder
import org.http4s.ember.client.EmberClientBuilder.default

trait SolrClient[F[_]]:
def config: SolrConfig
def modifySchema(
cmds: Seq[SchemaCommand],
onErrorLog: ResponseLogging = ResponseLogging.Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.http4s.Status
import org.http4s.client.Client
import org.http4s.{BasicCredentials, Method, Uri}

private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client[F])
private class SolrClientImpl[F[_]: Async](val config: SolrConfig, underlying: Client[F])
extends SolrClient[F]
with HttpClientDsl[F]
with SchemaJsonCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,24 @@ object SchemaMigrator:

override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for {
current <- currentVersion
_ <- logger.info(s"Found current schema version '$current' using id $versionDocId")
_ <- logger.info(
s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId"
)
_ <- current.fold(initVersionDocument)(_ => ().pure[F])
remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version))
_ <- logger.info(s"There are ${remain.size} migrations to run")
_ <- logger.info(
s"core ${client.config.core}: There are ${remain.size} migrations to run"
)
_ <- remain.traverse_(m =>
logger.info(s"Run migration ${m.version}") >>
logger.info(s"core ${client.config.core}: Run migration ${m.version}") >>
client.modifySchema(m.commands) >> upsertVersion(m.version)
)
} yield ()

private def initVersionDocument: F[Unit] =
logger.info("Initialize schema migration version document") >>
logger.info(
s"core ${client.config.core}: Initialize schema migration version document"
) >>
client.modifySchema(
Seq(
SchemaCommand.Add(
Expand All @@ -80,6 +86,6 @@ object SchemaMigrator:
private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n)

private def upsertVersion(n: Long) =
logger.info(s"Set schema migration version to $n") >>
logger.info(s"core ${client.config.core}: Set schema migration version to $n") >>
client.upsert(Seq(version(n)))
}

0 comments on commit f9f1927

Please sign in to comment.