Skip to content

Commit

Permalink
feat: initial Redis client implementation (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Jan 19, 2024
1 parent 2920208 commit efbef2f
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

import scodec.bits.ByteVector

final case class Message(id: MessageId, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,12 @@
* limitations under the License.
*/

package io.renku.redis.client.util
package io.renku.queue.client

import cats.effect.*
import cats.implicits.*
import dev.profunktor.redis4cats.RedisCommands
import munit.CatsEffectSuite
import fs2.Stream
import scodec.bits.ByteVector

class ConnectionTestSpec extends CatsEffectSuite with RedisSpec {

test("connect to Redis") {
withRedis().use { (redis: RedisCommands[IO, String, String]) =>
for
_ <- redis.set("foo", "123")
x <- redis.get("foo")
_ <- redis.setNx("foo", "should not happen")
y <- redis.get("foo")
_ <- IO(println(x === y)) // true
yield ()
}
}
trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

opaque type QueueName = String
object QueueName {
def apply(v: String): QueueName = new QueueName(v)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.effect.Async
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import fs2.Stream
import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName}
import scodec.bits.ByteVector

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"

override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
)
createConnection.flatMap(_.append(m)).compile.drain

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
): Stream[F, Message] =
createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
.map(toMessage)
.collect { case Some(m) => m }
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
m.body
.get(payloadKey)
.map(Message(MessageId(m.id.value), _))

private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import dev.profunktor.redis4cats.data.RedisCodec
import io.lettuce.core.codec.{ByteArrayCodec, RedisCodec as JRedisCodec, StringCodec}
import scodec.bits.ByteVector

import java.nio.ByteBuffer

object StringBytesCodec:

val instance: RedisCodec[String, ByteVector] = RedisCodec {
new JRedisCodec[String, ByteVector] {

override def decodeKey(bytes: ByteBuffer): String =
StringCodec.UTF8.decodeKey(bytes)

override def decodeValue(bytes: ByteBuffer): ByteVector =
ByteVector.view(ByteArrayCodec.INSTANCE.decodeValue(bytes))

override def encodeKey(key: String): ByteBuffer =
StringCodec.UTF8.encodeKey(key)

override def encodeValue(value: ByteVector): ByteBuffer =
ByteArrayCodec.INSTANCE.encodeValue(value.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import io.renku.queue.client.QueueName
import org.scalacheck.Gen
import org.scalacheck.Gen.alphaLowerChar

object RedisClientGenerators:

val queueNameGen: Gen[QueueName] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

given Gen[QueueName] = queueNameGen

extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.effect.IO
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.redis.client.RedisClientGenerators.*
import io.renku.redis.client.util.RedisSpec
import munit.CatsEffectSuite
import scodec.bits.ByteVector

class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:

test("can enqueue and dequeue events"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start
_ <- dequeued.waitUntil(_ == List(message1))

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
yield ()
}

private def toByteVector(v: String): ByteVector =
ByteVector.encodeUtf8(v).fold(throw _, identity)

private lazy val toStringUft8: ByteVector => String =
_.decodeUtf8.fold(throw _, identity)
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,39 @@
package io.renku.redis.client.util

import cats.effect.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log.Stdout.*
import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import io.renku.queue.client.QueueClient
import io.renku.redis.client.RedisQueueClient

trait RedisSpec:
self: munit.Suite =>

private lazy val server: RedisServer = RedisServer

val withRedis: Fixture[Resource[IO, RedisCommands[IO, String, String]]] =
new Fixture[Resource[IO, RedisCommands[IO, String, String]]]("redis") {
abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"):
def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]]
def asQueueClient(): Resource[IO, QueueClient[IO]]

def apply(): Resource[IO, RedisCommands[IO, String, String]] =
Redis[IO].utf8(server.url)
val withRedisClient: RedisFixture = new RedisFixture:

override def beforeAll(): Unit =
server.start()
def apply(): Resource[IO, RedisClient] =
RedisClient[IO].from(server.url)

override def afterAll(): Unit =
server.stop()
}
override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8))

override def munitFixtures
: Seq[Fixture[Resource[IO, RedisCommands[IO, String, String]]]] = List(withRedis)
override def asQueueClient(): Resource[IO, QueueClient[IO]] =
apply().map(new RedisQueueClient[IO](_))

override def beforeAll(): Unit =
server.start()

override def afterAll(): Unit =
server.stop()

override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] =
List(withRedisClient)
4 changes: 2 additions & 2 deletions project/RedisServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ object RedisServer {
}

private def call(methodName: String): ClassLoader => Unit = classLoader => {
val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$")
val method = clazz.getMethod(methodName)
val clazz = classLoader.loadClass("io.renku.redis.client.util.RedisServer$")
val method = clazz.getMethod(methodName)
val instance = clazz.getField("MODULE$").get(null)
method.invoke(instance)
}
Expand Down

0 comments on commit efbef2f

Please sign in to comment.