Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: event header to be encoded as single key in JSON #23

Merged
merged 2 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ lazy val root = project
httpClient,
events,
redisClient,
renkuRedisClient,
solrClient,
searchQuery,
searchSolrClient,
Expand Down Expand Up @@ -144,6 +145,23 @@ lazy val redisClient = project
commons % "test->test"
)

lazy val renkuRedisClient = project
.in(file("modules/renku-redis-client"))
.withId("renku-redis-client")
.settings(commonSettings)
.settings(
name := "renku-redis-client",
libraryDependencies ++=
Dependencies.catsEffect ++
Dependencies.redis4Cats ++
Dependencies.redis4CatsStreams
)
.enablePlugins(AutomateHeaderPlugin)
.dependsOn(
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test"
)

lazy val solrClient = project
.in(file("modules/solr-client"))
.withId("solr-client")
Expand Down Expand Up @@ -230,7 +248,7 @@ lazy val configValues = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test"
)

Expand Down Expand Up @@ -259,7 +277,7 @@ lazy val searchProvision = project
.dependsOn(
commons % "compile->compile;test->test",
events % "compile->compile;test->test",
redisClient % "compile->compile;test->test",
renkuRedisClient % "compile->compile;test->test",
searchSolrClient % "compile->compile;test->test",
configValues % "compile->compile;test->test"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.{ConfigDecoder, ConfigError}
import io.renku.queue.client.QueueName
import io.renku.redis.client.*
import org.http4s.Uri

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package io.renku.search.config

import cats.syntax.all.*
import ciris.*
import io.renku.queue.client.QueueName
import io.renku.redis.client.*
import io.renku.solr.client.{SolrConfig, SolrUser}
import org.http4s.Uri
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package io.renku.search.provision
package io.renku.events

import io.renku.events.v1.{ProjectCreated, Visibility}
import org.scalacheck.Gen
Expand All @@ -25,7 +25,7 @@ import org.scalacheck.Gen.alphaNumChar
import java.time.Instant
import java.time.temporal.ChronoUnit

object Generators:
object EventsGenerators:

def projectCreatedGen(prefix: String): Gen[ProjectCreated] =
for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,5 @@
package io.renku.redis.client

private object MessageBodyKeys:
val header = "header"
val payload = "payload"
val source = "source"
val messageType = "type"
val contentType = "dataContentType"
val schemaVersion = "schemaVersion"
val time = "time"
val requestId = "requestId"
Original file line number Diff line number Diff line change
Expand Up @@ -18,92 +18,11 @@

package io.renku.redis.client

import cats.syntax.all.*
import dev.profunktor.redis4cats.streams.data.XReadMessage
import io.renku.queue.client.{DataContentType, Header, Message, MessageId}
import scodec.bits.ByteVector

import java.time.Instant
final case class RedisMessage(id: MessageId, header: ByteVector, payload: ByteVector)

private object RedisMessage:

def bodyFrom(
header: Header,
payload: ByteVector
): Either[Throwable, Map[String, ByteVector]] =
BodyMap()
.add(MessageBodyKeys.payload, payload)
.flatMap(_.add(MessageBodyKeys.contentType, header.dataContentType.mimeType))
.flatMap(_.maybeAdd(MessageBodyKeys.source, header.source.map(_.value)))
.flatMap(_.maybeAdd(MessageBodyKeys.messageType, header.messageType.map(_.value)))
.flatMap(
_.maybeAdd(MessageBodyKeys.schemaVersion, header.schemaVersion.map(_.value))
)
.flatMap(
_.add(MessageBodyKeys.time, header.time.map(_.value).getOrElse(Instant.now()))
)
.flatMap(_.maybeAdd(MessageBodyKeys.requestId, header.requestId.map(_.value)))
.map(_.body)

def toMessage(
rm: XReadMessage[String, ByteVector]
): Either[Throwable, Option[Message]] =
val bodyMap = BodyMap(rm.body)
for
maybeContentType <- bodyMap
.get[String](MessageBodyKeys.contentType)
.flatMap(_.map(DataContentType.from).sequence)
maybePayload <- bodyMap.get[ByteVector](MessageBodyKeys.payload)
yield (maybeContentType, maybePayload)
.mapN(Message(MessageId(rm.id.value), _, _))

private case class BodyMap(body: Map[String, ByteVector] = Map.empty):

def add[V](key: String, value: V)(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
encoder
.encode(value)
.map(encV => copy(body = body + (key -> encV)))

def maybeAdd[V](key: String, maybeV: Option[V])(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
maybeV
.map(add(key, _))
.getOrElse(this.asRight)

def apply[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, V] =
get(key).flatMap(_.toRight(new Exception(s"No '$key' in Redis message")))

def get[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, Option[V]] =
body.get(key).map(decoder.decode).sequence

private trait ValueEncoder[A]:
def encode(v: A): Either[Throwable, ByteVector]
def contramap[B](f: B => A): ValueEncoder[B] = (b: B) => encode(f(b))

private object ValueEncoder:
def apply[A](using enc: ValueEncoder[A]): ValueEncoder[A] = enc

private given ValueEncoder[String] = ByteVector.encodeUtf8(_)
private given ValueEncoder[ByteVector] = identity(_).asRight
private given ValueEncoder[Long] = ByteVector.fromLong(_).asRight
private given ValueEncoder[Instant] =
ValueEncoder[Long].contramap[Instant](_.toEpochMilli)

private trait ValueDecoder[A]:
def decode(bv: ByteVector): Either[Throwable, A]
def map[B](f: A => B): ValueDecoder[B] = (bv: ByteVector) => decode(bv).map(f)

private object ValueDecoder:
def apply[A](using dec: ValueDecoder[A]): ValueDecoder[A] = dec

private given ValueDecoder[String] = _.decodeUtf8
private given ValueDecoder[ByteVector] = identity(_).asRight
private given ValueDecoder[Long] = _.toLong().asRight
private given ValueDecoder[Instant] = ValueDecoder[Long].map(Instant.ofEpochMilli)
opaque type MessageId = String
object MessageId:
def apply(v: String): MessageId = v
extension (self: MessageId) def value: String = self
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package io.renku.redis.client

import cats.MonadThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
Expand All @@ -28,57 +27,88 @@ import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XRe
import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.*
import io.renku.redis.client.RedisMessage.*
import scodec.bits.ByteVector
import scribe.Scribe

trait RedisQueueClient[F[_]] {

def enqueue(
queueName: QueueName,
header: ByteVector,
payload: ByteVector
): F[MessageId]

def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, RedisMessage]

def markProcessed(
clientId: ClientId,
queueName: QueueName,
messageId: MessageId
): F[Unit]

def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]]
}

object RedisQueueClient:

def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] =
def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, RedisQueueClient[F]] =
given Scribe[F] = scribe.cats[F]
given Log[F] = RedisLogger[F]
ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClient(_))
ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClientImpl(_))

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {
class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient)
extends RedisQueueClient[F] {

override def enqueue(
queueName: QueueName,
header: Header,
header: ByteVector,
payload: ByteVector
): F[MessageId] =
for
messageBody <- MonadThrow[F].fromEither(RedisMessage.bodyFrom(header, payload))
message = Stream.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.name, messageBody)
)
id <- makeStreamingConnection
.flatMap(_.append(message))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)
yield id
val messageBody = Map(
MessageBodyKeys.header -> header,
MessageBodyKeys.payload -> payload
)
val message = Stream.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.name, messageBody)
)
makeStreamingConnection
.flatMap(_.append(message))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

override def acquireEventsStream(
queueName: QueueName,
chunkSize: Int,
maybeOffset: Option[MessageId]
): Stream[F, Message] =
): Stream[F, RedisMessage] =
val initialOffset: String => StreamingOffset[String] =
maybeOffset
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

def logError(rm: XReadMessage[_, _]): Throwable => F[Option[Message]] = err =>
Log[F]
.error(s"Decoding message ${rm.id} failed: ${err.getMessage}")
.as(Option.empty)
def toMessage(rm: XReadMessage[String, ByteVector]): Option[RedisMessage] =
(rm.body.get(MessageBodyKeys.header), rm.body.get(MessageBodyKeys.payload))
.mapN(RedisMessage(MessageId(rm.id.value), _, _))

lazy val logInfo: ((XReadMessage[_, _], Option[RedisMessage])) => F[Unit] = {
case (m, None) =>
Log[F].info(
s"Message '${m.id}' skipped as it has no '${MessageBodyKeys.header}' or '${MessageBodyKeys.payload}'"
)
case _ => ().pure[F]
}

makeStreamingConnection >>= {
_.read(Set(queueName.name), chunkSize, initialOffset)
.evalMap(rm => toMessage(rm).fold(logError(rm), _.pure[F]))
.collect { case Some(m) => m }
.map(rm => rm -> toMessage(rm))
.evalTap(logInfo)
.collect { case (_, Some(m)) => m }
}

override def markProcessed(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package io.renku.queue.client
package io.renku.redis.client

opaque type QueueName = String
object QueueName:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,9 @@

package io.renku.redis.client

import io.renku.queue.client.*
import org.scalacheck.Gen
import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

import java.time.Instant

object RedisClientGenerators:

val stringGen: Gen[String] =
Expand All @@ -34,19 +31,6 @@ object RedisClientGenerators:
val queueNameGen: Gen[QueueName] =
stringGen.map(QueueName(_))

val dataContentTypeGen: Gen[DataContentType] =
Gen.oneOf(DataContentType.values.toSet)

val headerGen: Gen[Header] =
for
source <- Gen.option(stringGen.map(MessageSource.apply))
messageType <- Gen.option(stringGen.map(MessageType.apply))
dataContentType <- dataContentTypeGen
schemaVersion <- Gen.option(stringGen.map(SchemaVersion.apply))
time <- Gen.option(Gen.const(CreationTime(Instant.now())))
requestId <- Gen.option(stringGen.map(RequestId.apply))
yield Header(source, messageType, dataContentType, schemaVersion, time, requestId)

val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
Expand Down
Loading
Loading