Skip to content

Commit

Permalink
feat: streaming events from Redis with a specified offset (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Jan 22, 2024
1 parent 04489f9 commit 1c1da91
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 23 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ lazy val searchProvision = project
.withId("search-provision")
.settings(commonSettings)
.settings(
name := "search-provision"
name := "search-provision",
Test / testOptions += Tests.Setup(RedisServer.start),
Test / testOptions += Tests.Cleanup(RedisServer.stop)
)
.dependsOn(
commons % "compile->compile;test->test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ import fs2.Stream
import scodec.bits.ByteVector

trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]

def enqueue(queueName: QueueName, message: ByteVector): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message]

def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ opaque type QueueName = String
object QueueName {
def apply(v: String): QueueName = new QueueName(v)
}

opaque type ClientId = String
object ClientId {
def apply(v: String): ClientId = new ClientId(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,46 @@

package io.renku.redis.client

import cats.effect.Async
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName}
import io.renku.queue.client.*
import scodec.bits.ByteVector

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"

override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] =
override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
)
createConnection.flatMap(_.append(m)).compile.drain
createConnection
.flatMap(_.append(m))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message] =
val initialOffset: String => StreamingOffset[String] =
maybeOffset
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
_.read(Set(queueName.toString), chunkSize, initialOffset)
.map(toMessage)
.collect { case Some(m) => m }
}
Expand All @@ -57,4 +70,27 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)

override def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit] =
stringCommands.use {
_.set(formProcessedKey(clientId, queueName), messageId.value)
}

override def findLastProcessed(
clientId: ClientId,
queueName: QueueName
): F[Option[MessageId]] =
stringCommands.use {
_.get(formProcessedKey(clientId, queueName)).map(_.map(MessageId.apply))
}

private def stringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)

private def formProcessedKey(clientId: ClientId, queueName: QueueName) =
s"$queueName.$clientId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package io.renku.redis.client

import io.renku.queue.client.QueueName
import io.renku.queue.client.*
import org.scalacheck.Gen
import org.scalacheck.Gen.alphaLowerChar
import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

object RedisClientGenerators:

Expand All @@ -29,6 +29,15 @@ object RedisClientGenerators:
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

given Gen[QueueName] = queueNameGen
val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaNumChar).map(ClientId(_)))

val messageIdGen: Gen[MessageId] =
for
part1 <- Gen.chooseNum(3, 10)
part2 <- Gen.chooseNum(3, 10)
yield MessageId(s"$part1.$part2")

extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.redis.client

import cats.effect.IO
import cats.syntax.all.*
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.redis.client.RedisClientGenerators.*
Expand All @@ -37,8 +38,8 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
Expand All @@ -49,7 +50,51 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}

test("can start enqueueing events from the given messageId excluding"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
message1Id <- client.enqueue(queue, toByteVector(message1))

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message2))

message3 = "message3"
_ <- client.enqueue(queue, toByteVector(message3))
_ <- dequeued.waitUntil(_.toSet == Set(message2, message3))

_ <- streamingProcFiber.cancel
yield ()
}

test("allow marking and retrieving a processed event"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val clientId = RedisClientGenerators.clientIdGen.generateOne
val messageId = RedisClientGenerators.messageIdGen.generateOne
for
_ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty))

_ <- client.markProcessed(clientId, queue, messageId)

_ <- client
.findLastProcessed(clientId, queue)
.map(v => assert(v contains messageId))
yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package io.renku.redis.client.util
import cats.effect.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log.Stdout.*
import dev.profunktor.redis4cats.effect.Log.Stdout.instance
import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import io.renku.queue.client.QueueClient
Expand All @@ -30,19 +30,21 @@ import io.renku.redis.client.RedisQueueClient
trait RedisSpec:
self: munit.Suite =>

export dev.profunktor.redis4cats.effect.Log.Stdout.instance

private lazy val server: RedisServer = RedisServer

abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"):
def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]]
def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]]
def asQueueClient(): Resource[IO, QueueClient[IO]]

val withRedisClient: RedisFixture = new RedisFixture:

def apply(): Resource[IO, RedisClient] =
RedisClient[IO].from(server.url)

override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8))
override def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(createRedisCommands)

override def asQueueClient(): Resource[IO, QueueClient[IO]] =
apply().map(new RedisQueueClient[IO](_))
Expand All @@ -53,5 +55,9 @@ trait RedisSpec:
override def afterAll(): Unit =
server.stop()

lazy val createRedisCommands
: RedisClient => Resource[IO, RedisCommands[IO, String, String]] =
Redis[IO].fromClient(_, RedisCodec.Utf8)

override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] =
List(withRedisClient)
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
message1 = ProjectCreated("my project", "my description", Some("myself"), now)
_ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1)))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalTap(m => IO.println(avro.read[ProjectCreated](m.payload)))
.evalMap(event =>
dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _)
Expand All @@ -62,6 +62,6 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, avro.write(Seq(message2)))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}

0 comments on commit 1c1da91

Please sign in to comment.