Skip to content

Commit

Permalink
feat: streaming events with specified offset
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Jan 22, 2024
1 parent c94e5c9 commit 4d77ea9
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@ import fs2.Stream
import scodec.bits.ByteVector

trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]

def enqueue(queueName: QueueName, message: ByteVector): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message]

def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.*
Expand All @@ -34,19 +34,30 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien

private val payloadKey = "payload"

override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] =
override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
)
createConnection.flatMap(_.append(m)).compile.drain
createConnection
.flatMap(_.append(m))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message] =
val initialOffset: String => StreamingOffset[String] =
maybeOffset
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
_.read(Set(queueName.toString), chunkSize, initialOffset)
.map(toMessage)
.collect { case Some(m) => m }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.redis.client

import cats.effect.IO
import cats.syntax.all.*
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.redis.client.RedisClientGenerators.*
Expand All @@ -37,8 +38,8 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
Expand All @@ -49,11 +50,39 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}

test("allow marking and retrieve a processed event"):
test("can start enqueueing events from the given messageId excluding"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
message1Id <- client.enqueue(queue, toByteVector(message1))

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message2))

message3 = "message3"
_ <- client.enqueue(queue, toByteVector(message3))
_ <- dequeued.waitUntil(_.toSet == Set(message2, message3))

_ <- streamingProcFiber.cancel
yield ()
}

test("allow marking and retrieving a processed event"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val clientId = RedisClientGenerators.clientIdGen.generateOne
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
message1 = ProjectCreated("my project", "my description", Some("myself"), now)
_ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1)))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalTap(m => IO.println(avro.read[ProjectCreated](m.payload)))
.evalMap(event =>
dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _)
Expand All @@ -62,6 +62,6 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, avro.write(Seq(message2)))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}

0 comments on commit 4d77ea9

Please sign in to comment.