Skip to content

Commit

Permalink
chore: Refactor email and provision (#58)
Browse files Browse the repository at this point in the history
- refactors message handler code: moved parts into their own building
  blocks. `MessageHandlers` is the entry point for defining all
  message handlers that can be composed from various utilities based
  on `fs2.Stream|Pipe`
- Remove the default `events` queue name. Now starting up fails, if
  queue names are not correctly configured
- Remove `email` from the user document so it's not stored in solr
- When updating documents, `score` must be unset. Could be better
  modelled, given this big pr already it's only reset now
- updates dev setup and adds more scripts for playing around
- fixes issue where searching doesn't return results for certain terms
  due to wrong casing
  • Loading branch information
eikek authored Mar 15, 2024
1 parent 6479690 commit a317f3e
Show file tree
Hide file tree
Showing 74 changed files with 1,873 additions and 1,721 deletions.
1 change: 0 additions & 1 deletion .envrc

This file was deleted.

1 change: 1 addition & 0 deletions .envrc-template
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use flake .#vm
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,6 @@ helm-chart/renku-graph/charts/*tgz
.DS_Store

.direnv/
.envrc
*.qcow2
.tmp
10 changes: 7 additions & 3 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
[
redis
jq
coreutils
scala-cli
devshellToolsPkgs.sbt17
devshellToolsPkgs.openapi-docs
Expand All @@ -49,11 +50,12 @@
"projectCreated"
"projectUpdated"
"projectRemoved"
"projectAuthorizationAdded"
"projectAuthorizationUpdated"
"projectAuthorizationRemoved"
"projectAuthAdded"
"projectAuthUpdated"
"projectAuthRemoved"
"userAdded"
"userUpdated"
"userRemoved"
];
queueNameConfig = builtins.listToAttrs (builtins.map (qn: {
name = "RS_REDIS_QUEUE_${qn}";
Expand Down Expand Up @@ -86,6 +88,8 @@
});
vm = pkgs.mkShellNoCC (queueNameConfig
// {
RS_SOLR_HOST = "localhost";
RS_SOLR_PORT = "18983";
RS_SOLR_URL = "http://localhost:18983/solr";
RS_SOLR_CORE = "rsdev-test";
RS_REDIS_HOST = "localhost";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum EntityType:
case Project
case User

def name: String = productPrefix.toLowerCase
def name: String = productPrefix

object EntityType:
def fromString(str: String): Either[String, EntityType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ object ConfigValues extends ConfigDecoders:
}

def eventQueue(eventType: String): ConfigValue[Effect, QueueName] =
renv(s"REDIS_QUEUE_$eventType").default("events").as[QueueName]
renv(s"REDIS_QUEUE_$eventType").as[QueueName]

val retryOnErrorDelay: ConfigValue[Effect, FiniteDuration] =
renv("RETRY_ON_ERROR_DELAY").default("2 seconds").as[FiniteDuration]
renv("RETRY_ON_ERROR_DELAY").default("10 seconds").as[FiniteDuration]

val solrConfig: ConfigValue[Effect, SolrConfig] = {
val url = renv("SOLR_URL").default("http://localhost:8983/solr").as[Uri]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package io.renku.queue.client

import cats.syntax.all.*
import io.renku.events.v1.Header
import org.apache.avro.Schema

import java.time.Instant
import java.time.temporal.ChronoUnit
import cats.effect.Clock
import cats.Functor

final case class MessageHeader(
source: MessageSource,
Expand Down Expand Up @@ -66,13 +69,17 @@ object MessageSource:

opaque type SchemaVersion = String
object SchemaVersion:
val V1: SchemaVersion = "V1"
def apply(v: String): SchemaVersion = v
extension (self: SchemaVersion) def value: String = self

opaque type CreationTime = Instant
object CreationTime:
def apply(v: Instant): CreationTime = v
def now: CreationTime = Instant.now().truncatedTo(ChronoUnit.MILLIS)
def nowF[F[_]: Clock: Functor]: F[CreationTime] =
Clock[F].realTimeInstant.map(_.truncatedTo(ChronoUnit.MILLIS))

extension (self: CreationTime) def value: Instant = self

opaque type RequestId = String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import io.renku.search.api.data.*
import io.renku.search.model.EntityType
import io.renku.search.model.Id
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.Entity as SolrEntity
import io.renku.search.solr.documents.EntityDocument
import io.renku.search.solr.schema.EntityDocumentSchema.Fields
import io.renku.solr.client.QueryResponse
import io.renku.solr.client.facet.FacetResponse
Expand Down Expand Up @@ -58,7 +58,7 @@ private class SearchApiImpl[F[_]: Async](solrClient: SearchSolrClient[F])
.map(_.asLeft[SearchResult])

private def toApiResult(currentPage: PageDef)(
solrResult: QueryResponse[SolrEntity]
solrResult: QueryResponse[EntityDocument]
): SearchResult =
val hasMore = solrResult.responseBody.docs.size > currentPage.limit
val pageInfo = PageWithTotals(currentPage, solrResult.responseBody.numFound, hasMore)
Expand All @@ -76,6 +76,6 @@ private class SearchApiImpl[F[_]: Async](solrClient: SearchSolrClient[F])
if (hasMore) SearchResult(items.init, facets, pageInfo)
else SearchResult(items, facets, pageInfo)

private lazy val toApiEntity: SolrEntity => SearchEntity =
private lazy val toApiEntity: EntityDocument => SearchEntity =
given Transformer[Id, UserId] = (id: Id) => UserId(id)
_.to[SearchEntity]
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ final case class User(
id: Id,
firstName: Option[users.FirstName] = None,
lastName: Option[users.LastName] = None,
email: Option[users.Email] = None,
score: Option[Double] = None
) extends SearchEntity

Expand All @@ -98,7 +97,6 @@ object User:
Id("1CAF4C73F50D4514A041C9EDDB025A36"),
Some(users.FirstName("Albert")),
Some(users.LastName("Einstein")),
Some(users.Email("[email protected]")),
Some(2.1)
): SearchEntity
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.renku.search.model.users.FirstName
import io.renku.search.query.Query
import io.renku.search.solr.client.SearchSolrSpec
import io.renku.search.solr.client.SolrDocumentGenerators.*
import io.renku.search.solr.documents.{Entity as SolrEntity, User as SolrUser}
import io.renku.search.solr.documents.{EntityDocument, User as SolrUser}
import munit.CatsEffectSuite
import scribe.Scribe

Expand Down Expand Up @@ -74,8 +74,8 @@ class SearchApiSpec extends CatsEffectSuite with SearchSolrSpec:
private def mkQuery(phrase: String): QueryInput =
QueryInput.pageOne(Query.parse(s"Fields $phrase").fold(sys.error, identity))

private def toApiEntities(e: SolrEntity*) = e.map(toApiEntity)
private def toApiEntities(e: EntityDocument*) = e.map(toApiEntity)

private def toApiEntity(e: SolrEntity) =
private def toApiEntity(e: EntityDocument) =
given Transformer[Id, UserId] = (id: Id) => UserId(id)
e.to[SearchEntity]
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision

import cats.syntax.all.*
import cats.effect.*
import cats.effect.kernel.Fiber
import cats.effect.kernel.Ref
import scala.concurrent.duration.FiniteDuration

trait BackgroundProcessManage[F[_]]:
def register(name: String, process: F[Unit]): F[Unit]

/** Starts all registered tasks in the background, represented by `F[Unit]`. */
def background: Resource[F, F[Unit]]

/** Same as `.background.useForever` */
def startAll: F[Nothing]

object BackgroundProcessManage:
type Process[F[_]] = Fiber[F, Throwable, Unit]

private case class State[F[_]](tasks: Map[String, F[Unit]]):
def put(name: String, p: F[Unit]): State[F] =
State(tasks.updated(name, p))

def getTasks: List[F[Unit]] = tasks.values.toList

private object State:
def empty[F[_]]: State[F] = State[F](Map.empty)

def apply[F[_]: Async](
retryDelay: FiniteDuration,
maxRetries: Option[Int] = None
): F[BackgroundProcessManage[F]] =
val logger = scribe.cats.effect[F]
Ref.of[F, State[F]](State.empty[F]).map { state =>
new BackgroundProcessManage[F] {
def register(name: String, task: F[Unit]): F[Unit] =
state.update(_.put(name, wrapTask(name, task)))

def startAll: F[Nothing] =
state.get
.flatMap(s => logger.info(s"Starting ${s.tasks.size} background tasks")) >>
background.useForever

def background: Resource[F, F[Unit]] =
for {
ts <- Resource.eval(state.get.map(_.getTasks))
x <- ts.traverse(t => Async[F].background(t))
y = x.traverse_(_.map(_.embed(logger.info(s"Got cancelled"))))
} yield y

def wrapTask(name: String, task: F[Unit]): F[Unit] =
def run(c: Ref[F, Long]): F[Unit] =
logger.info(s"Starting process for: ${name}") >>
task.handleErrorWith { err =>
c.updateAndGet(_ + 1).flatMap {
case n if maxRetries.exists(_ <= n) =>
logger.error(
s"Max retries ($maxRetries) for process ${name} exceeded"
) >> Async[F].raiseError(err)
case n =>
val maxRetriesLabel = maxRetries.map(m => s"/$m").getOrElse("")
logger.error(
s"Starting process for '${name}' failed ($n$maxRetriesLabel), retrying",
err
) >> Async[F].delayBy(run(c), retryDelay)
}
}
Ref.of[F, Long](0).flatMap(run)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.provision

import cats.Show
import cats.effect.*
import fs2.Stream

import io.renku.events.v1.*
import io.renku.redis.client.QueueName
import io.renku.search.provision.handler.*
import io.renku.search.solr.documents.EntityDocument

/** The entry point for defining all message handlers.
*
* They are defined as vals to have them automatically added to a collection, to be
* easier accessed from the main method.
*/
final class MessageHandlers[F[_]: Async](
steps: QueueName => PipelineSteps[F],
cfg: QueuesConfig
) extends ShowInstances:
private[this] var tasks: Map[String, F[Unit]] = Map.empty
private[this] def add(queue: QueueName, task: Stream[F, Unit]): Stream[F, Unit] =
tasks = tasks.updated(queue.name, task.compile.drain)
task

def getAll: Map[String, F[Unit]] = tasks

val projectCreated: Stream[F, Unit] =
add(cfg.projectCreated, makeCreated[ProjectCreated](cfg.projectCreated))

val projectUpdated =
add(
cfg.projectUpdated,
makeUpdated[ProjectUpdated](cfg.projectUpdated, DocumentUpdates.project)
)

val projectRemoved: Stream[F, Unit] =
add(cfg.projectRemoved, makeRemovedSimple[ProjectRemoved](cfg.projectRemoved))

val projectAuthAdded: Stream[F, Unit] =
add(
cfg.projectAuthorizationAdded,
makeUpdated[ProjectAuthorizationAdded](
cfg.projectAuthorizationAdded,
DocumentUpdates.projectAuthAdded
)
)

val projectAuthUpdated: Stream[F, Unit] =
add(
cfg.projectAuthorizationUpdated,
makeUpdated[ProjectAuthorizationUpdated](
cfg.projectAuthorizationUpdated,
DocumentUpdates.projectAuthUpdated
)
)

val projectAuthRemoved: Stream[F, Unit] = add(
cfg.projectAuthorizationRemoved,
makeUpdated[ProjectAuthorizationRemoved](
cfg.projectAuthorizationRemoved,
DocumentUpdates.projectAuthRemoved
)
)

val userAdded: Stream[F, Unit] =
add(cfg.userAdded, makeCreated[UserAdded](cfg.userAdded))

val userUpdated =
add(cfg.userUpdated, makeUpdated[UserUpdated](cfg.userUpdated, DocumentUpdates.user))

val userRemoved =
val ps = steps(cfg.userRemoved)
add(
cfg.userRemoved,
ps.reader
.read[UserRemoved]
.through(ps.deleteFromSolr.tryDeleteAll)
.through(ps.deleteFromSolr.whenSuccess { msg =>
Stream
.emit(msg.map(IdExtractor[UserRemoved].getId))
.through(ps.userUtils.removeFromProjects)
.compile
.drain
})
)

private def makeCreated[A](queue: QueueName)(using
QueueMessageDecoder[F, A],
DocumentConverter[A],
Show[A]
): Stream[F, Unit] =
val ps = steps(queue)
ps.reader
.read[A]
.chunks
.through(ps.converter.convertChunk)
.through(ps.pushToSolr.pushChunk)

private def makeUpdated[A](
queue: QueueName,
docUpdate: (A, EntityDocument) => Option[EntityDocument]
)(using
QueueMessageDecoder[F, A],
Show[A],
IdExtractor[A]
): Stream[F, Unit] =
val ps = steps(queue)
ps.reader
.read[A]
.through(ps.fetchFromSolr.fetch1)
.map(_.update(docUpdate))
.through(ps.pushToSolr.push)

private def makeRemovedSimple[A](queue: QueueName)(using
QueueMessageDecoder[F, A],
Show[A],
IdExtractor[A]
): Stream[F, Unit] =
val ps = steps(queue)
ps.reader
.read[A]
.chunks
.through(ps.deleteFromSolr.deleteAll)
Loading

0 comments on commit a317f3e

Please sign in to comment.