diff --git a/build.sbt b/build.sbt index 1798dd3b..6b27f2b5 100644 --- a/build.sbt +++ b/build.sbt @@ -342,7 +342,8 @@ lazy val searchProvision = project .settings(commonSettings) .settings( name := "search-provision", - libraryDependencies ++= Dependencies.ciris + libraryDependencies ++= Dependencies.ciris, + Test / parallelExecution := false ) .dependsOn( commons % "compile->compile;test->test", @@ -425,6 +426,7 @@ lazy val commonSettings = Seq( ), Compile / console / scalacOptions := (Compile / scalacOptions).value.filterNot(_ == "-Xfatal-warnings"), Test / console / scalacOptions := (Compile / console / scalacOptions).value, + Test / parallelExecution := false, semanticdbEnabled := true, // enable SemanticDB semanticdbVersion := scalafixSemanticdb.revision, libraryDependencies ++= diff --git a/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala b/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala index 6d4fbe21..d99164d4 100644 --- a/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala +++ b/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala @@ -48,7 +48,9 @@ final case class MessageHeader( time.toInstant, requestId.value ) - AvroWriter(Header.SCHEMA$).write(Seq(h)) + dataContentType match + case DataContentType.Binary => AvroWriter(Header.SCHEMA$).write(Seq(h)) + case DataContentType.Json => AvroWriter(Header.SCHEMA$).writeJson(Seq(h)) object MessageHeader: def create[F[_]: Sync]( @@ -59,31 +61,47 @@ object MessageHeader: ): F[MessageHeader] = Timestamp.now[F].map(ts => MessageHeader(src, ct, sv, ts, reqId)) - private def readBinaryOrJson(bv: ByteVector): Either[DecodeFailure, Seq[Header]] = + private def readBinaryOrJson( + bv: ByteVector + ): Either[DecodeFailure, (DataContentType, List[Header])] = val reader = AvroReader(Header.SCHEMA$) Either.catchNonFatal(reader.read[Header](bv)) match - case Right(r) => Right(r) + case Right(r) => Right(DataContentType.Binary -> r.distinct.toList) case Left(exb) => - Either.catchNonFatal(reader.readJson[Header](bv)).leftMap { exj => - DecodeFailure.HeaderReadError(bv, exb, exj) - } + Either + .catchNonFatal(reader.readJson[Header](bv)) + .map(r => DataContentType.Json -> r.distinct.toList) + .leftMap { exj => + DecodeFailure.HeaderReadError(bv, exb, exj) + } + + private def logWrongDataContentType( + headerCt: DataContentType, + decoded: DataContentType + ): DataContentType = + if (headerCt != decoded) { + scribe.warn( + s"ContentType ($headerCt) used for decoding the header is not same as advertised in the header ($decoded)! Choose the one used for decoding the header to continue ($headerCt)." + ) + } + headerCt def fromByteVector(bv: ByteVector): Either[DecodeFailure, MessageHeader] = readBinaryOrJson(bv) - .map(_.distinct.toList) .flatMap { - case h :: Nil => Right(h) - case Nil => Left(DecodeFailure.NoHeaderRecord(bv)) - case hs => + case (ct, h :: Nil) => Right(ct -> h) + case (_, Nil) => Left(DecodeFailure.NoHeaderRecord(bv)) + case (ct, hs) => Left(DecodeFailure.MultipleHeaderRecords(bv, NonEmptyList.fromListUnsafe(hs))) } - .flatMap { h => + .flatMap { case (headerCt, h) => for ct <- DataContentType .fromMimeType(h.dataContentType) .leftMap(err => DecodeFailure.FieldReadError("dataContentType", h.dataContentType, err) ) + ctReal = logWrongDataContentType(headerCt, ct) v <- SchemaVersion .fromString(h.schemaVersion) .leftMap(err => @@ -92,5 +110,5 @@ object MessageHeader: src = MessageSource(h.source) ts = Timestamp(h.time) rid = RequestId(h.requestId) - yield MessageHeader(src, ct, v, ts, rid) + yield MessageHeader(src, ctReal, v, ts, rid) } diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala index 02058fd6..df9b00b6 100644 --- a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala @@ -69,11 +69,11 @@ private class QueueClientImpl[F[_]: Async]( )(using d: EventMessageDecoder[T]): Stream[F, EventMessage[T]] = acquireHeaderEventsStream(queueName, chunkSize, maybeOffset).evalMap { m => d.decode(m) match - case Right(m) => - Scribe[F].trace(s"Got message: $m").as(Some(m)) + case Right(em) => + Scribe[F].trace(s"Got message: $em").as(Some(em)) case Left(err) => Scribe[F] - .warn(s"Error decoding redis message: $err") + .warn(s"Error decoding redis payload in $m: $err") .as(None) .flatTap(_ => markProcessed(queueName, MessageId(m.id.value))) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala index 337f1340..bf2c7d37 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/ProvisioningSuite.scala @@ -18,6 +18,8 @@ package io.renku.search.provision +import scala.concurrent.duration.Duration + import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream @@ -33,6 +35,7 @@ import io.renku.search.solr.query.SolrToken import io.renku.solr.client.{QueryData, QueryString} trait ProvisioningSuite extends SearchSolrSuite with QueueSpec: + override val munitTimeout = Duration(1, "min") val queueConfig: QueuesConfig = QueuesConfig( projectCreated = QueueName("projectCreated"), diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala index 80515aa6..7c4d1c4b 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/group/GroupMemberUpdatedSpec.scala @@ -40,6 +40,8 @@ class GroupMemberUpdatedSpec extends ProvisioningSuite: override def munitFixtures: Seq[Fixture[?]] = List(withRedisClient, withQueueClient, withSearchSolrClient) + override def defaultVerbosity: Int = 2 + test("updating member to group and related projects"): withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) => val initialState = DbState.groupWithProjectsGen.generateOne diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala index 0a70659d..69ec628e 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrSuite.scala @@ -18,7 +18,8 @@ package io.renku.search.solr.client -import cats.effect.{IO, Resource} +import cats.effect.* +import cats.effect.std.CountDownLatch import io.renku.search.solr.schema.Migrations import io.renku.solr.client.SolrClient @@ -34,7 +35,7 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite: def apply(): Resource[IO, SearchSolrClient[IO]] = SolrClient[IO](solrConfig.copy(core = server.searchCoreName)) - .evalTap(SchemaMigrator[IO](_).migrate(Migrations.all).attempt.void) + .evalTap(SearchSolrSuite.setupSchema(server.searchCoreName, _)) .map(new SearchSolrClientImpl[IO](_)) override def beforeAll(): Unit = @@ -45,3 +46,30 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite: override def munitFixtures: Seq[Fixture[?]] = List(withSearchSolrClient) + +object SearchSolrSuite: + private val logger = scribe.cats.io + private case class MigrateState(tasks: Map[String, IO[Unit]] = Map.empty): + def add(name: String, task: IO[Unit]): MigrateState = copy(tasks.updated(name, task)) + private val currentState: Ref[IO, MigrateState] = + Ref.unsafe(MigrateState()) + + private def setupSchema(coreName: String, client: SolrClient[IO]): IO[Unit] = + CountDownLatch[IO](1).flatMap { latch => + currentState.flatModify { state => + state.tasks.get(coreName) match + case Some(t) => + ( + state, + logger + .info(s"Waiting for migrations to finish for core $coreName") + .flatMap(_ => t) + ) + case None => + val task = SchemaMigrator[IO](client) + .migrate(Migrations.all) + .flatTap(_ => latch.release) + val wait = latch.await + (state.add(coreName, wait), task) + } + } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala index 35158890..fbd1cca6 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala @@ -29,6 +29,7 @@ import org.http4s.ember.client.EmberClientBuilder import org.http4s.ember.client.EmberClientBuilder.default trait SolrClient[F[_]]: + def config: SolrConfig def modifySchema( cmds: Seq[SchemaCommand], onErrorLog: ResponseLogging = ResponseLogging.Error diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala index 6342d568..3d2e5620 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala @@ -30,7 +30,7 @@ import org.http4s.Status import org.http4s.client.Client import org.http4s.{BasicCredentials, Method, Uri} -private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client[F]) +private class SolrClientImpl[F[_]: Async](val config: SolrConfig, underlying: Client[F]) extends SolrClient[F] with HttpClientDsl[F] with SchemaJsonCodec diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala index 1595a254..6c636c26 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/migration/SchemaMigrator.scala @@ -54,18 +54,24 @@ object SchemaMigrator: override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for { current <- currentVersion - _ <- logger.info(s"Found current schema version '$current' using id $versionDocId") + _ <- logger.info( + s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId" + ) _ <- current.fold(initVersionDocument)(_ => ().pure[F]) remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version)) - _ <- logger.info(s"There are ${remain.size} migrations to run") + _ <- logger.info( + s"core ${client.config.core}: There are ${remain.size} migrations to run" + ) _ <- remain.traverse_(m => - logger.info(s"Run migration ${m.version}") >> + logger.info(s"core ${client.config.core}: Run migration ${m.version}") >> client.modifySchema(m.commands) >> upsertVersion(m.version) ) } yield () private def initVersionDocument: F[Unit] = - logger.info("Initialize schema migration version document") >> + logger.info( + s"core ${client.config.core}: Initialize schema migration version document" + ) >> client.modifySchema( Seq( SchemaCommand.Add( @@ -80,6 +86,6 @@ object SchemaMigrator: private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n) private def upsertVersion(n: Long) = - logger.info(s"Set schema migration version to $n") >> + logger.info(s"core ${client.config.core}: Set schema migration version to $n") >> client.upsert(Seq(version(n))) }