Skip to content

Commit

Permalink
feat: provisioning flow to figure out message payload encoding (#11)
Browse files Browse the repository at this point in the history
* feat: SearchProvisioner to deal with binary and json encoded messages

* feat: improvements in the provisioning flow
  • Loading branch information
jachro authored Jan 31, 2024
1 parent 29b97a5 commit c670d9a
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ package io.renku.queue.client

import scodec.bits.ByteVector

final case class Message(id: MessageId, payload: ByteVector)
final case class Message(id: MessageId, encoding: Encoding, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal

sealed trait Encoding extends Product:
lazy val name: String = productPrefix

object Encoding:

val all: Set[Encoding] = Set(Binary, Json)

def from(v: String): Either[IllegalArgumentException, Encoding] =
all
.find(_.productPrefix.equalsIgnoreCase(v))
.toRight(new IllegalArgumentException(s"'$v' not a valid payload Encoding"))

case object Binary extends Encoding
case object Json extends Encoding
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scodec.bits.ByteVector

trait QueueClient[F[_]] {

def enqueue(queueName: QueueName, message: ByteVector): F[MessageId]
def enqueue(queueName: QueueName, message: ByteVector, encoding: Encoding): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ object RedisQueueClient:
class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"
private val encodingKey = "encoding"

override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] =
override def enqueue(
queueName: QueueName,
message: ByteVector,
encoding: Encoding
): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
XAddMessage(
queueName.toString,
Map(payloadKey -> message, encodingKey -> encodeEncoding(encoding))
)
)
createConnection
.flatMap(_.append(m))
Expand All @@ -54,6 +62,12 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
.toList
.map(_.head)

private def encodeEncoding(encoding: Encoding): ByteVector =
ByteVector.encodeUtf8(encoding.name).fold(throw _, identity)

private def decodeEncoding(encoding: ByteVector): Encoding =
encoding.decodeUtf8.flatMap(Encoding.from).fold(throw _, identity)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
Expand All @@ -71,9 +85,10 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
m.body
.get(payloadKey)
.map(Message(MessageId(m.id.value), _))
(m.body.get(payloadKey), m.body.get(encodingKey).map(decodeEncoding))
.mapN { case (payload, encoding) =>
Message(MessageId(m.id.value), encoding, payload)
}

private def createConnection =
RedisStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object RedisClientGenerators:
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

val encodingGen: Gen[Encoding] =
Gen.oneOf(Encoding.all)

val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cats.effect.IO
import cats.syntax.all.*
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.queue.client.Encoding
import io.renku.redis.client.RedisClientGenerators.*
import io.renku.redis.client.util.RedisSpec
import munit.CatsEffectSuite
Expand All @@ -32,23 +33,27 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
test("can enqueue and dequeue events"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val encoding = RedisClientGenerators.encodingGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)
dequeued <- SignallingRef.of[IO, List[(String, Encoding)]](Nil)

message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))
_ <- client.enqueue(queue, toByteVector(message1), encoding)

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.evalMap(event =>
dequeued.update(toStringUft8(event.payload) -> event.encoding :: _)
)
.compile
.drain
.start
_ <- dequeued.waitUntil(_ == List(message1))
_ <- dequeued.waitUntil(_ == List(message1 -> encoding))

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))
_ <- client.enqueue(queue, toByteVector(message2), encoding)
_ <- dequeued
.waitUntil(_.toSet == Set(message1, message2).zip(List.fill(2)(encoding)))

_ <- streamingProcFiber.cancel
yield ()
Expand All @@ -57,11 +62,12 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
test("can start enqueueing events from the given messageId excluding"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val encoding = RedisClientGenerators.encodingGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
message1Id <- client.enqueue(queue, toByteVector(message1))
message1Id <- client.enqueue(queue, toByteVector(message1), encoding)

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some)
Expand All @@ -71,11 +77,11 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
.start

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- client.enqueue(queue, toByteVector(message2), encoding)
_ <- dequeued.waitUntil(_.toSet == Set(message2))

message3 = "message3"
_ <- client.enqueue(queue, toByteVector(message3))
_ <- client.enqueue(queue, toByteVector(message3), encoding)
_ <- dequeued.waitUntil(_.toSet == Set(message2, message3))

_ <- streamingProcFiber.cancel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.renku.search.api

import cats.effect.IO
import cats.syntax.all.*
import io.renku.api.Project as ApiProject
import io.renku.avro.codec.AvroDecoder
import io.renku.avro.codec.all.given
Expand All @@ -41,7 +40,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec:
val project2 = projectDocumentGen("disparate", "disparate description").generateOne
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- (project1 :: project2 :: Nil).traverse_(client.insertProject)
_ <- client.insertProjects(project1 :: project2 :: Nil)
response <- searchApi.find("matching")
results <- response.as[List[ApiProject]]
} yield assert(results contains toApiProject(project1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,41 @@

package io.renku.search.provision

import cats.MonadThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.Chunk
import fs2.io.net.Network
import io.renku.avro.codec.AvroReader
import io.renku.avro.codec.decoders.all.given
import io.renku.messages.ProjectCreated
import io.renku.queue.client.{Message, QueueClient, QueueName}
import io.renku.queue.client.*
import io.renku.redis.client.RedisUrl
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.Project
import io.renku.solr.client.SolrConfig
import scribe.Scribe

import scala.concurrent.duration.*

trait SearchProvisioner[F[_]]:
def provisionSolr: F[Unit]

object SearchProvisioner:

private val clientId: ClientId = ClientId("search-provisioner")

def apply[F[_]: Async: Network](
queueName: QueueName,
redisUrl: RedisUrl,
solrConfig: SolrConfig
): Resource[F, SearchProvisioner[F]] =
QueueClient[F](redisUrl)
.flatMap(qc => SearchSolrClient[F](solrConfig).tupleLeft(qc))
.map { case (qc, sc) => new SearchProvisionerImpl[F](queueName, qc, sc) }
.map { case (qc, sc) => new SearchProvisionerImpl[F](clientId, queueName, qc, sc) }

private class SearchProvisionerImpl[F[_]: Async](
clientId: ClientId,
queueName: QueueName,
queueClient: QueueClient[F],
solrClient: SearchSolrClient[F]
Expand All @@ -54,22 +61,58 @@ private class SearchProvisionerImpl[F[_]: Async](
private given Scribe[F] = scribe.cats[F]

override def provisionSolr: F[Unit] =
queueClient
.acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None)
.map(decodeEvent)
.evalTap(decoded => Scribe[F].info(s"Received $decoded"))
.flatMap(decoded => Stream.emits[F, ProjectCreated](decoded))
.evalMap(pushToSolr)
.compile
.drain
findLastProcessed >>= { maybeLastProcessed =>
queueClient
.acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed)
.evalMap(decodeMessage)
.evalTap { case (m, v) => Scribe[F].info(s"Received messageId: ${m.id} $v") }
.groupWithin(chunkSize = 10, timeout = 500 millis)
.evalMap(pushToSolr)
.compile
.drain
.handleErrorWith(logAndRestart)
}

private def findLastProcessed =
queueClient.findLastProcessed(clientId, queueName)

private val avro = AvroReader(ProjectCreated.SCHEMA$)

private def decodeEvent(message: Message): Seq[ProjectCreated] =
avro.read[ProjectCreated](message.payload)
private def decodeMessage(message: Message): F[(Message, Seq[ProjectCreated])] =
MonadThrow[F]
.catchNonFatal {
message.encoding match {
case Encoding.Binary => avro.read[ProjectCreated](message.payload)
case Encoding.Json => avro.readJson[ProjectCreated](message.payload)
}
}
.map(message -> _)
.onError(markProcessedOnFailure(message))

private def pushToSolr(chunk: Chunk[(Message, Seq[ProjectCreated])]): F[Unit] =
chunk.toList match {
case Nil => ().pure[F]
case tuples =>
val allSolrDocs = toSolrDocuments(tuples.flatMap(_._2))
val (lastMessage, _) = tuples.last
solrClient
.insertProjects(allSolrDocs)
.flatMap(_ => markProcessed(lastMessage))
.onError(markProcessedOnFailure(lastMessage))
}

private lazy val toSolrDocuments: Seq[ProjectCreated] => Seq[Project] =
_.map(pc => Project(id = pc.id, name = pc.name, description = pc.description))

private def markProcessedOnFailure(
message: Message
): PartialFunction[Throwable, F[Unit]] = err =>
markProcessed(message) >>
Scribe[F].error(s"Processing messageId: ${message.id} failed", err)

private def markProcessed(message: Message): F[Unit] =
queueClient.markProcessed(clientId, queueName, message.id)

private def pushToSolr(pc: ProjectCreated): F[Unit] =
solrClient
.insertProject(
Project(id = pc.id, name = pc.name, description = pc.description)
)
private def logAndRestart: Throwable => F[Unit] = err =>
Scribe[F].error("Failure in the provisioning process", err) >>
provisionSolr
Loading

0 comments on commit c670d9a

Please sign in to comment.