From 6e7395e87b1bdea4a92e77370fed298a37e0ff5f Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Mon, 11 Mar 2024 10:45:52 +0100 Subject: [PATCH] feat: support for UserUpdated events (#52) * refactor: SolrProvisioningProcess renamed to UpsertProvisioningProcess * feat: support for the UserUpdated events * feat: support for the ProjectUpdated events --- .../io/renku/search/GeneratorSyntax.scala | 6 +- .../io/renku/events/EventsGenerators.scala | 18 +- .../redis/client/RedisClientGenerators.scala | 2 +- .../io/renku/search/api/SearchApiSpec.scala | 12 +- .../renku/search/provision/Microservice.scala | 18 +- .../provision/ProvisioningProcess.scala | 27 +++ .../provision/QueueMessageDecoder.scala | 45 +++++ .../renku/search/provision/QueuesConfig.scala | 8 +- .../provision/UpdateProvisioningProcess.scala | 161 +++++++++++++++ ....scala => UpsertProvisioningProcess.scala} | 60 +++--- .../project/ProjectCreatedProvisioning.scala | 8 +- .../project/ProjectUpdatedProvisioning.scala | 76 +++++++ .../user/UserAddedProvisioning.scala | 8 +- .../user/UserUpdatedProvisioning.scala | 69 +++++++ ...a => ProjectCreatedProvisioningSpec.scala} | 4 +- .../ProjectUpdatedProvisioningSpec.scala | 187 ++++++++++++++++++ ....scala => UserAddedProvisioningSpec.scala} | 5 +- .../user/UserUpdatedProvisioningSpec.scala | 154 +++++++++++++++ .../search/solr/client/SearchSolrClient.scala | 8 +- .../solr/client/SearchSolrClientImpl.scala | 11 ++ .../renku/search/solr/documents/Entity.scala | 8 +- .../solr/client/SearchSolrClientSpec.scala | 17 +- .../renku/solr/client/GetByIdResponse.scala | 33 ++++ .../io/renku/solr/client/SolrClient.scala | 2 + .../io/renku/solr/client/SolrClientImpl.scala | 18 +- .../io/renku/solr/client/SolrClientSpec.scala | 60 +++--- 26 files changed, 910 insertions(+), 115 deletions(-) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/ProvisioningProcess.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/QueueMessageDecoder.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala rename modules/search-provision/src/main/scala/io/renku/search/provision/{SolrProvisioningProcess.scala => UpsertProvisioningProcess.scala} (74%) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala rename modules/search-provision/src/test/scala/io/renku/search/provision/project/{ProjectCreatedProvisionerSpec.scala => ProjectCreatedProvisioningSpec.scala} (99%) create mode 100644 modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala rename modules/search-provision/src/test/scala/io/renku/search/provision/user/{UserAddedProvisionerSpec.scala => UserAddedProvisioningSpec.scala} (97%) create mode 100644 modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala create mode 100644 modules/solr-client/src/main/scala/io/renku/solr/client/GetByIdResponse.scala diff --git a/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala b/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala index 79fc7de6..7eee754c 100644 --- a/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala +++ b/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala @@ -18,10 +18,10 @@ package io.renku.search -import fs2.Stream +import cats.arrow.FunctionK import cats.effect.IO +import fs2.Stream import org.scalacheck.Gen -import cats.arrow.FunctionK trait GeneratorSyntax: @@ -34,6 +34,8 @@ trait GeneratorSyntax: def generateSome: Option[A] = Some(generateOne) + def generateList: List[A] = Gen.listOf(self).generateOne + def stream: Stream[Gen, A] = Stream.repeatEval(self) diff --git a/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala index a3961154..ea6c4ccd 100644 --- a/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala +++ b/modules/events/src/test/scala/io/renku/events/EventsGenerators.scala @@ -20,19 +20,22 @@ package io.renku.events import io.renku.events.v1.{ProjectCreated, UserAdded, Visibility} import org.scalacheck.Gen -import org.scalacheck.Gen.alphaNumChar +import org.scalacheck.Gen.{alphaChar, alphaNumChar} import java.time.Instant import java.time.temporal.ChronoUnit object EventsGenerators: + val projectVisibilityGen: Gen[Visibility] = Gen.oneOf(Visibility.values().toList) + def projectCreatedGen(prefix: String): Gen[ProjectCreated] = for id <- Gen.uuid.map(_.toString) name <- stringGen(max = 5).map(v => s"$prefix-$v") - repositories <- Gen.listOfN(Gen.choose(1, 3).generateOne, stringGen(10)) - visibility <- Gen.oneOf(Visibility.values().toList) + repositoriesCount <- Gen.choose(1, 3) + repositories <- Gen.listOfN(repositoriesCount, stringGen(10)) + visibility <- projectVisibilityGen maybeDesc <- Gen.option(stringGen(20)) creator <- Gen.uuid.map(_.toString) yield ProjectCreated( @@ -49,8 +52,8 @@ object EventsGenerators: def userAddedGen(prefix: String): Gen[UserAdded] = for id <- Gen.uuid.map(_.toString) - firstName <- Gen.option(stringGen(max = 5).map(v => s"$prefix-$v")) - lastName <- stringGen(max = 5).map(v => s"$prefix-$v") + firstName <- Gen.option(alphaStringGen(max = 5).map(v => s"$prefix-$v")) + lastName <- alphaStringGen(max = 5).map(v => s"$prefix-$v") email <- Gen.option(stringGen(max = 5).map(host => s"$lastName@$host.com")) yield UserAdded( id, @@ -64,4 +67,7 @@ object EventsGenerators: .chooseNum(3, max) .flatMap(Gen.stringOfN(_, alphaNumChar)) - extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne) + def alphaStringGen(max: Int): Gen[String] = + Gen + .chooseNum(3, max) + .flatMap(Gen.stringOfN(_, alphaChar)) diff --git a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala index a64ae474..f3acf51f 100644 --- a/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala +++ b/modules/redis-client/src/test/scala/io/renku/redis/client/RedisClientGenerators.scala @@ -23,7 +23,7 @@ import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar} object RedisClientGenerators: - val stringGen: Gen[String] = + private val stringGen: Gen[String] = Gen .chooseNum(3, 10) .flatMap(Gen.stringOfN(_, alphaLowerChar)) diff --git a/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala b/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala index ef46a2af..50226064 100644 --- a/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala +++ b/modules/search-api/src/test/scala/io/renku/search/api/SearchApiSpec.scala @@ -27,12 +27,7 @@ import io.renku.search.model.users import io.renku.search.query.Query import io.renku.search.solr.client.SearchSolrClientGenerators.* import io.renku.search.solr.client.SearchSolrSpec -import io.renku.search.solr.documents.Project.given -import io.renku.search.solr.documents.{ - Entity as SolrEntity, - Project as SolrProject, - User as SolrUser -} +import io.renku.search.solr.documents.{Entity as SolrEntity, User as SolrUser} import munit.CatsEffectSuite import scribe.Scribe @@ -46,7 +41,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec: val project2 = projectDocumentGen("disparate", "disparate description").generateOne val searchApi = new SearchApiImpl[IO](client) for { - _ <- client.insert(project1 :: project2 :: Nil) + _ <- client.insert((project1 :: project2 :: Nil).map(_.widen)) results <- searchApi .query(mkQuery("matching")) .map(_.fold(err => fail(s"Calling Search API failed with $err"), identity)) @@ -61,8 +56,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec: val user = SolrUser(project.createdBy, users.FirstName("exclusive").some) val searchApi = new SearchApiImpl[IO](client) for { - _ <- client.insert(project :: Nil) - _ <- client.insert(user :: Nil) + _ <- client.insert(project :: user :: Nil) results <- searchApi .query(mkQuery("exclusive")) .map(_.fold(err => fail(s"Calling Search API failed with $err"), identity)) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index 90dee505..e13bcc91 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -22,8 +22,8 @@ import cats.effect.* import cats.syntax.all.* import io.renku.logging.LoggingSetup import io.renku.redis.client.QueueName -import io.renku.search.provision.project.ProjectCreatedProvisioning -import io.renku.search.provision.user.UserAddedProvisioning +import io.renku.search.provision.project.* +import io.renku.search.provision.user.* import io.renku.search.solr.schema.Migrations import io.renku.solr.client.migration.SchemaMigrator import scribe.Scribe @@ -50,18 +50,30 @@ object Microservice extends IOApp: ProjectCreatedProvisioning .make[IO](cfg.queuesConfig.projectCreated, cfg.redisConfig, cfg.solrConfig) ), + ( + "ProjectUpdated", + cfg.queuesConfig.projectUpdated, + ProjectUpdatedProvisioning + .make[IO](cfg.queuesConfig.projectUpdated, cfg.redisConfig, cfg.solrConfig) + ), ( "UserAdded", cfg.queuesConfig.userAdded, UserAddedProvisioning .make[IO](cfg.queuesConfig.userAdded, cfg.redisConfig, cfg.solrConfig) + ), + ( + "UserUpdated", + cfg.queuesConfig.userUpdated, + UserUpdatedProvisioning + .make[IO](cfg.queuesConfig.userUpdated, cfg.redisConfig, cfg.solrConfig) ) ).parTraverse_(startProcess(cfg)) .flatMap(_ => IO.never) private def startProcess( cfg: SearchProvisionConfig - ): ((String, QueueName, Resource[IO, SolrProvisioningProcess[IO]])) => IO[Unit] = { + ): ((String, QueueName, Resource[IO, ProvisioningProcess[IO]])) => IO[Unit] = { case t @ (name, queue, resource) => resource .evalMap(_.provisioningProcess.start) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/ProvisioningProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/ProvisioningProcess.scala new file mode 100644 index 00000000..64b24b50 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/ProvisioningProcess.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import io.renku.redis.client.ClientId + +trait ProvisioningProcess[F[_]]: + def provisioningProcess: F[Unit] + +object ProvisioningProcess: + val clientId: ClientId = ClientId("search-provisioner") diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/QueueMessageDecoder.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/QueueMessageDecoder.scala new file mode 100644 index 00000000..692d7718 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/QueueMessageDecoder.scala @@ -0,0 +1,45 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.MonadThrow +import cats.syntax.all.* +import io.renku.avro.codec.{AvroDecoder, AvroReader} +import io.renku.queue.client.{DataContentType, QueueMessage} +import org.apache.avro.Schema + +private class QueueMessageDecoder[F[_]: MonadThrow, A](schema: Schema)(using + AvroDecoder[A] +): + private val avro = AvroReader(schema) + + def decodeMessage(message: QueueMessage): F[Seq[A]] = + findContentType.andThenF(decodePayload(message))(message) + + private def findContentType(message: QueueMessage): F[DataContentType] = + MonadThrow[F] + .fromEither(DataContentType.from(message.header.dataContentType)) + + private def decodePayload(message: QueueMessage): DataContentType => F[Seq[A]] = { + case DataContentType.Binary => catchNonFatal(avro.read[A](message.payload)) + case DataContentType.Json => catchNonFatal(avro.readJson[A](message.payload)) + } + + private def catchNonFatal(f: => Seq[A]): F[Seq[A]] = + MonadThrow[F].catchNonFatal(f) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala index e9791428..788c3797 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala @@ -25,12 +25,16 @@ import io.renku.search.config.ConfigValues final case class QueuesConfig( projectCreated: QueueName, - userAdded: QueueName + projectUpdated: QueueName, + userAdded: QueueName, + userUpdated: QueueName ) object QueuesConfig: val config: ConfigValue[Effect, QueuesConfig] = ( ConfigValues.eventQueue("projectCreated"), - ConfigValues.eventQueue("userAdded") + ConfigValues.eventQueue("projectUpdated"), + ConfigValues.eventQueue("userAdded"), + ConfigValues.eventQueue("userUpdated") ).mapN(QueuesConfig.apply) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala new file mode 100644 index 00000000..5abae022 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala @@ -0,0 +1,161 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import cats.Show +import cats.effect.{Async, Resource, Temporal} +import cats.syntax.all.* +import fs2.io.net.Network +import io.bullet.borer.Codec +import io.renku.avro.codec.AvroDecoder +import io.renku.queue.client.{QueueClient, QueueMessage} +import io.renku.redis.client.{ClientId, QueueName, RedisConfig} +import io.renku.search.solr.client.SearchSolrClient +import io.renku.search.solr.documents.Entity +import io.renku.solr.client.SolrConfig +import org.apache.avro.Schema +import scribe.Scribe + +import scala.concurrent.duration.* +import scala.reflect.ClassTag + +trait UpdateProvisioningProcess[F[_]] extends ProvisioningProcess[F] + +object UpdateProvisioningProcess: + + def make[F[_]: Async: Network: Scribe, In, Out <: Entity]( + queueName: QueueName, + inSchema: Schema, + idExtractor: In => String, + docUpdate: ((In, Out)) => Out, + redisConfig: RedisConfig, + solrConfig: SolrConfig + )(using + Show[In], + AvroDecoder[In], + Codec[Entity], + ClassTag[Out] + ): Resource[F, UpdateProvisioningProcess[F]] = + SearchSolrClient.make[F](solrConfig).map { + new UpdateProvisioningProcessImpl[F, In, Out]( + queueName, + ProvisioningProcess.clientId, + idExtractor, + docUpdate, + QueueClient.make[F](redisConfig), + _, + QueueMessageDecoder[F, In](inSchema) + ) + } + +private class UpdateProvisioningProcessImpl[F[_]: Async: Scribe, In, Out <: Entity]( + queueName: QueueName, + clientId: ClientId, + idExtractor: In => String, + docUpdate: ((In, Out)) => Out, + queueClientResource: Resource[F, QueueClient[F]], + solrClient: SearchSolrClient[F], + messageDecoder: QueueMessageDecoder[F, In] +)(using Show[In], Codec[Entity], ClassTag[Out]) + extends UpdateProvisioningProcess[F]: + + override def provisioningProcess: F[Unit] = + queueClientResource + .use { queueClient => + findLastProcessed(queueClient) >>= { maybeLastProcessed => + queueClient + .acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed) + .evalMap(decodeMessage(queueClient)) + .evalTap(logInfo) + .evalMap(fetchDocuments) + .evalMap(pushToSolr(queueClient)) + .compile + .drain + .handleErrorWith(logAndRestart) + } + } + .handleErrorWith(logAndRestart) + + private def findLastProcessed(queueClient: QueueClient[F]) = + queueClient.findLastProcessed(clientId, queueName) + + private lazy val logInfo: ((QueueMessage, Seq[In])) => F[Unit] = { case (m, v) => + Scribe[F].info( + "Received message " + + s"queue: $queueName, " + + s"id: ${m.id}, " + + s"source: ${m.header.source}, " + + s"type: ${m.header.`type`} " + + s"for: ${v.mkString_(", ")}" + ) + } + + private def decodeMessage(queueClient: QueueClient[F])( + message: QueueMessage + ): F[(QueueMessage, Seq[In])] = + messageDecoder + .decodeMessage(message) + .tupleLeft(message) + .onError(markProcessedOnFailure(message, queueClient)) + + private lazy val fetchDocuments + : ((QueueMessage, Seq[In])) => F[(QueueMessage, Seq[(In, Out)])] = + case (m, ins) => + ins + .map { in => + val docId = idExtractor(in) + solrClient.findById[Out](docId) >>= { + case Some(out) => (in, out).some.pure[F] + case None => + Scribe[F] + .warn(s"Document id: '$docId' for update doesn't exist in Solr; skipping") + .as(Option.empty[Nothing]) + } + } + .sequence + .map(_.flatten) + .map((m, _)) + + private def pushToSolr( + queueClient: QueueClient[F] + ): ((QueueMessage, Seq[(In, Out)])) => F[Unit] = { case (m, inOuts) => + inOuts match { + case l if l.isEmpty => ().pure[F] + case inOuts => + val updatedDocs = inOuts.map(docUpdate).map(_.widen) + solrClient + .insert(updatedDocs) + .flatMap(_ => markProcessed(m, queueClient)) + .onError(markProcessedOnFailure(m, queueClient)) + } + } + + private def markProcessedOnFailure( + message: QueueMessage, + queueClient: QueueClient[F] + ): PartialFunction[Throwable, F[Unit]] = err => + markProcessed(message, queueClient) >> + Scribe[F].error(s"Processing messageId: ${message.id} for '$queueName' failed", err) + + private def markProcessed(message: QueueMessage, queueClient: QueueClient[F]): F[Unit] = + queueClient.markProcessed(clientId, queueName, message.id) + + private def logAndRestart: Throwable => F[Unit] = err => + Scribe[F].error(s"Failure in the provisioning process for '$queueName'", err) >> + Temporal[F].delayBy(provisioningProcess, 30 seconds) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SolrProvisioningProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/UpsertProvisioningProcess.scala similarity index 74% rename from modules/search-provision/src/main/scala/io/renku/search/provision/SolrProvisioningProcess.scala rename to modules/search-provision/src/main/scala/io/renku/search/provision/UpsertProvisioningProcess.scala index fda71d3c..ccf2f347 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SolrProvisioningProcess.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/UpsertProvisioningProcess.scala @@ -18,30 +18,29 @@ package io.renku.search.provision +import cats.Show import cats.effect.{Async, Resource, Temporal} import cats.syntax.all.* -import cats.{MonadThrow, Show} import fs2.Chunk import fs2.io.net.Network import io.bullet.borer.Encoder import io.github.arainko.ducktape.* -import io.renku.avro.codec.{AvroDecoder, AvroReader} -import io.renku.queue.client.{DataContentType, QueueClient, QueueMessage} +import io.renku.avro.codec.AvroDecoder +import io.renku.queue.client.{QueueClient, QueueMessage} import io.renku.redis.client.{ClientId, QueueName, RedisConfig} import io.renku.search.solr.client.SearchSolrClient +import io.renku.search.solr.documents.Entity import io.renku.solr.client.SolrConfig import org.apache.avro.Schema import scribe.Scribe import scala.concurrent.duration.* -trait SolrProvisioningProcess[F[_]]: - def provisioningProcess: F[Unit] +trait UpsertProvisioningProcess[F[_]] extends ProvisioningProcess[F] -object SolrProvisioningProcess: - private val clientId: ClientId = ClientId("search-provisioner") +object UpsertProvisioningProcess: - def make[F[_]: Async: Network: Scribe, In, Out]( + def make[F[_]: Async: Network: Scribe, In, Out <: Entity]( queueName: QueueName, inSchema: Schema, redisConfig: RedisConfig, @@ -50,26 +49,26 @@ object SolrProvisioningProcess: Show[In], Transformer[In, Out], AvroDecoder[In], - Encoder[Out] - ): Resource[F, SolrProvisioningProcess[F]] = + Encoder[Entity] + ): Resource[F, UpsertProvisioningProcess[F]] = SearchSolrClient.make[F](solrConfig).map { - new SolrProvisioningProcessImpl[F, In, Out]( + new UpsertProvisioningProcessImpl[F, In, Out]( queueName, - inSchema, - clientId, + ProvisioningProcess.clientId, QueueClient.make[F](redisConfig), - _ + _, + QueueMessageDecoder[F, In](inSchema) ) } -private class SolrProvisioningProcessImpl[F[_]: Async: Scribe, In, Out]( +private class UpsertProvisioningProcessImpl[F[_]: Async: Scribe, In, Out <: Entity]( queueName: QueueName, - inSchema: Schema, clientId: ClientId, queueClientResource: Resource[F, QueueClient[F]], - solrClient: SearchSolrClient[F] -)(using Show[In], Transformer[In, Out], AvroDecoder[In], Encoder[Out]) - extends SolrProvisioningProcess[F]: + solrClient: SearchSolrClient[F], + messageDecoder: QueueMessageDecoder[F, In] +)(using Show[In], Transformer[In, Out], AvroDecoder[In], Encoder[Entity]) + extends UpsertProvisioningProcess[F]: override def provisioningProcess: F[Unit] = queueClientResource @@ -102,24 +101,13 @@ private class SolrProvisioningProcessImpl[F[_]: Async: Scribe, In, Out]( ) } - private val avro = AvroReader(inSchema) - private def decodeMessage(queueClient: QueueClient[F])( message: QueueMessage ): F[(QueueMessage, Seq[In])] = - MonadThrow[F] - .fromEither(DataContentType.from(message.header.dataContentType)) - .flatMap { ct => - MonadThrow[F] - .catchNonFatal { - ct match { - case DataContentType.Binary => avro.read[In](message.payload) - case DataContentType.Json => avro.readJson[In](message.payload) - } - } - .map(message -> _) - .onError(markProcessedOnFailure(message, queueClient)) - } + messageDecoder + .decodeMessage(message) + .tupleLeft(message) + .onError(markProcessedOnFailure(message, queueClient)) private def pushToSolr( queueClient: QueueClient[F] @@ -130,7 +118,7 @@ private class SolrProvisioningProcessImpl[F[_]: Async: Scribe, In, Out]( val docs = toSolrDocuments(tuples.flatMap(_._2)) val (lastMessage, _) = tuples.last solrClient - .insert(docs) + .insert(docs.map(_.widen)) .flatMap(_ => markProcessed(lastMessage, queueClient)) .onError(markProcessedOnFailure(lastMessage, queueClient)) } @@ -143,7 +131,7 @@ private class SolrProvisioningProcessImpl[F[_]: Async: Scribe, In, Out]( queueClient: QueueClient[F] ): PartialFunction[Throwable, F[Unit]] = err => markProcessed(message, queueClient) >> - Scribe[F].error(s"Processing messageId: ${message.id} failed", err) + Scribe[F].error(s"Processing messageId: ${message.id} for '$queueName' failed", err) private def markProcessed(message: QueueMessage, queueClient: QueueClient[F]): F[Unit] = queueClient.markProcessed(clientId, queueName, message.id) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectCreatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectCreatedProvisioning.scala index b34c3110..b49c97ac 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectCreatedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectCreatedProvisioning.scala @@ -28,23 +28,21 @@ import io.renku.events.v1 import io.renku.events.v1.{ProjectCreated, Visibility} import io.renku.redis.client.{QueueName, RedisConfig} import io.renku.search.model.* -import io.renku.search.provision.SolrProvisioningProcess +import io.renku.search.provision.UpsertProvisioningProcess import io.renku.search.provision.TypeTransformers.given import io.renku.search.solr.documents import io.renku.solr.client.SolrConfig import scribe.Scribe -trait ProjectCreatedProvisioning[F[_]] extends SolrProvisioningProcess[F] - object ProjectCreatedProvisioning: def make[F[_]: Async: Network]( queueName: QueueName, redisConfig: RedisConfig, solrConfig: SolrConfig - ): Resource[F, SolrProvisioningProcess[F]] = + ): Resource[F, UpsertProvisioningProcess[F]] = given Scribe[F] = scribe.cats[F] - SolrProvisioningProcess.make[F, ProjectCreated, documents.Project]( + UpsertProvisioningProcess.make[F, ProjectCreated, documents.Project]( queueName, ProjectCreated.SCHEMA$, redisConfig, diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala new file mode 100644 index 00000000..9858582e --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala @@ -0,0 +1,76 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision +package project + +import cats.Show +import cats.effect.{Async, Resource} +import fs2.io.net.Network +import io.bullet.borer.Codec.* +import io.bullet.borer.{Codec, Decoder, Encoder} +import io.github.arainko.ducktape.* +import io.renku.avro.codec.decoders.all.given +import io.renku.events.v1.ProjectUpdated +import io.renku.redis.client.{QueueName, RedisConfig} +import io.renku.search.provision.TypeTransformers.given +import io.renku.search.solr.documents +import io.renku.solr.client.SolrConfig +import scribe.Scribe + +object ProjectUpdatedProvisioning: + + def make[F[_]: Async: Network]( + queueName: QueueName, + redisConfig: RedisConfig, + solrConfig: SolrConfig + ): Resource[F, UpdateProvisioningProcess[F]] = + given Scribe[F] = scribe.cats[F] + UpdateProvisioningProcess.make[F, ProjectUpdated, documents.Project]( + queueName, + ProjectUpdated.SCHEMA$, + idExtractor, + docUpdate, + redisConfig, + solrConfig + ) + + private given Codec[documents.Project] = Codec[documents.Project]( + Encoder[documents.Entity].contramap(_.asInstanceOf[documents.Entity]), + Decoder[documents.Entity].mapEither { + case u: documents.Project => Right(u) + case u => Left(s"${u.getClass} is not a Project document") + } + ) + + private given Show[ProjectUpdated] = + Show.show[ProjectUpdated](v => s"slug '${v.slug}'") + + private lazy val idExtractor: ProjectUpdated => String = _.id + + private lazy val docUpdate + : ((ProjectUpdated, documents.Project)) => documents.Project = { + case (update, orig) => + update + .into[documents.Project] + .transform( + Field.const(_.createdBy, orig.createdBy), + Field.const(_.creationDate, orig.creationDate), + Field.default(_.score) + ) + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserAddedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserAddedProvisioning.scala index 95aebe95..40386554 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserAddedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserAddedProvisioning.scala @@ -27,12 +27,12 @@ import io.renku.avro.codec.decoders.all.given import io.renku.events.v1 import io.renku.events.v1.UserAdded import io.renku.redis.client.{QueueName, RedisConfig} -import io.renku.search.provision.SolrProvisioningProcess +import io.renku.search.provision.UpsertProvisioningProcess import io.renku.search.solr.documents import io.renku.solr.client.SolrConfig import scribe.Scribe -trait UserAddedProvisioning[F[_]] extends SolrProvisioningProcess[F] +trait UserAddedProvisioning[F[_]] extends UpsertProvisioningProcess[F] object UserAddedProvisioning: @@ -40,9 +40,9 @@ object UserAddedProvisioning: queueName: QueueName, redisConfig: RedisConfig, solrConfig: SolrConfig - ): Resource[F, SolrProvisioningProcess[F]] = + ): Resource[F, UpsertProvisioningProcess[F]] = given Scribe[F] = scribe.cats[F] - SolrProvisioningProcess.make[F, UserAdded, documents.User]( + UpsertProvisioningProcess.make[F, UserAdded, documents.User]( queueName, UserAdded.SCHEMA$, redisConfig, diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala new file mode 100644 index 00000000..86a4f1b3 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision +package user + +import cats.Show +import cats.effect.{Async, Resource} +import fs2.io.net.Network +import io.bullet.borer.Codec.* +import io.bullet.borer.{Codec, Decoder, Encoder} +import io.github.arainko.ducktape.* +import io.renku.avro.codec.decoders.all.given +import io.renku.events.v1.UserUpdated +import io.renku.redis.client.{QueueName, RedisConfig} +import io.renku.search.model.users +import io.renku.search.solr.documents +import io.renku.solr.client.SolrConfig +import scribe.Scribe + +object UserUpdatedProvisioning: + + def make[F[_]: Async: Network]( + queueName: QueueName, + redisConfig: RedisConfig, + solrConfig: SolrConfig + ): Resource[F, UpdateProvisioningProcess[F]] = + given Scribe[F] = scribe.cats[F] + UpdateProvisioningProcess.make[F, UserUpdated, documents.User]( + queueName, + UserUpdated.SCHEMA$, + idExtractor, + docUpdate, + redisConfig, + solrConfig + ) + + private given Codec[documents.User] = Codec[documents.User]( + Encoder[documents.Entity].contramap(_.asInstanceOf[documents.Entity]), + Decoder[documents.Entity].mapEither { + case u: documents.User => Right(u) + case u => Left(s"${u.getClass} is not a User document") + } + ) + + private given Show[UserUpdated] = + Show.show[UserUpdated](u => s"id '${u.id}'") + + private lazy val idExtractor: UserUpdated => String = _.id + + private lazy val docUpdate: ((UserUpdated, documents.User)) => documents.User = { + case (update, _) => + update.into[documents.User].transform(Field.default(_.score)) + } diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala similarity index 99% rename from modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisionerSpec.scala rename to modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala index ebec7d91..abc971cb 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala @@ -43,7 +43,7 @@ import munit.CatsEffectSuite import scala.concurrent.duration.* -class ProjectCreatedProvisionerSpec +class ProjectCreatedProvisioningSpec extends CatsEffectSuite with QueueSpec with SearchSolrSpec: @@ -69,7 +69,7 @@ class ProjectCreatedProvisionerSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.queryEntity(queryProjects, 10, 0)) + .evalMap(_ => solrClient.queryEntity(queryProjects, 20, 0)) .flatMap(qr => Stream.emits(qr.responseBody.docs)) .evalMap(e => solrDocs.update(_ + e.noneScore)) .compile diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala new file mode 100644 index 00000000..98430cc6 --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala @@ -0,0 +1,187 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.project + +import cats.effect.{IO, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.concurrent.SignallingRef +import io.github.arainko.ducktape.* +import io.renku.avro.codec.AvroIO +import io.renku.avro.codec.encoders.all.given +import io.renku.events.EventsGenerators.* +import io.renku.events.v1.{ProjectCreated, ProjectUpdated} +import io.renku.queue.client.Generators.messageHeaderGen +import io.renku.queue.client.QueueSpec +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.{QueueName, RedisClientGenerators} +import io.renku.search.GeneratorSyntax.* +import io.renku.search.model.{EntityType, users} +import io.renku.search.provision.TypeTransformers.given +import io.renku.search.query.Query +import io.renku.search.query.Query.Segment +import io.renku.search.query.Query.Segment.typeIs +import io.renku.search.solr.client.SearchSolrSpec +import io.renku.search.solr.documents.EntityOps.* +import io.renku.search.solr.documents.{Entity, Project} +import munit.CatsEffectSuite + +import scala.concurrent.duration.* + +class ProjectUpdatedProvisioningSpec + extends CatsEffectSuite + with QueueSpec + with SearchSolrSpec: + + private val avro = AvroIO(ProjectUpdated.SCHEMA$) + + (nameUpdate :: slugUpdate :: repositoriesUpdate :: visibilityUpdate :: descUpdate :: noUpdate :: Nil) + .foreach { case TestCase(name, updateF) => + test(s"can fetch events, decode them, and update in Solr in case of $name"): + val queue = RedisClientGenerators.queueNameGen.generateOne + + clientsAndProvisioning(queue).use { case (queueClient, solrClient, provisioner) => + for + solrDocs <- SignallingRef.of[IO, Set[Entity]](Set.empty) + + provisioningFiber <- provisioner.provisioningProcess.start + + created = projectCreatedGen(prefix = "update").generateOne + _ <- solrClient.insert(Seq(created.toSolrDocument.widen)) + + updated = updateF(created) + _ <- queueClient.enqueue( + queue, + messageHeaderGen(ProjectUpdated.SCHEMA$).generateOne, + updated + ) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.queryEntity(queryProjects, 20, 0)) + .flatMap(qr => Stream.emits(qr.responseBody.docs)) + .evalMap(e => solrDocs.update(_ + e.noneScore)) + .compile + .drain + .start + + _ <- solrDocs.waitUntil( + _ contains created.update(updated).toSolrDocument + ) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + yield () + } + } + + private lazy val queryProjects = Query(typeIs(EntityType.Project)) + + private def clientsAndProvisioning(queueName: QueueName) = + (withQueueClient() >>= withSearchSolrClient().tupleLeft) + .flatMap { case (rc, sc) => + ProjectUpdatedProvisioning + .make[IO]( + queueName, + withRedisClient.redisConfig, + withSearchSolrClient.solrConfig + ) + .map((rc, sc, _)) + } + + extension (created: ProjectCreated) + def toSolrDocument: Project = created.into[Project].transform(Field.default(_.score)) + def update(updated: ProjectUpdated): ProjectCreated = + created.copy( + name = updated.name, + slug = updated.slug, + repositories = updated.repositories, + visibility = updated.visibility, + description = updated.description + ) + + private case class TestCase(name: String, f: ProjectCreated => ProjectUpdated) + private lazy val nameUpdate = TestCase( + "name update", + pc => + ProjectUpdated( + pc.id, + stringGen(max = 5).generateOne, + pc.slug, + pc.repositories, + pc.visibility, + pc.description + ) + ) + private lazy val slugUpdate = TestCase( + "slug update", + pc => + ProjectUpdated( + pc.id, + pc.name, + stringGen(max = 5).generateOne, + pc.repositories, + pc.visibility, + pc.description + ) + ) + private lazy val repositoriesUpdate = TestCase( + "repositories update", + pc => + ProjectUpdated( + pc.id, + pc.name, + pc.slug, + stringGen(max = 5).generateList, + pc.visibility, + pc.description + ) + ) + private lazy val visibilityUpdate = TestCase( + "repositories update", + pc => + ProjectUpdated( + pc.id, + pc.name, + pc.slug, + pc.repositories, + projectVisibilityGen.generateOne, + pc.description + ) + ) + private lazy val descUpdate = TestCase( + "repositories update", + pc => + ProjectUpdated( + pc.id, + pc.name, + pc.slug, + pc.repositories, + pc.visibility, + stringGen(max = 5).generateSome + ) + ) + private lazy val noUpdate = TestCase( + "no update", + _.to[ProjectUpdated] + ) + + override def munitFixtures: Seq[Fixture[_]] = + List(withRedisClient, withQueueClient, withSearchSolrClient) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisionerSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala similarity index 97% rename from modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisionerSpec.scala rename to modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala index 93cc526e..b68b06d5 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisionerSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala @@ -43,7 +43,10 @@ import munit.CatsEffectSuite import scala.concurrent.duration.* -class UserAddedProvisionerSpec extends CatsEffectSuite with QueueSpec with SearchSolrSpec: +class UserAddedProvisioningSpec + extends CatsEffectSuite + with QueueSpec + with SearchSolrSpec: private val avro = AvroIO(UserAdded.SCHEMA$) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala new file mode 100644 index 00000000..a6d33827 --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala @@ -0,0 +1,154 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.user + +import cats.effect.{IO, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.concurrent.SignallingRef +import io.github.arainko.ducktape.* +import io.renku.avro.codec.AvroIO +import io.renku.avro.codec.encoders.all.given +import io.renku.events.EventsGenerators.{stringGen, userAddedGen} +import io.renku.events.v1.{UserAdded, UserUpdated} +import io.renku.queue.client.Generators.messageHeaderGen +import io.renku.queue.client.QueueSpec +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.{QueueName, RedisClientGenerators} +import io.renku.search.GeneratorSyntax.* +import io.renku.search.model.{EntityType, users} +import io.renku.search.query.Query +import io.renku.search.query.Query.Segment +import io.renku.search.query.Query.Segment.typeIs +import io.renku.search.solr.client.SearchSolrSpec +import io.renku.search.solr.documents.EntityOps.* +import io.renku.search.solr.documents.{Entity, User} +import munit.CatsEffectSuite + +import scala.concurrent.duration.* + +class UserUpdatedProvisioningSpec + extends CatsEffectSuite + with QueueSpec + with SearchSolrSpec: + + private val avro = AvroIO(UserUpdated.SCHEMA$) + + (firstNameUpdate :: lastNameUpdate :: emailUpdate :: noUpdate :: Nil).foreach { + case TestCase(name, updateF) => + test(s"can fetch events, decode them, and update in Solr in case of $name"): + val queue = RedisClientGenerators.queueNameGen.generateOne + + clientsAndProvisioning(queue).use { case (queueClient, solrClient, provisioner) => + for + solrDocs <- SignallingRef.of[IO, Set[Entity]](Set.empty) + + provisioningFiber <- provisioner.provisioningProcess.start + + userAdded = userAddedGen(prefix = "update").generateOne + _ <- solrClient.insert(Seq(userAdded.toSolrDocument.widen)) + + userUpdated = updateF(userAdded) + _ <- queueClient.enqueue( + queue, + messageHeaderGen(UserUpdated.SCHEMA$).generateOne, + userUpdated + ) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.queryEntity(queryUsers, 10, 0)) + .flatMap(qr => Stream.emits(qr.responseBody.docs)) + .evalMap(e => solrDocs.update(_ + e.noneScore)) + .compile + .drain + .start + + _ <- solrDocs.waitUntil( + _ contains userAdded.update(userUpdated).toSolrDocument + ) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + yield () + } + } + + private lazy val queryUsers = Query(typeIs(EntityType.User)) + + private def clientsAndProvisioning(queueName: QueueName) = + (withQueueClient() >>= withSearchSolrClient().tupleLeft) + .flatMap { case (rc, sc) => + UserUpdatedProvisioning + .make[IO]( + queueName, + withRedisClient.redisConfig, + withSearchSolrClient.solrConfig + ) + .map((rc, sc, _)) + } + + extension (added: UserAdded) + def toSolrDocument: User = added.into[User].transform(Field.default(_.score)) + def update(updated: UserUpdated): UserAdded = + added.copy( + firstName = updated.firstName, + lastName = updated.lastName, + email = updated.email + ) + + private case class TestCase(name: String, f: UserAdded => UserUpdated) + private lazy val firstNameUpdate = TestCase( + "firstName update", + ua => + UserUpdated( + ua.id, + stringGen(max = 5).generateOne.some, + ua.lastName, + ua.email + ) + ) + private lazy val lastNameUpdate = TestCase( + "lastName update", + ua => + UserUpdated( + ua.id, + ua.firstName, + stringGen(max = 5).generateOne.some, + ua.email + ) + ) + private lazy val emailUpdate = TestCase( + "email update", + ua => + UserUpdated( + ua.id, + ua.firstName, + ua.lastName, + stringGen(max = 5).map(v => s"v@host.com").generateOne.some + ) + ) + private lazy val noUpdate = TestCase( + "no update", + ua => UserUpdated(ua.id, ua.firstName, ua.lastName, ua.email) + ) + + override def munitFixtures: Seq[Fixture[_]] = + List(withRedisClient, withQueueClient, withSearchSolrClient) diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala index 20c7accc..7877b79f 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala @@ -21,12 +21,14 @@ package io.renku.search.solr.client import cats.effect.{Async, Resource} import fs2.io.net.Network import io.bullet.borer.Encoder -import io.renku.search.solr.documents.Entity -import io.renku.solr.client.{SolrClient, SolrConfig} import io.renku.search.query.Query -import io.renku.solr.client.QueryResponse +import io.renku.search.solr.documents.Entity +import io.renku.solr.client.{QueryResponse, SolrClient, SolrConfig} + +import scala.reflect.ClassTag trait SearchSolrClient[F[_]]: + def findById[D <: Entity](id: String)(using ct: ClassTag[D]): F[Option[D]] def insert[D: Encoder](documents: Seq[D]): F[Unit] def queryEntity(query: Query, limit: Int, offset: Int): F[QueryResponse[Entity]] diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala index c9731bd2..00e56a32 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala @@ -29,6 +29,8 @@ import io.renku.solr.client.schema.FieldName import io.renku.solr.client.facet.{Facet, Facets} import io.renku.search.solr.schema.EntityDocumentSchema +import scala.reflect.ClassTag + private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) extends SearchSolrClient[F]: @@ -59,3 +61,12 @@ private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) .withFields(FieldName.all, FieldName.score) ) } yield res + + override def findById[D <: Entity](id: String)(using ct: ClassTag[D]): F[Option[D]] = + solrClient.findById[Entity](id).map(_.responseBody.docs.headOption).flatMap { + case Some(e: D) => Some(e).pure[F] + case Some(e) => + new Exception(s"Entity '$id' is of type ${e.getClass} not ${ct.runtimeClass}") + .raiseError[F, Option[D]] + case None => Option.empty[D].pure[F] + } diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala index 0d7f0253..4c2730ef 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala @@ -19,12 +19,13 @@ package io.renku.search.solr.documents import io.bullet.borer.derivation.MapBasedCodecs.* -import io.bullet.borer.{AdtEncodingStrategy, Decoder, Encoder} +import io.bullet.borer.{AdtEncodingStrategy, Codec, Decoder, Encoder} import io.renku.search.model.{projects, users} import io.renku.solr.client.EncoderSupport.* sealed trait Entity: val score: Option[Double] + def widen: Entity = this object Entity: @@ -33,8 +34,9 @@ object Entity: given AdtEncodingStrategy = AdtEncodingStrategy.flat(typeMemberName = discriminatorField) - given Encoder[Entity] = deriveEncoder[Entity] + given Encoder[Entity] = deriveAllEncoders[Entity] given Decoder[Entity] = deriveAllDecoders[Entity] + given Codec[Entity] = Codec.of[Entity] final case class Project( id: projects.Id, @@ -50,7 +52,6 @@ final case class Project( object Project: val entityType: String = "Project" - given Encoder[Project] = deriveWithDiscriminator final case class User( id: users.Id, @@ -62,4 +63,3 @@ final case class User( object User: val entityType: String = "User" - given Encoder[User] = deriveWithDiscriminator diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala index ca29d2e2..f22e57ad 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala @@ -25,6 +25,7 @@ import io.renku.search.model.users import io.renku.search.query.Query import io.renku.search.solr.client.SearchSolrClientGenerators.* import io.renku.search.solr.documents.EntityOps.* +import io.renku.search.solr.documents.{Entity, Project, User} import munit.CatsEffectSuite class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: @@ -34,9 +35,11 @@ class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: val project = projectDocumentGen("solr-project", "solr project description").generateOne for { - _ <- client.insert(Seq(project)) - r <- client.queryEntity(Query.parse("solr").toOption.get, 10, 0) - _ = assert(r.responseBody.docs.map(_.noneScore) contains project) + _ <- client.insert(Seq(project.widen)) + qr <- client.queryEntity(Query.parse("solr").toOption.get, 10, 0) + _ = assert(qr.responseBody.docs.map(_.noneScore) contains project) + gr <- client.findById[Project](project.id.value) + _ = assert(gr contains project) } yield () } @@ -45,8 +48,10 @@ class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: val firstName = users.FirstName("Johnny") val user = userDocumentGen.generateOne.copy(firstName = firstName.some) for { - _ <- client.insert(Seq(user)) - r <- client.queryEntity(Query.parse(firstName.value).toOption.get, 10, 0) - _ = assert(r.responseBody.docs.map(_.noneScore) contains user) + _ <- client.insert(Seq(user.widen)) + qr <- client.queryEntity(Query.parse(firstName.value).toOption.get, 10, 0) + _ = assert(qr.responseBody.docs.map(_.noneScore) contains user) + gr <- client.findById[User](user.id.value) + _ = assert(gr contains user) } yield () } diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/GetByIdResponse.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/GetByIdResponse.scala new file mode 100644 index 00000000..feb161e7 --- /dev/null +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/GetByIdResponse.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.solr.client + +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.bullet.borer.derivation.key + +final case class GetByIdResponse[A]( + @key("response") responseBody: ResponseBody[A] +): + def map[B](f: A => B): GetByIdResponse[B] = + copy(responseBody = responseBody.map(f)) + +object GetByIdResponse: + given [A](using Decoder[A]): Decoder[GetByIdResponse[A]] = + deriveDecoder diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala index 5c43ad15..16f2a030 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClient.scala @@ -40,6 +40,8 @@ trait SolrClient[F[_]]: def insert[A: Encoder](docs: Seq[A]): F[InsertResponse] + def findById[A: Decoder](id: String, other: String*): F[GetByIdResponse[A]] + object SolrClient: def apply[F[_]: Async: Network](config: SolrConfig): Resource[F, SolrClient[F]] = ClientBuilder(EmberClientBuilder.default[F]) diff --git a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala index ba03ff68..f8ab5028 100644 --- a/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala +++ b/modules/solr-client/src/main/scala/io/renku/solr/client/SolrClientImpl.scala @@ -18,6 +18,7 @@ package io.renku.solr.client +import cats.data.NonEmptyList import cats.effect.Async import cats.syntax.all.* import io.bullet.borer.{Decoder, Encoder} @@ -61,18 +62,31 @@ private class SolrClientImpl[F[_]: Async](config: SolrConfig, underlying: Client .void def insert[A: Encoder](docs: Seq[A]): F[InsertResponse] = - val req = Method.POST(docs, makeUpdateUrl).withBasicAuth(credentials) + val req = Method + .POST(docs, makeUpdateUrl) + .withBasicAuth(credentials) underlying .expectOr[InsertResponse](req)(ResponseLogging.Error(logger, req)) .flatTap(r => logger.trace(s"Solr inserted response: $r")) private def makeUpdateUrl = { - val base = solrUrl / "update" + val base = (solrUrl / "update") + .withQueryParam("overwrite", "true") + .withQueryParam("wt", "json") config.commitWithin match case Some(d) if d == Duration.Zero => base.withQueryParam("commit", "true") case Some(d) => base.withQueryParam("commitWithin", d.toMillis) case None => base } + override def findById[A: Decoder](id: String, other: String*): F[GetByIdResponse[A]] = + val req = Method + .GET(makeGetByIdUrl(NonEmptyList.of(id, other: _*))) + .withBasicAuth(credentials) + underlying.fetchAs[GetByIdResponse[A]](req) + + private def makeGetByIdUrl(ids: NonEmptyList[String]) = + (solrUrl / "get").withQueryParam("ids", ids.toList) + private lazy val credentials: Option[BasicCredentials] = config.maybeUser.map(u => BasicCredentials(u.username, u.password)) diff --git a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala index 2d4383ec..91ca7ca1 100644 --- a/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala +++ b/modules/solr-client/src/test/scala/io/renku/solr/client/SolrClientSpec.scala @@ -20,19 +20,18 @@ package io.renku.solr.client import cats.effect.IO import cats.syntax.all.* -import io.bullet.borer.derivation.MapBasedCodecs -import io.bullet.borer.{Decoder, Encoder} +import io.bullet.borer.derivation.{MapBasedCodecs, key} +import io.bullet.borer.{Decoder, Encoder, Reader} import io.renku.search.LoggingConfigure import io.renku.solr.client.SolrClientSpec.Room +import io.renku.solr.client.facet.{Facet, Facets} import io.renku.solr.client.schema.* import io.renku.solr.client.util.{SolrSpec, SolrTruncate} -import munit.CatsEffectSuite -import munit.ScalaCheckEffectSuite -import org.scalacheck.effect.PropF -import io.bullet.borer.Reader +import munit.{CatsEffectSuite, ScalaCheckEffectSuite} import org.scalacheck.Gen -import io.renku.solr.client.facet.{Facet, Facets} -import io.bullet.borer.derivation.key +import org.scalacheck.effect.PropF + +import java.util.UUID class SolrClientSpec extends CatsEffectSuite @@ -49,18 +48,20 @@ class SolrClientSpec SchemaCommand.Add(Field(FieldName("roomDescription"), TypeName("roomText"))), SchemaCommand.Add(Field(FieldName("roomSeats"), TypeName("roomInt"))) ) + withSolrClient().use { client => - val rooms = Seq(Room("meeting room", "room for meetings", 56)) + val room = Room(UUID.randomUUID().toString, "meeting room", "room for meetings", 56) for { _ <- truncateAll(client)( List("roomName", "roomDescription", "roomSeats").map(FieldName.apply), List("roomText", "roomInt").map(TypeName.apply) ) _ <- client.modifySchema(cmds) - _ <- client - .insert[Room](rooms) - r <- client.query[Room](QueryData(QueryString("_type:Room"))) - _ = assertEquals(r.responseBody.docs, rooms) + _ <- client.insert[Room](Seq(room)) + qr <- client.query[Room](QueryData(QueryString("_type:Room"))) + _ = qr.responseBody.docs contains room + ir <- client.findById[Room](room.id) + _ = ir.responseBody.docs contains room } yield () } @@ -95,32 +96,33 @@ class SolrClientSpec } yield () } - // test("delete by id"): - // withSolrClient().use { client => - // for { - // _ <- client.delete(QueryString("*:*")) - // _ <- client.insert(Seq(SolrClientSpec.Person("p1", "John"))) - // r <- client.query[SolrClientSpec.Person](QueryData(QueryString("*:*"))) - // p = r.responseBody.docs.head - // _ = assertEquals(p.id, "p1") - // _ <- client.deleteById("p1", "p2") - // r2 <- client.query[SolrClientSpec.Person](QueryData(QueryString("*:*"))) - // _ <- IO.sleep(50.millis) // seems to be necessary on ci - // _ = assert(r2.responseBody.docs.isEmpty) - // } yield () - // } +// test("delete by id"): +// withSolrClient().use { client => +// for { +// _ <- client.delete(QueryString("*:*")) +// _ <- client.insert(Seq(SolrClientSpec.Person("p1", "John"))) +// r <- client.query[SolrClientSpec.Person](QueryData(QueryString("*:*"))) +// p = r.responseBody.docs.head +// _ = assertEquals(p.id, "p1") +// _ <- client.deleteById("p1", "p2") +// r2 <- client.query[SolrClientSpec.Person](QueryData(QueryString("*:*"))) +// _ <- IO.sleep(50.millis) // seems to be necessary on ci +// _ = assert(r2.responseBody.docs.isEmpty) +// } yield () +// } object SolrClientSpec: - case class Room(roomName: String, roomDescription: String, roomSeats: Int) + case class Room(id: String, roomName: String, roomDescription: String, roomSeats: Int) object Room: val gen: Gen[Room] = for { + id <- Gen.uuid.map(_.toString) name <- Gen .choose(4, 12) .flatMap(n => Gen.listOfN(n, Gen.alphaChar)) .map(_.mkString) descr = s"Room description for $name" seats <- Gen.choose(15, 350) - } yield Room(name, descr, seats) + } yield Room(id, name, descr, seats) given Decoder[Room] = MapBasedCodecs.deriveDecoder given Encoder[Room] = EncoderSupport.deriveWithDiscriminator[Room]