From f9f1927defbce1d02dd08d894a4ff48714f94142 Mon Sep 17 00:00:00 2001 From: Eike Kettner Date: Tue, 14 May 2024 10:44:17 +0200 Subject: [PATCH] Lock on solr migrations in tests, more logging on solr migrations --- build.sbt | 4 ++- .../search/provision/ProvisioningSuite.scala | 3 ++ .../group/GroupMemberUpdatedSpec.scala | 2 ++ .../search/solr/client/SearchSolrSuite.scala | 32 +++++++++++++++++-- .../io/renku/solr/client/SolrClient.scala | 1 + .../io/renku/solr/client/SolrClientImpl.scala | 2 +- .../client/migration/SchemaMigrator.scala | 16 +++++++--- 7 files changed, 51 insertions(+), 9 deletions(-) diff --git a/build.sbt b/build.sbt index 1798dd3b..6b27f2b5 100644 --- a/build.sbt +++ b/build.sbt @@ -342,7 +342,8 @@ lazy val searchProvision = project .settings(commonSettings) .settings( name := "search-provision", - libraryDependencies ++= Dependencies.ciris + libraryDependencies ++= Dependencies.ciris, + Test / parallelExecution := false ) .dependsOn( commons % "compile->compile;test->test", @@ -425,6 +426,7 @@ lazy val commonSettings = Seq( ), Compile / console / scalacOptions := (Compile / scalacOptions).value.filterNot(_ == "-Xfatal-warnings"), Test / console / scalacOptions := (Compile / console / scalacOptions).value, + Test / parallelExecution := false, semanticdbEnabled := true, // enable SemanticDB semanticdbVersion := scalafixSemanticdb.revision, libraryDependencies ++= diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala index 337f1340..bf2c7d37 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala @@ -18,6 +18,8 @@ package io.renku.search.provision +import scala.concurrent.duration.Duration + import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream @@ -33,6 +35,7 @@ import io.renku.search.solr.query.SolrToken import io.renku.solr.client.{QueryData, QueryString} trait ProvisioningSuite extends SearchSolrSuite with QueueSpec: + override val munitTimeout = Duration(1, "min") val queueConfig: QueuesConfig = QueuesConfig( projectCreated = QueueName("projectCreated"), diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala index 80515aa6..7c4d1c4b 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala @@ -40,6 +40,8 @@ class GroupMemberUpdatedSpec extends ProvisioningSuite: override def munitFixtures: Seq[Fixture[?]] = List(withRedisClient, withQueueClient, withSearchSolrClient) + override def defaultVerbosity: Int = 2 + test("updating member to group and related projects"): withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) => val initialState = DbState.groupWithProjectsGen.generateOne diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala index 0a70659d..69ec628e 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala @@ -18,7 +18,8 @@ package io.renku.search.solr.client -import cats.effect.{IO, Resource} +import cats.effect.* +import cats.effect.std.CountDownLatch import io.renku.search.solr.schema.Migrations import io.renku.solr.client.SolrClient @@ -34,7 +35,7 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite: def apply(): Resource[IO, SearchSolrClient[IO]] = SolrClient[IO](solrConfig.copy(core = server.searchCoreName)) - .evalTap(SchemaMigrator[IO](_).migrate(Migrations.all).attempt.void) + .evalTap(SearchSolrSuite.setupSchema(server.searchCoreName, _)) .map(new SearchSolrClientImpl[IO](_)) override def beforeAll(): Unit = @@ -45,3 +46,30 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite: override def munitFixtures: Seq[Fixture[?]] = List(withSearchSolrClient) + +object SearchSolrSuite: + private val logger = scribe.cats.io + private case class MigrateState(tasks: Map[String, IO[Unit]] = Map.empty): + def add(name: String, task: IO[Unit]): MigrateState = copy(tasks.updated(name, task)) + private val currentState: Ref[IO, MigrateState] = + Ref.unsafe(MigrateState()) + + private def setupSchema(coreName: String, client: SolrClient[IO]): IO[Unit] = + CountDownLatch[IO](1).flatMap { latch => + currentState.flatModify { state => + state.tasks.get(coreName) match + case Some(t) => + ( + state, + logger + .info(s"Waiting for migrations to finish for core $coreName") + .flatMap(_ => t) + ) + case None => + val task = SchemaMigrator[IO](client) + .migrate(Migrations.all) + .flatTap(_ => latch.release) + val wait = latch.await + (state.add(coreName, wait), task) + } + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala index 35158890..fbd1cca6 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala @@ -29,6 +29,7 @@ import org.http4s.ember.client.EmberClientBuilder import org.http4s.ember.client.EmberClientBuilder.default trait SolrClient[F[_]]: + def config: SolrConfig def modifySchema( cmds: Seq[SchemaCommand], onErrorLog: ResponseLogging = ResponseLogging.Error diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala index 6342d568..3d2e5620 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala @@ -30,7 +30,7 @@ import org.http4s.Status import org.http4s.client.Client import org.http4s.{BasicCredentials, Method, Uri} -private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client[F]) +private class SolrClientImpl[F[_]: Async](val config: SolrConfig, underlying: Client[F]) extends SolrClient[F] with HttpClientDsl[F] with SchemaJsonCodec diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 1595a254..6c636c26 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -54,18 +54,24 @@ object SchemaMigrator: override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for { current <- currentVersion - _ <- logger.info(s"Found current schema version '$current' using id $versionDocId") + _ <- logger.info( + s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId" + ) _ <- current.fold(initVersionDocument)(_ => ().pure[F]) remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version)) - _ <- logger.info(s"There are ${remain.size} migrations to run") + _ <- logger.info( + s"core ${client.config.core}: There are ${remain.size} migrations to run" + ) _ <- remain.traverse_(m => - logger.info(s"Run migration ${m.version}") >> + logger.info(s"core ${client.config.core}: Run migration ${m.version}") >> client.modifySchema(m.commands) >> upsertVersion(m.version) ) } yield () private def initVersionDocument: F[Unit] = - logger.info("Initialize schema migration version document") >> + logger.info( + s"core ${client.config.core}: Initialize schema migration version document" + ) >> client.modifySchema( Seq( SchemaCommand.Add( @@ -80,6 +86,6 @@ object SchemaMigrator: private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n) private def upsertVersion(n: Long) = - logger.info(s"Set schema migration version to $n") >> + logger.info(s"core ${client.config.core}: Set schema migration version to $n") >> client.upsert(Seq(version(n))) }