diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index f38669b4..160e22a4 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -22,12 +22,20 @@ import fs2.Stream import scodec.bits.ByteVector trait QueueClient[F[_]] { - def enqueue(queueName: QueueName, message: ByteVector): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message] + + def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] + + def acquireEventsStream( + queueName: QueueName, + chunkSize: Int, + maybeOffset: Option[MessageId] + ): Stream[F, Message] + def markProcessed( clientId: ClientId, queueName: QueueName, messageId: MessageId ): F[Unit] + def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 9cef933a..8799280a 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -24,7 +24,7 @@ import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.streams.RedisStream -import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage} +import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage} import dev.profunktor.redis4cats.{Redis, RedisCommands} import fs2.Stream import io.renku.queue.client.* @@ -34,19 +34,30 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien private val payloadKey = "payload" - override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] = + override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] = val m = Stream .emit[F, XAddMessage[String, ByteVector]]( XAddMessage(queueName.toString, Map(payloadKey -> message)) ) - createConnection.flatMap(_.append(m)).compile.drain + createConnection + .flatMap(_.append(m)) + .map(id => MessageId(id.value)) + .compile + .toList + .map(_.head) override def acquireEventsStream( queueName: QueueName, - chunkSize: Int + chunkSize: Int, + maybeOffset: Option[MessageId] ): Stream[F, Message] = + val initialOffset: String => StreamingOffset[String] = + maybeOffset + .map(id => StreamingOffset.Custom[String](_, id.value)) + .getOrElse(StreamingOffset.All[String]) + createConnection >>= { - _.read(Set(queueName.toString), chunkSize) + _.read(Set(queueName.toString), chunkSize, initialOffset) .map(toMessage) .collect { case Some(m) => m } } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index 89125afd..f0ef83cf 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -19,6 +19,7 @@ package io.renku.redis.client import cats.effect.IO +import cats.syntax.all.* import fs2.* import fs2.concurrent.SignallingRef import io.renku.redis.client.RedisClientGenerators.* @@ -37,8 +38,8 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: message1 = "message1" _ <- client.enqueue(queue, toByteVector(message1)) - fiber <- client - .acquireEventsStream(queue, chunkSize = 1) + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) .compile .drain @@ -49,11 +50,39 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: _ <- client.enqueue(queue, toByteVector(message2)) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) - _ <- fiber.cancel + _ <- streamingProcFiber.cancel yield () } - test("allow marking and retrieve a processed event"): + test("can start enqueueing events from the given messageId excluding"): + withRedisClient.asQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + for + dequeued <- SignallingRef.of[IO, List[String]](Nil) + + message1 = "message1" + message1Id <- client.enqueue(queue, toByteVector(message1)) + + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some) + .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .compile + .drain + .start + + message2 = "message2" + _ <- client.enqueue(queue, toByteVector(message2)) + _ <- dequeued.waitUntil(_.toSet == Set(message2)) + + message3 = "message3" + _ <- client.enqueue(queue, toByteVector(message3)) + _ <- dequeued.waitUntil(_.toSet == Set(message2, message3)) + + _ <- streamingProcFiber.cancel + yield () + } + + test("allow marking and retrieving a processed event"): withRedisClient.asQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne val clientId = RedisClientGenerators.clientIdGen.generateOne diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala index 5c605ae4..c9e3adf7 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala @@ -47,8 +47,8 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec: message1 = ProjectCreated("my project", "my description", Some("myself"), now) _ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) - fiber <- client - .acquireEventsStream(queue, chunkSize = 1) + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) .evalTap(m => IO.println(avro.read[ProjectCreated](m.payload))) .evalMap(event => dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _) @@ -62,6 +62,6 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec: _ <- client.enqueue(queue, avro.write(Seq(message2))) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) - _ <- fiber.cancel + _ <- streamingProcFiber.cancel yield () }