Skip to content

Commit

Permalink
Merge pull request #112 from SwissDataScienceCenter/user-remove-proce…
Browse files Browse the repository at this point in the history
…ssing

fix: Refactor and fix user removed event process
  • Loading branch information
eikek authored May 8, 2024
2 parents 298fa0c + caac982 commit 7ceae1a
Show file tree
Hide file tree
Showing 21 changed files with 520 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import cats.data.NonEmptyList
import io.renku.avro.codec.AvroEncoder
import io.renku.avro.codec.all.given
import io.renku.events.{v1, v2}
import io.renku.search.model.{Id, Namespace}
import io.renku.search.model.{Id, Namespace, users}
import org.apache.avro.Schema

sealed trait UserUpdated extends RenkuEventPayload:
Expand All @@ -36,6 +36,23 @@ sealed trait UserUpdated extends RenkuEventPayload:
fold(_ => v1.UserUpdated.SCHEMA$, _ => v2.UserUpdated.SCHEMA$)

object UserUpdated:
def apply(
id: Id,
namespace: Namespace,
firstName: Option[users.FirstName],
lastName: Option[users.LastName],
email: Option[users.Email]
): UserUpdated =
V2(
v2.UserUpdated(
id.value,
firstName.map(_.value),
lastName.map(_.value),
email.map(_.value),
namespace.value
)
)

final case class V1(event: v1.UserUpdated) extends UserUpdated:
val id: Id = Id(event.id)
def withId(id: Id): UserUpdated = V1(event.copy(id = id.value))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,15 @@ trait CommonOpts:
given Argument[Keyword] =
Argument.readString.map(Keyword(_))

given Argument[users.FirstName] =
Argument.readString.map(users.FirstName(_))

given Argument[users.LastName] =
Argument.readString.map(users.LastName(_))

given Argument[users.Email] =
Argument.readString.map(users.Email(_))

val nameOpt: Opts[Name] =
Opts.option[Name]("name", "The name of the entity")

Expand Down Expand Up @@ -105,4 +114,13 @@ trait CommonOpts:
val projectDescription: Opts[Option[ProjectDescription]] =
Opts.option[ProjectDescription]("description", "The project description").orNone

val firstName: Opts[Option[users.FirstName]] =
Opts.option[users.FirstName]("first-name", "The first name").orNone

val lastName: Opts[Option[users.LastName]] =
Opts.option[users.LastName]("last-name", "The last name").orNone

val email: Opts[Option[users.Email]] =
Opts.option[users.Email]("email", "The email address").orNone

object CommonOpts extends CommonOpts
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,7 @@ object SearchCli

case SubCommands.Project(opts) =>
ProjectCmd(opts)

case SubCommands.User(opts) =>
UserCmd(opts)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum SubCommands:
case PerfTests(opts: PerfTestsConfig)
case Group(opts: GroupCmd.SubCmdOpts)
case Project(opts: ProjectCmd.SubCmdOpts)
case User(opts: UserCmd.SubCmdOpts)

private object SubCommands:

Expand All @@ -39,8 +40,12 @@ private object SubCommands:
private val projectOpts: Opts[ProjectCmd.SubCmdOpts] =
Opts.subcommand("project", "Manage project events")(ProjectCmd.opts)

private val userOpts: Opts[UserCmd.SubCmdOpts] =
Opts.subcommand("user", "Manage user events")(UserCmd.opts)

val opts: Opts[SubCommands] =
perfTestOpts
.map(SubCommands.PerfTests.apply)
.orElse(groupOpts.map(SubCommands.Group.apply))
.orElse(projectOpts.map(SubCommands.Project.apply))
.orElse(userOpts.map(SubCommands.User.apply))
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli

import cats.effect.*

import com.monovore.decline.Opts
import io.renku.search.cli.users.*

object UserCmd:

enum SubCmdOpts:
case Add(opts: AddCmd.Options)
case Update(opts: UpdateCmd.Options)
case Remove(opts: RemoveCmd.Options)

private val addOpts: Opts[AddCmd.Options] =
Opts.subcommand("add", "Add user")(AddCmd.opts)

private val updateOpts: Opts[UpdateCmd.Options] =
Opts.subcommand("update", "Update user")(UpdateCmd.opts)

private val removeOpts: Opts[RemoveCmd.Options] =
Opts.subcommand("remove", "Remove user")(RemoveCmd.opts)

val opts: Opts[SubCmdOpts] =
addOpts
.map(SubCmdOpts.Add.apply)
.orElse(updateOpts.map(SubCmdOpts.Update.apply))
.orElse(removeOpts.map(SubCmdOpts.Remove.apply))

def apply(opts: SubCmdOpts): IO[ExitCode] =
opts match
case SubCmdOpts.Add(c) => AddCmd(c)
case SubCmdOpts.Update(c) => UpdateCmd(c)
case SubCmdOpts.Remove(c) => RemoveCmd(c)
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli.users

import cats.effect.*
import cats.syntax.all.*

import com.monovore.decline.Opts
import io.renku.search.cli.{CommonOpts, Services}
import io.renku.search.config.QueuesConfig
import io.renku.search.events.UserAdded
import io.renku.search.model.*
import io.renku.search.model.users.*

object AddCmd:

final case class Options(
id: Id,
ns: Namespace,
first: Option[FirstName],
last: Option[LastName],
email: Option[Email]
):
def asPayload: UserAdded = UserAdded(id, ns, first, last, email)

val opts: Opts[Options] =
(
CommonOpts.idOpt,
CommonOpts.namespaceOpt,
CommonOpts.firstName,
CommonOpts.lastName,
CommonOpts.email
).mapN(Options.apply)

def apply(cfg: Options): IO[ExitCode] =
Services.queueClient.use { queue =>
for
queuesCfg <- QueuesConfig.config.load[IO]
msg <- Services.createMessage(cfg.asPayload)
_ <- queue.enqueue(queuesCfg.userAdded, msg)
yield ExitCode.Success
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli.users

import cats.effect.*
import cats.syntax.all.*

import com.monovore.decline.Opts
import io.renku.search.cli.{CommonOpts, Services}
import io.renku.search.config.QueuesConfig
import io.renku.search.events.UserRemoved
import io.renku.search.model.*

object RemoveCmd:

final case class Options(id: Id):
def asPayload: UserRemoved = UserRemoved(id)

val opts: Opts[Options] =
CommonOpts.idOpt.map(Options.apply)

def apply(cfg: Options): IO[ExitCode] =
Services.queueClient.use { queue =>
for
queuesCfg <- QueuesConfig.config.load[IO]
msg <- Services.createMessage(cfg.asPayload)
_ <- queue.enqueue(queuesCfg.userRemoved, msg)
yield ExitCode.Success
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.cli.users

import cats.effect.*
import cats.syntax.all.*

import com.monovore.decline.Opts
import io.renku.search.cli.{CommonOpts, Services}
import io.renku.search.config.QueuesConfig
import io.renku.search.events.UserUpdated
import io.renku.search.model.*
import io.renku.search.model.users.*

object UpdateCmd:

final case class Options(
id: Id,
ns: Namespace,
first: Option[FirstName],
last: Option[LastName],
email: Option[Email]
):
def asPayload: UserUpdated = UserUpdated(id, ns, first, last, email)

val opts: Opts[Options] =
(
CommonOpts.idOpt,
CommonOpts.namespaceOpt,
CommonOpts.firstName,
CommonOpts.lastName,
CommonOpts.email
).mapN(Options.apply)

def apply(cfg: Options): IO[ExitCode] =
Services.queueClient.use { queue =>
for
queuesCfg <- QueuesConfig.config.load[IO]
msg <- Services.createMessage(cfg.asPayload)
_ <- queue.enqueue(queuesCfg.userUpdated, msg)
yield ExitCode.Success
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,22 @@ final class MessageHandlers[F[_]: Async](
val userUpdated: Stream[F, Unit] =
add(cfg.userUpdated, makeUpsert[UserUpdated](cfg.userUpdated).drain)

val userRemoved: Stream[F, Unit] =
private[provision] def makeUserRemoved =
val ps = steps(cfg.userRemoved)
add(
cfg.userRemoved,
ps.reader
.readEvents[UserRemoved]
.map(EntityOrPartialMessage.noDocuments)
.through(ps.deleteFromSolr.tryDeleteAll)
.through(ps.deleteFromSolr.whenSuccess { msg =>
Stream
.emit(msg.message.map(_.id))
.through(ps.userUtils.removeFromProjects)
.compile
.drain
})
.drain
)
ps.reader
.readEvents[UserRemoved]
.map(EntityOrPartialMessage.noDocuments)
.through(ps.deleteFromSolr.tryDeleteAll)
.through(ps.deleteFromSolr.whenSuccess { msg =>
Stream
.emit(msg.message.map(_.id))
.through(ps.userUtils.removeFromMembers)
.compile
.drain
})

val userRemoved: Stream[F, Unit] =
add(cfg.userRemoved, makeUserRemoved.drain)

val groupAdded: Stream[F, Unit] =
add(cfg.groupAdded, makeUpsert[GroupAdded](cfg.groupAdded).drain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ object Services:
steps = PipelineSteps[F](
solr,
redis,
cfg.queuesConfig,
inChunkSize = 1,
cfg.clientId
inChunkSize = 1
)
handlers = MessageHandlers[F](steps, cfg.queuesConfig)
} yield Services(cfg, solr, redis, handlers)
Loading

0 comments on commit 7ceae1a

Please sign in to comment.