Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initial Redis client implementation #5

Merged
merged 9 commits into from
Jan 19, 2024
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

import scodec.bits.ByteVector

final case class Message(id: MessageId, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,12 @@
* limitations under the License.
*/

package io.renku.redis.client.util
package io.renku.queue.client

import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.RedisCommands
import munit.CatsEffectSuite
import fs2.Stream
import scodec.bits.ByteVector

class ConnectionTestSpec extends CatsEffectSuite with RedisSpec {

test("connect to Redis") {
withRedis().use { (redis: RedisCommands[IO, String, String]) =>
for
_ <- redis.set("foo", "123")
x <- redis.get("foo")
_ <- redis.setNx("foo", "should not happen")
y <- redis.get("foo")
_ <- IO(println(x === y)) // true
yield ()
}
}
trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

opaque type QueueName = String
object QueueName {
def apply(v: String): QueueName = new QueueName(v)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.effect.Async
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import fs2.Stream
import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName}
import scodec.bits.ByteVector

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"

override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
)
createConnection.flatMap(_.append(m)).compile.drain

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
): Stream[F, Message] =
createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
.map(toMessage)
.collect { case Some(m) => m }
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
m.body
.get(payloadKey)
.map(Message(MessageId(m.id.value), _))

private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import dev.profunktor.redis4cats.data.RedisCodec
import io.lettuce.core.codec.{ByteArrayCodec, RedisCodec as JRedisCodec, StringCodec}
import scodec.bits.ByteVector

import java.nio.ByteBuffer

object StringBytesCodec:

val instance: RedisCodec[String, ByteVector] = RedisCodec {
new JRedisCodec[String, ByteVector] {

override def decodeKey(bytes: ByteBuffer): String =
StringCodec.UTF8.decodeKey(bytes)

override def decodeValue(bytes: ByteBuffer): ByteVector =
ByteVector.view(ByteArrayCodec.INSTANCE.decodeValue(bytes))

override def encodeKey(key: String): ByteBuffer =
StringCodec.UTF8.encodeKey(key)

override def encodeValue(value: ByteVector): ByteBuffer =
ByteArrayCodec.INSTANCE.encodeValue(value.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import io.renku.queue.client.QueueName
import org.scalacheck.Gen
import org.scalacheck.Gen.alphaLowerChar

object RedisClientGenerators:

val queueNameGen: Gen[QueueName] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

given Gen[QueueName] = queueNameGen

extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.effect.IO
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.redis.client.RedisClientGenerators.*
import io.renku.redis.client.util.RedisSpec
import munit.CatsEffectSuite
import scodec.bits.ByteVector

class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:

test("can enqueue and dequeue events"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start
_ <- dequeued.waitUntil(_ == List(message1))

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
yield ()
}

private def toByteVector(v: String): ByteVector =
ByteVector.encodeUtf8(v).fold(throw _, identity)

private lazy val toStringUft8: ByteVector => String =
_.decodeUtf8.fold(throw _, identity)
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,39 @@
package io.renku.redis.client.util

import cats.effect.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log.Stdout.*
import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import io.renku.queue.client.QueueClient
import io.renku.redis.client.RedisQueueClient

trait RedisSpec:
self: munit.Suite =>

private lazy val server: RedisServer = RedisServer

val withRedis: Fixture[Resource[IO, RedisCommands[IO, String, String]]] =
new Fixture[Resource[IO, RedisCommands[IO, String, String]]]("redis") {
abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"):
def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]]
def asQueueClient(): Resource[IO, QueueClient[IO]]

def apply(): Resource[IO, RedisCommands[IO, String, String]] =
Redis[IO].utf8(server.url)
val withRedisClient: RedisFixture = new RedisFixture:

override def beforeAll(): Unit =
server.start()
def apply(): Resource[IO, RedisClient] =
RedisClient[IO].from(server.url)

override def afterAll(): Unit =
server.stop()
}
override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8))

override def munitFixtures
: Seq[Fixture[Resource[IO, RedisCommands[IO, String, String]]]] = List(withRedis)
override def asQueueClient(): Resource[IO, QueueClient[IO]] =
apply().map(new RedisQueueClient[IO](_))

override def beforeAll(): Unit =
server.start()

override def afterAll(): Unit =
server.stop()

override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] =
List(withRedisClient)
4 changes: 2 additions & 2 deletions project/RedisServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ object RedisServer {
}

private def call(methodName: String): ClassLoader => Unit = classLoader => {
val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$")
val method = clazz.getMethod(methodName)
val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$")
val method = clazz.getMethod(methodName)
val instance = clazz.getField("MODULE$").get(null)
method.invoke(instance)
}
Expand Down