diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala new file mode 100644 index 00000000..e662dade --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import scodec.bits.ByteVector + +final case class Message(id: MessageId, payload: ByteVector) + +final case class MessageId(value: String) extends AnyVal diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index c25b87bc..018d646e 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -23,5 +23,5 @@ import scodec.bits.ByteVector trait QueueClient[F[_]] { def enqueue(queueName: QueueName, message: ByteVector): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, ByteVector] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message] } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index bbade297..68a9cba9 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -23,9 +23,9 @@ import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.streams.RedisStream -import dev.profunktor.redis4cats.streams.data.XAddMessage +import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage} import fs2.Stream -import io.renku.queue.client.{QueueClient, QueueName} +import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName} import scodec.bits.ByteVector class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { @@ -42,13 +42,18 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien override def acquireEventsStream( queueName: QueueName, chunkSize: Int - ): Stream[F, ByteVector] = + ): Stream[F, Message] = createConnection >>= { _.read(Set(queueName.toString), chunkSize) - .map(_.body.get(payloadKey)) + .map(toMessage) .collect { case Some(m) => m } } + private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] = + m.body + .get(payloadKey) + .map(Message(MessageId(m.id.value), _)) + private def createConnection = RedisStream .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index a71f486e..d6b71584 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -39,7 +39,7 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: fiber <- client .acquireEventsStream(queue, chunkSize = 1) - .evalMap(m => dequeued.update(toStringUft8(m) :: _)) + .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) .compile .drain .start