Skip to content

Commit

Permalink
feat: support for UserUpdated events (#52)
Browse files Browse the repository at this point in the history
* refactor: SolrProvisioningProcess renamed to UpsertProvisioningProcess

* feat: support for the UserUpdated events

* feat: support for the ProjectUpdated events
  • Loading branch information
jachro authored Mar 11, 2024
1 parent ad8c1cc commit 6e7395e
Show file tree
Hide file tree
Showing 26 changed files with 910 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

package io.renku.search

import fs2.Stream
import cats.arrow.FunctionK
import cats.effect.IO
import fs2.Stream
import org.scalacheck.Gen
import cats.arrow.FunctionK

trait GeneratorSyntax:

Expand All @@ -34,6 +34,8 @@ trait GeneratorSyntax:

def generateSome: Option[A] = Some(generateOne)

def generateList: List[A] = Gen.listOf(self).generateOne

def stream: Stream[Gen, A] =
Stream.repeatEval(self)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ package io.renku.events

import io.renku.events.v1.{ProjectCreated, UserAdded, Visibility}
import org.scalacheck.Gen
import org.scalacheck.Gen.alphaNumChar
import org.scalacheck.Gen.{alphaChar, alphaNumChar}

import java.time.Instant
import java.time.temporal.ChronoUnit

object EventsGenerators:

val projectVisibilityGen: Gen[Visibility] = Gen.oneOf(Visibility.values().toList)

def projectCreatedGen(prefix: String): Gen[ProjectCreated] =
for
id <- Gen.uuid.map(_.toString)
name <- stringGen(max = 5).map(v => s"$prefix-$v")
repositories <- Gen.listOfN(Gen.choose(1, 3).generateOne, stringGen(10))
visibility <- Gen.oneOf(Visibility.values().toList)
repositoriesCount <- Gen.choose(1, 3)
repositories <- Gen.listOfN(repositoriesCount, stringGen(10))
visibility <- projectVisibilityGen
maybeDesc <- Gen.option(stringGen(20))
creator <- Gen.uuid.map(_.toString)
yield ProjectCreated(
Expand All @@ -49,8 +52,8 @@ object EventsGenerators:
def userAddedGen(prefix: String): Gen[UserAdded] =
for
id <- Gen.uuid.map(_.toString)
firstName <- Gen.option(stringGen(max = 5).map(v => s"$prefix-$v"))
lastName <- stringGen(max = 5).map(v => s"$prefix-$v")
firstName <- Gen.option(alphaStringGen(max = 5).map(v => s"$prefix-$v"))
lastName <- alphaStringGen(max = 5).map(v => s"$prefix-$v")
email <- Gen.option(stringGen(max = 5).map(host => s"$lastName@$host.com"))
yield UserAdded(
id,
Expand All @@ -64,4 +67,7 @@ object EventsGenerators:
.chooseNum(3, max)
.flatMap(Gen.stringOfN(_, alphaNumChar))

extension [V](gen: Gen[V]) def generateOne: V = gen.sample.getOrElse(generateOne)
def alphaStringGen(max: Int): Gen[String] =
Gen
.chooseNum(3, max)
.flatMap(Gen.stringOfN(_, alphaChar))
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalacheck.Gen.{alphaLowerChar, alphaNumChar}

object RedisClientGenerators:

val stringGen: Gen[String] =
private val stringGen: Gen[String] =
Gen
.chooseNum(3, 10)
.flatMap(Gen.stringOfN(_, alphaLowerChar))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,7 @@ import io.renku.search.model.users
import io.renku.search.query.Query
import io.renku.search.solr.client.SearchSolrClientGenerators.*
import io.renku.search.solr.client.SearchSolrSpec
import io.renku.search.solr.documents.Project.given
import io.renku.search.solr.documents.{
Entity as SolrEntity,
Project as SolrProject,
User as SolrUser
}
import io.renku.search.solr.documents.{Entity as SolrEntity, User as SolrUser}
import munit.CatsEffectSuite
import scribe.Scribe

Expand All @@ -46,7 +41,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec:
val project2 = projectDocumentGen("disparate", "disparate description").generateOne
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- client.insert(project1 :: project2 :: Nil)
_ <- client.insert((project1 :: project2 :: Nil).map(_.widen))
results <- searchApi
.query(mkQuery("matching"))
.map(_.fold(err => fail(s"Calling Search API failed with $err"), identity))
Expand All @@ -61,8 +56,7 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec:
val user = SolrUser(project.createdBy, users.FirstName("exclusive").some)
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- client.insert(project :: Nil)
_ <- client.insert(user :: Nil)
_ <- client.insert(project :: user :: Nil)
results <- searchApi
.query(mkQuery("exclusive"))
.map(_.fold(err => fail(s"Calling Search API failed with $err"), identity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import cats.effect.*
import cats.syntax.all.*
import io.renku.logging.LoggingSetup
import io.renku.redis.client.QueueName
import io.renku.search.provision.project.ProjectCreatedProvisioning
import io.renku.search.provision.user.UserAddedProvisioning
import io.renku.search.provision.project.*
import io.renku.search.provision.user.*
import io.renku.search.solr.schema.Migrations
import io.renku.solr.client.migration.SchemaMigrator
import scribe.Scribe
Expand All @@ -50,18 +50,30 @@ object Microservice extends IOApp:
ProjectCreatedProvisioning
.make[IO](cfg.queuesConfig.projectCreated, cfg.redisConfig, cfg.solrConfig)
),
(
"ProjectUpdated",
cfg.queuesConfig.projectUpdated,
ProjectUpdatedProvisioning
.make[IO](cfg.queuesConfig.projectUpdated, cfg.redisConfig, cfg.solrConfig)
),
(
"UserAdded",
cfg.queuesConfig.userAdded,
UserAddedProvisioning
.make[IO](cfg.queuesConfig.userAdded, cfg.redisConfig, cfg.solrConfig)
),
(
"UserUpdated",
cfg.queuesConfig.userUpdated,
UserUpdatedProvisioning
.make[IO](cfg.queuesConfig.userUpdated, cfg.redisConfig, cfg.solrConfig)
)
).parTraverse_(startProcess(cfg))
.flatMap(_ => IO.never)

private def startProcess(
cfg: SearchProvisionConfig
): ((String, QueueName, Resource[IO, SolrProvisioningProcess[IO]])) => IO[Unit] = {
): ((String, QueueName, Resource[IO, ProvisioningProcess[IO]])) => IO[Unit] = {
case t @ (name, queue, resource) =>
resource
.evalMap(_.provisioningProcess.start)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision

import io.renku.redis.client.ClientId

trait ProvisioningProcess[F[_]]:
def provisioningProcess: F[Unit]

object ProvisioningProcess:
val clientId: ClientId = ClientId("search-provisioner")
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision

import cats.MonadThrow
import cats.syntax.all.*
import io.renku.avro.codec.{AvroDecoder, AvroReader}
import io.renku.queue.client.{DataContentType, QueueMessage}
import org.apache.avro.Schema

private class QueueMessageDecoder[F[_]: MonadThrow, A](schema: Schema)(using
AvroDecoder[A]
):
private val avro = AvroReader(schema)

def decodeMessage(message: QueueMessage): F[Seq[A]] =
findContentType.andThenF(decodePayload(message))(message)

private def findContentType(message: QueueMessage): F[DataContentType] =
MonadThrow[F]
.fromEither(DataContentType.from(message.header.dataContentType))

private def decodePayload(message: QueueMessage): DataContentType => F[Seq[A]] = {
case DataContentType.Binary => catchNonFatal(avro.read[A](message.payload))
case DataContentType.Json => catchNonFatal(avro.readJson[A](message.payload))
}

private def catchNonFatal(f: => Seq[A]): F[Seq[A]] =
MonadThrow[F].catchNonFatal(f)
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ import io.renku.search.config.ConfigValues

final case class QueuesConfig(
projectCreated: QueueName,
userAdded: QueueName
projectUpdated: QueueName,
userAdded: QueueName,
userUpdated: QueueName
)

object QueuesConfig:
val config: ConfigValue[Effect, QueuesConfig] =
(
ConfigValues.eventQueue("projectCreated"),
ConfigValues.eventQueue("userAdded")
ConfigValues.eventQueue("projectUpdated"),
ConfigValues.eventQueue("userAdded"),
ConfigValues.eventQueue("userUpdated")
).mapN(QueuesConfig.apply)
Loading

0 comments on commit 6e7395e

Please sign in to comment.