Skip to content

Commit

Permalink
feat: events in Redis to get all avro spec Header entries
Browse files Browse the repository at this point in the history
  • Loading branch information
jachro committed Feb 19, 2024
1 parent 67798f6 commit e547d6f
Show file tree
Hide file tree
Showing 11 changed files with 342 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.queue.client

import java.time.Instant

final case class Header(
source: Option[MessageSource],
messageType: Option[MessageType],
dataContentType: DataContentType,
schemaVersion: Option[SchemaVersion],
time: Option[CreationTime],
requestId: Option[RequestId]
)

object Header:
def apply(contentType: DataContentType): Header =
Header(
source = None,
messageType = None,
dataContentType = contentType,
schemaVersion = None,
time = None,
requestId = None
)

opaque type MessageSource = String
object MessageSource:
def apply(v: String): MessageSource = v
extension (self: MessageSource) def value: String = self

opaque type MessageType = String
object MessageType:
def apply(v: String): MessageType = v
extension (self: MessageType) def value: String = self

enum DataContentType(val mimeType: String):
lazy val name: String = productPrefix
case Binary extends DataContentType("application/avro+binary")
case Json extends DataContentType("application/avro+json")

object DataContentType:
def from(mimeType: String): Either[Throwable, DataContentType] =
DataContentType.values.toList
.find(_.mimeType == mimeType)
.toRight(
new IllegalArgumentException(s"'$mimeType' not a valid 'DataContentType' value")
)

opaque type SchemaVersion = String
object SchemaVersion:
def apply(v: String): SchemaVersion = v
extension (self: SchemaVersion) def value: String = self

opaque type CreationTime = Instant
object CreationTime:
def apply(v: Instant): CreationTime = v
extension (self: CreationTime) def value: Instant = self

opaque type RequestId = String
object RequestId:
def apply(v: String): RequestId = v
extension (self: RequestId) def value: String = self
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ import scodec.bits.ByteVector

final case class Message(id: MessageId, contentType: DataContentType, payload: ByteVector)

final case class MessageId(value: String) extends AnyVal

enum DataContentType(val mimeType: String):
lazy val name: String = productPrefix
case Binary extends DataContentType("application/avro+binary")
case Json extends DataContentType("application/avro+json")

object DataContentType:
def from(mimeType: String): Either[Throwable, DataContentType] =
DataContentType.values.toList
.find(_.mimeType == mimeType)
.toRight(new IllegalArgumentException(s"'$mimeType' not a valid value"))
opaque type MessageId = String
object MessageId:
def apply(v: String): MessageId = v
extension (self: MessageId) def value: String = self
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ trait QueueClient[F[_]] {

def enqueue(
queueName: QueueName,
message: ByteVector,
contentType: DataContentType
header: Header,
payload: ByteVector
): F[MessageId]

def acquireEventsStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ object QueueName:
opaque type ClientId = String
object ClientId:
def apply(v: String): ClientId = v
extension (self: ClientId) def value: String = self
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

private object MessageBodyKeys:
val payload = "payload"
val source = "source"
val messageType = "type"
val contentType = "dataContentType"
val schemaVersion = "schemaVersion"
val time = "time"
val requestId = "requestId"
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.redis.client

import cats.syntax.all.*
import dev.profunktor.redis4cats.streams.data.XReadMessage
import io.renku.queue.client.{DataContentType, Header, Message, MessageId}
import scodec.bits.ByteVector

import java.time.Instant

private object RedisMessage:

def bodyFrom(
header: Header,
payload: ByteVector
): Either[Throwable, Map[String, ByteVector]] =
BodyMap()
.add(MessageBodyKeys.payload, payload)
.flatMap(_.add(MessageBodyKeys.contentType, header.dataContentType.mimeType))
.flatMap(_.maybeAdd(MessageBodyKeys.source, header.source.map(_.value)))
.flatMap(_.maybeAdd(MessageBodyKeys.messageType, header.messageType.map(_.value)))
.flatMap(
_.maybeAdd(MessageBodyKeys.schemaVersion, header.schemaVersion.map(_.value))
)
.flatMap(
_.add(MessageBodyKeys.time, header.time.map(_.value).getOrElse(Instant.now()))
)
.flatMap(_.maybeAdd(MessageBodyKeys.requestId, header.requestId.map(_.value)))
.map(_.body)

def toMessage(
rm: XReadMessage[String, ByteVector]
): Either[Throwable, Option[Message]] =
val bodyMap = BodyMap(rm.body)
for
maybeContentType <- bodyMap
.get[String](MessageBodyKeys.contentType)
.flatMap(_.map(DataContentType.from).sequence)
maybePayload <- bodyMap.get[ByteVector](MessageBodyKeys.payload)
yield (maybeContentType, maybePayload)
.mapN(Message(MessageId(rm.id.value), _, _))

private case class BodyMap(body: Map[String, ByteVector] = Map.empty):

def add[V](key: String, value: V)(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
encoder
.encode(value)
.map(encV => copy(body = body + (key -> encV)))

def maybeAdd[V](key: String, maybeV: Option[V])(using
encoder: ValueEncoder[V]
): Either[Throwable, BodyMap] =
maybeV
.map(add(key, _))
.getOrElse(this.asRight)

def apply[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, V] =
get(key).flatMap(_.toRight(new Exception(s"No '$key' in Redis message")))

def get[V](key: String)(using
decoder: ValueDecoder[V]
): Either[Throwable, Option[V]] =
body.get(key).map(decoder.decode).sequence

private trait ValueEncoder[A]:
def encode(v: A): Either[Throwable, ByteVector]
def contramap[B](f: B => A): ValueEncoder[B] = (b: B) => encode(f(b))

private object ValueEncoder:
def apply[A](using enc: ValueEncoder[A]): ValueEncoder[A] = enc

private given ValueEncoder[String] = ByteVector.encodeUtf8(_)
private given ValueEncoder[ByteVector] = identity(_).asRight
private given ValueEncoder[Long] = ByteVector.fromLong(_).asRight
private given ValueEncoder[Instant] =
ValueEncoder[Long].contramap[Instant](_.toEpochMilli)

private trait ValueDecoder[A]:
def decode(bv: ByteVector): Either[Throwable, A]
def map[B](f: A => B): ValueDecoder[B] = (bv: ByteVector) => decode(bv).map(f)

private object ValueDecoder:
def apply[A](using dec: ValueDecoder[A]): ValueDecoder[A] = dec

private given ValueDecoder[String] = _.decodeUtf8
private given ValueDecoder[ByteVector] = identity(_).asRight
private given ValueDecoder[Long] = _.toLong().asRight
private given ValueDecoder[Instant] = ValueDecoder[Long].map(Instant.ofEpochMilli)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.renku.redis.client

import cats.MonadThrow
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import dev.profunktor.redis4cats.connection.RedisClient
Expand All @@ -28,6 +29,7 @@ import dev.profunktor.redis4cats.streams.{RedisStream, Streaming}
import dev.profunktor.redis4cats.{Redis, RedisCommands}
import fs2.Stream
import io.renku.queue.client.*
import io.renku.redis.client.RedisMessage.*
import scodec.bits.ByteVector
import scribe.Scribe

Expand All @@ -40,35 +42,23 @@ object RedisQueueClient:

class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] {

private val payloadKey = "payload"
private val contentTypeKey = "dataContentType"

override def enqueue(
queueName: QueueName,
message: ByteVector,
contentType: DataContentType
header: Header,
payload: ByteVector
): F[MessageId] =
val m = Stream
.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(
queueName.name,
Map(payloadKey -> message, contentTypeKey -> encodeContentType(contentType))
)
for
messageBody <- MonadThrow[F].fromEither(RedisMessage.bodyFrom(header, payload))
message = Stream.emit[F, XAddMessage[String, ByteVector]](
XAddMessage(queueName.name, messageBody)
)
createStreamingConnection
.flatMap(_.append(m))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)

private def encodeContentType(contentType: DataContentType): ByteVector =
ByteVector.encodeUtf8(contentType.mimeType).fold(throw _, identity)

private def decodeContentType(encoding: ByteVector): DataContentType =
encoding.decodeUtf8
.flatMap(DataContentType.from)
.fold(throw _, identity)
id <- makeStreamingConnection
.flatMap(_.append(message))
.map(id => MessageId(id.value))
.compile
.toList
.map(_.head)
yield id

override def acquireEventsStream(
queueName: QueueName,
Expand All @@ -80,18 +70,17 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
.map(id => StreamingOffset.Custom[String](_, id.value))
.getOrElse(StreamingOffset.All[String])

createStreamingConnection >>= {
def logError(rm: XReadMessage[_, _]): Throwable => F[Option[Message]] = err =>
Log[F]
.error(s"Decoding message ${rm.id} failed: ${err.getMessage}")
.as(Option.empty)

makeStreamingConnection >>= {
_.read(Set(queueName.name), chunkSize, initialOffset)
.map(toMessage)
.evalMap(rm => toMessage(rm).fold(logError(rm), _.pure[F]))
.collect { case Some(m) => m }
}

private def toMessage(m: XReadMessage[String, ByteVector]): Option[Message] =
(m.body.get(payloadKey), m.body.get(contentTypeKey).map(decodeContentType))
.mapN { case (payload, contentType) =>
Message(MessageId(m.id.value), contentType, payload)
}

override def markProcessed(
clientId: ClientId,
queueName: QueueName,
Expand All @@ -112,7 +101,7 @@ class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClien
private def formProcessedKey(clientId: ClientId, queueName: QueueName) =
s"$queueName.$clientId"

private def createStreamingConnection
private def makeStreamingConnection
: Stream[F, Streaming[[A] =>> Stream[F, A], String, ByteVector]] =
RedisStream
.mkStreamingConnection[F, String, ByteVector](client, StringBytesCodec.instance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,31 @@ import io.renku.queue.client.*
import org.scalacheck.Gen
import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

import java.time.Instant

object RedisClientGenerators:

val queueNameGen: Gen[QueueName] =
val stringGen: Gen[String] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar).map(QueueName(_)))
.flatMap(Gen.stringOfN(_, alphaLowerChar))

val queueNameGen: Gen[QueueName] =
stringGen.map(QueueName(_))

val dataContentTypeGen: Gen[DataContentType] =
Gen.oneOf(DataContentType.values.toSet)

val headerGen: Gen[Header] =
for
source <- Gen.option(stringGen.map(MessageSource.apply))
messageType <- Gen.option(stringGen.map(MessageType.apply))
dataContentType <- dataContentTypeGen
schemaVersion <- Gen.option(stringGen.map(SchemaVersion.apply))
time <- Gen.option(Gen.const(CreationTime(Instant.now())))
requestId <- Gen.option(stringGen.map(RequestId.apply))
yield Header(source, messageType, dataContentType, schemaVersion, time, requestId)

val clientIdGen: Gen[ClientId] =
Gen
.chooseNum(3, 10)
Expand Down
Loading

0 comments on commit e547d6f

Please sign in to comment.