From 1c8844a4495aaa7dcaa042cf22623313ba1841d1 Mon Sep 17 00:00:00 2001 From: eikek <701128+eikek@users.noreply.github.com> Date: Fri, 10 May 2024 10:32:16 +0200 Subject: [PATCH] fix: Support json and binary avro values (#114) - decode message header trying both formats - message payloads are then decoded using the advertised format in the header --- .../renku/search/events/DecodeFailure.scala | 30 ++++++++ .../renku/search/events/MessageHeader.scala | 33 +++++++-- .../search/events/MessageHeaderSpec.scala | 74 +++++++++++++++++++ .../renku/queue/client/QueueClientSpec.scala | 18 ++++- 4 files changed, 146 insertions(+), 9 deletions(-) create mode 100644 modules/events/src/test/scala/io/renku/search/events/MessageHeaderSpec.scala diff --git a/modules/events/src/main/scala/io/renku/search/events/DecodeFailure.scala b/modules/events/src/main/scala/io/renku/search/events/DecodeFailure.scala index 6d76c5d3..86be207a 100644 --- a/modules/events/src/main/scala/io/renku/search/events/DecodeFailure.scala +++ b/modules/events/src/main/scala/io/renku/search/events/DecodeFailure.scala @@ -18,6 +18,11 @@ package io.renku.search.events +import cats.data.NonEmptyList + +import io.renku.events.Header +import scodec.bits.ByteVector + trait DecodeFailure extends RuntimeException object DecodeFailure: @@ -34,3 +39,28 @@ object DecodeFailure: extends NoStackTrace( s"Version ${header.schemaVersion} not supported for payload in message $id (header: $header)" ) + + final case class HeaderReadError( + data: ByteVector, + causeBinary: Throwable, + causeJson: Throwable + ) extends NoStackTrace( + s"Reading message header failed! Binary-Error: ${causeBinary.getMessage} Json-Error: ${causeJson.getMessage}" + ): + addSuppressed(causeBinary) + addSuppressed(causeJson) + + final case class NoHeaderRecord(data: ByteVector) + extends NoStackTrace( + s"No header record found in byte vector: $data" + ) + + final case class MultipleHeaderRecords(data: ByteVector, headers: NonEmptyList[Header]) + extends NoStackTrace( + s"Multiple header records (${headers.size}) found. Required exactly one." + ) + + final case class FieldReadError(fieldName: String, value: String, message: String) + extends NoStackTrace( + s"Reading field '$fieldName' with value '$value' failed: $message" + ) diff --git a/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala b/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala index cfcde554..6d4fbe21 100644 --- a/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala +++ b/modules/events/src/main/scala/io/renku/search/events/MessageHeader.scala @@ -18,6 +18,7 @@ package io.renku.search.events +import cats.data.NonEmptyList import cats.effect.Sync import cats.syntax.all.* @@ -58,20 +59,36 @@ object MessageHeader: ): F[MessageHeader] = Timestamp.now[F].map(ts => MessageHeader(src, ct, sv, ts, reqId)) - def fromByteVector(bv: ByteVector): Either[String, MessageHeader] = - Either - .catchNonFatal(AvroReader(Header.SCHEMA$).read[Header](bv)) - .leftMap(_.getMessage) + private def readBinaryOrJson(bv: ByteVector): Either[DecodeFailure, Seq[Header]] = + val reader = AvroReader(Header.SCHEMA$) + Either.catchNonFatal(reader.read[Header](bv)) match + case Right(r) => Right(r) + case Left(exb) => + Either.catchNonFatal(reader.readJson[Header](bv)).leftMap { exj => + DecodeFailure.HeaderReadError(bv, exb, exj) + } + + def fromByteVector(bv: ByteVector): Either[DecodeFailure, MessageHeader] = + readBinaryOrJson(bv) .map(_.distinct.toList) .flatMap { case h :: Nil => Right(h) - case Nil => Left(s"No header record found in byte vector: $bv") - case hs => Left(s"More than one (${hs.size}) headers in: $bv") + case Nil => Left(DecodeFailure.NoHeaderRecord(bv)) + case hs => + Left(DecodeFailure.MultipleHeaderRecords(bv, NonEmptyList.fromListUnsafe(hs))) } .flatMap { h => for - ct <- DataContentType.fromMimeType(h.dataContentType) - v <- SchemaVersion.fromString(h.schemaVersion) + ct <- DataContentType + .fromMimeType(h.dataContentType) + .leftMap(err => + DecodeFailure.FieldReadError("dataContentType", h.dataContentType, err) + ) + v <- SchemaVersion + .fromString(h.schemaVersion) + .leftMap(err => + DecodeFailure.FieldReadError("schemaVersion", h.schemaVersion, err) + ) src = MessageSource(h.source) ts = Timestamp(h.time) rid = RequestId(h.requestId) diff --git a/modules/events/src/test/scala/io/renku/search/events/MessageHeaderSpec.scala b/modules/events/src/test/scala/io/renku/search/events/MessageHeaderSpec.scala new file mode 100644 index 00000000..4cf49a2b --- /dev/null +++ b/modules/events/src/test/scala/io/renku/search/events/MessageHeaderSpec.scala @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.events + +import java.time.Instant + +import io.renku.avro.codec.AvroEncoder +import io.renku.avro.codec.AvroWriter +import io.renku.avro.codec.all.given +import io.renku.events.Header as HeaderV2 +import io.renku.events.v1.Header as HeaderV1 +import io.renku.search.model.Timestamp +import munit.* +import scodec.bits.ByteVector + +class MessageHeaderSpec extends FunSuite: + + DataContentType.values.foreach { ct => + test(s"read v1 header: $ct"): + val now = Instant.now + val hv1 = HeaderV1("the-source", "type2", ct.mimeType, "v1", now, "req1") + val bv1 = AvroWriter(HeaderV1.SCHEMA$).writeData(ct, Seq(hv1)) + val h = MessageHeader.fromByteVector(bv1).fold(throw _, identity) + assertEquals( + h, + MessageHeader( + MessageSource("the-source"), + ct, + SchemaVersion.V1, + Timestamp(now), + RequestId("req1") + ) + ) + } + + DataContentType.values.foreach { ct => + test(s"read v2 header: $ct"): + val now = Instant.now + val hv2 = HeaderV2("the-source", "type2", ct.mimeType, "v1", now, "req1") + val bv1 = AvroWriter(HeaderV2.SCHEMA$).writeData(ct, Seq(hv2)) + val h = MessageHeader.fromByteVector(bv1).fold(throw _, identity) + assertEquals( + h, + MessageHeader( + MessageSource("the-source"), + ct, + SchemaVersion.V1, + Timestamp(now), + RequestId("req1") + ) + ) + } + + extension (self: AvroWriter) + def writeData[A: AvroEncoder](ct: DataContentType, values: Seq[A]): ByteVector = + ct match + case DataContentType.Binary => self.write[A](values) + case DataContentType.Json => self.writeJson[A](values) diff --git a/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala index 9a27126f..f323a18c 100644 --- a/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala +++ b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala @@ -29,8 +29,24 @@ import io.renku.search.events.EventMessage import munit.CatsEffectSuite class QueueClientSpec extends CatsEffectSuite with QueueSpec: + test("can enqueue and dequeue project-member-add events"): + withQueueClient().use { queue => + val qname = RedisClientGenerators.queueNameGen.generateOne + val msg = EventsGenerators + .eventMessageGen(EventsGenerators.projectMemberAddedGen) + .generateOne + for + msgId <- queue.enqueue(qname, msg) + res <- queue + .acquireMessageStream[ProjectMemberAdded](qname, 1, None) + .take(1) + .compile + .toList + _ = assertEquals(res.head, msg.copy(id = msgId)) + yield () + } - test("can enqueue and dequeue events"): + test("can enqueue and dequeue project-created events"): withQueueClient().use { queueClient => val queue = RedisClientGenerators.queueNameGen.generateOne for