diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala new file mode 100644 index 00000000..e662dade --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import scodec.bits.ByteVector + +final case class Message(id: MessageId, payload: ByteVector) + +final case class MessageId(value: String) extends AnyVal diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala similarity index 58% rename from modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala rename to modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 3a402130..018d646e 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -16,24 +16,12 @@ * limitations under the License. */ -package io.renku.redis.client.util +package io.renku.queue.client -import cats.effect.* -import cats.implicits.* -import dev.profunktor.redis4cats.RedisCommands -import munit.CatsEffectSuite +import fs2.Stream +import scodec.bits.ByteVector -class ConnectionTestSpec extends CatsEffectSuite with RedisSpec { - - test("connect to Redis") { - withRedis().use { (redis: RedisCommands[IO, String, String]) => - for - _ <- redis.set("foo", "123") - x <- redis.get("foo") - _ <- redis.setNx("foo", "should not happen") - y <- redis.get("foo") - _ <- IO(println(x === y)) // true - yield () - } - } +trait QueueClient[F[_]] { + def enqueue(queueName: QueueName, message: ByteVector): F[Unit] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message] } diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala new file mode 100644 index 00000000..3e1670bf --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +opaque type QueueName = String +object QueueName { + def apply(v: String): QueueName = new QueueName(v) +} diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala new file mode 100644 index 00000000..68a9cba9 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import cats.effect.Async +import cats.syntax.all.* +import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.streams.RedisStream +import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage} +import fs2.Stream +import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName} +import scodec.bits.ByteVector + +class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { + + private val payloadKey = "payload" + + override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] = + val m = Stream + .emit[F, XAddMessage[String, ByteVector]]( + XAddMessage(queueName.toString, Map(payloadKey -> message)) + ) + createConnection.flatMap(_.append(m)).compile.drain + + override def acquireEventsStream( + queueName: QueueName, + chunkSize: Int + ): Stream[F, Message] = + createConnection >>= { + _.read(Set(queueName.toString), chunkSize) + .map(toMessage) + .collect { case Some(m) => m } + } + + private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] = + m.body + .get(payloadKey) + .map(Message(MessageId(m.id.value), _)) + + private def createConnection = + RedisStream + .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) +} diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala new file mode 100644 index 00000000..9d1711d9 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import dev.profunktor.redis4cats.data.RedisCodec +import io.lettuce.core.codec.{ByteArrayCodec, RedisCodec as JRedisCodec, StringCodec} +import scodec.bits.ByteVector + +import java.nio.ByteBuffer + +object StringBytesCodec: + + val instance: RedisCodec[String, ByteVector] = RedisCodec { + new JRedisCodec[String, ByteVector] { + + override def decodeKey(bytes: ByteBuffer): String = + StringCodec.UTF8.decodeKey(bytes) + + override def decodeValue(bytes: ByteBuffer): ByteVector = + ByteVector.view(ByteArrayCodec.INSTANCE.decodeValue(bytes)) + + override def encodeKey(key: String): ByteBuffer = + StringCodec.UTF8.encodeKey(key) + + override def encodeValue(value: ByteVector): ByteBuffer = + ByteArrayCodec.INSTANCE.encodeValue(value.toArray) + } + } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala new file mode 100644 index 00000000..97d9663c --- /dev/null +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import io.renku.queue.client.QueueName +import org.scalacheck.Gen +import org.scalacheck.Gen.alphaLowerChar + +object RedisClientGenerators: + + val queueNameGen: Gen[QueueName] = + Gen + .chooseNum(3, 10) + .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_))) + + given Gen[QueueName] = queueNameGen + + extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala new file mode 100644 index 00000000..d6b71584 --- /dev/null +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import cats.effect.IO +import fs2.* +import fs2.concurrent.SignallingRef +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.util.RedisSpec +import munit.CatsEffectSuite +import scodec.bits.ByteVector + +class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: + + test("can enqueue and dequeue events"): + withRedisClient.asQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + for + dequeued <- SignallingRef.of[IO, List[String]](Nil) + + message1 = "message1" + _ <- client.enqueue(queue, toByteVector(message1)) + + fiber <- client + .acquireEventsStream(queue, chunkSize = 1) + .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .compile + .drain + .start + _ <- dequeued.waitUntil(_ == List(message1)) + + message2 = "message2" + _ <- client.enqueue(queue, toByteVector(message2)) + _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) + + _ <- fiber.cancel + yield () + } + + private def toByteVector(v: String): ByteVector = + ByteVector.encodeUtf8(v).fold(throw _, identity) + + private lazy val toStringUft8: ByteVector => String = + _.decodeUtf8.fold(throw _, identity) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index 8c672d74..49a95190 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -19,27 +19,39 @@ package io.renku.redis.client.util import cats.effect.* +import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.effect.Log.Stdout.* import dev.profunktor.redis4cats.effect.MkRedis.forAsync import dev.profunktor.redis4cats.{Redis, RedisCommands} +import io.renku.queue.client.QueueClient +import io.renku.redis.client.RedisQueueClient trait RedisSpec: self: munit.Suite => private lazy val server: RedisServer = RedisServer - val withRedis: Fixture[Resource[IO, RedisCommands[IO, String, String]]] = - new Fixture[Resource[IO, RedisCommands[IO, String, String]]]("redis") { + abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"): + def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] + def asQueueClient(): Resource[IO, QueueClient[IO]] - def apply(): Resource[IO, RedisCommands[IO, String, String]] = - Redis[IO].utf8(server.url) + val withRedisClient: RedisFixture = new RedisFixture: - override def beforeAll(): Unit = - server.start() + def apply(): Resource[IO, RedisClient] = + RedisClient[IO].from(server.url) - override def afterAll(): Unit = - server.stop() - } + override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] = + apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8)) - override def munitFixtures - : Seq[Fixture[Resource[IO, RedisCommands[IO, String, String]]]] = List(withRedis) + override def asQueueClient(): Resource[IO, QueueClient[IO]] = + apply().map(new RedisQueueClient[IO](_)) + + override def beforeAll(): Unit = + server.start() + + override def afterAll(): Unit = + server.stop() + + override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] = + List(withRedisClient) diff --git a/project/RedisServer.scala b/project/RedisServer.scala index 266ef51e..dd22014f 100644 --- a/project/RedisServer.scala +++ b/project/RedisServer.scala @@ -34,8 +34,8 @@ object RedisServer { } private def call(methodName: String): ClassLoader => Unit = classLoader => { - val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$") - val method = clazz.getMethod(methodName) + val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$") + val method = clazz.getMethod(methodName) val instance = clazz.getField("MODULE$").get(null) method.invoke(instance) }