Skip to content

Commit

Permalink
feat: event header to be encoded as single key in JSON (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro authored Feb 20, 2024
1 parent 77821c3 commit a0473f8
Show file tree
Hide file tree
Showing 23 changed files with 460 additions and 260 deletions.
22 changes: 20 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ lazy val root = project
httpClient,
events,
redisClient,
renkuRedisClient,
solrClient,
searchQuery,
searchSolrClient,
Expand Down Expand Up @@ -144,6 +145,23 @@ lazy val redisClient = project
commons % "test->test"
)

lazy val renkuRedisClient = project
.in(file("modules/renku-redis-client"))
.withId("renku-redis-client")
.settings(commonSettings)
.settings(
name := "renku-redis-client",
libraryDependencies ++=
Dependencies.catsEffect ++
Dependencies.redis4Cats ++
Dependencies.redis4CatsStreams
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test"
)

lazy val solrClient = project
.in(file("modules/solr-client"))
.withId("solr-client")
Expand Down Expand Up @@ -230,7 +248,7 @@ lazy val configValues = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test"
)

Expand Down Expand Up @@ -259,7 +277,7 @@ lazy val searchProvision = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test",
configValues % "compile->compile;test->test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.{ConfigDecoder, ConfigError}
import io.renku.queue.client.QueueName
import io.renku.redis.client.*
import org.http4s.Uri

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.*
import io.renku.queue.client.QueueName
import io.renku.redis.client.*
import io.renku.solr.client.{SolrConfig, SolrUser}
import org.http4s.Uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package io.renku.search.provision
package io.renku.events

import io.renku.events.v1.{ProjectCreated, Visibility}
import org.scalacheck.Gen
Expand All @@ -25,7 +25,7 @@ import org.scalacheck.Gen.alphaNumChar
import java.time.Instant
import java.time.temporal.ChronoUnit

object Generators:
object EventsGenerators:

def projectCreatedGen(prefix: String): Gen[ProjectCreated] =
for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@
package io.renku.redis.client

private object MessageBodyKeys:
val header = "header"
val payload = "payload"
val source = "source"
val messageType = "type"
val contentType = "dataContentType"
val schemaVersion = "schemaVersion"
val time = "time"
val requestId = "requestId"
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,11 @@

package io.renku.redis.client

import cats.syntax.all.*
import dev.profunktor.redis4cats.streams.data.XReadMessage
import io.renku.queue.client.{DataContentType, Header, Message, MessageId}
import scodec.bits.ByteVector

import java.time.Instant
final case class RedisMessage(id: MessageId, header: ByteVector, payload: ByteVector)

private object RedisMessage:

def bodyFrom(
header: Header,
payload: ByteVector
): Either[Throwable, Map[String, ByteVector]] =
BodyMap()
.add(MessageBodyKeys.payload, payload)
.flatMap(_.add(MessageBodyKeys.contentType, header.dataContentType.mimeType))
.flatMap(_.maybeAdd(MessageBodyKeys.source, header.source.map(_.value)))
.flatMap(_.maybeAdd(MessageBodyKeys.messageType, header.messageType.map(_.value)))
.flatMap(
_.maybeAdd(MessageBodyKeys.schemaVersion, header.schemaVersion.map(_.value))
)
.flatMap(
_.add(MessageBodyKeys.time, header.time.map(_.value).getOrElse(Instant.now()))
)
.flatMap(_.maybeAdd(MessageBodyKeys.requestId, header.requestId.map(_.value)))
.map(_.body)

def toMessage(
rm: XReadMessage[String, ByteVector]
): Either[Throwable, Option[Message]] =
val bodyMap = BodyMap(rm.body)
for
maybeContentType <- bodyMap
.get[String](MessageBodyKeys.contentType)
.flatMap(_.map(DataContentType.from).sequence)
maybePayload <- bodyMap.get[ByteVector](MessageBodyKeys.payload)
yield (maybeContentType, maybePayload)
.mapN(Message(MessageId(rm.id.value), _, _))

private case class BodyMap(body: Map[String, ByteVector] = Map.empty):

def add[V](key: String, value: V)(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
encoder
.encode(value)
.map(encV => copy(body = body + (key -> encV)))

def maybeAdd[V](key: String, maybeV: Option[V])(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
maybeV
.map(add(key, _))
.getOrElse(this.asRight)

def apply[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, V] =
get(key).flatMap(_.toRight(new Exception(s"No '$key' in Redis message")))

def get[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, Option[V]] =
body.get(key).map(decoder.decode).sequence

private trait ValueEncoder[A]:
def encode(v: A): Either[Throwable, ByteVector]
def contramap[B](f: B => A): ValueEncoder[B] = (b: B) => encode(f(b))

private object ValueEncoder:
def apply[A](using enc: ValueEncoder[A]): ValueEncoder[A] = enc

private given ValueEncoder[String] = ByteVector.encodeUtf8(_)
private given ValueEncoder[ByteVector] = identity(_).asRight
private given ValueEncoder[Long] = ByteVector.fromLong(_).asRight
private given ValueEncoder[Instant] =
ValueEncoder[Long].contramap[Instant](_.toEpochMilli)

private trait ValueDecoder[A]:
def decode(bv: ByteVector): Either[Throwable, A]
def map[B](f: A => B): ValueDecoder[B] = (bv: ByteVector) => decode(bv).map(f)

private object ValueDecoder:
def apply[A](using dec: ValueDecoder[A]): ValueDecoder[A] = dec

private given ValueDecoder[String] = _.decodeUtf8
private given ValueDecoder[ByteVector] = identity(_).asRight
private given ValueDecoder[Long] = _.toLong().asRight
private given ValueDecoder[Instant] = ValueDecoder[Long].map(Instant.ofEpochMilli)
opaque type MessageId = String
object MessageId:
def apply(v: String): MessageId = v
extension (self: MessageId) def value: String = self
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package io.renku.redis.client

import cats.MonadThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
Expand All @@ -28,57 +27,88 @@ import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XRe
import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.*
import io.renku.redis.client.RedisMessage.*
import scodec.bits.ByteVector
import scribe.Scribe

trait RedisQueueClient[F[_]] {

def enqueue(
queueName: QueueName,
header: ByteVector,
payload: ByteVector
): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, RedisMessage]

def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]
}

object RedisQueueClient:

def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] =
def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, RedisQueueClient[F]] =
given Scribe[F] = scribe.cats[F]
given Log[F] = RedisLogger[F]
ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClient(_))
ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClientImpl(_))

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {
class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
extends RedisQueueClient[F] {

override def enqueue(
queueName: QueueName,
header: Header,
header: ByteVector,
payload: ByteVector
): F[MessageId] =
for
messageBody <- MonadThrow[F].fromEither(RedisMessage.bodyFrom(header, payload))
message = Stream.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.name, messageBody)
)
id <- makeStreamingConnection
.flatMap(_.append(message))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)
yield id
val messageBody = Map(
MessageBodyKeys.header -> header,
MessageBodyKeys.payload -> payload
)
val message = Stream.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.name, messageBody)
)
makeStreamingConnection
.flatMap(_.append(message))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message] =
): Stream[F, RedisMessage] =
val initialOffset: String => StreamingOffset[String] =
maybeOffset
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

def logError(rm: XReadMessage[_, _]): Throwable => F[Option[Message]] = err =>
Log[F]
.error(s"Decoding message ${rm.id} failed: ${err.getMessage}")
.as(Option.empty)
def toMessage(rm: XReadMessage[String, ByteVector]): Option[RedisMessage] =
(rm.body.get(MessageBodyKeys.header), rm.body.get(MessageBodyKeys.payload))
.mapN(RedisMessage(MessageId(rm.id.value), _, _))

lazy val logInfo: ((XReadMessage[_, _], Option[RedisMessage])) => F[Unit] = {
case (m, None) =>
Log[F].info(
s"Message '${m.id}' skipped as it has no '${MessageBodyKeys.header}' or '${MessageBodyKeys.payload}'"
)
case _ => ().pure[F]
}

makeStreamingConnection >>= {
_.read(Set(queueName.name), chunkSize, initialOffset)
.evalMap(rm => toMessage(rm).fold(logError(rm), _.pure[F]))
.collect { case Some(m) => m }
.map(rm => rm -> toMessage(rm))
.evalTap(logInfo)
.collect { case (_, Some(m)) => m }
}

override def markProcessed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package io.renku.queue.client
package io.renku.redis.client

opaque type QueueName = String
object QueueName:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package io.renku.redis.client

import io.renku.queue.client.*
import org.scalacheck.Gen
import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

import java.time.Instant

object RedisClientGenerators:

val stringGen: Gen[String] =
Expand All @@ -34,19 +31,6 @@ object RedisClientGenerators:
val queueNameGen: Gen[QueueName] =
stringGen.map(QueueName(_))

val dataContentTypeGen: Gen[DataContentType] =
Gen.oneOf(DataContentType.values.toSet)

val headerGen: Gen[Header] =
for
source <- Gen.option(stringGen.map(MessageSource.apply))
messageType <- Gen.option(stringGen.map(MessageType.apply))
dataContentType <- dataContentTypeGen
schemaVersion <- Gen.option(stringGen.map(SchemaVersion.apply))
time <- Gen.option(Gen.const(CreationTime(Instant.now())))
requestId <- Gen.option(stringGen.map(RequestId.apply))
yield Header(source, messageType, dataContentType, schemaVersion, time, requestId)

val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
Expand Down
Loading

0 comments on commit a0473f8

Please sign in to comment.