diff --git a/build.sbt b/build.sbt index f951d04f..4ace075e 100644 --- a/build.sbt +++ b/build.sbt @@ -119,7 +119,9 @@ lazy val searchProvision = project .withId("search-provision") .settings(commonSettings) .settings( - name := "search-provision" + name := "search-provision", + Test / testOptions += Tests.Setup(RedisServer.start), + Test / testOptions += Tests.Cleanup(RedisServer.stop) ) .dependsOn( commons % "compile->compile;test->test", diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 018d646e..160e22a4 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -22,6 +22,20 @@ import fs2.Stream import scodec.bits.ByteVector trait QueueClient[F[_]] { - def enqueue(queueName: QueueName, message: ByteVector): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message] + + def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] + + def acquireEventsStream( + queueName: QueueName, + chunkSize: Int, + maybeOffset: Option[MessageId] + ): Stream[F, Message] + + def markProcessed( + clientId: ClientId, + queueName: QueueName, + messageId: MessageId + ): F[Unit] + + def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] } diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/types.scala similarity index 89% rename from modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala rename to modules/redis-client/src/main/scala/io/renku/queue/client/types.scala index 3e1670bf..918c283d 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/types.scala @@ -22,3 +22,8 @@ opaque type QueueName = String object QueueName { def apply(v: String): QueueName = new QueueName(v) } + +opaque type ClientId = String +object ClientId { + def apply(v: String): ClientId = new ClientId(v) +} diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 68a9cba9..8799280a 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -18,33 +18,46 @@ package io.renku.redis.client -import cats.effect.Async +import cats.effect.{Async, Resource} import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.streams.RedisStream -import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage} +import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage} +import dev.profunktor.redis4cats.{Redis, RedisCommands} import fs2.Stream -import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName} +import io.renku.queue.client.* import scodec.bits.ByteVector class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { private val payloadKey = "payload" - override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] = + override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] = val m = Stream .emit[F, XAddMessage[String, ByteVector]]( XAddMessage(queueName.toString, Map(payloadKey -> message)) ) - createConnection.flatMap(_.append(m)).compile.drain + createConnection + .flatMap(_.append(m)) + .map(id => MessageId(id.value)) + .compile + .toList + .map(_.head) override def acquireEventsStream( queueName: QueueName, - chunkSize: Int + chunkSize: Int, + maybeOffset: Option[MessageId] ): Stream[F, Message] = + val initialOffset: String => StreamingOffset[String] = + maybeOffset + .map(id => StreamingOffset.Custom[String](_, id.value)) + .getOrElse(StreamingOffset.All[String]) + createConnection >>= { - _.read(Set(queueName.toString), chunkSize) + _.read(Set(queueName.toString), chunkSize, initialOffset) .map(toMessage) .collect { case Some(m) => m } } @@ -57,4 +70,27 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien private def createConnection = RedisStream .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) + + override def markProcessed( + clientId: ClientId, + queueName: QueueName, + messageId: MessageId + ): F[Unit] = + stringCommands.use { + _.set(formProcessedKey(clientId, queueName), messageId.value) + } + + override def findLastProcessed( + clientId: ClientId, + queueName: QueueName + ): F[Option[MessageId]] = + stringCommands.use { + _.get(formProcessedKey(clientId, queueName)).map(_.map(MessageId.apply)) + } + + private def stringCommands: Resource[F, RedisCommands[F, String, String]] = + Redis[F].fromClient(client, RedisCodec.Utf8) + + private def formProcessedKey(clientId: ClientId, queueName: QueueName) = + s"$queueName.$clientId" } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala index 97d9663c..74a9c6d7 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -18,9 +18,9 @@ package io.renku.redis.client -import io.renku.queue.client.QueueName +import io.renku.queue.client.* import org.scalacheck.Gen -import org.scalacheck.Gen.alphaLowerChar +import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar} object RedisClientGenerators: @@ -29,6 +29,15 @@ object RedisClientGenerators: .chooseNum(3, 10) .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_))) - given Gen[QueueName] = queueNameGen + val clientIdGen: Gen[ClientId] = + Gen + .chooseNum(3, 10) + .flatMap(Gen.stringOfN(_, alphaNumChar).map(ClientId(_))) + + val messageIdGen: Gen[MessageId] = + for + part1 <- Gen.chooseNum(3, 10) + part2 <- Gen.chooseNum(3, 10) + yield MessageId(s"$part1.$part2") extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index d6b71584..f0ef83cf 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -19,6 +19,7 @@ package io.renku.redis.client import cats.effect.IO +import cats.syntax.all.* import fs2.* import fs2.concurrent.SignallingRef import io.renku.redis.client.RedisClientGenerators.* @@ -37,8 +38,8 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: message1 = "message1" _ <- client.enqueue(queue, toByteVector(message1)) - fiber <- client - .acquireEventsStream(queue, chunkSize = 1) + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) .compile .drain @@ -49,7 +50,51 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: _ <- client.enqueue(queue, toByteVector(message2)) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) - _ <- fiber.cancel + _ <- streamingProcFiber.cancel + yield () + } + + test("can start enqueueing events from the given messageId excluding"): + withRedisClient.asQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + for + dequeued <- SignallingRef.of[IO, List[String]](Nil) + + message1 = "message1" + message1Id <- client.enqueue(queue, toByteVector(message1)) + + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some) + .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .compile + .drain + .start + + message2 = "message2" + _ <- client.enqueue(queue, toByteVector(message2)) + _ <- dequeued.waitUntil(_.toSet == Set(message2)) + + message3 = "message3" + _ <- client.enqueue(queue, toByteVector(message3)) + _ <- dequeued.waitUntil(_.toSet == Set(message2, message3)) + + _ <- streamingProcFiber.cancel + yield () + } + + test("allow marking and retrieving a processed event"): + withRedisClient.asQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + val clientId = RedisClientGenerators.clientIdGen.generateOne + val messageId = RedisClientGenerators.messageIdGen.generateOne + for + _ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty)) + + _ <- client.markProcessed(clientId, queue, messageId) + + _ <- client + .findLastProcessed(clientId, queue) + .map(v => assert(v contains messageId)) yield () } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index 49a95190..d8d2451d 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -21,7 +21,7 @@ package io.renku.redis.client.util import cats.effect.* import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.data.RedisCodec -import dev.profunktor.redis4cats.effect.Log.Stdout.* +import dev.profunktor.redis4cats.effect.Log.Stdout.instance import dev.profunktor.redis4cats.effect.MkRedis.forAsync import dev.profunktor.redis4cats.{Redis, RedisCommands} import io.renku.queue.client.QueueClient @@ -30,10 +30,12 @@ import io.renku.redis.client.RedisQueueClient trait RedisSpec: self: munit.Suite => + export dev.profunktor.redis4cats.effect.Log.Stdout.instance + private lazy val server: RedisServer = RedisServer abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"): - def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] + def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] def asQueueClient(): Resource[IO, QueueClient[IO]] val withRedisClient: RedisFixture = new RedisFixture: @@ -41,8 +43,8 @@ trait RedisSpec: def apply(): Resource[IO, RedisClient] = RedisClient[IO].from(server.url) - override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] = - apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8)) + override def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] = + apply().flatMap(createRedisCommands) override def asQueueClient(): Resource[IO, QueueClient[IO]] = apply().map(new RedisQueueClient[IO](_)) @@ -53,5 +55,9 @@ trait RedisSpec: override def afterAll(): Unit = server.stop() + lazy val createRedisCommands + : RedisClient => Resource[IO, RedisCommands[IO, String, String]] = + Redis[IO].fromClient(_, RedisCodec.Utf8) + override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] = List(withRedisClient) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala index 5c605ae4..c9e3adf7 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionSpec.scala @@ -47,8 +47,8 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec: message1 = ProjectCreated("my project", "my description", Some("myself"), now) _ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1))) - fiber <- client - .acquireEventsStream(queue, chunkSize = 1) + streamingProcFiber <- client + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) .evalTap(m => IO.println(avro.read[ProjectCreated](m.payload))) .evalMap(event => dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _) @@ -62,6 +62,6 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec: _ <- client.enqueue(queue, avro.write(Seq(message2))) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) - _ <- fiber.cancel + _ <- streamingProcFiber.cancel yield () }