From 75190bce14530a5c9a2c375193fc99bce165bb1c Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Tue, 30 Jan 2024 17:50:48 +0100 Subject: [PATCH 1/3] feat: Provisioner fallback to JSON decoder if binary fails --- .../search/provision/SearchProvisioner.scala | 18 +++- .../provision/SearchProvisionerSpec.scala | 101 ++++++++++++------ 2 files changed, 85 insertions(+), 34 deletions(-) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala index 1989917a..a2c0a968 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala @@ -18,6 +18,7 @@ package io.renku.search.provision +import cats.MonadThrow import cats.effect.{Async, Resource} import cats.syntax.all.* import fs2.Stream @@ -30,6 +31,7 @@ import io.renku.redis.client.RedisUrl import io.renku.search.solr.client.SearchSolrClient import io.renku.search.solr.documents.Project import io.renku.solr.client.SolrConfig +import org.apache.avro.AvroRuntimeException import scribe.Scribe trait SearchProvisioner[F[_]]: @@ -56,7 +58,7 @@ private class SearchProvisionerImpl[F[_]: Async]( override def provisionSolr: F[Unit] = queueClient .acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None) - .map(decodeEvent) + .evalMap(decodeEvent) .evalTap(decoded => Scribe[F].info(s"Received $decoded")) .flatMap(decoded => Stream.emits[F, ProjectCreated](decoded)) .evalMap(pushToSolr) @@ -65,8 +67,18 @@ private class SearchProvisionerImpl[F[_]: Async]( private val avro = AvroReader(ProjectCreated.SCHEMA$) - private def decodeEvent(message: Message): Seq[ProjectCreated] = - avro.read[ProjectCreated](message.payload) + private def decodeEvent(message: Message): F[Seq[ProjectCreated]] = + decodeBinary(message).orElse(decodeJson(message)) + + private def decodeBinary(message: Message): F[Seq[ProjectCreated]] = + MonadThrow[F].catchOnly[AvroRuntimeException]( + avro.read[ProjectCreated](message.payload) + ) + + private def decodeJson(message: Message): F[Seq[ProjectCreated]] = + MonadThrow[F].catchOnly[AvroRuntimeException]( + avro.readJson[ProjectCreated](message.payload) + ) private def pushToSolr(pc: ProjectCreated): F[Unit] = solrClient diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala index 5106c43a..90dfd2f4 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala @@ -31,47 +31,86 @@ import io.renku.redis.client.util.RedisSpec import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.Project import munit.CatsEffectSuite -import scribe.Scribe import java.time.temporal.ChronoUnit import scala.concurrent.duration.* class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSolrSpec: - private given Scribe[IO] = scribe.cats[IO] private val avro = AvroIO(ProjectCreated.SCHEMA$) - test("can fetch events and send them to Solr"): + test("can fetch events binary encoded, decode them, and send them to Solr"): val queue = RedisClientGenerators.queueNameGen.generateOne - (withRedisClient.asQueueClient() >>= withSearchSolrClient().tupleLeft) - .use { case (queueClient, solrClient) => - val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) - for - solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) - - provisioningFiber <- provisioner.provisionSolr.start - - message1 <- generateProjectCreated("project", "description", Some("myself")) - _ <- queueClient.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) - - docsCollectorFiber <- - Stream - .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findProjects("*")) - .flatMap(Stream.emits(_)) - .evalTap(IO.println) - .evalMap(d => solrDocs.update(_ + d)) - .compile - .drain - .start - - _ <- solrDocs.waitUntil(_ contains toSolrDocument(message1)) - - _ <- provisioningFiber.cancel - _ <- docsCollectorFiber.cancel - yield () - } + redisAndSolrClients.use { case (queueClient, solrClient) => + val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) + for + solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) + + provisioningFiber <- provisioner.provisionSolr.start + + message1 <- generateProjectCreated( + "project-binary", + "description binary", + Some("myself binary") + ) + _ <- queueClient.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.findProjects("*")) + .flatMap(Stream.emits(_)) + .evalTap(IO.println) + .evalMap(d => solrDocs.update(_ + d)) + .compile + .drain + .start + + _ <- solrDocs.waitUntil(_ contains toSolrDocument(message1)) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + yield () + } + + test("can fetch events JSON encoded, decode them, and send them to Solr"): + val queue = RedisClientGenerators.queueNameGen.generateOne + + redisAndSolrClients.use { case (queueClient, solrClient) => + val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) + for + solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) + + provisioningFiber <- provisioner.provisionSolr.start + + message1 <- generateProjectCreated( + "project-json", + "description json", + Some("myself json") + ) + _ <- queueClient.enqueue(queue, avro.writeJson[ProjectCreated](Seq(message1))) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.findProjects("*")) + .flatMap(Stream.emits(_)) + .evalTap(IO.println) + .evalMap(d => solrDocs.update(_ + d)) + .compile + .drain + .start + + _ <- solrDocs.waitUntil(_ contains toSolrDocument(message1)) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + yield () + } + + private def redisAndSolrClients = + withRedisClient.asQueueClient() >>= withSearchSolrClient().tupleLeft private def generateProjectCreated( name: String, From fd84747e8074fdc61603420d85db32d1665beef8 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Tue, 30 Jan 2024 18:46:34 +0100 Subject: [PATCH 2/3] feat: SearchProvisioner to check payload to decide on decoding strategy --- .../search/provision/SearchProvisioner.scala | 25 ++++++++------ .../provision/SearchProvisionerSpec.scala | 33 +++++++++---------- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala index a2c0a968..d79bc910 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala @@ -31,7 +31,6 @@ import io.renku.redis.client.RedisUrl import io.renku.search.solr.client.SearchSolrClient import io.renku.search.solr.documents.Project import io.renku.solr.client.SolrConfig -import org.apache.avro.AvroRuntimeException import scribe.Scribe trait SearchProvisioner[F[_]]: @@ -58,7 +57,7 @@ private class SearchProvisionerImpl[F[_]: Async]( override def provisionSolr: F[Unit] = queueClient .acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None) - .evalMap(decodeEvent) + .evalMap(decodeMessage) .evalTap(decoded => Scribe[F].info(s"Received $decoded")) .flatMap(decoded => Stream.emits[F, ProjectCreated](decoded)) .evalMap(pushToSolr) @@ -67,18 +66,24 @@ private class SearchProvisionerImpl[F[_]: Async]( private val avro = AvroReader(ProjectCreated.SCHEMA$) - private def decodeEvent(message: Message): F[Seq[ProjectCreated]] = - decodeBinary(message).orElse(decodeJson(message)) + private def decodeMessage(message: Message): F[Seq[ProjectCreated]] = + MonadThrow[F].fromOption( + decodeBinary(message).orElse(decodeJson(message)), + new Exception("Message encoded neither as binary nor json") + ) - private def decodeBinary(message: Message): F[Seq[ProjectCreated]] = - MonadThrow[F].catchOnly[AvroRuntimeException]( + private def decodeBinary(message: Message): Option[Seq[ProjectCreated]] = + Option.when(!isJsonEncoded(message)) { avro.read[ProjectCreated](message.payload) - ) + } - private def decodeJson(message: Message): F[Seq[ProjectCreated]] = - MonadThrow[F].catchOnly[AvroRuntimeException]( + private def decodeJson(message: Message): Option[Seq[ProjectCreated]] = + Option.when(isJsonEncoded(message)) { avro.readJson[ProjectCreated](message.payload) - ) + } + + private lazy val isJsonEncoded: Message => Boolean = + _.payload.headOption.contains(123.toByte) // meaning it's the '{' char private def pushToSolr(pc: ProjectCreated): F[Unit] = solrClient diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala index 90dfd2f4..5749ee28 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala @@ -31,8 +31,11 @@ import io.renku.redis.client.util.RedisSpec import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.Project import munit.CatsEffectSuite +import org.scalacheck.Gen +import org.scalacheck.Gen.alphaNumChar import java.time.temporal.ChronoUnit +import java.time.temporal.ChronoUnit.MILLIS import scala.concurrent.duration.* class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSolrSpec: @@ -49,11 +52,7 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo provisioningFiber <- provisioner.provisionSolr.start - message1 <- generateProjectCreated( - "project-binary", - "description binary", - Some("myself binary") - ) + message1 <- generateProjectCreated(prefix = "binary") _ <- queueClient.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) docsCollectorFiber <- @@ -84,11 +83,7 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo provisioningFiber <- provisioner.provisionSolr.start - message1 <- generateProjectCreated( - "project-json", - "description json", - Some("myself json") - ) + message1 <- generateProjectCreated(prefix = "json") _ <- queueClient.enqueue(queue, avro.writeJson[ProjectCreated](Seq(message1))) docsCollectorFiber <- @@ -112,15 +107,19 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo private def redisAndSolrClients = withRedisClient.asQueueClient() >>= withSearchSolrClient().tupleLeft - private def generateProjectCreated( - name: String, - description: String, - owner: Option[String] - ): IO[ProjectCreated] = + private def generateProjectCreated(prefix: String): IO[ProjectCreated] = + def generateString(max: Int): Gen[String] = + Gen + .chooseNum(3, max) + .flatMap(Gen.stringOfN(_, alphaNumChar)) + for - now <- Clock[IO].realTimeInstant.map(_.truncatedTo(ChronoUnit.MILLIS)) + now <- Clock[IO].realTimeInstant.map(_.truncatedTo(MILLIS)) uuid <- IO.randomUUID - yield ProjectCreated(uuid.toString, name, description, owner, now) + name = prefix + generateString(max = 5).sample.get + desc = prefix + generateString(max = 10).sample.get + ownerGen = generateString(max = 5).map(prefix + _) + yield ProjectCreated(uuid.toString, name, desc, Gen.option(ownerGen).sample.get, now) private def toSolrDocument(created: ProjectCreated): Project = Project(created.id, created.name, created.description) From 0b1acfe8563a2bbaee8b65812ddf4389ef83cdf8 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 31 Jan 2024 19:09:55 +0100 Subject: [PATCH 3/3] feat: improvements in the provisioning flow --- .../scala/io/renku/queue/client/Message.scala | 17 +++- .../io/renku/queue/client/QueueClient.scala | 2 +- .../renku/redis/client/RedisQueueClient.scala | 25 ++++-- .../redis/client/RedisClientGenerators.scala | 3 + .../redis/client/RedisQueueClientSpec.scala | 24 ++++-- .../io/renku/search/api/SearchApiSpec.scala | 3 +- .../search/provision/SearchProvisioner.scala | 86 ++++++++++++------- .../provision/SearchProvisionerSpec.scala | 26 ++++-- .../search/solr/client/SearchSolrClient.scala | 2 +- .../solr/client/SearchSolrClientImpl.scala | 4 +- .../solr/client/SearchSolrClientSpec.scala | 2 +- 11 files changed, 135 insertions(+), 59 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala index e662dade..34652c29 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala @@ -20,6 +20,21 @@ package io.renku.queue.client import scodec.bits.ByteVector -final case class Message(id: MessageId, payload: ByteVector) +final case class Message(id: MessageId, encoding: Encoding, payload: ByteVector) final case class MessageId(value: String) extends AnyVal + +sealed trait Encoding extends Product: + lazy val name: String = productPrefix + +object Encoding: + + val all: Set[Encoding] = Set(Binary, Json) + + def from(v: String): Either[IllegalArgumentException, Encoding] = + all + .find(_.productPrefix.equalsIgnoreCase(v)) + .toRight(new IllegalArgumentException(s"'$v' not a valid payload Encoding")) + + case object Binary extends Encoding + case object Json extends Encoding diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 27a9077a..d7385738 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -25,7 +25,7 @@ import scodec.bits.ByteVector trait QueueClient[F[_]] { - def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] + def enqueue(queueName: QueueName, message: ByteVector, encoding: Encoding): F[MessageId] def acquireEventsStream( queueName: QueueName, diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 460f8edf..266454b6 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -41,11 +41,19 @@ object RedisQueueClient: class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { private val payloadKey = "payload" + private val encodingKey = "encoding" - override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] = + override def enqueue( + queueName: QueueName, + message: ByteVector, + encoding: Encoding + ): F[MessageId] = val m = Stream .emit[F, XAddMessage[String, ByteVector]]( - XAddMessage(queueName.toString, Map(payloadKey -> message)) + XAddMessage( + queueName.toString, + Map(payloadKey -> message, encodingKey -> encodeEncoding(encoding)) + ) ) createConnection .flatMap(_.append(m)) @@ -54,6 +62,12 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien .toList .map(_.head) + private def encodeEncoding(encoding: Encoding): ByteVector = + ByteVector.encodeUtf8(encoding.name).fold(throw _, identity) + + private def decodeEncoding(encoding: ByteVector): Encoding = + encoding.decodeUtf8.flatMap(Encoding.from).fold(throw _, identity) + override def acquireEventsStream( queueName: QueueName, chunkSize: Int, @@ -71,9 +85,10 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien } private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] = - m.body - .get(payloadKey) - .map(Message(MessageId(m.id.value), _)) + (m.body.get(payloadKey), m.body.get(encodingKey).map(decodeEncoding)) + .mapN { case (payload, encoding) => + Message(MessageId(m.id.value), encoding, payload) + } private def createConnection = RedisStream diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala index 74a9c6d7..b83272e3 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -29,6 +29,9 @@ object RedisClientGenerators: .chooseNum(3, 10) .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_))) + val encodingGen: Gen[Encoding] = + Gen.oneOf(Encoding.all) + val clientIdGen: Gen[ClientId] = Gen .chooseNum(3, 10) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index f0ef83cf..ead55339 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -22,6 +22,7 @@ import cats.effect.IO import cats.syntax.all.* import fs2.* import fs2.concurrent.SignallingRef +import io.renku.queue.client.Encoding import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec import munit.CatsEffectSuite @@ -32,23 +33,27 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: test("can enqueue and dequeue events"): withRedisClient.asQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne + val encoding = RedisClientGenerators.encodingGen.generateOne for - dequeued <- SignallingRef.of[IO, List[String]](Nil) + dequeued <- SignallingRef.of[IO, List[(String, Encoding)]](Nil) message1 = "message1" - _ <- client.enqueue(queue, toByteVector(message1)) + _ <- client.enqueue(queue, toByteVector(message1), encoding) streamingProcFiber <- client .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) - .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .evalMap(event => + dequeued.update(toStringUft8(event.payload) -> event.encoding :: _) + ) .compile .drain .start - _ <- dequeued.waitUntil(_ == List(message1)) + _ <- dequeued.waitUntil(_ == List(message1 -> encoding)) message2 = "message2" - _ <- client.enqueue(queue, toByteVector(message2)) - _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) + _ <- client.enqueue(queue, toByteVector(message2), encoding) + _ <- dequeued + .waitUntil(_.toSet == Set(message1, message2).zip(List.fill(2)(encoding))) _ <- streamingProcFiber.cancel yield () @@ -57,11 +62,12 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: test("can start enqueueing events from the given messageId excluding"): withRedisClient.asQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne + val encoding = RedisClientGenerators.encodingGen.generateOne for dequeued <- SignallingRef.of[IO, List[String]](Nil) message1 = "message1" - message1Id <- client.enqueue(queue, toByteVector(message1)) + message1Id <- client.enqueue(queue, toByteVector(message1), encoding) streamingProcFiber <- client .acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some) @@ -71,11 +77,11 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: .start message2 = "message2" - _ <- client.enqueue(queue, toByteVector(message2)) + _ <- client.enqueue(queue, toByteVector(message2), encoding) _ <- dequeued.waitUntil(_.toSet == Set(message2)) message3 = "message3" - _ <- client.enqueue(queue, toByteVector(message3)) + _ <- client.enqueue(queue, toByteVector(message3), encoding) _ <- dequeued.waitUntil(_.toSet == Set(message2, message3)) _ <- streamingProcFiber.cancel diff --git a/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala b/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala index 25311f27..98708679 100644 --- a/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala +++ b/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala @@ -19,7 +19,6 @@ package io.renku.search.api import cats.effect.IO -import cats.syntax.all.* import io.renku.api.Project as ApiProject import io.renku.avro.codec.AvroDecoder import io.renku.avro.codec.all.given @@ -41,7 +40,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec: val project2 = projectDocumentGen("disparate", "disparate description").generateOne val searchApi = new SearchApiImpl[IO](client) for { - _ <- (project1 :: project2 :: Nil).traverse_(client.insertProject) + _ <- client.insertProjects(project1 :: project2 :: Nil) response <- searchApi.find("matching") results <- response.as[List[ApiProject]] } yield assert(results contains toApiProject(project1)) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala index d79bc910..1a6d2c9b 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala @@ -21,22 +21,27 @@ package io.renku.search.provision import cats.MonadThrow import cats.effect.{Async, Resource} import cats.syntax.all.* -import fs2.Stream +import fs2.Chunk import fs2.io.net.Network import io.renku.avro.codec.AvroReader import io.renku.avro.codec.decoders.all.given import io.renku.messages.ProjectCreated -import io.renku.queue.client.{Message, QueueClient, QueueName} +import io.renku.queue.client.* import io.renku.redis.client.RedisUrl import io.renku.search.solr.client.SearchSolrClient import io.renku.search.solr.documents.Project import io.renku.solr.client.SolrConfig import scribe.Scribe +import scala.concurrent.duration.* + trait SearchProvisioner[F[_]]: def provisionSolr: F[Unit] object SearchProvisioner: + + private val clientId: ClientId = ClientId("search-provisioner") + def apply[F[_]: Async: Network]( queueName: QueueName, redisUrl: RedisUrl, @@ -44,9 +49,10 @@ object SearchProvisioner: ): Resource[F, SearchProvisioner[F]] = QueueClient[F](redisUrl) .flatMap(qc => SearchSolrClient[F](solrConfig).tupleLeft(qc)) - .map { case (qc, sc) => new SearchProvisionerImpl[F](queueName, qc, sc) } + .map { case (qc, sc) => new SearchProvisionerImpl[F](clientId, queueName, qc, sc) } private class SearchProvisionerImpl[F[_]: Async]( + clientId: ClientId, queueName: QueueName, queueClient: QueueClient[F], solrClient: SearchSolrClient[F] @@ -55,38 +61,58 @@ private class SearchProvisionerImpl[F[_]: Async]( private given Scribe[F] = scribe.cats[F] override def provisionSolr: F[Unit] = - queueClient - .acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None) - .evalMap(decodeMessage) - .evalTap(decoded => Scribe[F].info(s"Received $decoded")) - .flatMap(decoded => Stream.emits[F, ProjectCreated](decoded)) - .evalMap(pushToSolr) - .compile - .drain + findLastProcessed >>= { maybeLastProcessed => + queueClient + .acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed) + .evalMap(decodeMessage) + .evalTap { case (m, v) => Scribe[F].info(s"Received messageId: ${m.id} $v") } + .groupWithin(chunkSize = 10, timeout = 500 millis) + .evalMap(pushToSolr) + .compile + .drain + .handleErrorWith(logAndRestart) + } + + private def findLastProcessed = + queueClient.findLastProcessed(clientId, queueName) private val avro = AvroReader(ProjectCreated.SCHEMA$) - private def decodeMessage(message: Message): F[Seq[ProjectCreated]] = - MonadThrow[F].fromOption( - decodeBinary(message).orElse(decodeJson(message)), - new Exception("Message encoded neither as binary nor json") - ) + private def decodeMessage(message: Message): F[(Message, Seq[ProjectCreated])] = + MonadThrow[F] + .catchNonFatal { + message.encoding match { + case Encoding.Binary => avro.read[ProjectCreated](message.payload) + case Encoding.Json => avro.readJson[ProjectCreated](message.payload) + } + } + .map(message -> _) + .onError(markProcessedOnFailure(message)) - private def decodeBinary(message: Message): Option[Seq[ProjectCreated]] = - Option.when(!isJsonEncoded(message)) { - avro.read[ProjectCreated](message.payload) + private def pushToSolr(chunk: Chunk[(Message, Seq[ProjectCreated])]): F[Unit] = + chunk.toList match { + case Nil => ().pure[F] + case tuples => + val allSolrDocs = toSolrDocuments(tuples.flatMap(_._2)) + val (lastMessage, _) = tuples.last + solrClient + .insertProjects(allSolrDocs) + .flatMap(_ => markProcessed(lastMessage)) + .onError(markProcessedOnFailure(lastMessage)) } - private def decodeJson(message: Message): Option[Seq[ProjectCreated]] = - Option.when(isJsonEncoded(message)) { - avro.readJson[ProjectCreated](message.payload) - } + private lazy val toSolrDocuments: Seq[ProjectCreated] => Seq[Project] = + _.map(pc => Project(id = pc.id, name = pc.name, description = pc.description)) + + private def markProcessedOnFailure( + message: Message + ): PartialFunction[Throwable, F[Unit]] = err => + markProcessed(message) >> + Scribe[F].error(s"Processing messageId: ${message.id} failed", err) - private lazy val isJsonEncoded: Message => Boolean = - _.payload.headOption.contains(123.toByte) // meaning it's the '{' char + private def markProcessed(message: Message): F[Unit] = + queueClient.markProcessed(clientId, queueName, message.id) - private def pushToSolr(pc: ProjectCreated): F[Unit] = - solrClient - .insertProject( - Project(id = pc.id, name = pc.name, description = pc.description) - ) + private def logAndRestart: Throwable => F[Unit] = err => + Scribe[F].error("Failure in the provisioning process", err) >> + provisionSolr diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala index 5749ee28..91b64e36 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala @@ -25,6 +25,7 @@ import fs2.concurrent.SignallingRef import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given import io.renku.messages.ProjectCreated +import io.renku.queue.client.Encoding import io.renku.redis.client.RedisClientGenerators import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec @@ -44,23 +45,28 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo test("can fetch events binary encoded, decode them, and send them to Solr"): val queue = RedisClientGenerators.queueNameGen.generateOne + val clientId = RedisClientGenerators.clientIdGen.generateOne redisAndSolrClients.use { case (queueClient, solrClient) => - val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) + val provisioner = + new SearchProvisionerImpl(clientId, queue, queueClient, solrClient) for solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) provisioningFiber <- provisioner.provisionSolr.start message1 <- generateProjectCreated(prefix = "binary") - _ <- queueClient.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) + _ <- queueClient.enqueue( + queue, + avro.write[ProjectCreated](Seq(message1)), + Encoding.Binary + ) docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) .evalMap(_ => solrClient.findProjects("*")) .flatMap(Stream.emits(_)) - .evalTap(IO.println) .evalMap(d => solrDocs.update(_ + d)) .compile .drain @@ -75,16 +81,22 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo test("can fetch events JSON encoded, decode them, and send them to Solr"): val queue = RedisClientGenerators.queueNameGen.generateOne + val clientId = RedisClientGenerators.clientIdGen.generateOne redisAndSolrClients.use { case (queueClient, solrClient) => - val provisioner = new SearchProvisionerImpl(queue, queueClient, solrClient) + val provisioner = + new SearchProvisionerImpl(clientId, queue, queueClient, solrClient) for solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty) provisioningFiber <- provisioner.provisionSolr.start message1 <- generateProjectCreated(prefix = "json") - _ <- queueClient.enqueue(queue, avro.writeJson[ProjectCreated](Seq(message1))) + _ <- queueClient.enqueue( + queue, + avro.writeJson[ProjectCreated](Seq(message1)), + Encoding.Json + ) docsCollectorFiber <- Stream @@ -116,8 +128,8 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo for now <- Clock[IO].realTimeInstant.map(_.truncatedTo(MILLIS)) uuid <- IO.randomUUID - name = prefix + generateString(max = 5).sample.get - desc = prefix + generateString(max = 10).sample.get + name = s"$prefix-${generateString(max = 5).sample.get}" + desc = s"$prefix ${generateString(max = 10).sample.get}" ownerGen = generateString(max = 5).map(prefix + _) yield ProjectCreated(uuid.toString, name, desc, Gen.option(ownerGen).sample.get, now) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala index dd173b82..40e5301c 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala @@ -25,7 +25,7 @@ import io.renku.solr.client.{SolrClient, SolrConfig} trait SearchSolrClient[F[_]]: - def insertProject(project: Project): F[Unit] + def insertProjects(projects: Seq[Project]): F[Unit] def findProjects(phrase: String): F[List[Project]] diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala index e8990898..6f88bea6 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala @@ -27,8 +27,8 @@ import io.renku.solr.client.{QueryString, SolrClient} private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) extends SearchSolrClient[F]: - override def insertProject(project: Project): F[Unit] = - solrClient.insert(Seq(project)).void + override def insertProjects(projects: Seq[Project]): F[Unit] = + solrClient.insert(projects).void override def findProjects(phrase: String): F[List[Project]] = solrClient diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala index e38723bd..c92ebdf7 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala @@ -29,7 +29,7 @@ class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: val project = projectDocumentGen("solr-project", "solr project description").generateOne for { - _ <- client.insertProject(project) + _ <- client.insertProjects(Seq(project)) r <- client.findProjects("solr") _ = assert(r contains project) } yield ()