Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: provisioning flow to figure out message payload encoding #11

Merged
merged 5 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,21 @@ package io.renku.queue.client

import scodec.bits.ByteVector

final case class Message(id: MessageId, payload: ByteVector)
final case class Message(id: MessageId, encoding: Encoding, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal

sealed trait Encoding extends Product:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now we can use enums 💪🏼 😄

lazy val name: String = productPrefix

object Encoding:

val all: Set[Encoding] = Set(Binary, Json)

def from(v: String): Either[IllegalArgumentException, Encoding] =
all
.find(_.productPrefix.equalsIgnoreCase(v))
.toRight(new IllegalArgumentException(s"'$v' not a valid payload Encoding"))

case object Binary extends Encoding
case object Json extends Encoding
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scodec.bits.ByteVector

trait QueueClient[F[_]] {

def enqueue(queueName: QueueName, message: ByteVector): F[MessageId]
def enqueue(queueName: QueueName, message: ByteVector, encoding: Encoding): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,19 @@ object RedisQueueClient:
class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"
private val encodingKey = "encoding"

override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] =
override def enqueue(
queueName: QueueName,
message: ByteVector,
encoding: Encoding
): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
XAddMessage(
queueName.toString,
Map(payloadKey -> message, encodingKey -> encodeEncoding(encoding))
)
)
createConnection
.flatMap(_.append(m))
Expand All @@ -54,6 +62,12 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
.toList
.map(_.head)

private def encodeEncoding(encoding: Encoding): ByteVector =
ByteVector.encodeUtf8(encoding.name).fold(throw _, identity)

private def decodeEncoding(encoding: ByteVector): Encoding =
encoding.decodeUtf8.flatMap(Encoding.from).fold(throw _, identity)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
Expand All @@ -71,9 +85,10 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
m.body
.get(payloadKey)
.map(Message(MessageId(m.id.value), _))
(m.body.get(payloadKey), m.body.get(encodingKey).map(decodeEncoding))
.mapN { case (payload, encoding) =>
Message(MessageId(m.id.value), encoding, payload)
}

private def createConnection =
RedisStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ object RedisClientGenerators:
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

val encodingGen: Gen[Encoding] =
Gen.oneOf(Encoding.all)

val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cats.effect.IO
import cats.syntax.all.*
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.queue.client.Encoding
import io.renku.redis.client.RedisClientGenerators.*
import io.renku.redis.client.util.RedisSpec
import munit.CatsEffectSuite
Expand All @@ -32,23 +33,27 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
test("can enqueue and dequeue events"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val encoding = RedisClientGenerators.encodingGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)
dequeued <- SignallingRef.of[IO, List[(String, Encoding)]](Nil)

message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))
_ <- client.enqueue(queue, toByteVector(message1), encoding)

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.evalMap(event =>
dequeued.update(toStringUft8(event.payload) -> event.encoding :: _)
)
.compile
.drain
.start
_ <- dequeued.waitUntil(_ == List(message1))
_ <- dequeued.waitUntil(_ == List(message1 -> encoding))

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))
_ <- client.enqueue(queue, toByteVector(message2), encoding)
_ <- dequeued
.waitUntil(_.toSet == Set(message1, message2).zip(List.fill(2)(encoding)))

_ <- streamingProcFiber.cancel
yield ()
Expand All @@ -57,11 +62,12 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
test("can start enqueueing events from the given messageId excluding"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val encoding = RedisClientGenerators.encodingGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
message1Id <- client.enqueue(queue, toByteVector(message1))
message1Id <- client.enqueue(queue, toByteVector(message1), encoding)

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some)
Expand All @@ -71,11 +77,11 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
.start

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- client.enqueue(queue, toByteVector(message2), encoding)
_ <- dequeued.waitUntil(_.toSet == Set(message2))

message3 = "message3"
_ <- client.enqueue(queue, toByteVector(message3))
_ <- client.enqueue(queue, toByteVector(message3), encoding)
_ <- dequeued.waitUntil(_.toSet == Set(message2, message3))

_ <- streamingProcFiber.cancel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package io.renku.search.api

import cats.effect.IO
import cats.syntax.all.*
import io.renku.api.Project as ApiProject
import io.renku.avro.codec.AvroDecoder
import io.renku.avro.codec.all.given
Expand All @@ -41,7 +40,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec:
val project2 = projectDocumentGen("disparate", "disparate description").generateOne
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- (project1 :: project2 :: Nil).traverse_(client.insertProject)
_ <- client.insertProjects(project1 :: project2 :: Nil)
response <- searchApi.find("matching")
results <- response.as[List[ApiProject]]
} yield assert(results contains toApiProject(project1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,41 @@

package io.renku.search.provision

import cats.MonadThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.Chunk
import fs2.io.net.Network
import io.renku.avro.codec.AvroReader
import io.renku.avro.codec.decoders.all.given
import io.renku.messages.ProjectCreated
import io.renku.queue.client.{Message, QueueClient, QueueName}
import io.renku.queue.client.*
import io.renku.redis.client.RedisUrl
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.Project
import io.renku.solr.client.SolrConfig
import scribe.Scribe

import scala.concurrent.duration.*

trait SearchProvisioner[F[_]]:
def provisionSolr: F[Unit]

object SearchProvisioner:

private val clientId: ClientId = ClientId("search-provisioner")

def apply[F[_]: Async: Network](
queueName: QueueName,
redisUrl: RedisUrl,
solrConfig: SolrConfig
): Resource[F, SearchProvisioner[F]] =
QueueClient[F](redisUrl)
.flatMap(qc => SearchSolrClient[F](solrConfig).tupleLeft(qc))
.map { case (qc, sc) => new SearchProvisionerImpl[F](queueName, qc, sc) }
.map { case (qc, sc) => new SearchProvisionerImpl[F](clientId, queueName, qc, sc) }

private class SearchProvisionerImpl[F[_]: Async](
clientId: ClientId,
queueName: QueueName,
queueClient: QueueClient[F],
solrClient: SearchSolrClient[F]
Expand All @@ -54,22 +61,58 @@ private class SearchProvisionerImpl[F[_]: Async](
private given Scribe[F] = scribe.cats[F]

override def provisionSolr: F[Unit] =
queueClient
.acquireEventsStream(queueName, chunkSize = 1, maybeOffset = None)
.map(decodeEvent)
.evalTap(decoded => Scribe[F].info(s"Received $decoded"))
.flatMap(decoded => Stream.emits[F, ProjectCreated](decoded))
.evalMap(pushToSolr)
.compile
.drain
findLastProcessed >>= { maybeLastProcessed =>
queueClient
.acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed)
.evalMap(decodeMessage)
.evalTap { case (m, v) => Scribe[F].info(s"Received messageId: ${m.id} $v") }
.groupWithin(chunkSize = 10, timeout = 500 millis)
.evalMap(pushToSolr)
.compile
.drain
.handleErrorWith(logAndRestart)
}

private def findLastProcessed =
queueClient.findLastProcessed(clientId, queueName)

private val avro = AvroReader(ProjectCreated.SCHEMA$)

private def decodeEvent(message: Message): Seq[ProjectCreated] =
avro.read[ProjectCreated](message.payload)
private def decodeMessage(message: Message): F[(Message, Seq[ProjectCreated])] =
MonadThrow[F]
.catchNonFatal {
message.encoding match {
case Encoding.Binary => avro.read[ProjectCreated](message.payload)
case Encoding.Json => avro.readJson[ProjectCreated](message.payload)
}
}
.map(message -> _)
.onError(markProcessedOnFailure(message))

private def pushToSolr(chunk: Chunk[(Message, Seq[ProjectCreated])]): F[Unit] =
chunk.toList match {
case Nil => ().pure[F]
case tuples =>
val allSolrDocs = toSolrDocuments(tuples.flatMap(_._2))
val (lastMessage, _) = tuples.last
solrClient
.insertProjects(allSolrDocs)
.flatMap(_ => markProcessed(lastMessage))
.onError(markProcessedOnFailure(lastMessage))
}

private lazy val toSolrDocuments: Seq[ProjectCreated] => Seq[Project] =
_.map(pc => Project(id = pc.id, name = pc.name, description = pc.description))

private def markProcessedOnFailure(
message: Message
): PartialFunction[Throwable, F[Unit]] = err =>
markProcessed(message) >>
Scribe[F].error(s"Processing messageId: ${message.id} failed", err)

private def markProcessed(message: Message): F[Unit] =
queueClient.markProcessed(clientId, queueName, message.id)

private def pushToSolr(pc: ProjectCreated): F[Unit] =
solrClient
.insertProject(
Project(id = pc.id, name = pc.name, description = pc.description)
)
private def logAndRestart: Throwable => F[Unit] = err =>
Scribe[F].error("Failure in the provisioning process", err) >>
provisionSolr
Loading
Loading