Skip to content

Commit

Permalink
refactor: QueueEvent -> Message
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Jan 19, 2024
1 parent 7fee77c commit df2b09f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

import scodec.bits.ByteVector

final case class Message(id: MessageId, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ import scodec.bits.ByteVector

trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, ByteVector]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.XAddMessage
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import fs2.Stream
import io.renku.queue.client.{QueueClient, QueueName}
import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName}
import scodec.bits.ByteVector

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {
Expand All @@ -42,13 +42,18 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
): Stream[F, ByteVector] =
): Stream[F, Message] =
createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
.map(_.body.get(payloadKey))
.map(toMessage)
.collect { case Some(m) => m }
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
m.body
.get(payloadKey)
.map(Message(MessageId(m.id.value), _))

private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
.evalMap(m => dequeued.update(toStringUft8(m) :: _))
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start
Expand Down

0 comments on commit df2b09f

Please sign in to comment.