Skip to content

Commit

Permalink
Merge pull request #116 from SwissDataScienceCenter/log-bad-content-type
Browse files Browse the repository at this point in the history
Attempt to fix content-type for redis messages
  • Loading branch information
eikek authored May 14, 2024
2 parents bf570f6 + f9f1927 commit 2054926
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 24 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ lazy val searchProvision = project
.settings(commonSettings)
.settings(
name := "search-provision",
libraryDependencies ++= Dependencies.ciris
libraryDependencies ++= Dependencies.ciris,
Test / parallelExecution := false
)
.dependsOn(
commons % "compile->compile;test->test",
Expand Down Expand Up @@ -425,6 +426,7 @@ lazy val commonSettings = Seq(
),
Compile / console / scalacOptions := (Compile / scalacOptions).value.filterNot(_ == "-Xfatal-warnings"),
Test / console / scalacOptions := (Compile / console / scalacOptions).value,
Test / parallelExecution := false,
semanticdbEnabled := true, // enable SemanticDB
semanticdbVersion := scalafixSemanticdb.revision,
libraryDependencies ++=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ final case class MessageHeader(
time.toInstant,
requestId.value
)
AvroWriter(Header.SCHEMA$).write(Seq(h))
dataContentType match
case DataContentType.Binary => AvroWriter(Header.SCHEMA$).write(Seq(h))
case DataContentType.Json => AvroWriter(Header.SCHEMA$).writeJson(Seq(h))

object MessageHeader:
def create[F[_]: Sync](
Expand All @@ -59,31 +61,47 @@ object MessageHeader:
): F[MessageHeader] =
Timestamp.now[F].map(ts => MessageHeader(src, ct, sv, ts, reqId))

private def readBinaryOrJson(bv: ByteVector): Either[DecodeFailure, Seq[Header]] =
private def readBinaryOrJson(
bv: ByteVector
): Either[DecodeFailure, (DataContentType, List[Header])] =
val reader = AvroReader(Header.SCHEMA$)
Either.catchNonFatal(reader.read[Header](bv)) match
case Right(r) => Right(r)
case Right(r) => Right(DataContentType.Binary -> r.distinct.toList)
case Left(exb) =>
Either.catchNonFatal(reader.readJson[Header](bv)).leftMap { exj =>
DecodeFailure.HeaderReadError(bv, exb, exj)
}
Either
.catchNonFatal(reader.readJson[Header](bv))
.map(r => DataContentType.Json -> r.distinct.toList)
.leftMap { exj =>
DecodeFailure.HeaderReadError(bv, exb, exj)
}

private def logWrongDataContentType(
headerCt: DataContentType,
decoded: DataContentType
): DataContentType =
if (headerCt != decoded) {
scribe.warn(
s"ContentType ($headerCt) used for decoding the header is not same as advertised in the header ($decoded)! Choose the one used for decoding the header to continue ($headerCt)."
)
}
headerCt

def fromByteVector(bv: ByteVector): Either[DecodeFailure, MessageHeader] =
readBinaryOrJson(bv)
.map(_.distinct.toList)
.flatMap {
case h :: Nil => Right(h)
case Nil => Left(DecodeFailure.NoHeaderRecord(bv))
case hs =>
case (ct, h :: Nil) => Right(ct -> h)
case (_, Nil) => Left(DecodeFailure.NoHeaderRecord(bv))
case (ct, hs) =>
Left(DecodeFailure.MultipleHeaderRecords(bv, NonEmptyList.fromListUnsafe(hs)))
}
.flatMap { h =>
.flatMap { case (headerCt, h) =>
for
ct <- DataContentType
.fromMimeType(h.dataContentType)
.leftMap(err =>
DecodeFailure.FieldReadError("dataContentType", h.dataContentType, err)
)
ctReal = logWrongDataContentType(headerCt, ct)
v <- SchemaVersion
.fromString(h.schemaVersion)
.leftMap(err =>
Expand All @@ -92,5 +110,5 @@ object MessageHeader:
src = MessageSource(h.source)
ts = Timestamp(h.time)
rid = RequestId(h.requestId)
yield MessageHeader(src, ct, v, ts, rid)
yield MessageHeader(src, ctReal, v, ts, rid)
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ private class QueueClientImpl[F[_]: Async](
)(using d: EventMessageDecoder[T]): Stream[F, EventMessage[T]] =
acquireHeaderEventsStream(queueName, chunkSize, maybeOffset).evalMap { m =>
d.decode(m) match
case Right(m) =>
Scribe[F].trace(s"Got message: $m").as(Some(m))
case Right(em) =>
Scribe[F].trace(s"Got message: $em").as(Some(em))
case Left(err) =>
Scribe[F]
.warn(s"Error decoding redis message: $err")
.warn(s"Error decoding redis payload in $m: $err")
.as(None)
.flatTap(_ => markProcessed(queueName, MessageId(m.id.value)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package io.renku.search.provision

import scala.concurrent.duration.Duration

import cats.effect.{IO, Resource}
import cats.syntax.all.*
import fs2.Stream
Expand All @@ -33,6 +35,7 @@ import io.renku.search.solr.query.SolrToken
import io.renku.solr.client.{QueryData, QueryString}

trait ProvisioningSuite extends SearchSolrSuite with QueueSpec:
override val munitTimeout = Duration(1, "min")

val queueConfig: QueuesConfig = QueuesConfig(
projectCreated = QueueName("projectCreated"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class GroupMemberUpdatedSpec extends ProvisioningSuite:
override def munitFixtures: Seq[Fixture[?]] =
List(withRedisClient, withQueueClient, withSearchSolrClient)

override def defaultVerbosity: Int = 2

test("updating member to group and related projects"):
withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) =>
val initialState = DbState.groupWithProjectsGen.generateOne
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package io.renku.search.solr.client

import cats.effect.{IO, Resource}
import cats.effect.*
import cats.effect.std.CountDownLatch

import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.SolrClient
Expand All @@ -34,7 +35,7 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite:

def apply(): Resource[IO, SearchSolrClient[IO]] =
SolrClient[IO](solrConfig.copy(core = server.searchCoreName))
.evalTap(SchemaMigrator[IO](_).migrate(Migrations.all).attempt.void)
.evalTap(SearchSolrSuite.setupSchema(server.searchCoreName, _))
.map(new SearchSolrClientImpl[IO](_))

override def beforeAll(): Unit =
Expand All @@ -45,3 +46,30 @@ abstract class SearchSolrSuite extends SolrClientBaseSuite:

override def munitFixtures: Seq[Fixture[?]] =
List(withSearchSolrClient)

object SearchSolrSuite:
private val logger = scribe.cats.io
private case class MigrateState(tasks: Map[String, IO[Unit]] = Map.empty):
def add(name: String, task: IO[Unit]): MigrateState = copy(tasks.updated(name, task))
private val currentState: Ref[IO, MigrateState] =
Ref.unsafe(MigrateState())

private def setupSchema(coreName: String, client: SolrClient[IO]): IO[Unit] =
CountDownLatch[IO](1).flatMap { latch =>
currentState.flatModify { state =>
state.tasks.get(coreName) match
case Some(t) =>
(
state,
logger
.info(s"Waiting for migrations to finish for core $coreName")
.flatMap(_ => t)
)
case None =>
val task = SchemaMigrator[IO](client)
.migrate(Migrations.all)
.flatTap(_ => latch.release)
val wait = latch.await
(state.add(coreName, wait), task)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.http4s.ember.client.EmberClientBuilder
import org.http4s.ember.client.EmberClientBuilder.default

trait SolrClient[F[_]]:
def config: SolrConfig
def modifySchema(
cmds: Seq[SchemaCommand],
onErrorLog: ResponseLogging = ResponseLogging.Error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.http4s.Status
import org.http4s.client.Client
import org.http4s.{BasicCredentials, Method, Uri}

private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client[F])
private class SolrClientImpl[F[_]: Async](val config: SolrConfig, underlying: Client[F])
extends SolrClient[F]
with HttpClientDsl[F]
with SchemaJsonCodec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,24 @@ object SchemaMigrator:

override def migrate(migrations: Seq[SchemaMigration]): F[Unit] = for {
current <- currentVersion
_ <- logger.info(s"Found current schema version '$current' using id $versionDocId")
_ <- logger.info(
s"core ${client.config.core}: Found current schema version '$current' using id $versionDocId"
)
_ <- current.fold(initVersionDocument)(_ => ().pure[F])
remain = migrations.sortBy(_.version).dropWhile(m => current.exists(_ >= m.version))
_ <- logger.info(s"There are ${remain.size} migrations to run")
_ <- logger.info(
s"core ${client.config.core}: There are ${remain.size} migrations to run"
)
_ <- remain.traverse_(m =>
logger.info(s"Run migration ${m.version}") >>
logger.info(s"core ${client.config.core}: Run migration ${m.version}") >>
client.modifySchema(m.commands) >> upsertVersion(m.version)
)
} yield ()

private def initVersionDocument: F[Unit] =
logger.info("Initialize schema migration version document") >>
logger.info(
s"core ${client.config.core}: Initialize schema migration version document"
) >>
client.modifySchema(
Seq(
SchemaCommand.Add(
Expand All @@ -80,6 +86,6 @@ object SchemaMigrator:
private def version(n: Long): VersionDocument = VersionDocument(versionDocId, n)

private def upsertVersion(n: Long) =
logger.info(s"Set schema migration version to $n") >>
logger.info(s"core ${client.config.core}: Set schema migration version to $n") >>
client.upsert(Seq(version(n)))
}

0 comments on commit 2054926

Please sign in to comment.