From a0473f87ca732dbb998154704bd7dba45b00919f Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Tue, 20 Feb 2024 17:31:45 +0100 Subject: [PATCH] feat: event header to be encoded as single key in JSON (#23) --- build.sbt | 22 ++++- .../renku/search/config/ConfigDecoders.scala | 1 - .../io/renku/search/config/ConfigValues.scala | 1 - .../io/renku/events/EventsGenerators.scala} | 4 +- .../renku/redis/client/MessageBodyKeys.scala | 7 +- .../io/renku/redis/client/RedisMessage.scala | 91 +------------------ .../renku/redis/client/RedisQueueClient.scala | 82 +++++++++++------ .../renku/{queue => redis}/client/types.scala | 2 +- .../redis/client/RedisClientGenerators.scala | 16 ---- .../redis/client/RedisQueueClientSpec.scala | 61 ++++++------- .../renku/redis/client/util/RedisSpec.scala | 5 +- modules/renku-redis-client/README.md | 3 + .../renku/queue/client/DataContentType.scala | 32 +++++++ .../renku/queue/client/MessageHeader.scala} | 70 +++++++------- .../io/renku/queue/client/QueueClient.scala | 17 ++-- .../renku/queue/client/QueueClientImpl.scala | 84 +++++++++++++++++ .../io/renku/queue/client/QueueMessage.scala} | 9 +- .../io/renku/queue/client/Generators.scala | 45 +++++++++ .../renku/queue/client/QueueClientSpec.scala | 66 ++++++++++++++ .../io/renku/queue/client/QueueSpec.scala | 34 +++++++ .../provision/SearchProvisionConfig.scala | 3 +- .../search/provision/SearchProvisioner.scala | 39 ++++---- .../provision/SearchProvisionerSpec.scala | 26 +++--- 23 files changed, 460 insertions(+), 260 deletions(-) rename modules/{search-provision/src/test/scala/io/renku/search/provision/Generators.scala => events/src/test/scala/io/renku/events/EventsGenerators.scala} (97%) rename modules/redis-client/src/main/scala/io/renku/{queue => redis}/client/types.scala (97%) create mode 100644 modules/renku-redis-client/README.md create mode 100644 modules/renku-redis-client/src/main/scala/io/renku/queue/client/DataContentType.scala rename modules/{redis-client/src/main/scala/io/renku/queue/client/Header.scala => renku-redis-client/src/main/scala/io/renku/queue/client/MessageHeader.scala} (58%) rename modules/{redis-client => renku-redis-client}/src/main/scala/io/renku/queue/client/QueueClient.scala (82%) create mode 100644 modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala rename modules/{redis-client/src/main/scala/io/renku/queue/client/Message.scala => renku-redis-client/src/main/scala/io/renku/queue/client/QueueMessage.scala} (77%) create mode 100644 modules/renku-redis-client/src/test/scala/io/renku/queue/client/Generators.scala create mode 100644 modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala create mode 100644 modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueSpec.scala diff --git a/build.sbt b/build.sbt index a5e59eb9..7948ae4f 100644 --- a/build.sbt +++ b/build.sbt @@ -56,6 +56,7 @@ lazy val root = project httpClient, events, redisClient, + renkuRedisClient, solrClient, searchQuery, searchSolrClient, @@ -144,6 +145,23 @@ lazy val redisClient = project commons % "test->test" ) +lazy val renkuRedisClient = project + .in(file("modules/renku-redis-client")) + .withId("renku-redis-client") + .settings(commonSettings) + .settings( + name := "renku-redis-client", + libraryDependencies ++= + Dependencies.catsEffect ++ + Dependencies.redis4Cats ++ + Dependencies.redis4CatsStreams + ) + .enablePlugins(AutomateHeaderPlugin) + .dependsOn( + events % "compile->compile;test->test", + redisClient % "compile->compile;test->test" + ) + lazy val solrClient = project .in(file("modules/solr-client")) .withId("solr-client") @@ -230,7 +248,7 @@ lazy val configValues = project .dependsOn( commons % "compile->compile;test->test", events % "compile->compile;test->test", - redisClient % "compile->compile;test->test", + renkuRedisClient % "compile->compile;test->test", searchSolrClient % "compile->compile;test->test" ) @@ -259,7 +277,7 @@ lazy val searchProvision = project .dependsOn( commons % "compile->compile;test->test", events % "compile->compile;test->test", - redisClient % "compile->compile;test->test", + renkuRedisClient % "compile->compile;test->test", searchSolrClient % "compile->compile;test->test", configValues % "compile->compile;test->test" ) diff --git a/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala b/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala index b13cc92c..d84b406b 100644 --- a/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala +++ b/modules/config-values/src/main/scala/io/renku/search/config/ConfigDecoders.scala @@ -20,7 +20,6 @@ package io.renku.search.config import cats.syntax.all.* import ciris.{ConfigDecoder, ConfigError} -import io.renku.queue.client.QueueName import io.renku.redis.client.* import org.http4s.Uri diff --git a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala index 16ea1254..8af2f8ad 100644 --- a/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala +++ b/modules/config-values/src/main/scala/io/renku/search/config/ConfigValues.scala @@ -20,7 +20,6 @@ package io.renku.search.config import cats.syntax.all.* import ciris.* -import io.renku.queue.client.QueueName import io.renku.redis.client.* import io.renku.solr.client.{SolrConfig, SolrUser} import org.http4s.Uri diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/Generators.scala b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala similarity index 97% rename from modules/search-provision/src/test/scala/io/renku/search/provision/Generators.scala rename to modules/events/src/test/scala/io/renku/events/EventsGenerators.scala index 18038463..b88686b4 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/Generators.scala +++ b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.search.provision +package io.renku.events import io.renku.events.v1.{ProjectCreated, Visibility} import org.scalacheck.Gen @@ -25,7 +25,7 @@ import org.scalacheck.Gen.alphaNumChar import java.time.Instant import java.time.temporal.ChronoUnit -object Generators: +object EventsGenerators: def projectCreatedGen(prefix: String): Gen[ProjectCreated] = for diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/MessageBodyKeys.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/MessageBodyKeys.scala index 6cb1c884..c118a564 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/MessageBodyKeys.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/MessageBodyKeys.scala @@ -19,10 +19,5 @@ package io.renku.redis.client private object MessageBodyKeys: + val header = "header" val payload = "payload" - val source = "source" - val messageType = "type" - val contentType = "dataContentType" - val schemaVersion = "schemaVersion" - val time = "time" - val requestId = "requestId" diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisMessage.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisMessage.scala index bee3daa5..ae7378c3 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisMessage.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisMessage.scala @@ -18,92 +18,11 @@ package io.renku.redis.client -import cats.syntax.all.* -import dev.profunktor.redis4cats.streams.data.XReadMessage -import io.renku.queue.client.{DataContentType, Header, Message, MessageId} import scodec.bits.ByteVector -import java.time.Instant +final case class RedisMessage(id: MessageId, header: ByteVector, payload: ByteVector) -private object RedisMessage: - - def bodyFrom( - header: Header, - payload: ByteVector - ): Either[Throwable, Map[String, ByteVector]] = - BodyMap() - .add(MessageBodyKeys.payload, payload) - .flatMap(_.add(MessageBodyKeys.contentType, header.dataContentType.mimeType)) - .flatMap(_.maybeAdd(MessageBodyKeys.source, header.source.map(_.value))) - .flatMap(_.maybeAdd(MessageBodyKeys.messageType, header.messageType.map(_.value))) - .flatMap( - _.maybeAdd(MessageBodyKeys.schemaVersion, header.schemaVersion.map(_.value)) - ) - .flatMap( - _.add(MessageBodyKeys.time, header.time.map(_.value).getOrElse(Instant.now())) - ) - .flatMap(_.maybeAdd(MessageBodyKeys.requestId, header.requestId.map(_.value))) - .map(_.body) - - def toMessage( - rm: XReadMessage[String, ByteVector] - ): Either[Throwable, Option[Message]] = - val bodyMap = BodyMap(rm.body) - for - maybeContentType <- bodyMap - .get[String](MessageBodyKeys.contentType) - .flatMap(_.map(DataContentType.from).sequence) - maybePayload <- bodyMap.get[ByteVector](MessageBodyKeys.payload) - yield (maybeContentType, maybePayload) - .mapN(Message(MessageId(rm.id.value), _, _)) - - private case class BodyMap(body: Map[String, ByteVector] = Map.empty): - - def add[V](key: String, value: V)(using - encoder: ValueEncoder[V] - ): Either[Throwable, BodyMap] = - encoder - .encode(value) - .map(encV => copy(body = body + (key -> encV))) - - def maybeAdd[V](key: String, maybeV: Option[V])(using - encoder: ValueEncoder[V] - ): Either[Throwable, BodyMap] = - maybeV - .map(add(key, _)) - .getOrElse(this.asRight) - - def apply[V](key: String)(using - decoder: ValueDecoder[V] - ): Either[Throwable, V] = - get(key).flatMap(_.toRight(new Exception(s"No '$key' in Redis message"))) - - def get[V](key: String)(using - decoder: ValueDecoder[V] - ): Either[Throwable, Option[V]] = - body.get(key).map(decoder.decode).sequence - - private trait ValueEncoder[A]: - def encode(v: A): Either[Throwable, ByteVector] - def contramap[B](f: B => A): ValueEncoder[B] = (b: B) => encode(f(b)) - - private object ValueEncoder: - def apply[A](using enc: ValueEncoder[A]): ValueEncoder[A] = enc - - private given ValueEncoder[String] = ByteVector.encodeUtf8(_) - private given ValueEncoder[ByteVector] = identity(_).asRight - private given ValueEncoder[Long] = ByteVector.fromLong(_).asRight - private given ValueEncoder[Instant] = - ValueEncoder[Long].contramap[Instant](_.toEpochMilli) - - private trait ValueDecoder[A]: - def decode(bv: ByteVector): Either[Throwable, A] - def map[B](f: A => B): ValueDecoder[B] = (bv: ByteVector) => decode(bv).map(f) - - private object ValueDecoder: - def apply[A](using dec: ValueDecoder[A]): ValueDecoder[A] = dec - - private given ValueDecoder[String] = _.decodeUtf8 - private given ValueDecoder[ByteVector] = identity(_).asRight - private given ValueDecoder[Long] = _.toLong().asRight - private given ValueDecoder[Instant] = ValueDecoder[Long].map(Instant.ofEpochMilli) +opaque type MessageId = String +object MessageId: + def apply(v: String): MessageId = v + extension (self: MessageId) def value: String = self diff --git a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala index 07870bf7..e1a5c248 100644 --- a/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/RedisQueueClient.scala @@ -18,7 +18,6 @@ package io.renku.redis.client -import cats.MonadThrow import cats.effect.{Async, Resource} import cats.syntax.all.* import dev.profunktor.redis4cats.connection.RedisClient @@ -28,57 +27,88 @@ import dev.profunktor.redis4cats.streams.data.{StreamingOffset, XAddMessage, XRe import dev.profunktor.redis4cats.streams.{RedisStream, Streaming} import dev.profunktor.redis4cats.{Redis, RedisCommands} import fs2.Stream -import io.renku.queue.client.* -import io.renku.redis.client.RedisMessage.* import scodec.bits.ByteVector import scribe.Scribe +trait RedisQueueClient[F[_]] { + + def enqueue( + queueName: QueueName, + header: ByteVector, + payload: ByteVector + ): F[MessageId] + + def acquireEventsStream( + queueName: QueueName, + chunkSize: Int, + maybeOffset: Option[MessageId] + ): Stream[F, RedisMessage] + + def markProcessed( + clientId: ClientId, + queueName: QueueName, + messageId: MessageId + ): F[Unit] + + def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] +} + object RedisQueueClient: - def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] = + def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, RedisQueueClient[F]] = given Scribe[F] = scribe.cats[F] given Log[F] = RedisLogger[F] - ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClient(_)) + ClientCreator[F](redisConfig).makeClient.map(new RedisQueueClientImpl(_)) -class RedisQueueClient[F[_]: Async: Log](client: RedisClient) extends QueueClient[F] { +class RedisQueueClientImpl[F[_]: Async: Log](client: RedisClient) + extends RedisQueueClient[F] { override def enqueue( queueName: QueueName, - header: Header, + header: ByteVector, payload: ByteVector ): F[MessageId] = - for - messageBody <- MonadThrow[F].fromEither(RedisMessage.bodyFrom(header, payload)) - message = Stream.emit[F, XAddMessage[String, ByteVector]]( - XAddMessage(queueName.name, messageBody) - ) - id <- makeStreamingConnection - .flatMap(_.append(message)) - .map(id => MessageId(id.value)) - .compile - .toList - .map(_.head) - yield id + val messageBody = Map( + MessageBodyKeys.header -> header, + MessageBodyKeys.payload -> payload + ) + val message = Stream.emit[F, XAddMessage[String, ByteVector]]( + XAddMessage(queueName.name, messageBody) + ) + makeStreamingConnection + .flatMap(_.append(message)) + .map(id => MessageId(id.value)) + .compile + .toList + .map(_.head) override def acquireEventsStream( queueName: QueueName, chunkSize: Int, maybeOffset: Option[MessageId] - ): Stream[F, Message] = + ): Stream[F, RedisMessage] = val initialOffset: String => StreamingOffset[String] = maybeOffset .map(id => StreamingOffset.Custom[String](_, id.value)) .getOrElse(StreamingOffset.All[String]) - def logError(rm: XReadMessage[_, _]): Throwable => F[Option[Message]] = err => - Log[F] - .error(s"Decoding message ${rm.id} failed: ${err.getMessage}") - .as(Option.empty) + def toMessage(rm: XReadMessage[String, ByteVector]): Option[RedisMessage] = + (rm.body.get(MessageBodyKeys.header), rm.body.get(MessageBodyKeys.payload)) + .mapN(RedisMessage(MessageId(rm.id.value), _, _)) + + lazy val logInfo: ((XReadMessage[_, _], Option[RedisMessage])) => F[Unit] = { + case (m, None) => + Log[F].info( + s"Message '${m.id}' skipped as it has no '${MessageBodyKeys.header}' or '${MessageBodyKeys.payload}'" + ) + case _ => ().pure[F] + } makeStreamingConnection >>= { _.read(Set(queueName.name), chunkSize, initialOffset) - .evalMap(rm => toMessage(rm).fold(logError(rm), _.pure[F])) - .collect { case Some(m) => m } + .map(rm => rm -> toMessage(rm)) + .evalTap(logInfo) + .collect { case (_, Some(m)) => m } } override def markProcessed( diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/types.scala b/modules/redis-client/src/main/scala/io/renku/redis/client/types.scala similarity index 97% rename from modules/redis-client/src/main/scala/io/renku/queue/client/types.scala rename to modules/redis-client/src/main/scala/io/renku/redis/client/types.scala index feb076a7..a52829a1 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/types.scala +++ b/modules/redis-client/src/main/scala/io/renku/redis/client/types.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package io.renku.queue.client +package io.renku.redis.client opaque type QueueName = String object QueueName: diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala index 8a677d8d..6c129519 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -18,12 +18,9 @@ package io.renku.redis.client -import io.renku.queue.client.* import org.scalacheck.Gen import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar} -import java.time.Instant - object RedisClientGenerators: val stringGen: Gen[String] = @@ -34,19 +31,6 @@ object RedisClientGenerators: val queueNameGen: Gen[QueueName] = stringGen.map(QueueName(_)) - val dataContentTypeGen: Gen[DataContentType] = - Gen.oneOf(DataContentType.values.toSet) - - val headerGen: Gen[Header] = - for - source <- Gen.option(stringGen.map(MessageSource.apply)) - messageType <- Gen.option(stringGen.map(MessageType.apply)) - dataContentType <- dataContentTypeGen - schemaVersion <- Gen.option(stringGen.map(SchemaVersion.apply)) - time <- Gen.option(Gen.const(CreationTime(Instant.now()))) - requestId <- Gen.option(stringGen.map(RequestId.apply)) - yield Header(source, messageType, dataContentType, schemaVersion, time, requestId) - val clientIdGen: Gen[ClientId] = Gen .chooseNum(3, 10) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala index 6e91058f..5c7e0502 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisQueueClientSpec.scala @@ -25,7 +25,6 @@ import dev.profunktor.redis4cats.streams.data.XAddMessage import dev.profunktor.redis4cats.streams.{RedisStream, Streaming} import fs2.* import fs2.concurrent.SignallingRef -import io.renku.queue.client.{DataContentType, MessageId, QueueName} import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.util.RedisSpec import munit.CatsEffectSuite @@ -34,91 +33,87 @@ import scodec.bits.ByteVector class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: test("can enqueue and dequeue events"): - withRedisClient.asQueueClient().use { client => + withRedisClient.asRedisQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne for - dequeued <- SignallingRef.of[IO, List[(String, DataContentType)]](Nil) + dequeued <- SignallingRef.of[IO, List[(String, String)]](Nil) message1 = "message1" - message1Head = headerGen.generateOne - _ <- client.enqueue(queue, message1Head, toByteVector(message1)) + message1Head = "header1" + _ <- client.enqueue(queue, toByteVector(message1Head), toByteVector(message1)) streamingProcFiber <- client .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) .evalMap(event => - dequeued.update(toStringUft8(event.payload) -> event.contentType :: _) + dequeued.update(event.header.asString -> event.payload.asString :: _) ) .compile .drain .start - _ <- dequeued.waitUntil(_ == List(message1 -> message1Head.dataContentType)) + _ <- dequeued.waitUntil(_ == List(message1Head -> message1)) message2 = "message2" - message2Head = headerGen.generateOne - _ <- client.enqueue(queue, message2Head, toByteVector(message2)) + message2Head = "header2" + _ <- client.enqueue(queue, toByteVector(message2Head), toByteVector(message2)) _ <- dequeued - .waitUntil( - _.toSet == Set( - message1 -> message1Head.dataContentType, - message2 -> message2Head.dataContentType - ) - ) + .waitUntil(_.toSet == Set(message1Head -> message1, message2Head -> message2)) _ <- streamingProcFiber.cancel yield () } test("can start enqueueing events from the given messageId excluding"): - withRedisClient.asQueueClient().use { client => + withRedisClient.asRedisQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne for dequeued <- SignallingRef.of[IO, List[String]](Nil) message1 = "message1" - message1Id <- client.enqueue(queue, headerGen.generateOne, toByteVector(message1)) + message1Id <- client.enqueue(queue, toByteVector("head1"), toByteVector(message1)) streamingProcFiber <- client .acquireEventsStream(queue, chunkSize = 1, maybeOffset = message1Id.some) - .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .evalMap(event => dequeued.update(event.payload.asString :: _)) .compile .drain .start message2 = "message2" - _ <- client.enqueue(queue, headerGen.generateOne, toByteVector(message2)) + _ <- client.enqueue(queue, toByteVector("head2"), toByteVector(message2)) _ <- dequeued.waitUntil(_.toSet == Set(message2)) message3 = "message3" - _ <- client.enqueue(queue, headerGen.generateOne, toByteVector(message3)) + _ <- client.enqueue(queue, toByteVector("head3"), toByteVector(message3)) _ <- dequeued.waitUntil(_.toSet == Set(message2, message3)) _ <- streamingProcFiber.cancel yield () } - test("can skip events that fails decoding"): - withRedisClient().flatMap(rc => withRedisClient.asQueueClient().tupleLeft(rc)).use { - case (redisClient, queueClient) => + test("can skip events that are wrongly defined"): + withRedisClient() + .flatMap(rc => withRedisClient.asRedisQueueClient().tupleLeft(rc)) + .use { case (redisClient, queueClient) => val queue = RedisClientGenerators.queueNameGen.generateOne for dequeued <- SignallingRef.of[IO, List[String]](Nil) - _ <- enqueue(redisClient, queue, toByteVector("message1")) + _ <- enqueueWithoutHeader(redisClient, queue, toByteVector("message1")) streamingProcFiber <- queueClient .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) - .evalMap(event => dequeued.update(toStringUft8(event.payload) :: _)) + .evalMap(event => dequeued.update(event.payload.asString :: _)) .compile .drain .start message2 = "message2" - _ <- queueClient.enqueue(queue, headerGen.generateOne, toByteVector(message2)) + _ <- queueClient.enqueue(queue, toByteVector("head2"), toByteVector(message2)) _ <- dequeued.waitUntil(_.toSet == Set(message2)) yield () - } + } test("allow marking and retrieving a processed event"): - withRedisClient.asQueueClient().use { client => + withRedisClient.asRedisQueueClient().use { client => val queue = RedisClientGenerators.queueNameGen.generateOne val clientId = RedisClientGenerators.clientIdGen.generateOne val messageId = RedisClientGenerators.messageIdGen.generateOne @@ -136,10 +131,9 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: private def toByteVector(v: String): ByteVector = ByteVector.encodeUtf8(v).fold(throw _, identity) - private lazy val toStringUft8: ByteVector => String = - _.decodeUtf8.fold(throw _, identity) + extension (bv: ByteVector) def asString: String = bv.decodeUtf8.fold(throw _, identity) - private def enqueue( + private def enqueueWithoutHeader( client: RedisClient, queueName: QueueName, payload: ByteVector @@ -147,10 +141,7 @@ class RedisQueueClientSpec extends CatsEffectSuite with RedisSpec: val message = Stream.emit[IO, XAddMessage[String, ByteVector]]( XAddMessage( queueName.name, - Map( - MessageBodyKeys.payload -> payload, - MessageBodyKeys.contentType -> toByteVector("illegal") - ) + Map(MessageBodyKeys.payload -> payload) ) ) makeStreamingConnection(client) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala index 88a194f1..d4bc9c83 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/util/RedisSpec.scala @@ -26,7 +26,6 @@ import dev.profunktor.redis4cats.effect.Log.Stdout.instance import dev.profunktor.redis4cats.effect.MkRedis.forAsync import dev.profunktor.redis4cats.{Redis, RedisCommands} import io.lettuce.core.RedisConnectionException -import io.renku.queue.client.QueueClient import io.renku.redis.client.* import io.renku.servers.RedisServer @@ -39,7 +38,7 @@ trait RedisSpec: abstract class RedisFixture extends Fixture[Resource[IO, RedisClient]]("redis"): def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] - def asQueueClient(): Resource[IO, QueueClient[IO]] + def asRedisQueueClient(): Resource[IO, RedisQueueClient[IO]] val withRedisClient: RedisFixture = new RedisFixture: @@ -54,7 +53,7 @@ trait RedisSpec: override def asRedisCommands(): Resource[IO, RedisCommands[IO, String, String]] = apply().flatMap(createRedisCommands) - override def asQueueClient(): Resource[IO, QueueClient[IO]] = + override def asRedisQueueClient(): Resource[IO, RedisQueueClient[IO]] = RedisQueueClient.make[IO]( RedisConfig( RedisHost(server.host), diff --git a/modules/renku-redis-client/README.md b/modules/renku-redis-client/README.md new file mode 100644 index 00000000..71a4019a --- /dev/null +++ b/modules/renku-redis-client/README.md @@ -0,0 +1,3 @@ +# renku-redis-client + +This module brings algebras for renku-search and Redis. diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/DataContentType.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/DataContentType.scala new file mode 100644 index 00000000..fa909416 --- /dev/null +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/DataContentType.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +enum DataContentType(val mimeType: String): + lazy val name: String = productPrefix + case Binary extends DataContentType("application/avro+binary") + case Json extends DataContentType("application/avro+json") + +object DataContentType: + def from(mimeType: String): Either[Throwable, DataContentType] = + DataContentType.values.toList + .find(_.mimeType == mimeType) + .toRight( + new IllegalArgumentException(s"'$mimeType' not a valid 'DataContentType' value") + ) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Header.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/MessageHeader.scala similarity index 58% rename from modules/redis-client/src/main/scala/io/renku/queue/client/Header.scala rename to modules/renku-redis-client/src/main/scala/io/renku/queue/client/MessageHeader.scala index c1ea874f..e001e215 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/Header.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/MessageHeader.scala @@ -18,26 +18,45 @@ package io.renku.queue.client +import io.renku.events.v1.Header +import org.apache.avro.Schema + import java.time.Instant +import java.time.temporal.ChronoUnit -final case class Header( - source: Option[MessageSource], - messageType: Option[MessageType], +final case class MessageHeader( + source: MessageSource, + payloadSchema: Schema, dataContentType: DataContentType, - schemaVersion: Option[SchemaVersion], - time: Option[CreationTime], - requestId: Option[RequestId] -) - -object Header: - def apply(contentType: DataContentType): Header = + schemaVersion: SchemaVersion, + time: CreationTime, + requestId: RequestId +): + def toSchemaHeader(p: Any): Header = Header( - source = None, - messageType = None, - dataContentType = contentType, - schemaVersion = None, - time = None, - requestId = None + source.value, + p.getClass.getName, + dataContentType.mimeType, + schemaVersion.value, + time.value, + requestId.value + ) + +object MessageHeader: + def apply( + source: MessageSource, + payloadSchema: Schema, + dataContentType: DataContentType, + schemaVersion: SchemaVersion, + requestId: RequestId + ): MessageHeader = + MessageHeader( + source, + payloadSchema, + dataContentType, + schemaVersion, + CreationTime.now, + requestId ) opaque type MessageSource = String @@ -45,24 +64,6 @@ object MessageSource: def apply(v: String): MessageSource = v extension (self: MessageSource) def value: String = self -opaque type MessageType = String -object MessageType: - def apply(v: String): MessageType = v - extension (self: MessageType) def value: String = self - -enum DataContentType(val mimeType: String): - lazy val name: String = productPrefix - case Binary extends DataContentType("application/avro+binary") - case Json extends DataContentType("application/avro+json") - -object DataContentType: - def from(mimeType: String): Either[Throwable, DataContentType] = - DataContentType.values.toList - .find(_.mimeType == mimeType) - .toRight( - new IllegalArgumentException(s"'$mimeType' not a valid 'DataContentType' value") - ) - opaque type SchemaVersion = String object SchemaVersion: def apply(v: String): SchemaVersion = v @@ -71,6 +72,7 @@ object SchemaVersion: opaque type CreationTime = Instant object CreationTime: def apply(v: Instant): CreationTime = v + def now: CreationTime = Instant.now().truncatedTo(ChronoUnit.MILLIS) extension (self: CreationTime) def value: Instant = self opaque type RequestId = String diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala similarity index 82% rename from modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala rename to modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala index f38d1d75..458e93c2 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClient.scala @@ -20,22 +20,22 @@ package io.renku.queue.client import cats.effect.{Async, Resource} import fs2.Stream -import io.renku.redis.client.{RedisConfig, RedisQueueClient} -import scodec.bits.ByteVector +import io.renku.avro.codec.AvroEncoder +import io.renku.redis.client.* -trait QueueClient[F[_]] { +trait QueueClient[F[_]]: - def enqueue( + def enqueue[P: AvroEncoder]( queueName: QueueName, - header: Header, - payload: ByteVector + header: MessageHeader, + payload: P ): F[MessageId] def acquireEventsStream( queueName: QueueName, chunkSize: Int, maybeOffset: Option[MessageId] - ): Stream[F, Message] + ): Stream[F, QueueMessage] def markProcessed( clientId: ClientId, @@ -44,8 +44,7 @@ trait QueueClient[F[_]] { ): F[Unit] def findLastProcessed(clientId: ClientId, queueName: QueueName): F[Option[MessageId]] -} object QueueClient: def make[F[_]: Async](redisConfig: RedisConfig): Resource[F, QueueClient[F]] = - RedisQueueClient.make[F](redisConfig) + RedisQueueClient.make[F](redisConfig).map(new QueueClientImpl[F](_)) diff --git a/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala new file mode 100644 index 00000000..5c401d0f --- /dev/null +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueClientImpl.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import cats.MonadThrow +import cats.effect.Async +import cats.syntax.all.* +import fs2.Stream +import io.renku.avro.codec.decoders.all.given +import io.renku.avro.codec.encoders.all.given +import io.renku.avro.codec.{AvroDecoder, AvroEncoder, AvroReader, AvroWriter} +import io.renku.events.v1.Header +import io.renku.queue.client.DataContentType.* +import io.renku.redis.client.* +import scribe.Scribe + +private class QueueClientImpl[F[_]: Async](redisQueueClient: RedisQueueClient[F]) + extends QueueClient[F]: + + private given Scribe[F] = scribe.cats[F] + + override def enqueue[P: AvroEncoder]( + queueName: QueueName, + header: MessageHeader, + payload: P + ): F[MessageId] = + val schemaHeader = header.toSchemaHeader(payload) + val encHeader = AvroWriter(Header.SCHEMA$).writeJson(Seq(schemaHeader)) + val encPayload = header.dataContentType match { + case Binary => AvroWriter(header.payloadSchema).write(Seq(payload)) + case Json => AvroWriter(header.payloadSchema).writeJson(Seq(payload)) + } + redisQueueClient.enqueue(queueName, encHeader, encPayload) + + override def acquireEventsStream( + queueName: QueueName, + chunkSize: Int, + maybeOffset: Option[MessageId] + ): Stream[F, QueueMessage] = + + def decodeHeader(rm: RedisMessage): F[Option[Header]] = + MonadThrow[F] + .catchNonFatal(AvroReader(Header.SCHEMA$).readJson[Header](rm.header).toList) + .flatMap { + case h :: Nil => h.some.pure[F] + case h => + Scribe[F] + .error(s"${h.size} header(s) in Redis instead of one") + .as(Option.empty[Header]) + } + + redisQueueClient + .acquireEventsStream(queueName, chunkSize, maybeOffset) + .evalMap(rm => decodeHeader(rm).map(_.map(QueueMessage(rm.id, _, rm.payload)))) + .collect { case Some(qm) => qm } + + override def markProcessed( + clientId: ClientId, + queueName: QueueName, + messageId: MessageId + ): F[Unit] = + redisQueueClient.markProcessed(clientId, queueName, messageId) + + override def findLastProcessed( + clientId: ClientId, + queueName: QueueName + ): F[Option[MessageId]] = + redisQueueClient.findLastProcessed(clientId, queueName) diff --git a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueMessage.scala similarity index 77% rename from modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala rename to modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueMessage.scala index d149366b..893a05be 100644 --- a/modules/redis-client/src/main/scala/io/renku/queue/client/Message.scala +++ b/modules/renku-redis-client/src/main/scala/io/renku/queue/client/QueueMessage.scala @@ -18,11 +18,8 @@ package io.renku.queue.client +import io.renku.events.v1.Header +import io.renku.redis.client.MessageId import scodec.bits.ByteVector -final case class Message(id: MessageId, contentType: DataContentType, payload: ByteVector) - -opaque type MessageId = String -object MessageId: - def apply(v: String): MessageId = v - extension (self: MessageId) def value: String = self +final case class QueueMessage(id: MessageId, header: Header, payload: ByteVector) diff --git a/modules/renku-redis-client/src/test/scala/io/renku/queue/client/Generators.scala b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/Generators.scala new file mode 100644 index 00000000..357ba236 --- /dev/null +++ b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/Generators.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import org.apache.avro.Schema +import org.scalacheck.Gen + +object Generators: + + val requestIdGen: Gen[RequestId] = Gen.uuid.map(_.toString).map(RequestId(_)) + + def messageHeaderGen(schema: Schema, contentType: DataContentType): Gen[MessageHeader] = + messageHeaderGen(schema, Gen.const(contentType)) + + def messageHeaderGen( + schema: Schema, + ctGen: Gen[DataContentType] = Gen.oneOf(DataContentType.values.toList) + ): Gen[MessageHeader] = + for + contentType <- ctGen + schemaVersion <- Gen.choose(1, 100).map(v => SchemaVersion(s"v$v")) + requestId <- requestIdGen + yield MessageHeader( + MessageSource("test"), + schema, + contentType, + schemaVersion, + requestId + ) diff --git a/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala new file mode 100644 index 00000000..ceb6cca6 --- /dev/null +++ b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueClientSpec.scala @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import cats.effect.IO +import fs2.concurrent.SignallingRef +import io.renku.avro.codec.AvroWriter +import io.renku.avro.codec.encoders.all.given +import io.renku.events.EventsGenerators +import io.renku.events.v1.ProjectCreated +import io.renku.queue.client.DataContentType.{Binary, Json} +import io.renku.queue.client.Generators.* +import io.renku.redis.client.{MessageId, RedisClientGenerators} +import io.renku.redis.client.RedisClientGenerators.* +import munit.CatsEffectSuite + +class QueueClientSpec extends CatsEffectSuite with QueueSpec: + + test("can enqueue and dequeue events"): + withQueueClient().use { queueClient => + val queue = RedisClientGenerators.queueNameGen.generateOne + for + dequeued <- SignallingRef.of[IO, List[QueueMessage]](Nil) + + message1 = EventsGenerators.projectCreatedGen("test").generateOne + header1 = messageHeaderGen(ProjectCreated.SCHEMA$).generateOne + message1Id <- queueClient.enqueue(queue, header1, message1) + + streamingProcFiber <- queueClient + .acquireEventsStream(queue, chunkSize = 1, maybeOffset = None) + .evalMap(event => dequeued.update(event :: _)) + .compile + .drain + .start + _ <- dequeued.waitUntil(_ == List(toQueueMessage(message1Id, header1, message1))) + + _ <- streamingProcFiber.cancel + yield () + } + + private def toQueueMessage( + id: MessageId, + header: MessageHeader, + payload: ProjectCreated + ) = + val encodedPayload = header.dataContentType match { + case Binary => AvroWriter(ProjectCreated.SCHEMA$).write(Seq(payload)) + case Json => AvroWriter(ProjectCreated.SCHEMA$).writeJson(Seq(payload)) + } + QueueMessage(id, header.toSchemaHeader(payload), encodedPayload) diff --git a/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueSpec.scala b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueSpec.scala new file mode 100644 index 00000000..967e0369 --- /dev/null +++ b/modules/renku-redis-client/src/test/scala/io/renku/queue/client/QueueSpec.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.queue.client + +import cats.effect.{IO, Resource} +import dev.profunktor.redis4cats.connection.RedisClient +import io.renku.redis.client.util.RedisSpec + +trait QueueSpec extends RedisSpec: + self: munit.Suite => + + abstract class QueueFixture extends Fixture[Resource[IO, QueueClient[IO]]]("queue") + + val withQueueClient: QueueFixture = () => + withRedisClient.asRedisQueueClient().map(new QueueClientImpl[IO](_)) + + override def munitFixtures: Seq[Fixture[_]] = + List(withRedisClient, withQueueClient) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala index d7b610b9..8d9f105d 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisionConfig.scala @@ -20,8 +20,7 @@ package io.renku.search.provision import cats.syntax.all.* import ciris.{ConfigValue, Effect} -import io.renku.queue.client.QueueName -import io.renku.redis.client.RedisConfig +import io.renku.redis.client.{QueueName, RedisConfig} import io.renku.search.config.ConfigValues import io.renku.solr.client.SolrConfig diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala index 57643e7f..8cd3f198 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SearchProvisioner.scala @@ -27,7 +27,7 @@ import io.renku.avro.codec.AvroReader import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.ProjectCreated import io.renku.queue.client.* -import io.renku.redis.client.RedisConfig +import io.renku.redis.client.{ClientId, QueueName, RedisConfig} import io.renku.search.model.* import io.renku.search.solr.client.SearchSolrClient import io.renku.search.solr.documents.* @@ -86,30 +86,35 @@ private class SearchProvisionerImpl[F[_]: Async]( private def findLastProcessed(queueClient: QueueClient[F]) = queueClient.findLastProcessed(clientId, queueName) - private lazy val logInfo: ((Message, Seq[ProjectCreated])) => F[Unit] = { case (m, v) => - Scribe[F].info( - s"Received messageId: ${m.id} for projects: ${v.map(_.slug).mkString(", ")}" - ) + private lazy val logInfo: ((QueueMessage, Seq[ProjectCreated])) => F[Unit] = { + case (m, v) => + Scribe[F].info( + s"Received messageId: ${m.id} for projects: ${v.map(_.slug).mkString(", ")}" + ) } private val avro = AvroReader(ProjectCreated.SCHEMA$) private def decodeMessage(queueClient: QueueClient[F])( - message: Message - ): F[(Message, Seq[ProjectCreated])] = + message: QueueMessage + ): F[(QueueMessage, Seq[ProjectCreated])] = MonadThrow[F] - .catchNonFatal { - message.contentType match { - case DataContentType.Binary => avro.read[ProjectCreated](message.payload) - case DataContentType.Json => avro.readJson[ProjectCreated](message.payload) - } + .fromEither(DataContentType.from(message.header.dataContentType)) + .flatMap { ct => + MonadThrow[F] + .catchNonFatal { + ct match { + case DataContentType.Binary => avro.read[ProjectCreated](message.payload) + case DataContentType.Json => avro.readJson[ProjectCreated](message.payload) + } + } + .map(message -> _) + .onError(markProcessedOnFailure(message, queueClient)) } - .map(message -> _) - .onError(markProcessedOnFailure(message, queueClient)) private def pushToSolr( queueClient: QueueClient[F] - )(chunk: Chunk[(Message, Seq[ProjectCreated])]): F[Unit] = + )(chunk: Chunk[(QueueMessage, Seq[ProjectCreated])]): F[Unit] = chunk.toList match { case Nil => ().pure[F] case tuples => @@ -140,13 +145,13 @@ private class SearchProvisionerImpl[F[_]: Async]( } private def markProcessedOnFailure( - message: Message, + message: QueueMessage, queueClient: QueueClient[F] ): PartialFunction[Throwable, F[Unit]] = err => markProcessed(message, queueClient) >> Scribe[F].error(s"Processing messageId: ${message.id} failed", err) - private def markProcessed(message: Message, queueClient: QueueClient[F]): F[Unit] = + private def markProcessed(message: QueueMessage, queueClient: QueueClient[F]): F[Unit] = queueClient.markProcessed(clientId, queueName, message.id) private def logAndRestart: Throwable => F[Unit] = err => diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala index b4e02715..e61f4c87 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/SearchProvisionerSpec.scala @@ -24,20 +24,20 @@ import fs2.Stream import fs2.concurrent.SignallingRef import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given +import io.renku.events.EventsGenerators.projectCreatedGen import io.renku.events.v1.{ProjectCreated, Visibility} -import io.renku.queue.client.{DataContentType, Header} +import io.renku.queue.client.Generators.messageHeaderGen +import io.renku.queue.client.{DataContentType, QueueSpec} import io.renku.redis.client.RedisClientGenerators import io.renku.redis.client.RedisClientGenerators.* -import io.renku.redis.client.util.RedisSpec import io.renku.search.model.{projects, users} -import io.renku.search.provision.Generators.projectCreatedGen import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.{Project, User} import munit.CatsEffectSuite import scala.concurrent.duration.* -class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSolrSpec: +class SearchProvisionerSpec extends CatsEffectSuite with QueueSpec with SearchSolrSpec: private val avro = AvroIO(ProjectCreated.SCHEMA$) @@ -45,7 +45,7 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo val queue = RedisClientGenerators.queueNameGen.generateOne val clientId = RedisClientGenerators.clientIdGen.generateOne - redisAndSolrClients.use { case (queueClient, solrClient) => + queueAndSolrClients.use { case (queueClient, solrClient) => val provisioner = new SearchProvisionerImpl(clientId, queue, Resource.pure(queueClient), solrClient) for @@ -56,8 +56,8 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo message1 = projectCreatedGen(prefix = "binary").generateOne _ <- queueClient.enqueue( queue, - Header(DataContentType.Binary), - avro.write[ProjectCreated](Seq(message1)) + messageHeaderGen(ProjectCreated.SCHEMA$, DataContentType.Binary).generateOne, + message1 ) docsCollectorFiber <- @@ -81,7 +81,7 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo val queue = RedisClientGenerators.queueNameGen.generateOne val clientId = RedisClientGenerators.clientIdGen.generateOne - redisAndSolrClients.use { case (queueClient, solrClient) => + queueAndSolrClients.use { case (queueClient, solrClient) => val provisioner = new SearchProvisionerImpl(clientId, queue, Resource.pure(queueClient), solrClient) for @@ -92,8 +92,8 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo message1 = projectCreatedGen(prefix = "json").generateOne _ <- queueClient.enqueue( queue, - Header(DataContentType.Json), - avro.writeJson[ProjectCreated](Seq(message1)) + messageHeaderGen(ProjectCreated.SCHEMA$, DataContentType.Json).generateOne, + message1 ) docsCollectorFiber <- @@ -114,8 +114,8 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo yield () } - private def redisAndSolrClients = - withRedisClient.asQueueClient() >>= withSearchSolrClient().tupleLeft + private def queueAndSolrClients = + withQueueClient() >>= withSearchSolrClient().tupleLeft private def toSolrDocument(created: ProjectCreated): Project = def toUser(id: String): User = User(users.Id(id)) @@ -132,4 +132,4 @@ class SearchProvisionerSpec extends CatsEffectSuite with RedisSpec with SearchSo ) override def munitFixtures: Seq[Fixture[_]] = - List(withRedisClient, withSearchSolrClient) + List(withQueueClient, withSearchSolrClient)