Skip to content

Commit

Permalink
fix: Support json and binary avro values (#114)
Browse files Browse the repository at this point in the history
- decode message header trying both formats
- message payloads are then decoded using the advertised format in the header
  • Loading branch information
eikek authored May 10, 2024
1 parent c4f82a5 commit 1c8844a
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package io.renku.search.events

import cats.data.NonEmptyList

import io.renku.events.Header
import scodec.bits.ByteVector

trait DecodeFailure extends RuntimeException

object DecodeFailure:
Expand All @@ -34,3 +39,28 @@ object DecodeFailure:
extends NoStackTrace(
s"Version ${header.schemaVersion} not supported for payload in message $id (header: $header)"
)

final case class HeaderReadError(
data: ByteVector,
causeBinary: Throwable,
causeJson: Throwable
) extends NoStackTrace(
s"Reading message header failed! Binary-Error: ${causeBinary.getMessage} Json-Error: ${causeJson.getMessage}"
):
addSuppressed(causeBinary)
addSuppressed(causeJson)

final case class NoHeaderRecord(data: ByteVector)
extends NoStackTrace(
s"No header record found in byte vector: $data"
)

final case class MultipleHeaderRecords(data: ByteVector, headers: NonEmptyList[Header])
extends NoStackTrace(
s"Multiple header records (${headers.size}) found. Required exactly one."
)

final case class FieldReadError(fieldName: String, value: String, message: String)
extends NoStackTrace(
s"Reading field '$fieldName' with value '$value' failed: $message"
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package io.renku.search.events

import cats.data.NonEmptyList
import cats.effect.Sync
import cats.syntax.all.*

Expand Down Expand Up @@ -58,20 +59,36 @@ object MessageHeader:
): F[MessageHeader] =
Timestamp.now[F].map(ts => MessageHeader(src, ct, sv, ts, reqId))

def fromByteVector(bv: ByteVector): Either[String, MessageHeader] =
Either
.catchNonFatal(AvroReader(Header.SCHEMA$).read[Header](bv))
.leftMap(_.getMessage)
private def readBinaryOrJson(bv: ByteVector): Either[DecodeFailure, Seq[Header]] =
val reader = AvroReader(Header.SCHEMA$)
Either.catchNonFatal(reader.read[Header](bv)) match
case Right(r) => Right(r)
case Left(exb) =>
Either.catchNonFatal(reader.readJson[Header](bv)).leftMap { exj =>
DecodeFailure.HeaderReadError(bv, exb, exj)
}

def fromByteVector(bv: ByteVector): Either[DecodeFailure, MessageHeader] =
readBinaryOrJson(bv)
.map(_.distinct.toList)
.flatMap {
case h :: Nil => Right(h)
case Nil => Left(s"No header record found in byte vector: $bv")
case hs => Left(s"More than one (${hs.size}) headers in: $bv")
case Nil => Left(DecodeFailure.NoHeaderRecord(bv))
case hs =>
Left(DecodeFailure.MultipleHeaderRecords(bv, NonEmptyList.fromListUnsafe(hs)))
}
.flatMap { h =>
for
ct <- DataContentType.fromMimeType(h.dataContentType)
v <- SchemaVersion.fromString(h.schemaVersion)
ct <- DataContentType
.fromMimeType(h.dataContentType)
.leftMap(err =>
DecodeFailure.FieldReadError("dataContentType", h.dataContentType, err)
)
v <- SchemaVersion
.fromString(h.schemaVersion)
.leftMap(err =>
DecodeFailure.FieldReadError("schemaVersion", h.schemaVersion, err)
)
src = MessageSource(h.source)
ts = Timestamp(h.time)
rid = RequestId(h.requestId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.events

import java.time.Instant

import io.renku.avro.codec.AvroEncoder
import io.renku.avro.codec.AvroWriter
import io.renku.avro.codec.all.given
import io.renku.events.Header as HeaderV2
import io.renku.events.v1.Header as HeaderV1
import io.renku.search.model.Timestamp
import munit.*
import scodec.bits.ByteVector

class MessageHeaderSpec extends FunSuite:

DataContentType.values.foreach { ct =>
test(s"read v1 header: $ct"):
val now = Instant.now
val hv1 = HeaderV1("the-source", "type2", ct.mimeType, "v1", now, "req1")
val bv1 = AvroWriter(HeaderV1.SCHEMA$).writeData(ct, Seq(hv1))
val h = MessageHeader.fromByteVector(bv1).fold(throw _, identity)
assertEquals(
h,
MessageHeader(
MessageSource("the-source"),
ct,
SchemaVersion.V1,
Timestamp(now),
RequestId("req1")
)
)
}

DataContentType.values.foreach { ct =>
test(s"read v2 header: $ct"):
val now = Instant.now
val hv2 = HeaderV2("the-source", "type2", ct.mimeType, "v1", now, "req1")
val bv1 = AvroWriter(HeaderV2.SCHEMA$).writeData(ct, Seq(hv2))
val h = MessageHeader.fromByteVector(bv1).fold(throw _, identity)
assertEquals(
h,
MessageHeader(
MessageSource("the-source"),
ct,
SchemaVersion.V1,
Timestamp(now),
RequestId("req1")
)
)
}

extension (self: AvroWriter)
def writeData[A: AvroEncoder](ct: DataContentType, values: Seq[A]): ByteVector =
ct match
case DataContentType.Binary => self.write[A](values)
case DataContentType.Json => self.writeJson[A](values)
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,24 @@ import io.renku.search.events.EventMessage
import munit.CatsEffectSuite

class QueueClientSpec extends CatsEffectSuite with QueueSpec:
test("can enqueue and dequeue project-member-add events"):
withQueueClient().use { queue =>
val qname = RedisClientGenerators.queueNameGen.generateOne
val msg = EventsGenerators
.eventMessageGen(EventsGenerators.projectMemberAddedGen)
.generateOne
for
msgId <- queue.enqueue(qname, msg)
res <- queue
.acquireMessageStream[ProjectMemberAdded](qname, 1, None)
.take(1)
.compile
.toList
_ = assertEquals(res.head, msg.copy(id = msgId))
yield ()
}

test("can enqueue and dequeue events"):
test("can enqueue and dequeue project-created events"):
withQueueClient().use { queueClient =>
val queue = RedisClientGenerators.queueNameGen.generateOne
for
Expand Down

0 comments on commit 1c8844a

Please sign in to comment.