From 0063e91f43e9cb168663441433784c03973ddc9e Mon Sep 17 00:00:00 2001 From: Jakub Chrobasik Date: Wed, 13 Mar 2024 13:36:34 +0100 Subject: [PATCH] feat: support for the UserRemoved event (#56) * refactor: Project Removal process not to group events * feat: support for UserRemoved events * refactor: using model.Id wherever possible; DocumentId removed --- .../io/renku/search/GeneratorSyntax.scala | 3 + .../renku/search/provision/Microservice.scala | 14 +- .../search/provision/OnSolrPersist.scala | 28 ++++ .../renku/search/provision/QueuesConfig.scala | 6 +- .../search/provision/SolrRemovalProcess.scala | 70 ++++----- .../provision/UpdateProvisioningProcess.scala | 5 +- .../AuthorizationAddedProvisioning.scala | 4 +- .../AuthorizationRemovedProvisioning.scala | 10 +- .../AuthorizationUpdatedProvisioning.scala | 4 +- ...ning.scala => ProjectRemovedProcess.scala} | 15 +- .../project/ProjectUpdatedProvisioning.scala | 4 +- .../provision/user/UserRemovedProcess.scala | 126 +++++++++++++++ .../user/UserUpdatedProvisioning.scala | 4 +- .../AuthorizationAddedProvisioningSpec.scala | 2 +- ...AuthorizationRemovedProvisioningSpec.scala | 2 +- ...AuthorizationUpdatedProvisioningSpec.scala | 2 +- .../ProjectCreatedProvisioningSpec.scala | 7 +- ....scala => ProjectRemovedProcessSpec.scala} | 13 +- .../ProjectUpdatedProvisioningSpec.scala | 5 +- .../user/UserAddedProvisioningSpec.scala | 66 ++------ .../user/UserRemovedProcessSpec.scala | 145 ++++++++++++++++++ .../user/UserUpdatedProvisioningSpec.scala | 21 +-- .../search/solr/client/SearchSolrClient.scala | 14 +- .../solr/client/SearchSolrClientImpl.scala | 24 +-- .../renku/search/solr/documents/Entity.scala | 13 +- .../solr/client/SearchSolrClientSpec.scala | 28 +++- .../solr/client/SolrDocumentGenerators.scala | 12 +- 27 files changed, 466 insertions(+), 181 deletions(-) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/OnSolrPersist.scala rename modules/search-provision/src/main/scala/io/renku/search/provision/project/{ProjectRemovedProvisioning.scala => ProjectRemovedProcess.scala} (84%) create mode 100644 modules/search-provision/src/main/scala/io/renku/search/provision/user/UserRemovedProcess.scala rename modules/search-provision/src/test/scala/io/renku/search/provision/project/{ProjectRemovedProvisioningSpec.scala => ProjectRemovedProcessSpec.scala} (94%) create mode 100644 modules/search-provision/src/test/scala/io/renku/search/provision/user/UserRemovedProcessSpec.scala diff --git a/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala b/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala index 7eee754c..65875c8f 100644 --- a/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala +++ b/modules/commons/src/test/scala/io/renku/search/GeneratorSyntax.scala @@ -36,6 +36,9 @@ trait GeneratorSyntax: def generateList: List[A] = Gen.listOf(self).generateOne + def generateList(min: Int, max: Int): List[A] = + Gen.choose(min, max).flatMap(Gen.listOfN(_, self)).generateOne + def stream: Stream[Gen, A] = Stream.repeatEval(self) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala index b54b4613..86ff6f26 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/Microservice.scala @@ -61,7 +61,7 @@ object Microservice extends IOApp: ( "ProjectRemoved", cfg.queuesConfig.projectRemoved, - ProjectRemovedProvisioning + ProjectRemovedProcess .make[IO](cfg.queuesConfig.projectRemoved, cfg.redisConfig, cfg.solrConfig) .map(_.removalProcess.start) ), @@ -111,6 +111,18 @@ object Microservice extends IOApp: UserUpdatedProvisioning .make[IO](cfg.queuesConfig.userUpdated, cfg.redisConfig, cfg.solrConfig) .map(_.provisioningProcess.start) + ), + ( + "UserRemoved", + cfg.queuesConfig.userRemoved, + UserRemovedProcess + .make[IO]( + cfg.queuesConfig.userRemoved, + cfg.queuesConfig.projectAuthorizationRemoved, + cfg.redisConfig, + cfg.solrConfig + ) + .map(_.removalProcess.start) ) ).parTraverse_(startProcess(cfg)) .flatMap(_ => IO.never) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/OnSolrPersist.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/OnSolrPersist.scala new file mode 100644 index 00000000..f5c127ed --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/OnSolrPersist.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision + +import io.renku.queue.client.{QueueClient, RequestId} +import io.renku.search.solr.client.SearchSolrClient + +private trait OnSolrPersist[F[_], In]: + def execute(in: In, requestId: RequestId)( + queueClient: QueueClient[F], + solrClient: SearchSolrClient[F] + ): F[Unit] diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala index aec387c6..82c7ec88 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/QueuesConfig.scala @@ -31,7 +31,8 @@ final case class QueuesConfig( projectAuthorizationUpdated: QueueName, projectAuthorizationRemoved: QueueName, userAdded: QueueName, - userUpdated: QueueName + userUpdated: QueueName, + userRemoved: QueueName ) object QueuesConfig: @@ -44,5 +45,6 @@ object QueuesConfig: ConfigValues.eventQueue("projectAuthorizationUpdated"), ConfigValues.eventQueue("projectAuthorizationRemoved"), ConfigValues.eventQueue("userAdded"), - ConfigValues.eventQueue("userUpdated") + ConfigValues.eventQueue("userUpdated"), + ConfigValues.eventQueue("userRemoved") ).mapN(QueuesConfig.apply) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/SolrRemovalProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/SolrRemovalProcess.scala index 283a4ae8..33559d72 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/SolrRemovalProcess.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/SolrRemovalProcess.scala @@ -18,58 +18,58 @@ package io.renku.search.provision +import cats.Show +import cats.data.NonEmptyList import cats.effect.{Async, Resource, Temporal} import cats.syntax.all.* -import cats.Show -import fs2.Chunk import fs2.io.net.Network import io.github.arainko.ducktape.* -import io.renku.avro.codec.{AvroDecoder, AvroReader} -import io.renku.queue.client.{QueueClient, QueueMessage} +import io.renku.avro.codec.AvroDecoder +import io.renku.queue.client.{QueueClient, QueueMessage, RequestId} import io.renku.redis.client.{ClientId, QueueName, RedisConfig} +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrClient import io.renku.solr.client.SolrConfig import org.apache.avro.Schema import scribe.Scribe -import io.renku.search.solr.documents.DocumentId + import scala.concurrent.duration.* -import cats.data.NonEmptyList trait SolrRemovalProcess[F[_]]: def removalProcess: F[Unit] object SolrRemovalProcess: - private val clientId: ClientId = ClientId("search-provisioner") def make[F[_]: Async: Network: Scribe, In]( queueName: QueueName, inSchema: Schema, redisConfig: RedisConfig, - solrConfig: SolrConfig + solrConfig: SolrConfig, + onSolrPersist: Option[OnSolrPersist[F, In]] )(using Show[In], - Transformer[In, DocumentId], + Transformer[In, Id], AvroDecoder[In] ): Resource[F, SolrRemovalProcess[F]] = SearchSolrClient.make[F](solrConfig).map { new SolrRemovalProcessImpl[F, In]( queueName, - inSchema, - clientId, + ProvisioningProcess.clientId, QueueClient.make[F](redisConfig), _, - QueueMessageDecoder[F, In](inSchema) + QueueMessageDecoder[F, In](inSchema), + onSolrPersist ) } private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In]( queueName: QueueName, - inSchema: Schema, clientId: ClientId, queueClientResource: Resource[F, QueueClient[F]], solrClient: SearchSolrClient[F], - messageDecoder: QueueMessageDecoder[F, In] -)(using Show[In], Transformer[In, DocumentId], AvroDecoder[In]) + messageDecoder: QueueMessageDecoder[F, In], + onSolrPersist: Option[OnSolrPersist[F, In]] +)(using Show[In], Transformer[In, Id], AvroDecoder[In]) extends SolrRemovalProcess[F]: override def removalProcess: F[Unit] = queueClientResource @@ -79,7 +79,6 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In]( .acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed) .evalMap(decodeMessage(queueClient)) .evalTap(logInfo) - .groupWithin(chunkSize = 10, timeout = 500 millis) .evalMap(deleteFromSolr(queueClient)) .compile .drain @@ -93,7 +92,7 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In]( private lazy val logInfo: ((QueueMessage, Seq[In])) => F[Unit] = { case (m, v) => Scribe[F].info( - "Received mesage " + s"queue: $queueName, " + + s"Received message queue: $queueName, " + s"id: ${m.id}, " + s"source: ${m.header.source}, " + s"type: ${m.header.`type`} " + @@ -101,8 +100,6 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In]( ) } - private val avro = AvroReader(inSchema) - private def decodeMessage(queueClient: QueueClient[F])( message: QueueMessage ): F[(QueueMessage, Seq[In])] = @@ -113,24 +110,27 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In]( private def deleteFromSolr( queueClient: QueueClient[F] - )(chunk: Chunk[(QueueMessage, Seq[In])]): F[Unit] = - chunk.toList match { - case Nil => ().pure[F] - case tuples => - val ids = toDocumentIds(tuples.flatMap(_._2)) - ids match { - case Some(ids) => - val (lastMessage, _) = tuples.last - solrClient - .deleteIds(ids) - .flatMap(_ => markProcessed(lastMessage, queueClient)) - .onError(markProcessedOnFailure(lastMessage, queueClient)) - case None => ().pure[F] - } + ): ((QueueMessage, Seq[In])) => F[Unit] = { case (message, ins) => + toDocumentIds(ins).fold(().pure[F]) { ids => + (solrClient.deleteIds(ids) >> onPersist(queueClient, message, ins)) + .flatMap(_ => markProcessed(message, queueClient)) + .onError(markProcessedOnFailure(message, queueClient)) + } + } + + private def onPersist( + queueClient: QueueClient[F], + message: QueueMessage, + ins: Seq[In] + ) = + onSolrPersist.fold(().pure[F]) { p => + ins.toList.traverse_( + p.execute(_, RequestId(message.header.requestId))(queueClient, solrClient) + ) } - private lazy val toDocumentIds: Seq[In] => Option[NonEmptyList[DocumentId]] = - _.map(_.to[DocumentId]).toList.toNel + private lazy val toDocumentIds: Seq[In] => Option[NonEmptyList[Id]] = + _.map(_.to[Id]).toList.toNel private def markProcessedOnFailure( message: QueueMessage, diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala index 5abae022..2bc35d24 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/UpdateProvisioningProcess.scala @@ -26,6 +26,7 @@ import io.bullet.borer.Codec import io.renku.avro.codec.AvroDecoder import io.renku.queue.client.{QueueClient, QueueMessage} import io.renku.redis.client.{ClientId, QueueName, RedisConfig} +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrClient import io.renku.search.solr.documents.Entity import io.renku.solr.client.SolrConfig @@ -42,7 +43,7 @@ object UpdateProvisioningProcess: def make[F[_]: Async: Network: Scribe, In, Out <: Entity]( queueName: QueueName, inSchema: Schema, - idExtractor: In => String, + idExtractor: In => Id, docUpdate: ((In, Out)) => Out, redisConfig: RedisConfig, solrConfig: SolrConfig @@ -67,7 +68,7 @@ object UpdateProvisioningProcess: private class UpdateProvisioningProcessImpl[F[_]: Async: Scribe, In, Out <: Entity]( queueName: QueueName, clientId: ClientId, - idExtractor: In => String, + idExtractor: In => Id, docUpdate: ((In, Out)) => Out, queueClientResource: Resource[F, QueueClient[F]], solrClient: SearchSolrClient[F], diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationAddedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationAddedProvisioning.scala index 1b601345..a1d70d49 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationAddedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationAddedProvisioning.scala @@ -22,7 +22,6 @@ package project import cats.Show import cats.effect.{Async, Resource} import fs2.io.net.Network - import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.ProjectAuthorizationAdded import io.renku.redis.client.{QueueName, RedisConfig} @@ -55,7 +54,8 @@ object AuthorizationAddedProvisioning: s"projectId '${v.projectId}', userId '${v.userId}', role '${v.role}'" ) - private lazy val idExtractor: ProjectAuthorizationAdded => String = _.projectId + private lazy val idExtractor: ProjectAuthorizationAdded => Id = + paa => Id(paa.projectId) private lazy val docUpdate : ((ProjectAuthorizationAdded, documents.Project)) => documents.Project = { diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioning.scala index 7455945f..1189bf2c 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioning.scala @@ -20,14 +20,11 @@ package io.renku.search.provision package project import cats.Show -import cats.effect.Async -import cats.effect.Resource +import cats.effect.{Async, Resource} import fs2.io.net.Network - import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.ProjectAuthorizationRemoved -import io.renku.redis.client.QueueName -import io.renku.redis.client.RedisConfig +import io.renku.redis.client.{QueueName, RedisConfig} import io.renku.search.model.Id import io.renku.search.solr.documents import io.renku.solr.client.SolrConfig @@ -56,7 +53,8 @@ object AuthorizationRemovedProvisioning: s"projectId '${v.projectId}', userId '${v.userId}'" ) - private lazy val idExtractor: ProjectAuthorizationRemoved => String = _.projectId + private lazy val idExtractor: ProjectAuthorizationRemoved => Id = par => + Id(par.projectId) private lazy val docUpdate : ((ProjectAuthorizationRemoved, documents.Project)) => documents.Project = { diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioning.scala index 4bbe7c0c..d10eaa67 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioning.scala @@ -22,7 +22,6 @@ package project import cats.Show import cats.effect.{Async, Resource} import fs2.io.net.Network - import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.ProjectAuthorizationUpdated import io.renku.redis.client.{QueueName, RedisConfig} @@ -55,7 +54,8 @@ object AuthorizationUpdatedProvisioning: s"projectId '${v.projectId}', userId '${v.userId}', role '${v.role}'" ) - private lazy val idExtractor: ProjectAuthorizationUpdated => String = _.projectId + private lazy val idExtractor: ProjectAuthorizationUpdated => Id = + pau => Id(pau.projectId) private lazy val docUpdate : ((ProjectAuthorizationUpdated, documents.Project)) => documents.Project = { diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProcess.scala similarity index 84% rename from modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProvisioning.scala rename to modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProcess.scala index 3e165c77..c696e367 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectRemovedProcess.scala @@ -22,20 +22,17 @@ import cats.Show import cats.effect.{Async, Resource} import cats.syntax.all.* import fs2.io.net.Network - import io.github.arainko.ducktape.* import io.renku.avro.codec.decoders.all.given import io.renku.events.v1 import io.renku.events.v1.ProjectRemoved import io.renku.redis.client.{QueueName, RedisConfig} +import io.renku.search.model.Id import io.renku.search.provision.SolrRemovalProcess -import io.renku.search.solr.documents.DocumentId import io.renku.solr.client.SolrConfig import scribe.Scribe -trait ProjectRemovedProvisioning[F[_]] extends SolrRemovalProcess[F] - -object ProjectRemovedProvisioning: +object ProjectRemovedProcess: def make[F[_]: Async: Network]( queueName: QueueName, @@ -47,12 +44,12 @@ object ProjectRemovedProvisioning: queueName, ProjectRemoved.SCHEMA$, redisConfig, - solrConfig + solrConfig, + onSolrPersist = None ) private given Show[ProjectRemoved] = Show.show[ProjectRemoved](pr => show"slug '${pr.id}'") - private given Transformer[ProjectRemoved, DocumentId] = - // _.id.into[DocumentId].transform() - r => DocumentId(r.id) + private given Transformer[ProjectRemoved, Id] = + r => Id(r.id) diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala index 075d4f55..4971d84f 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/project/ProjectUpdatedProvisioning.scala @@ -22,11 +22,11 @@ package project import cats.Show import cats.effect.{Async, Resource} import fs2.io.net.Network - import io.github.arainko.ducktape.* import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.ProjectUpdated import io.renku.redis.client.{QueueName, RedisConfig} +import io.renku.search.model.Id import io.renku.search.provision.TypeTransformers.given import io.renku.search.solr.documents import io.renku.solr.client.SolrConfig @@ -52,7 +52,7 @@ object ProjectUpdatedProvisioning: private given Show[ProjectUpdated] = Show.show[ProjectUpdated](v => s"slug '${v.slug}'") - private lazy val idExtractor: ProjectUpdated => String = _.id + private lazy val idExtractor: ProjectUpdated => Id = pu => Id(pu.id) private lazy val docUpdate : ((ProjectUpdated, documents.Project)) => documents.Project = { diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserRemovedProcess.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserRemovedProcess.scala new file mode 100644 index 00000000..79fedf51 --- /dev/null +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserRemovedProcess.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.user + +import cats.Show +import cats.effect.{Async, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.io.net.Network +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder +import io.github.arainko.ducktape.* +import io.renku.avro.codec.decoders.all.given +import io.renku.avro.codec.encoders.all.given +import io.renku.events.v1 +import io.renku.events.v1.{ProjectAuthorizationRemoved, UserRemoved} +import io.renku.queue.client.* +import io.renku.queue.client.DataContentType.Binary +import io.renku.redis.client.{QueueName, RedisConfig} +import io.renku.search.model.Id +import io.renku.search.provision.ProvisioningProcess.clientId +import io.renku.search.provision.{OnSolrPersist, SolrRemovalProcess} +import io.renku.search.solr.client.SearchSolrClient +import io.renku.search.solr.documents.Project +import io.renku.search.solr.schema.EntityDocumentSchema.Fields +import io.renku.solr.client.{QueryData, SolrConfig} +import scribe.Scribe + +object UserRemovedProcess: + + def make[F[_]: Async: Network]( + userRemovedQueue: QueueName, + authRemovedQueue: QueueName, + redisConfig: RedisConfig, + solrConfig: SolrConfig + ): Resource[F, SolrRemovalProcess[F]] = + given Scribe[F] = scribe.cats[F] + + SolrRemovalProcess.make[F, UserRemoved]( + userRemovedQueue, + UserRemoved.SCHEMA$, + redisConfig, + solrConfig, + onSolrPersist = Some(onSolrPersist[F](authRemovedQueue)) + ) + + private given Show[UserRemoved] = + Show.show[UserRemoved](e => show"id '${e.id}'") + + private given Transformer[UserRemoved, Id] = + r => Id(r.id) + + private def onSolrPersist[F[_]: Async](authRemovedQueue: QueueName) = + + case class ProjectId(id: String) + given Decoder[ProjectId] = deriveDecoder[ProjectId] + + new OnSolrPersist[F, UserRemoved] { + override def execute(in: UserRemoved, requestId: RequestId)( + queueClient: QueueClient[F], + solrClient: SearchSolrClient[F] + ): F[Unit] = + findAffectedProjects(solrClient, in.id) + .evalMap(enqueueAuthRemoved(queueClient, requestId, _, in.id)) + .compile + .drain + + private def findAffectedProjects( + sc: SearchSolrClient[F], + userId: String + ): Stream[F, ProjectId] = + Stream + .iterate(1)(_ + 1) + .evalMap(p => sc.query[ProjectId](prepareQuery(userId, p))) + .map(_.responseBody.docs) + .takeWhile(_.nonEmpty) + .flatMap(Stream.emits) + + private val pageSize = 20 + + private def prepareQuery(userId: String, page: Int) = + QueryData( + s"${Fields.entityType}:${Project.entityType} ${Fields.owners}:$userId ${Fields.members}:$userId", + filter = Seq.empty, + limit = pageSize * page, + offset = pageSize * (page - 1) + ).withFields(Fields.id) + + private def enqueueAuthRemoved( + qc: QueueClient[F], + requestId: RequestId, + projectId: ProjectId, + userId: String + ): F[Unit] = + qc.enqueue( + authRemovedQueue, + createHeader(requestId), + ProjectAuthorizationRemoved(projectId.id, userId) + ).void + + private def createHeader(requestId: RequestId) = + MessageHeader( + MessageSource(clientId.value), + ProjectAuthorizationRemoved.SCHEMA$, + Binary, + SchemaVersion("V1"), + CreationTime.now, + requestId + ) + } diff --git a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala index bc0dcfee..b006fb9f 100644 --- a/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala +++ b/modules/search-provision/src/main/scala/io/renku/search/provision/user/UserUpdatedProvisioning.scala @@ -27,7 +27,7 @@ import io.github.arainko.ducktape.* import io.renku.avro.codec.decoders.all.given import io.renku.events.v1.UserUpdated import io.renku.redis.client.{QueueName, RedisConfig} -import io.renku.search.model.{Name, users} +import io.renku.search.model.{Id, Name, users} import io.renku.search.solr.documents import io.renku.solr.client.SolrConfig import scribe.Scribe @@ -52,7 +52,7 @@ object UserUpdatedProvisioning: private given Show[UserUpdated] = Show.show[UserUpdated](u => s"id '${u.id}'") - private lazy val idExtractor: UserUpdated => String = _.id + private lazy val idExtractor: UserUpdated => Id = uu => Id(uu.id) private lazy val docUpdate: ((UserUpdated, documents.User)) => documents.User = { case (update, _) => diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationAddedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationAddedProvisioningSpec.scala index 3bc2874a..661a2b8f 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationAddedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationAddedProvisioningSpec.scala @@ -68,7 +68,7 @@ class AuthorizationAddedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](projectDoc.id.value)) + .evalMap(_ => solrClient.findById[Project](projectDoc.id)) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioningSpec.scala index d25ceaab..d42689df 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationRemovedProvisioningSpec.scala @@ -69,7 +69,7 @@ class AuthorizationRemovedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](projectDoc.id.value)) + .evalMap(_ => solrClient.findById[Project](projectDoc.id)) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioningSpec.scala index 1415e84c..f5b3f0dc 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/AuthorizationUpdatedProvisioningSpec.scala @@ -69,7 +69,7 @@ class AuthorizationUpdatedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](projectDoc.id.value)) + .evalMap(_ => solrClient.findById[Project](projectDoc.id)) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala index 7e450714..be34474f 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectCreatedProvisioningSpec.scala @@ -19,12 +19,10 @@ package io.renku.search.provision.project import scala.concurrent.duration.* - import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream import fs2.concurrent.SignallingRef - import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given import io.renku.events.EventsGenerators.projectCreatedGen @@ -34,6 +32,7 @@ import io.renku.queue.client.{DataContentType, QueueSpec} import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.{QueueName, RedisClientGenerators} import io.renku.search.GeneratorSyntax.* +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.{Entity, Project} import munit.CatsEffectSuite @@ -65,7 +64,7 @@ class ProjectCreatedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](created.id)) + .evalMap(_ => solrClient.findById[Project](Id(created.id))) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain @@ -97,7 +96,7 @@ class ProjectCreatedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](created.id)) + .evalMap(_ => solrClient.findById[Project](Id(created.id))) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProcessSpec.scala similarity index 94% rename from modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProvisioningSpec.scala rename to modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProcessSpec.scala index 95869346..88ca1dd7 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectRemovedProcessSpec.scala @@ -18,13 +18,10 @@ package io.renku.search.provision.project -import scala.concurrent.duration.* - import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream import fs2.concurrent.SignallingRef - import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given import io.renku.events.EventsGenerators.* @@ -34,7 +31,7 @@ import io.renku.queue.client.QueueSpec import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.{QueueName, RedisClientGenerators} import io.renku.search.GeneratorSyntax.* -import io.renku.search.model.EntityType +import io.renku.search.model.{EntityType, Id} import io.renku.search.query.Query import io.renku.search.query.Query.Segment import io.renku.search.query.Query.Segment.typeIs @@ -42,7 +39,9 @@ import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.{Entity, Project} import munit.CatsEffectSuite -class ProjectRemovedProvisioningSpec +import scala.concurrent.duration.* + +class ProjectRemovedProcessSpec extends CatsEffectSuite with QueueSpec with SearchSolrSpec @@ -65,7 +64,7 @@ class ProjectRemovedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](created.id)) + .evalMap(_ => solrClient.findById[Project](Id(created.id))) .evalMap(e => solrDoc.update(_ => e)) .compile .drain @@ -96,7 +95,7 @@ class ProjectRemovedProvisioningSpec private def clientsAndProvisioning(queueName: QueueName) = (withQueueClient() >>= withSearchSolrClient().tupleLeft) .flatMap { case (rc, sc) => - ProjectRemovedProvisioning + ProjectRemovedProcess .make[IO]( queueName, withRedisClient.redisConfig, diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala index b9222d43..e8b47ca5 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/project/ProjectUpdatedProvisioningSpec.scala @@ -19,12 +19,10 @@ package io.renku.search.provision.project import scala.concurrent.duration.* - import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream import fs2.concurrent.SignallingRef - import io.github.arainko.ducktape.* import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given @@ -35,6 +33,7 @@ import io.renku.queue.client.QueueSpec import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.{QueueName, RedisClientGenerators} import io.renku.search.GeneratorSyntax.* +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrSpec import io.renku.search.solr.documents.{Entity, Project} import munit.CatsEffectSuite @@ -71,7 +70,7 @@ class ProjectUpdatedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.findById[Project](created.id)) + .evalMap(_ => solrClient.findById[Project](Id(created.id))) .evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e)))) .compile .drain diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala index 9823162e..528fecd9 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserAddedProvisioningSpec.scala @@ -22,22 +22,17 @@ import cats.effect.{IO, Resource} import cats.syntax.all.* import fs2.Stream import fs2.concurrent.SignallingRef -import io.renku.avro.codec.AvroIO import io.renku.avro.codec.encoders.all.given import io.renku.events.EventsGenerators.userAddedGen import io.renku.events.v1.UserAdded import io.renku.queue.client.Generators.messageHeaderGen -import io.renku.queue.client.{DataContentType, QueueSpec} +import io.renku.queue.client.QueueSpec import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.{QueueName, RedisClientGenerators} import io.renku.search.GeneratorSyntax.* -import io.renku.search.model.EntityType -import io.renku.search.query.Query -import io.renku.search.query.Query.Segment -import io.renku.search.query.Query.Segment.typeIs +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrSpec -import io.renku.search.solr.documents.EntityOps.* -import io.renku.search.solr.documents.Entity +import io.renku.search.solr.documents.{Entity, User} import munit.CatsEffectSuite import scala.concurrent.duration.* @@ -48,77 +43,38 @@ class UserAddedProvisioningSpec with SearchSolrSpec with UserSyntax: - private val avro = AvroIO(UserAdded.SCHEMA$) - - test("can fetch events binary encoded, decode them, and send them to Solr"): - val queue = RedisClientGenerators.queueNameGen.generateOne - - clientsAndProvisioning(queue).use { case (queueClient, solrClient, provisioner) => - for - solrDocs <- SignallingRef.of[IO, Set[Entity]](Set.empty) - - provisioningFiber <- provisioner.provisioningProcess.start - - message1 = userAddedGen(prefix = "binary").generateOne - _ <- queueClient.enqueue( - queue, - messageHeaderGen(UserAdded.SCHEMA$, DataContentType.Binary).generateOne, - message1 - ) - - docsCollectorFiber <- - Stream - .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.queryEntity(queryUsers, 10, 0)) - .flatMap(qr => Stream.emits(qr.responseBody.docs)) - .evalMap(e => solrDocs.update(_ + e.noneScore)) - .compile - .drain - .start - - _ <- solrDocs.waitUntil(_ contains message1.toSolrDocument) - - _ <- provisioningFiber.cancel - _ <- docsCollectorFiber.cancel - yield () - } - - test("can fetch events JSON encoded, decode them, and send them to Solr"): + test("can fetch events, decode them, and send them to Solr"): val queue = RedisClientGenerators.queueNameGen.generateOne clientsAndProvisioning(queue).use { case (queueClient, solrClient, provisioner) => for - solrDocs <- SignallingRef.of[IO, Set[Entity]](Set.empty) + solrDoc <- SignallingRef.of[IO, Option[Entity]](None) provisioningFiber <- provisioner.provisioningProcess.start - message1 = userAddedGen(prefix = "json").generateOne + userAdded = userAddedGen(prefix = "user-added").generateOne _ <- queueClient.enqueue( queue, - messageHeaderGen(UserAdded.SCHEMA$, DataContentType.Json).generateOne, - message1 + messageHeaderGen(UserAdded.SCHEMA$).generateOne, + userAdded ) docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.queryEntity(queryUsers, 10, 0)) - .flatMap(qr => Stream.emits(qr.responseBody.docs)) - .evalTap(IO.println) - .evalMap(e => solrDocs.update(_ + e.noneScore)) + .evalMap(_ => solrClient.findById[User](Id(userAdded.id))) + .evalMap(e => solrDoc.update(_ => e)) .compile .drain .start - _ <- solrDocs.waitUntil(_ contains toSolrDocument(message1)) + _ <- solrDoc.waitUntil(_ contains userAdded.toSolrDocument) _ <- provisioningFiber.cancel _ <- docsCollectorFiber.cancel yield () } - private lazy val queryUsers = Query(typeIs(EntityType.User)) - private def clientsAndProvisioning(queueName: QueueName) = (withQueueClient() >>= withSearchSolrClient().tupleLeft) .flatMap { case (rc, sc) => diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserRemovedProcessSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserRemovedProcessSpec.scala new file mode 100644 index 00000000..5c755edb --- /dev/null +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserRemovedProcessSpec.scala @@ -0,0 +1,145 @@ +/* + * Copyright 2024 Swiss Data Science Center (SDSC) + * A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and + * Eidgenössische Technische Hochschule Zürich (ETHZ). + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.renku.search.provision.user + +import cats.effect.{IO, Resource} +import cats.syntax.all.* +import fs2.Stream +import fs2.concurrent.SignallingRef +import io.renku.avro.codec.AvroIO +import io.renku.avro.codec.decoders.all.given +import io.renku.avro.codec.encoders.all.given +import io.renku.events.EventsGenerators.* +import io.renku.events.v1.* +import io.renku.queue.client.Generators.messageHeaderGen +import io.renku.queue.client.QueueSpec +import io.renku.redis.client.RedisClientGenerators.* +import io.renku.redis.client.{QueueName, RedisClientGenerators} +import io.renku.search.GeneratorSyntax.* +import io.renku.search.model.EntityType +import io.renku.search.model.ModelGenerators.projectMemberRoleGen +import io.renku.search.provision.QueueMessageDecoder +import io.renku.search.provision.project.ProjectSyntax +import io.renku.search.query.Query +import io.renku.search.query.Query.Segment +import io.renku.search.query.Query.Segment.typeIs +import io.renku.search.solr.client.SearchSolrSpec +import io.renku.search.solr.client.SolrDocumentGenerators.* +import io.renku.search.solr.documents.{Entity, User} +import munit.CatsEffectSuite + +import scala.concurrent.duration.* + +class UserRemovedProcessSpec + extends CatsEffectSuite + with QueueSpec + with SearchSolrSpec + with ProjectSyntax: + + private val avro = AvroIO(UserRemoved.SCHEMA$) + + test( + "can fetch events, decode them, and remove from Solr relevant User document " + + "and issue ProjectAuthorizationRemoved events for all affected projects" + ): + val userRemovedQueue = RedisClientGenerators.queueNameGen.generateOne + val authRemovedQueue = RedisClientGenerators.queueNameGen.generateOne + val messageDecoder = QueueMessageDecoder[IO, ProjectAuthorizationRemoved]( + ProjectAuthorizationRemoved.SCHEMA$ + ) + + clientsAndProvisioning(userRemovedQueue, authRemovedQueue).use { + case (queueClient, solrClient, provisioner) => + for + solrDoc <- SignallingRef.of[IO, Option[Entity]](None) + authRemovalEvents <- SignallingRef.of[IO, Set[ProjectAuthorizationRemoved]]( + Set.empty + ) + + provisioningFiber <- provisioner.removalProcess.start + + user = userDocumentGen.generateOne + affectedProjects = projectCreatedGen("affected") + .map(_.toSolrDocument.addMember(user.id, projectMemberRoleGen.generateOne)) + .generateList(min = 20, max = 25) + notAffectedProject = projectCreatedGen( + "not-affected" + ).generateOne.toSolrDocument + _ <- solrClient.insert(user :: notAffectedProject :: affectedProjects) + + docsCollectorFiber <- + Stream + .awakeEvery[IO](500 millis) + .evalMap(_ => solrClient.findById[User](user.id)) + .evalMap(e => solrDoc.update(_ => e)) + .compile + .drain + .start + + _ <- solrDoc.waitUntil(_.nonEmpty) + + eventsCollectorFiber <- + queueClient + .acquireEventsStream(authRemovedQueue, 1, None) + .evalMap(messageDecoder.decodeMessage) + .evalMap(e => authRemovalEvents.update(_ ++ e)) + .compile + .drain + .start + + _ <- queueClient.enqueue( + userRemovedQueue, + messageHeaderGen(UserRemoved.SCHEMA$).generateOne, + UserRemoved(user.id.value) + ) + + _ <- solrDoc.waitUntil(_.isEmpty) + + _ <- authRemovalEvents.waitUntil( + _ == affectedProjects + .map(ap => ProjectAuthorizationRemoved(ap.id.value, user.id.value)) + .toSet + ) + + _ <- provisioningFiber.cancel + _ <- docsCollectorFiber.cancel + _ <- eventsCollectorFiber.cancel + yield () + } + + private lazy val queryProjects = Query(typeIs(EntityType.Project)) + + private def clientsAndProvisioning( + userRemovedQueue: QueueName, + authRemovedQueue: QueueName + ) = + (withQueueClient() >>= withSearchSolrClient().tupleLeft) + .flatMap { case (rc, sc) => + UserRemovedProcess + .make[IO]( + userRemovedQueue, + authRemovedQueue, + withRedisClient.redisConfig, + withSearchSolrClient.solrConfig + ) + .map((rc, sc, _)) + } + + override def munitFixtures: Seq[Fixture[_]] = + List(withRedisClient, withQueueClient, withSearchSolrClient) diff --git a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala index 8e913687..998e0afa 100644 --- a/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala +++ b/modules/search-provision/src/test/scala/io/renku/search/provision/user/UserUpdatedProvisioningSpec.scala @@ -31,13 +31,9 @@ import io.renku.queue.client.QueueSpec import io.renku.redis.client.RedisClientGenerators.* import io.renku.redis.client.{QueueName, RedisClientGenerators} import io.renku.search.GeneratorSyntax.* -import io.renku.search.model.EntityType -import io.renku.search.query.Query -import io.renku.search.query.Query.Segment -import io.renku.search.query.Query.Segment.typeIs +import io.renku.search.model.Id import io.renku.search.solr.client.SearchSolrSpec -import io.renku.search.solr.documents.EntityOps.* -import io.renku.search.solr.documents.Entity +import io.renku.search.solr.documents.{Entity, User} import munit.CatsEffectSuite import scala.concurrent.duration.* @@ -57,11 +53,11 @@ class UserUpdatedProvisioningSpec clientsAndProvisioning(queue).use { case (queueClient, solrClient, provisioner) => for - solrDocs <- SignallingRef.of[IO, Set[Entity]](Set.empty) + solrDoc <- SignallingRef.of[IO, Option[Entity]](None) provisioningFiber <- provisioner.provisioningProcess.start - userAdded = userAddedGen(prefix = "update").generateOne + userAdded = userAddedGen(prefix = "user-update").generateOne _ <- solrClient.insert(Seq(userAdded.toSolrDocument.widen)) userUpdated = updateF(userAdded) @@ -74,14 +70,13 @@ class UserUpdatedProvisioningSpec docsCollectorFiber <- Stream .awakeEvery[IO](500 millis) - .evalMap(_ => solrClient.queryEntity(queryUsers, 10, 0)) - .flatMap(qr => Stream.emits(qr.responseBody.docs)) - .evalMap(e => solrDocs.update(_ + e.noneScore)) + .evalMap(_ => solrClient.findById[User](Id(userAdded.id))) + .evalMap(e => solrDoc.update(_ => e)) .compile .drain .start - _ <- solrDocs.waitUntil( + _ <- solrDoc.waitUntil( _ contains userAdded.update(userUpdated).toSolrDocument ) @@ -91,8 +86,6 @@ class UserUpdatedProvisioningSpec } } - private lazy val queryUsers = Query(typeIs(EntityType.User)) - private def clientsAndProvisioning(queueName: QueueName) = (withQueueClient() >>= withSearchSolrClient().tupleLeft) .flatMap { case (rc, sc) => diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala index 3658e606..f01f428a 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClient.scala @@ -18,21 +18,23 @@ package io.renku.search.solr.client +import cats.data.NonEmptyList import cats.effect.{Async, Resource} import fs2.io.net.Network -import io.bullet.borer.Encoder +import io.bullet.borer.{Decoder, Encoder} +import io.renku.search.model.Id import io.renku.search.query.Query -import io.renku.search.solr.documents.{DocumentId, Entity} -import io.renku.solr.client.{QueryResponse, SolrClient, SolrConfig} -import cats.data.NonEmptyList +import io.renku.search.solr.documents.Entity +import io.renku.solr.client.{QueryData, QueryResponse, SolrClient, SolrConfig} import scala.reflect.ClassTag trait SearchSolrClient[F[_]]: - def findById[D <: Entity](id: String)(using ct: ClassTag[D]): F[Option[D]] + def findById[D <: Entity](id: Id)(using ct: ClassTag[D]): F[Option[D]] def insert[D: Encoder](documents: Seq[D]): F[Unit] - def deleteIds(ids: NonEmptyList[DocumentId]): F[Unit] + def deleteIds(ids: NonEmptyList[Id]): F[Unit] def queryEntity(query: Query, limit: Int, offset: Int): F[QueryResponse[Entity]] + def query[D: Decoder](query: QueryData): F[QueryResponse[D]] object SearchSolrClient: def make[F[_]: Async: Network]( diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala index a3c0cfcd..1dd269b7 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/client/SearchSolrClientImpl.scala @@ -18,17 +18,18 @@ package io.renku.search.solr.client +import cats.data.NonEmptyList import cats.effect.Async import cats.syntax.all.* -import io.bullet.borer.Encoder +import io.bullet.borer.{Decoder, Encoder} +import io.renku.search.model.Id import io.renku.search.query.Query -import io.renku.search.solr.documents.{DocumentId, Entity} +import io.renku.search.solr.documents.Entity import io.renku.search.solr.query.LuceneQueryInterpreter -import io.renku.solr.client.{QueryData, QueryResponse, QueryString, SolrClient} -import cats.data.NonEmptyList -import io.renku.solr.client.schema.FieldName -import io.renku.solr.client.facet.{Facet, Facets} import io.renku.search.solr.schema.EntityDocumentSchema +import io.renku.solr.client.facet.{Facet, Facets} +import io.renku.solr.client.schema.FieldName +import io.renku.solr.client.{QueryData, QueryResponse, QueryString, SolrClient} import scala.reflect.ClassTag @@ -46,8 +47,8 @@ private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) override def insert[D: Encoder](documents: Seq[D]): F[Unit] = solrClient.insert(documents).void - override def deleteIds(ids: NonEmptyList[DocumentId]): F[Unit] = - solrClient.deleteIds(ids.map(_.name)).void + override def deleteIds(ids: NonEmptyList[Id]): F[Unit] = + solrClient.deleteIds(ids.map(_.value)).void override def queryEntity( query: Query, @@ -66,8 +67,11 @@ private class SearchSolrClientImpl[F[_]: Async](solrClient: SolrClient[F]) ) } yield res - override def findById[D <: Entity](id: String)(using ct: ClassTag[D]): F[Option[D]] = - solrClient.findById[Entity](id).map(_.responseBody.docs.headOption).flatMap { + override def query[D: Decoder](query: QueryData): F[QueryResponse[D]] = + solrClient.query[D](query) + + override def findById[D <: Entity](id: Id)(using ct: ClassTag[D]): F[Option[D]] = + solrClient.findById[Entity](id.value).map(_.responseBody.docs.headOption) >>= { case Some(e: D) => Some(e).pure[F] case Some(e) => new Exception(s"Entity '$id' is of type ${e.getClass} not ${ct.runtimeClass}") diff --git a/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala index ecd245e4..c354e28b 100644 --- a/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala +++ b/modules/search-solr-client/src/main/scala/io/renku/search/solr/documents/Entity.scala @@ -18,18 +18,13 @@ package io.renku.search.solr.documents -import io.bullet.borer._ +import io.bullet.borer.* import io.bullet.borer.derivation.MapBasedCodecs.* -import io.renku.search.model._ +import io.renku.search.model.* import io.renku.search.model.projects.MemberRole import io.renku.search.model.projects.MemberRole.{Member, Owner} import io.renku.solr.client.EncoderSupport.* -opaque type DocumentId = String -object DocumentId: - def apply(v: String): DocumentId = v - extension (self: DocumentId) def name: String = self - sealed trait Entity: val score: Option[Double] val id: Id @@ -70,7 +65,7 @@ final case class Project( copy(owners = owners.filterNot(_ == userId), members = members.filterNot(_ == userId)) object Project: - val entityType: String = "Project" + val entityType: String = "project" final case class User( id: Id, @@ -82,7 +77,7 @@ final case class User( ) extends Entity object User: - val entityType: String = "User" + val entityType: String = "user" def nameFrom(firstName: Option[String], lastName: Option[String]): Option[Name] = Option(List(firstName, lastName).flatten.mkString(" ")) diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala index 6c6438e2..22221a3d 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SearchSolrClientSpec.scala @@ -20,12 +20,16 @@ package io.renku.search.solr.client import cats.effect.IO import cats.syntax.all.* +import io.bullet.borer.Decoder +import io.bullet.borer.derivation.MapBasedCodecs.deriveDecoder import io.renku.search.GeneratorSyntax.* import io.renku.search.model.users import io.renku.search.query.Query import io.renku.search.solr.client.SolrDocumentGenerators.* import io.renku.search.solr.documents.EntityOps.* import io.renku.search.solr.documents.{Entity, Project, User} +import io.renku.search.solr.schema.EntityDocumentSchema.Fields +import io.renku.solr.client.QueryData import munit.CatsEffectSuite class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: @@ -38,7 +42,7 @@ class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: _ <- client.insert(Seq(project.widen)) qr <- client.queryEntity(Query.parse("solr").toOption.get, 10, 0) _ = assert(qr.responseBody.docs.map(_.noneScore) contains project) - gr <- client.findById[Project](project.id.value) + gr <- client.findById[Project](project.id) _ = assert(gr contains project) } yield () } @@ -51,7 +55,27 @@ class SearchSolrClientSpec extends CatsEffectSuite with SearchSolrSpec: _ <- client.insert(Seq(user.widen)) qr <- client.queryEntity(Query.parse(firstName.value).toOption.get, 10, 0) _ = assert(qr.responseBody.docs.map(_.noneScore) contains user) - gr <- client.findById[User](user.id.value) + gr <- client.findById[User](user.id) _ = assert(gr contains user) } yield () } + + test("be able to find by the given query"): + withSearchSolrClient().use { client => + val firstName = users.FirstName("Ian") + val user = userDocumentGen.generateOne.copy(firstName = firstName.some) + case class UserId(id: String) + given Decoder[UserId] = deriveDecoder[UserId] + for { + _ <- client.insert(Seq(user.widen)) + gr <- client.query[UserId]( + QueryData( + s"firstName:$firstName", + filter = Seq.empty, + limit = 100, + offset = 0 + ).withFields(Fields.id) + ) + _ = assert(gr.responseBody.docs.map(_.id) contains user.id.value) + } yield () + } diff --git a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SolrDocumentGenerators.scala b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SolrDocumentGenerators.scala index a6129e6c..d0f20bac 100644 --- a/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SolrDocumentGenerators.scala +++ b/modules/search-solr-client/src/test/scala/io/renku/search/solr/client/SolrDocumentGenerators.scala @@ -19,23 +19,25 @@ package io.renku.search.solr.client import cats.syntax.all.* - import io.renku.search.GeneratorSyntax.* -import io.renku.search.model.ModelGenerators.* import io.renku.search.model.* +import io.renku.search.model.ModelGenerators.* import io.renku.search.solr.documents.* import org.scalacheck.Gen import org.scalacheck.cats.implicits.* -object SolrDocumentGenerators: +object SolrDocumentGenerators extends SolrDocumentGenerators + +trait SolrDocumentGenerators: private def projectIdGen: Gen[Id] = Gen.uuid.map(uuid => Id(uuid.toString)) val projectDocumentGen: Gen[Project] = + val differentiator = nameGen.generateOne projectDocumentGen( - s"proj-${nameGen.generateOne}", - s"proj desc ${projectDescGen.generateOne}" + s"proj-$differentiator", + s"proj desc $differentiator" ) def projectDocumentGen(name: String, desc: String): Gen[Project] =