Skip to content

Commit

Permalink
feat: support for the UserRemoved event (#56)
Browse files Browse the repository at this point in the history
* refactor: Project Removal process not to group events

* feat: support for UserRemoved events

* refactor: using model.Id wherever possible; DocumentId removed
  • Loading branch information
jachro authored Mar 13, 2024
1 parent f504f33 commit 0063e91
Show file tree
Hide file tree
Showing 27 changed files with 466 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ trait GeneratorSyntax:

def generateList: List[A] = Gen.listOf(self).generateOne

def generateList(min: Int, max: Int): List[A] =
Gen.choose(min, max).flatMap(Gen.listOfN(_, self)).generateOne

def stream: Stream[Gen, A] =
Stream.repeatEval(self)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ object Microservice extends IOApp:
(
"ProjectRemoved",
cfg.queuesConfig.projectRemoved,
ProjectRemovedProvisioning
ProjectRemovedProcess
.make[IO](cfg.queuesConfig.projectRemoved, cfg.redisConfig, cfg.solrConfig)
.map(_.removalProcess.start)
),
Expand Down Expand Up @@ -111,6 +111,18 @@ object Microservice extends IOApp:
UserUpdatedProvisioning
.make[IO](cfg.queuesConfig.userUpdated, cfg.redisConfig, cfg.solrConfig)
.map(_.provisioningProcess.start)
),
(
"UserRemoved",
cfg.queuesConfig.userRemoved,
UserRemovedProcess
.make[IO](
cfg.queuesConfig.userRemoved,
cfg.queuesConfig.projectAuthorizationRemoved,
cfg.redisConfig,
cfg.solrConfig
)
.map(_.removalProcess.start)
)
).parTraverse_(startProcess(cfg))
.flatMap(_ => IO.never)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision

import io.renku.queue.client.{QueueClient, RequestId}
import io.renku.search.solr.client.SearchSolrClient

private trait OnSolrPersist[F[_], In]:
def execute(in: In, requestId: RequestId)(
queueClient: QueueClient[F],
solrClient: SearchSolrClient[F]
): F[Unit]
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ final case class QueuesConfig(
projectAuthorizationUpdated: QueueName,
projectAuthorizationRemoved: QueueName,
userAdded: QueueName,
userUpdated: QueueName
userUpdated: QueueName,
userRemoved: QueueName
)

object QueuesConfig:
Expand All @@ -44,5 +45,6 @@ object QueuesConfig:
ConfigValues.eventQueue("projectAuthorizationUpdated"),
ConfigValues.eventQueue("projectAuthorizationRemoved"),
ConfigValues.eventQueue("userAdded"),
ConfigValues.eventQueue("userUpdated")
ConfigValues.eventQueue("userUpdated"),
ConfigValues.eventQueue("userRemoved")
).mapN(QueuesConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -18,58 +18,58 @@

package io.renku.search.provision

import cats.Show
import cats.data.NonEmptyList
import cats.effect.{Async, Resource, Temporal}
import cats.syntax.all.*
import cats.Show
import fs2.Chunk
import fs2.io.net.Network
import io.github.arainko.ducktape.*
import io.renku.avro.codec.{AvroDecoder, AvroReader}
import io.renku.queue.client.{QueueClient, QueueMessage}
import io.renku.avro.codec.AvroDecoder
import io.renku.queue.client.{QueueClient, QueueMessage, RequestId}
import io.renku.redis.client.{ClientId, QueueName, RedisConfig}
import io.renku.search.model.Id
import io.renku.search.solr.client.SearchSolrClient
import io.renku.solr.client.SolrConfig
import org.apache.avro.Schema
import scribe.Scribe
import io.renku.search.solr.documents.DocumentId

import scala.concurrent.duration.*
import cats.data.NonEmptyList

trait SolrRemovalProcess[F[_]]:
def removalProcess: F[Unit]

object SolrRemovalProcess:
private val clientId: ClientId = ClientId("search-provisioner")

def make[F[_]: Async: Network: Scribe, In](
queueName: QueueName,
inSchema: Schema,
redisConfig: RedisConfig,
solrConfig: SolrConfig
solrConfig: SolrConfig,
onSolrPersist: Option[OnSolrPersist[F, In]]
)(using
Show[In],
Transformer[In, DocumentId],
Transformer[In, Id],
AvroDecoder[In]
): Resource[F, SolrRemovalProcess[F]] =
SearchSolrClient.make[F](solrConfig).map {
new SolrRemovalProcessImpl[F, In](
queueName,
inSchema,
clientId,
ProvisioningProcess.clientId,
QueueClient.make[F](redisConfig),
_,
QueueMessageDecoder[F, In](inSchema)
QueueMessageDecoder[F, In](inSchema),
onSolrPersist
)
}

private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In](
queueName: QueueName,
inSchema: Schema,
clientId: ClientId,
queueClientResource: Resource[F, QueueClient[F]],
solrClient: SearchSolrClient[F],
messageDecoder: QueueMessageDecoder[F, In]
)(using Show[In], Transformer[In, DocumentId], AvroDecoder[In])
messageDecoder: QueueMessageDecoder[F, In],
onSolrPersist: Option[OnSolrPersist[F, In]]
)(using Show[In], Transformer[In, Id], AvroDecoder[In])
extends SolrRemovalProcess[F]:
override def removalProcess: F[Unit] =
queueClientResource
Expand All @@ -79,7 +79,6 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In](
.acquireEventsStream(queueName, chunkSize = 1, maybeLastProcessed)
.evalMap(decodeMessage(queueClient))
.evalTap(logInfo)
.groupWithin(chunkSize = 10, timeout = 500 millis)
.evalMap(deleteFromSolr(queueClient))
.compile
.drain
Expand All @@ -93,16 +92,14 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In](

private lazy val logInfo: ((QueueMessage, Seq[In])) => F[Unit] = { case (m, v) =>
Scribe[F].info(
"Received mesage " + s"queue: $queueName, " +
s"Received message queue: $queueName, " +
s"id: ${m.id}, " +
s"source: ${m.header.source}, " +
s"type: ${m.header.`type`} " +
s"for: ${v.mkString_(", ")}"
)
}

private val avro = AvroReader(inSchema)

private def decodeMessage(queueClient: QueueClient[F])(
message: QueueMessage
): F[(QueueMessage, Seq[In])] =
Expand All @@ -113,24 +110,27 @@ private class SolrRemovalProcessImpl[F[_]: Async: Scribe, In](

private def deleteFromSolr(
queueClient: QueueClient[F]
)(chunk: Chunk[(QueueMessage, Seq[In])]): F[Unit] =
chunk.toList match {
case Nil => ().pure[F]
case tuples =>
val ids = toDocumentIds(tuples.flatMap(_._2))
ids match {
case Some(ids) =>
val (lastMessage, _) = tuples.last
solrClient
.deleteIds(ids)
.flatMap(_ => markProcessed(lastMessage, queueClient))
.onError(markProcessedOnFailure(lastMessage, queueClient))
case None => ().pure[F]
}
): ((QueueMessage, Seq[In])) => F[Unit] = { case (message, ins) =>
toDocumentIds(ins).fold(().pure[F]) { ids =>
(solrClient.deleteIds(ids) >> onPersist(queueClient, message, ins))
.flatMap(_ => markProcessed(message, queueClient))
.onError(markProcessedOnFailure(message, queueClient))
}
}

private def onPersist(
queueClient: QueueClient[F],
message: QueueMessage,
ins: Seq[In]
) =
onSolrPersist.fold(().pure[F]) { p =>
ins.toList.traverse_(
p.execute(_, RequestId(message.header.requestId))(queueClient, solrClient)
)
}

private lazy val toDocumentIds: Seq[In] => Option[NonEmptyList[DocumentId]] =
_.map(_.to[DocumentId]).toList.toNel
private lazy val toDocumentIds: Seq[In] => Option[NonEmptyList[Id]] =
_.map(_.to[Id]).toList.toNel

private def markProcessedOnFailure(
message: QueueMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.bullet.borer.Codec
import io.renku.avro.codec.AvroDecoder
import io.renku.queue.client.{QueueClient, QueueMessage}
import io.renku.redis.client.{ClientId, QueueName, RedisConfig}
import io.renku.search.model.Id
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.Entity
import io.renku.solr.client.SolrConfig
Expand All @@ -42,7 +43,7 @@ object UpdateProvisioningProcess:
def make[F[_]: Async: Network: Scribe, In, Out <: Entity](
queueName: QueueName,
inSchema: Schema,
idExtractor: In => String,
idExtractor: In => Id,
docUpdate: ((In, Out)) => Out,
redisConfig: RedisConfig,
solrConfig: SolrConfig
Expand All @@ -67,7 +68,7 @@ object UpdateProvisioningProcess:
private class UpdateProvisioningProcessImpl[F[_]: Async: Scribe, In, Out <: Entity](
queueName: QueueName,
clientId: ClientId,
idExtractor: In => String,
idExtractor: In => Id,
docUpdate: ((In, Out)) => Out,
queueClientResource: Resource[F, QueueClient[F]],
solrClient: SearchSolrClient[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package project
import cats.Show
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.renku.avro.codec.decoders.all.given
import io.renku.events.v1.ProjectAuthorizationAdded
import io.renku.redis.client.{QueueName, RedisConfig}
Expand Down Expand Up @@ -55,7 +54,8 @@ object AuthorizationAddedProvisioning:
s"projectId '${v.projectId}', userId '${v.userId}', role '${v.role}'"
)

private lazy val idExtractor: ProjectAuthorizationAdded => String = _.projectId
private lazy val idExtractor: ProjectAuthorizationAdded => Id =
paa => Id(paa.projectId)

private lazy val docUpdate
: ((ProjectAuthorizationAdded, documents.Project)) => documents.Project = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@ package io.renku.search.provision
package project

import cats.Show
import cats.effect.Async
import cats.effect.Resource
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.renku.avro.codec.decoders.all.given
import io.renku.events.v1.ProjectAuthorizationRemoved
import io.renku.redis.client.QueueName
import io.renku.redis.client.RedisConfig
import io.renku.redis.client.{QueueName, RedisConfig}
import io.renku.search.model.Id
import io.renku.search.solr.documents
import io.renku.solr.client.SolrConfig
Expand Down Expand Up @@ -56,7 +53,8 @@ object AuthorizationRemovedProvisioning:
s"projectId '${v.projectId}', userId '${v.userId}'"
)

private lazy val idExtractor: ProjectAuthorizationRemoved => String = _.projectId
private lazy val idExtractor: ProjectAuthorizationRemoved => Id = par =>
Id(par.projectId)

private lazy val docUpdate
: ((ProjectAuthorizationRemoved, documents.Project)) => documents.Project = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package project
import cats.Show
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.renku.avro.codec.decoders.all.given
import io.renku.events.v1.ProjectAuthorizationUpdated
import io.renku.redis.client.{QueueName, RedisConfig}
Expand Down Expand Up @@ -55,7 +54,8 @@ object AuthorizationUpdatedProvisioning:
s"projectId '${v.projectId}', userId '${v.userId}', role '${v.role}'"
)

private lazy val idExtractor: ProjectAuthorizationUpdated => String = _.projectId
private lazy val idExtractor: ProjectAuthorizationUpdated => Id =
pau => Id(pau.projectId)

private lazy val docUpdate
: ((ProjectAuthorizationUpdated, documents.Project)) => documents.Project = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,17 @@ import cats.Show
import cats.effect.{Async, Resource}
import cats.syntax.all.*
import fs2.io.net.Network

import io.github.arainko.ducktape.*
import io.renku.avro.codec.decoders.all.given
import io.renku.events.v1
import io.renku.events.v1.ProjectRemoved
import io.renku.redis.client.{QueueName, RedisConfig}
import io.renku.search.model.Id
import io.renku.search.provision.SolrRemovalProcess
import io.renku.search.solr.documents.DocumentId
import io.renku.solr.client.SolrConfig
import scribe.Scribe

trait ProjectRemovedProvisioning[F[_]] extends SolrRemovalProcess[F]

object ProjectRemovedProvisioning:
object ProjectRemovedProcess:

def make[F[_]: Async: Network](
queueName: QueueName,
Expand All @@ -47,12 +44,12 @@ object ProjectRemovedProvisioning:
queueName,
ProjectRemoved.SCHEMA$,
redisConfig,
solrConfig
solrConfig,
onSolrPersist = None
)

private given Show[ProjectRemoved] =
Show.show[ProjectRemoved](pr => show"slug '${pr.id}'")

private given Transformer[ProjectRemoved, DocumentId] =
// _.id.into[DocumentId].transform()
r => DocumentId(r.id)
private given Transformer[ProjectRemoved, Id] =
r => Id(r.id)
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ package project
import cats.Show
import cats.effect.{Async, Resource}
import fs2.io.net.Network

import io.github.arainko.ducktape.*
import io.renku.avro.codec.decoders.all.given
import io.renku.events.v1.ProjectUpdated
import io.renku.redis.client.{QueueName, RedisConfig}
import io.renku.search.model.Id
import io.renku.search.provision.TypeTransformers.given
import io.renku.search.solr.documents
import io.renku.solr.client.SolrConfig
Expand All @@ -52,7 +52,7 @@ object ProjectUpdatedProvisioning:
private given Show[ProjectUpdated] =
Show.show[ProjectUpdated](v => s"slug '${v.slug}'")

private lazy val idExtractor: ProjectUpdated => String = _.id
private lazy val idExtractor: ProjectUpdated => Id = pu => Id(pu.id)

private lazy val docUpdate
: ((ProjectUpdated, documents.Project)) => documents.Project = {
Expand Down
Loading

0 comments on commit 0063e91

Please sign in to comment.