Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: streaming events from Redis with a specified offset #6

Merged
merged 4 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ lazy val searchProvision = project
.withId("search-provision")
.settings(commonSettings)
.settings(
name := "search-provision"
name := "search-provision",
Test / testOptions += Tests.Setup(RedisServer.start),
Test / testOptions += Tests.Cleanup(RedisServer.stop)
)
.dependsOn(
commons % "compile->compile;test->test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,20 @@ import fs2.Stream
import scodec.bits.ByteVector

trait QueueClient[F[_]] {
def enqueue(queueName: QueueName, message: ByteVector): F[Unit]
def acquireEventsStream(queueName: QueueName, chunkSize: Int): Stream[F, Message]

def enqueue(queueName: QueueName, message: ByteVector): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message]

def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,8 @@ opaque type QueueName = String
object QueueName {
def apply(v: String): QueueName = new QueueName(v)
}

opaque type ClientId = String
object ClientId {
def apply(v: String): ClientId = new ClientId(v)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,46 @@

package io.renku.redis.client

import cats.effect.Async
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log
import dev.profunktor.redis4cats.streams.RedisStream
import dev.profunktor.redis4cats.streams.data.{XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XReadMessage}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.{Message, MessageId, QueueClient, QueueName}
import io.renku.queue.client.*
import scodec.bits.ByteVector

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"

override def enqueue(queueName: QueueName, message: ByteVector): F[Unit] =
override def enqueue(queueName: QueueName, message: ByteVector): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.toString, Map(payloadKey -> message))
)
createConnection.flatMap(_.append(m)).compile.drain
createConnection
.flatMap(_.append(m))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message] =
val initialOffset: String => StreamingOffset[String] =
maybeOffset
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

createConnection >>= {
_.read(Set(queueName.toString), chunkSize)
_.read(Set(queueName.toString), chunkSize, initialOffset)
.map(toMessage)
.collect { case Some(m) => m }
}
Expand All @@ -57,4 +70,27 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
private def createConnection =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)

override def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit] =
stringCommands.use {
_.set(formProcessedKey(clientId, queueName), messageId.value)
}

override def findLastProcessed(
clientId: ClientId,
queueName: QueueName
): F[Option[MessageId]] =
stringCommands.use {
_.get(formProcessedKey(clientId, queueName)).map(_.map(MessageId.apply))
}

private def stringCommands: Resource[F, RedisCommands[F, String, String]] =
Redis[F].fromClient(client, RedisCodec.Utf8)

private def formProcessedKey(clientId: ClientId, queueName: QueueName) =
s"$queueName.$clientId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@

package io.renku.redis.client

import io.renku.queue.client.QueueName
import io.renku.queue.client.*
import org.scalacheck.Gen
import org.scalacheck.Gen.alphaLowerChar
import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

object RedisClientGenerators:

Expand All @@ -29,6 +29,15 @@ object RedisClientGenerators:
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))

given Gen[QueueName] = queueNameGen
val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaNumChar).map(ClientId(_)))

val messageIdGen: Gen[MessageId] =
for
part1 <- Gen.chooseNum(3, 10)
part2 <- Gen.chooseNum(3, 10)
yield MessageId(s"$part1.$part2")

extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package io.renku.redis.client

import cats.effect.IO
import cats.syntax.all.*
import fs2.*
import fs2.concurrent.SignallingRef
import io.renku.redis.client.RedisClientGenerators.*
Expand All @@ -37,8 +38,8 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
message1 = "message1"
_ <- client.enqueue(queue, toByteVector(message1))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
Expand All @@ -49,7 +50,51 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}

test("can start enqueueing events from the given messageId excluding"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
dequeued <- SignallingRef.of[IO, List[String]](Nil)

message1 = "message1"
message1Id <- client.enqueue(queue, toByteVector(message1))

streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some)
.evalMap(event => dequeued.update(toStringUft8(event.payload) :: _))
.compile
.drain
.start

message2 = "message2"
_ <- client.enqueue(queue, toByteVector(message2))
_ <- dequeued.waitUntil(_.toSet == Set(message2))

message3 = "message3"
_ <- client.enqueue(queue, toByteVector(message3))
_ <- dequeued.waitUntil(_.toSet == Set(message2, message3))

_ <- streamingProcFiber.cancel
yield ()
}

test("allow marking and retrieving a processed event"):
withRedisClient.asQueueClient().use { client =>
val queue = RedisClientGenerators.queueNameGen.generateOne
val clientId = RedisClientGenerators.clientIdGen.generateOne
val messageId = RedisClientGenerators.messageIdGen.generateOne
for
_ <- client.findLastProcessed(clientId, queue).map(v => assert(v.isEmpty))

_ <- client.markProcessed(clientId, queue, messageId)

_ <- client
.findLastProcessed(clientId, queue)
.map(v => assert(v contains messageId))
yield ()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package io.renku.redis.client.util
import cats.effect.*
import dev.profunktor.redis4cats.connection.RedisClient
import dev.profunktor.redis4cats.data.RedisCodec
import dev.profunktor.redis4cats.effect.Log.Stdout.*
import dev.profunktor.redis4cats.effect.Log.Stdout.instance
import dev.profunktor.redis4cats.effect.MkRedis.forAsync
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import io.renku.queue.client.QueueClient
Expand All @@ -30,19 +30,21 @@ import io.renku.redis.client.RedisQueueClient
trait RedisSpec:
self: munit.Suite =>

export dev.profunktor.redis4cats.effect.Log.Stdout.instance

private lazy val server: RedisServer = RedisServer

abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"):
def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]]
def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]]
def asQueueClient(): Resource[IO, QueueClient[IO]]

val withRedisClient: RedisFixture = new RedisFixture:

def apply(): Resource[IO, RedisClient] =
RedisClient[IO].from(server.url)

override def asRedisCommand(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(Redis[IO].fromClient(_, RedisCodec.Utf8))
override def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] =
apply().flatMap(createRedisCommands)

override def asQueueClient(): Resource[IO, QueueClient[IO]] =
apply().map(new RedisQueueClient[IO](_))
Expand All @@ -53,5 +55,9 @@ trait RedisSpec:
override def afterAll(): Unit =
server.stop()

lazy val createRedisCommands
: RedisClient => Resource[IO, RedisCommands[IO, String, String]] =
Redis[IO].fromClient(_, RedisCodec.Utf8)

override def munitFixtures: Seq[Fixture[Resource[IO, RedisClient]]] =
List(withRedisClient)
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
message1 = ProjectCreated("my project", "my description", Some("myself"), now)
_ <- client.enqueue(queue, avro.write[ProjectCreated](Seq(message1)))

fiber <- client
.acquireEventsStream(queue, chunkSize = 1)
streamingProcFiber <- client
.acquireEventsStream(queue, chunkSize = 1, maybeOffset = None)
.evalTap(m => IO.println(avro.read[ProjectCreated](m.payload)))
.evalMap(event =>
dequeued.update(avro.read[ProjectCreated](event.payload).toList ::: _)
Expand All @@ -62,6 +62,6 @@ class SearchProvisionSpec extends CatsEffectSuite with RedisSpec:
_ <- client.enqueue(queue, avro.write(Seq(message2)))
_ <- dequeued.waitUntil(_.toSet == Set(message1, message2))

_ <- fiber.cancel
_ <- streamingProcFiber.cancel
yield ()
}