From 0fccc73000216c832db0bc36c58d4bc17c949ada Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 17 Jan 2024 17:37:54 +0100 Subject: [PATCH 1/8] refactor: Command alias --- .../renku/redis/client/util/ConnectionTestSpec.scala | 3 +-- .../scala/io/renku/redis/client/util/RedisSpec.scala | 11 ++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala index b4f569d4..2aa802f4 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala @@ -2,13 +2,12 @@ package io.renku.redis.client.util import cats.effect.* import cats.implicits.* -import dev.profunktor.redis4cats.RedisCommands import munit.CatsEffectSuite class ConnectionTestSpec extends CatsEffectSuite with RedisSpec { test("connect to Redis") { - withRedis().use { (redis: RedisCommands[IO, String, String]) => + withRedis().use { (redis: Command) => for _ <- redis.set("foo", "123") x <- redis.get("foo") diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index 8c672d74..f7ea8734 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -28,10 +28,12 @@ trait RedisSpec: private lazy val server: RedisServer = RedisServer - val withRedis: Fixture[Resource[IO, RedisCommands[IO, String, String]]] = - new Fixture[Resource[IO, RedisCommands[IO, String, String]]]("redis") { + type Command = RedisCommands[IO, String, String] - def apply(): Resource[IO, RedisCommands[IO, String, String]] = + val withRedis: Fixture[Resource[IO, Command]] = + new Fixture[Resource[IO, Command]]("redis") { + + def apply(): Resource[IO, Command] = Redis[IO].utf8(server.url) override def beforeAll(): Unit = @@ -41,5 +43,4 @@ trait RedisSpec: server.stop() } - override def munitFixtures - : Seq[Fixture[Resource[IO, RedisCommands[IO, String, String]]]] = List(withRedis) + override def munitFixtures: Seq[Fixture[Resource[IO, Command]]] = List(withRedis) From 5ea794637df6a88de1e29023dff71fa5ef5c4db4 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 18 Jan 2024 18:41:36 +0100 Subject: [PATCH 2/8] feat: Redis client that enqueues and dequeues string messages --- .../io/renku/queue/client/QueueClient.scala | 26 +++++++++ .../io/renku/queue/client/QueueName.scala | 22 ++++++++ .../renku/redis/client/RedisQueueClient.scala | 51 ++++++++++++++++++ .../redis/client/RedisClientGenerators.scala | 34 ++++++++++++ .../redis/client/RedisQueueClientSpec.scala | 54 +++++++++++++++++++ .../client/util/ConnectionTestSpec.scala | 20 ------- .../renku/redis/client/util/RedisSpec.scala | 33 ++++++++---- 7 files changed, 209 insertions(+), 31 deletions(-) create mode 100644 modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala create mode 100644 modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala create mode 100644 modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala create mode 100644 modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala create mode 100644 modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala delete mode 100644 modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala new file mode 100644 index 00000000..940e9ff2 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import fs2.Stream + +trait QueueClient[F[_]] { + def enqueue(queueName: QueueName, message: String): F[Unit] + def acquireEventsStream(queueName: QueueName): Stream[F, String] +} diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala new file mode 100644 index 00000000..760a8029 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +case class QueueName(value: String): + override lazy val toString: String = value diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala new file mode 100644 index 00000000..506e1b99 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import cats.effect.Async +import cats.syntax.all.* +import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.data.RedisCodec +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.streams.RedisStream +import dev.profunktor.redis4cats.streams.data.XAddMessage +import fs2.Stream +import io.renku.queue.client.{QueueClient, QueueName} + +class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { + + private val payloadKey = "payload" + + override def enqueue(queueName: QueueName, message: String): F[Unit] = + val m = Stream + .emit[F, XAddMessage[String, String]]( + XAddMessage(queueName.value, Map(payloadKey -> message)) + ) + createConnection.flatMap(_.append(m)).compile.drain + + override def acquireEventsStream(queueName: QueueName): Stream[F, String] = + createConnection >>= { + _.read(Set(queueName.value), chunkSize = 1) + .map(_.body.get(payloadKey)) + .collect { case Some(m) => m } + } + + private def createConnection = + RedisStream.mkStreamingConnection[F, String, String](client, RedisCodec.Utf8) +} diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala new file mode 100644 index 00000000..1fc4e4d2 --- /dev/null +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import io.renku.queue.client.QueueName +import org.scalacheck.Gen +import org.scalacheck.Gen.alphaLowerChar + +object RedisClientGenerators: + + val queueNameGen: Gen[QueueName] = + Gen + .chooseNum(3, 10) + .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName.apply)) + + given Gen[QueueName] = queueNameGen + + extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala new file mode 100644 index 00000000..766ddcc1 --- /dev/null +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import cats.effect.IO +import fs2.* +import fs2.concurrent.SignallingRef +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.util.RedisSpec +import munit.CatsEffectSuite + +class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: + + test("can enqueue and dequeue events") { + withRedisClient.asQueueClient().use { client => + val queue = RedisClientGenerators.queueNameGen.generateOne + for + dequeued <- SignallingRef.of[IO, List[String]](Nil) + + message1 = "message1" + _ <- client.enqueue(queue, message1) + + fiber <- client + .acquireEventsStream(queue) + .evalMap(m => dequeued.update(m :: _)) + .compile + .drain + .start + _ <- dequeued.waitUntil(_ == List(message1)) + + message2 = "message2" + _ <- client.enqueue(queue, message2) + _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) + + _ <- fiber.cancel + yield () + } + } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala deleted file mode 100644 index 2aa802f4..00000000 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/ConnectionTestSpec.scala +++ /dev/null @@ -1,20 +0,0 @@ -package io.renku.redis.client.util - -import cats.effect.* -import cats.implicits.* -import munit.CatsEffectSuite - -class ConnectionTestSpec extends CatsEffectSuite with RedisSpec { - - test("connect to Redis") { - withRedis().use { (redis: Command) => - for - _ <- redis.set("foo", "123") - x <- redis.get("foo") - _ <- redis.setNx("foo", "should not happen") - y <- redis.get("foo") - _ <- IO(println(x === y)) // true - yield () - } - } -} diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index f7ea8734..49a95190 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -19,28 +19,39 @@ package io.renku.redis.client.util import cats.effect.* +import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.effect.Log.Stdout.* import dev.profunktor.redis4cats.effect.MkRedis.forAsync import dev.profunktor.redis4cats.{Redis, RedisCommands} +import io.renku.queue.client.QueueClient +import io.renku.redis.client.RedisQueueClient trait RedisSpec: self: munit.Suite => private lazy val server: RedisServer = RedisServer - type Command = RedisCommands[IO, String, String] + abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"): + def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] + def asQueueClient(): Resource[IO, QueueClient[IO]] - val withRedis: Fixture[Resource[IO, Command]] = - new Fixture[Resource[IO, Command]]("redis") { + val withRedisClient: RedisFixture = new RedisFixture: - def apply(): Resource[IO, Command] = - Redis[IO].utf8(server.url) + def apply(): Resource[IO, RedisClient] = + RedisClient[IO].from(server.url) - override def beforeAll(): Unit = - server.start() + override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] = + apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8)) - override def afterAll(): Unit = - server.stop() - } + override def asQueueClient(): Resource[IO, QueueClient[IO]] = + apply().map(new RedisQueueClient[IO](_)) - override def munitFixtures: Seq[Fixture[Resource[IO, Command]]] = List(withRedis) + override def beforeAll(): Unit = + server.start() + + override def afterAll(): Unit = + server.stop() + + override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] = + List(withRedisClient) From 8b14e7e5844a3650d83bbed3f8a6bd69ffd3b2ff Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 18 Jan 2024 18:47:00 +0100 Subject: [PATCH 3/8] feat: Redis client that chunkSize when dequeueing --- .../src/main/scala/io/renku/queue/client/QueueClient.scala | 4 +++- .../scala/io/renku/redis/client/RedisQueueClient.scala | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 940e9ff2..7b01204b 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -22,5 +22,7 @@ import fs2.Stream trait QueueClient[F[_]] { def enqueue(queueName: QueueName, message: String): F[Unit] - def acquireEventsStream(queueName: QueueName): Stream[F, String] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, String] + def acquireEventsStream(queueName: QueueName): Stream[F, String] = + acquireEventsStream(queueName, chunkSize = 1) } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 506e1b99..b11694be 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -39,9 +39,12 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien ) createConnection.flatMap(_.append(m)).compile.drain - override def acquireEventsStream(queueName: QueueName): Stream[F, String] = + override def acquireEventsStream( + queueName: QueueName, + chunkSize: Int + ): Stream[F, String] = createConnection >>= { - _.read(Set(queueName.value), chunkSize = 1) + _.read(Set(queueName.value), chunkSize = chunkSize) .map(_.body.get(payloadKey)) .collect { case Some(m) => m } } From 3a023ddafe0a40dacde19daa0b7ef28207a27a39 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Thu, 18 Jan 2024 19:42:18 +0100 Subject: [PATCH 4/8] feat: Redis client to work with Array[Byte] --- .../io/renku/queue/client/QueueClient.scala | 6 ++--- .../renku/redis/client/RedisQueueClient.scala | 24 ++++++++++++------- .../redis/client/RedisQueueClientSpec.scala | 10 ++++---- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index 7b01204b..ef79f1cd 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -21,8 +21,6 @@ package io.renku.queue.client import fs2.Stream trait QueueClient[F[_]] { - def enqueue(queueName: QueueName, message: String): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, String] - def acquireEventsStream(queueName: QueueName): Stream[F, String] = - acquireEventsStream(queueName, chunkSize = 1) + def enqueue(queueName: QueueName, message: Array[Byte]): F[Unit] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Array[Byte]] } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index b11694be..bcb765c8 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -30,25 +30,33 @@ import io.renku.queue.client.{QueueClient, QueueName} class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { - private val payloadKey = "payload" + private val payloadKeyEnc = encodeKey("payload") - override def enqueue(queueName: QueueName, message: String): F[Unit] = + override def enqueue(queueName: QueueName, message: Array[Byte]): F[Unit] = val m = Stream - .emit[F, XAddMessage[String, String]]( - XAddMessage(queueName.value, Map(payloadKey -> message)) + .emit[F, XAddMessage[Array[Byte], Array[Byte]]]( + XAddMessage(encodeKey(queueName.value), Map(payloadKeyEnc -> message)) ) createConnection.flatMap(_.append(m)).compile.drain override def acquireEventsStream( queueName: QueueName, chunkSize: Int - ): Stream[F, String] = + ): Stream[F, Array[Byte]] = createConnection >>= { - _.read(Set(queueName.value), chunkSize = chunkSize) - .map(_.body.get(payloadKey)) + _.read(Set(encodeKey(queueName.value)), chunkSize) + .map(_.body.find(payloadEntry).map(_._2)) .collect { case Some(m) => m } } + private lazy val payloadEntry: ((Array[Byte], Array[Byte])) => Boolean = { + case (k, _) => k.sameElements(payloadKeyEnc) + } + private def createConnection = - RedisStream.mkStreamingConnection[F, String, String](client, RedisCodec.Utf8) + RedisStream + .mkStreamingConnection[F, Array[Byte], Array[Byte]](client, RedisCodec.Bytes) + + private lazy val encodeKey: String => Array[Byte] = + RedisCodec.Utf8.underlying.encodeKey(_).array() } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index 766ddcc1..093bb5e8 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -25,6 +25,8 @@ import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec import munit.CatsEffectSuite +import java.nio.charset.StandardCharsets.UTF_8 + class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: test("can enqueue and dequeue events") { @@ -34,18 +36,18 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: dequeued <- SignallingRef.of[IO, List[String]](Nil) message1 = "message1" - _ <- client.enqueue(queue, message1) + _ <- client.enqueue(queue, message1.getBytes) fiber <- client - .acquireEventsStream(queue) - .evalMap(m => dequeued.update(m :: _)) + .acquireEventsStream(queue, chunkSize = 1) + .evalMap(m => dequeued.update(new String(m, UTF_8) :: _)) .compile .drain .start _ <- dequeued.waitUntil(_ == List(message1)) message2 = "message2" - _ <- client.enqueue(queue, message2) + _ <- client.enqueue(queue, message2.getBytes) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) _ <- fiber.cancel From 43309cef499cd0969f7c9cf62e8f70151d29f330 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 19 Jan 2024 12:39:43 +0100 Subject: [PATCH 5/8] feat: Redis codec of String and ByteVector --- .../io/renku/queue/client/QueueClient.scala | 5 ++- .../renku/redis/client/RedisQueueClient.scala | 25 ++++------- .../renku/redis/client/StringBytesCodec.scala | 44 +++++++++++++++++++ .../redis/client/RedisQueueClientSpec.scala | 15 ++++--- 4 files changed, 66 insertions(+), 23 deletions(-) create mode 100644 modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index ef79f1cd..c25b87bc 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -19,8 +19,9 @@ package io.renku.queue.client import fs2.Stream +import scodec.bits.ByteVector trait QueueClient[F[_]] { - def enqueue(queueName: QueueName, message: Array[Byte]): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Array[Byte]] + def enqueue(queueName: QueueName, message: ByteVector): F[Unit] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, ByteVector] } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index bcb765c8..83b629cb 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -21,42 +21,35 @@ package io.renku.redis.client import cats.effect.Async import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient -import dev.profunktor.redis4cats.data.RedisCodec import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.streams.RedisStream import dev.profunktor.redis4cats.streams.data.XAddMessage import fs2.Stream import io.renku.queue.client.{QueueClient, QueueName} +import scodec.bits.ByteVector class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { - private val payloadKeyEnc = encodeKey("payload") + private val payloadKey = "payload" - override def enqueue(queueName: QueueName, message: Array[Byte]): F[Unit] = + override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] = val m = Stream - .emit[F, XAddMessage[Array[Byte], Array[Byte]]]( - XAddMessage(encodeKey(queueName.value), Map(payloadKeyEnc -> message)) + .emit[F, XAddMessage[String, ByteVector]]( + XAddMessage(queueName.value, Map(payloadKey -> message)) ) createConnection.flatMap(_.append(m)).compile.drain override def acquireEventsStream( queueName: QueueName, chunkSize: Int - ): Stream[F, Array[Byte]] = + ): Stream[F, ByteVector] = createConnection >>= { - _.read(Set(encodeKey(queueName.value)), chunkSize) - .map(_.body.find(payloadEntry).map(_._2)) + _.read(Set(queueName.value), chunkSize) + .map(_.body.get(payloadKey)) .collect { case Some(m) => m } } - private lazy val payloadEntry: ((Array[Byte], Array[Byte])) => Boolean = { - case (k, _) => k.sameElements(payloadKeyEnc) - } - private def createConnection = RedisStream - .mkStreamingConnection[F, Array[Byte], Array[Byte]](client, RedisCodec.Bytes) - - private lazy val encodeKey: String => Array[Byte] = - RedisCodec.Utf8.underlying.encodeKey(_).array() + .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala new file mode 100644 index 00000000..574a8a45 --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.redis.client + +import dev.profunktor.redis4cats.data.RedisCodec +import io.lettuce.core.codec.{ByteArrayCodec, StringCodec, RedisCodec as JRedisCodec} +import scodec.bits.ByteVector + +import java.nio.ByteBuffer + +object StringBytesCodec: + + val instance: RedisCodec[String, ByteVector] = RedisCodec { + new JRedisCodec[String, ByteVector] { + + override def decodeKey(bytes: ByteBuffer): String = + StringCodec.UTF8.decodeKey(bytes) + + override def decodeValue(bytes: ByteBuffer): ByteVector = + ByteVector.view(ByteArrayCodec.INSTANCE.decodeValue(bytes)) + + override def encodeKey(key: String): ByteBuffer = + StringCodec.UTF8.encodeKey(key) + + override def encodeValue(value: ByteVector): ByteBuffer = + ByteArrayCodec.INSTANCE.encodeValue(value.toArray) + } + } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index 093bb5e8..d6b13cde 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -24,8 +24,7 @@ import fs2.concurrent.SignallingRef import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec import munit.CatsEffectSuite - -import java.nio.charset.StandardCharsets.UTF_8 +import scodec.bits.ByteVector class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: @@ -36,21 +35,27 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: dequeued <- SignallingRef.of[IO, List[String]](Nil) message1 = "message1" - _ <- client.enqueue(queue, message1.getBytes) + _ <- client.enqueue(queue, toByteVector(message1)) fiber <- client .acquireEventsStream(queue, chunkSize = 1) - .evalMap(m => dequeued.update(new String(m, UTF_8) :: _)) + .evalMap(m => dequeued.update(toStringUft8(m) :: _)) .compile .drain .start _ <- dequeued.waitUntil(_ == List(message1)) message2 = "message2" - _ <- client.enqueue(queue, message2.getBytes) + _ <- client.enqueue(queue, toByteVector(message2)) _ <- dequeued.waitUntil(_.toSet == Set(message1, message2)) _ <- fiber.cancel yield () } } + + private def toByteVector(v: String): ByteVector = + ByteVector.encodeUtf8(v).fold(throw _, identity) + + private lazy val toStringUft8: ByteVector => String = + _.decodeUtf8.fold(throw _, identity) From 7fee77c29bb4096e6d09d4a1dd38c354badab28e Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 19 Jan 2024 12:47:43 +0100 Subject: [PATCH 6/8] feat: QueueName as opaque type --- .../src/main/scala/io/renku/queue/client/QueueName.scala | 6 ++++-- .../main/scala/io/renku/redis/client/RedisQueueClient.scala | 4 ++-- .../scala/io/renku/redis/client/RedisClientGenerators.scala | 2 +- .../scala/io/renku/redis/client/RedisQueueClientSpec.scala | 3 +-- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala index 760a8029..3e1670bf 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueName.scala @@ -18,5 +18,7 @@ package io.renku.queue.client -case class QueueName(value: String): - override lazy val toString: String = value +opaque type QueueName = String +object QueueName { + def apply(v: String): QueueName = new QueueName(v) +} diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 83b629cb..bbade297 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -35,7 +35,7 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] = val m = Stream .emit[F, XAddMessage[String, ByteVector]]( - XAddMessage(queueName.value, Map(payloadKey -> message)) + XAddMessage(queueName.toString, Map(payloadKey -> message)) ) createConnection.flatMap(_.append(m)).compile.drain @@ -44,7 +44,7 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien chunkSize: Int ): Stream[F, ByteVector] = createConnection >>= { - _.read(Set(queueName.value), chunkSize) + _.read(Set(queueName.toString), chunkSize) .map(_.body.get(payloadKey)) .collect { case Some(m) => m } } diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala index 1fc4e4d2..97d9663c 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -27,7 +27,7 @@ object RedisClientGenerators: val queueNameGen: Gen[QueueName] = Gen .chooseNum(3, 10) - .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName.apply)) + .flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_))) given Gen[QueueName] = queueNameGen diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index d6b13cde..a71f486e 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -28,7 +28,7 @@ import scodec.bits.ByteVector class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: - test("can enqueue and dequeue events") { + test("can enqueue and dequeue events"): withRedisClient.asQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne for @@ -52,7 +52,6 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: _ <- fiber.cancel yield () } - } private def toByteVector(v: String): ByteVector = ByteVector.encodeUtf8(v).fold(throw _, identity) From df2b09fadfc00612a95e081db54ae93701cb3dd5 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 19 Jan 2024 14:28:26 +0100 Subject: [PATCH 7/8] refactor: QueueEvent -> Message --- .../scala/io/renku/queue/client/Message.scala | 25 +++++++++++++++++++ .../io/renku/queue/client/QueueClient.scala | 2 +- .../renku/redis/client/RedisQueueClient.scala | 13 +++++++--- .../redis/client/RedisQueueClientSpec.scala | 2 +- 4 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala new file mode 100644 index 00000000..e662dade --- /dev/null +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import scodec.bits.ByteVector + +final case class Message(id: MessageId, payload: ByteVector) + +final case class MessageId(value: String) extends AnyVal diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index c25b87bc..018d646e 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -23,5 +23,5 @@ import scodec.bits.ByteVector trait QueueClient[F[_]] { def enqueue(queueName: QueueName, message: ByteVector): F[Unit] - def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, ByteVector] + def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message] } diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index bbade297..68a9cba9 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -23,9 +23,9 @@ import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient import dev.profunktor.redis4cats.effect.Log import dev.profunktor.redis4cats.streams.RedisStream -import dev.profunktor.redis4cats.streams.data.XAddMessage +import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage} import fs2.Stream -import io.renku.queue.client.{QueueClient, QueueName} +import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName} import scodec.bits.ByteVector class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { @@ -42,13 +42,18 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien override def acquireEventsStream( queueName: QueueName, chunkSize: Int - ): Stream[F, ByteVector] = + ): Stream[F, Message] = createConnection >>= { _.read(Set(queueName.toString), chunkSize) - .map(_.body.get(payloadKey)) + .map(toMessage) .collect { case Some(m) => m } } + private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] = + m.body + .get(payloadKey) + .map(Message(MessageId(m.id.value), _)) + private def createConnection = RedisStream .mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index a71f486e..d6b71584 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -39,7 +39,7 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: fiber <- client .acquireEventsStream(queue, chunkSize = 1) - .evalMap(m => dequeued.update(toStringUft8(m) :: _)) + .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) .compile .drain .start From 834038da20fbb722907321691c9126e1c99af821 Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Fri, 19 Jan 2024 14:33:37 +0100 Subject: [PATCH 8/8] chore: formatting --- .../main/scala/io/renku/redis/client/StringBytesCodec.scala | 2 +- project/RedisServer.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala index 574a8a45..9d1711d9 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/StringBytesCodec.scala @@ -19,7 +19,7 @@ package io.renku.redis.client import dev.profunktor.redis4cats.data.RedisCodec -import io.lettuce.core.codec.{ByteArrayCodec, StringCodec, RedisCodec as JRedisCodec} +import io.lettuce.core.codec.{ByteArrayCodec, RedisCodec as JRedisCodec, StringCodec} import scodec.bits.ByteVector import java.nio.ByteBuffer diff --git a/project/RedisServer.scala b/project/RedisServer.scala index 266ef51e..dd22014f 100644 --- a/project/RedisServer.scala +++ b/project/RedisServer.scala @@ -34,8 +34,8 @@ object RedisServer { } private def call(methodName: String): ClassLoader => Unit = classLoader => { - val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$") - val method = clazz.getMethod(methodName) + val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$") + val method = clazz.getMethod(methodName) val instance = clazz.getField("MODULE$").get(null) method.invoke(instance) }