Skip to content

Commit

Permalink
feat: Use solr optimistic locking when updating (#87)
Browse files Browse the repository at this point in the history
- removes useless file `api.avdl` probably left from the beginnings
- moved a little away from using ducktape everywhere, because we had
  to write most things manually anyways (there were more
  `Field.default`, `Field.computed` additionals than what could be
  mapped automatically). It also makes it a bit less work to deal with
  specific cases when bringing in one value from an argument
- renamed `insert` to `upsert` for solr updates, because one never
  knows what it will be :-)
- put `DocVersion` into solr-client module, as this is very specific to solr
- refactor `MessageHandlers` to use the same basic `upsert` based
  operation, where `DocumentMerger` decides how to update/create
  entities in case they already exists or not. This has been splitted
  into multiple places before which have been removed
- refactor `MessageHandlers` so tests don't need to use multiple
  threads (events can be send and afterwards the handler stream can be
  run)
- The upsert in `MessageHandler` will retry on version conflict
- enhance `EncoderSupport` to also support borers `key` annotation for
  changing field names. this is used to have a nice named field
  `version` and map it to solrs required name `_version_`
- when a `ProjectUpdated` event comes first, it is stored as a
  `PartialEntityDocument` as well
  • Loading branch information
eikek authored Apr 16, 2024
1 parent d85de0f commit b697989
Show file tree
Hide file tree
Showing 59 changed files with 1,370 additions and 681 deletions.
10 changes: 0 additions & 10 deletions modules/events/src/main/avro/api.avdl

This file was deleted.

48 changes: 48 additions & 0 deletions modules/events/src/main/scala/io/renku/search/events/syntax.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2024 Swiss Data Science Center (SDSC)
* A partnership between École Polytechnique Fédérale de Lausanne (EPFL) and
* Eidgenössische Technische Hochschule Zürich (ETHZ).
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.renku.search.events

import io.renku.events.v1.{Visibility as EventVisibility, *}
import io.renku.search.model.{Id, Name}
import io.renku.search.model.projects.*
import java.time.Instant
import io.renku.search.model.users.FirstName
import io.renku.search.model.users.LastName

trait syntax:
extension (self: EventVisibility)
def toModel: Visibility =
Visibility.unsafeFromString(self.name())

extension (self: ProjectMemberRole)
def toModel: MemberRole =
MemberRole.unsafeFromString(self.name())

extension (self: String)
def toId: Id = Id(self)
def toName: Name = Name(self)
def toSlug: Slug = Slug(self)
def toRepository: Repository = Repository(self)
def toDescription: Description = Description(self)
def toFirstName: FirstName = FirstName(self)
def toLastName: LastName = LastName(self)

extension (self: Instant) def toCreationDate: CreationDate = CreationDate(self)

object syntax extends syntax
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,28 @@ object EventsGenerators:
repositories,
visibility,
maybeDesc,
Seq.empty,
creator,
Instant.now().truncatedTo(ChronoUnit.MILLIS)
)

def projectUpdatedGen(prefix: String): Gen[ProjectUpdated] =
for
id <- Gen.uuid.map(_.toString)
name <- stringGen(max = 5).map(v => s"$prefix-$v")
repositoriesCount <- Gen.choose(1, 3)
repositories <- Gen.listOfN(repositoriesCount, stringGen(10))
visibility <- projectVisibilityGen
maybeDesc <- Gen.option(stringGen(20))
yield ProjectUpdated(
id,
name,
name,
repositories,
visibility,
maybeDesc
)

def projectAuthorizationAddedGen(
projectIdGen: Gen[String] = Gen.uuid.map(_.toString),
roleGen: Gen[ProjectMemberRole] = projectMemberRoleGen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class SerializeDeserializeSpec extends FunSuite {
Seq.empty,
Visibility.PUBLIC,
Some("a description for it"),
Seq.empty,
"created-by-me",
Instant.now().truncatedTo(ChronoUnit.MILLIS)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@

package io.renku.queue.client

import java.time.Instant

import io.renku.avro.codec.AvroEncoder
import io.renku.avro.codec.AvroWriter
import io.renku.redis.client.RedisClientGenerators
import org.apache.avro.Schema
import org.scalacheck.Gen
import java.time.Instant
import scodec.bits.ByteVector

object Generators:

Expand Down Expand Up @@ -54,3 +59,14 @@ object Generators:
creationTime,
requestId
)

def queueMessageGen[A: AvroEncoder](schema: Schema, payload: A): Gen[QueueMessage] =
for
id <- RedisClientGenerators.messageIdGen
header <- messageHeaderGen(schema)
encodedPayload = header.dataContentType match {
case DataContentType.Binary => AvroWriter(schema).write(Seq(payload))
case DataContentType.Json => AvroWriter(schema).writeJson(Seq(payload))
}
h = header.toSchemaHeader(payload)
yield QueueMessage(id, h, encodedPayload)
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ import io.github.arainko.ducktape.*
import io.renku.search.GeneratorSyntax.*
import io.renku.search.api.data.*
import io.renku.search.model.Id
import io.renku.search.model.projects.Visibility
import io.renku.search.model.users.FirstName
import io.renku.search.query.Query
import io.renku.search.solr.client.SearchSolrSuite
import io.renku.search.solr.client.SolrDocumentGenerators.*
import io.renku.search.solr.documents.{EntityDocument, User as SolrUser}
import scribe.Scribe
import io.renku.solr.client.DocVersion
import org.scalacheck.Gen
import io.renku.search.model.projects.Visibility
import scribe.Scribe

class SearchApiSpec extends SearchSolrSuite:

Expand All @@ -52,7 +53,7 @@ class SearchApiSpec extends SearchSolrSuite:
).generateOne
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- client.insert((project1 :: project2 :: Nil).map(_.widen))
_ <- client.upsert((project1 :: project2 :: Nil).map(_.widen))
results <- searchApi
.query(AuthContext.anonymous)(mkQuery("matching"))
.map(_.fold(err => fail(s"Calling Search API failed with $err"), identity))
Expand All @@ -72,10 +73,11 @@ class SearchApiSpec extends SearchSolrSuite:
"exclusive description",
Gen.const(Visibility.Public)
).generateOne
val user = SolrUser(project.createdBy, FirstName("exclusive").some)
val user =
SolrUser(project.createdBy, DocVersion.NotExists, FirstName("exclusive").some)
val searchApi = new SearchApiImpl[IO](client)
for {
_ <- client.insert(project :: user :: Nil)
_ <- client.upsert(project :: user :: Nil)
results <- searchApi
.query(AuthContext.anonymous)(mkQuery("exclusive"))
.map(_.fold(err => fail(s"Calling Search API failed with $err"), identity))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.http4s.client.Client
import org.http4s.ember.client.EmberClientBuilder
import org.http4s.headers.Accept
import org.http4s.{Header, MediaType, Method, Uri}
import io.renku.solr.client.DocVersion

private object GitLabDocsCreator:
def make[F[_]: Async: Network: ModelTypesGenerators](
Expand Down Expand Up @@ -78,6 +79,7 @@ private class GitLabDocsCreator[F[_]: Async: ModelTypesGenerators](
(glProj
.into[Project]
.transform(
Field.default(_.version),
Field.computed(_.id, s => Id(s"gl_proj_${s.id}")),
Field.computed(_.slug, s => projects.Slug(s.path_with_namespace)),
Field
Expand Down Expand Up @@ -115,6 +117,7 @@ private class GitLabDocsCreator[F[_]: Async: ModelTypesGenerators](
glUser
.into[User]
.transform(
Field.default(_.version),
Field.computed(_.id, s => Id(s"gl_user_${s.id}")),
Field.computed(
_.firstName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ private class ProjectEventsGeneratorImpl[F[_]: MonadThrow: ModelTypesGenerators]
project.repositories.map(_.value),
Visibility.valueOf(project.visibility.name.toUpperCase),
project.description.map(_.value),
Seq.empty,
creator.value,
project.creationDate.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.http4s.headers.Accept
import org.http4s.implicits.*
import org.http4s.{Header, MediaType, Method, Uri}
import org.typelevel.ci.*
import io.renku.solr.client.DocVersion

/** For the API go here: https://randommer.io/api/swagger-docs/index.html */
object RandommerIoDocsCreator:
Expand Down Expand Up @@ -69,7 +70,7 @@ private class RandommerIoDocsCreator[F[_]: Async: ModelTypesGenerators](
private lazy val toUser: ((users.FirstName, users.LastName)) => F[User] = {
case (first, last) =>
gens.generateId.map(id =>
User(id, first.some, last.some, Name(s"$first $last").some)
User(id, DocVersion.NotExists, first.some, last.some, Name(s"$first $last").some)
)
}

Expand All @@ -88,6 +89,7 @@ private class RandommerIoDocsCreator[F[_]: Async: ModelTypesGenerators](
val slug = createSlug(name, user)
Project(
id,
DocVersion.NotExists,
name,
slug,
Seq(createRepo(slug)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package io.renku.search.provision

import cats.Show
import cats.data.OptionT
import cats.effect.*
import fs2.Stream

import io.renku.events.v1.*
import io.renku.redis.client.QueueName
import io.renku.search.provision.handler.*
import io.renku.search.solr.documents.EntityDocument
import io.renku.solr.client.UpsertResponse

/** The entry point for defining all message handlers.
*
Expand All @@ -34,22 +35,29 @@ import io.renku.search.solr.documents.EntityDocument
*/
final class MessageHandlers[F[_]: Async](
steps: QueueName => PipelineSteps[F],
cfg: QueuesConfig
cfg: QueuesConfig,
maxConflictRetries: Int = 20
) extends ShowInstances:
assert(maxConflictRetries >= 0, "maxConflictRetries must be >= 0")

private val logger = scribe.cats.effect[F]
private var tasks: Map[String, F[Unit]] = Map.empty
private def add(queue: QueueName, task: Stream[F, Unit]): Stream[F, Unit] =
tasks = tasks.updated(queue.name, task.compile.drain)
task

private[provision] def withMaxConflictRetries(n: Int): MessageHandlers[F] =
new MessageHandlers[F](steps, cfg, n)

def getAll: Map[String, F[Unit]] = tasks

val projectCreated: Stream[F, Unit] =
add(cfg.projectCreated, makeUpsert[ProjectCreated](cfg.projectCreated))
add(cfg.projectCreated, makeUpsert[ProjectCreated](cfg.projectCreated).drain)

val projectUpdated: Stream[F, Unit] =
add(
cfg.projectUpdated,
makeUpdated[ProjectUpdated](cfg.projectUpdated, DocumentUpdates.project)
makeUpsert[ProjectUpdated](cfg.projectUpdated).drain
)

val projectRemoved: Stream[F, Unit] =
Expand All @@ -58,25 +66,25 @@ final class MessageHandlers[F[_]: Async](
val projectAuthAdded: Stream[F, Unit] =
add(
cfg.projectAuthorizationAdded,
makeUpsert[ProjectAuthorizationAdded](cfg.projectAuthorizationAdded)
makeUpsert[ProjectAuthorizationAdded](cfg.projectAuthorizationAdded).drain
)

val projectAuthUpdated: Stream[F, Unit] =
add(
cfg.projectAuthorizationUpdated,
makeUpsert[ProjectAuthorizationUpdated](cfg.projectAuthorizationUpdated)
makeUpsert[ProjectAuthorizationUpdated](cfg.projectAuthorizationUpdated).drain
)

val projectAuthRemoved: Stream[F, Unit] = add(
cfg.projectAuthorizationRemoved,
makeUpsert[ProjectAuthorizationRemoved](cfg.projectAuthorizationRemoved)
makeUpsert[ProjectAuthorizationRemoved](cfg.projectAuthorizationRemoved).drain
)

val userAdded: Stream[F, Unit] =
add(cfg.userAdded, makeCreated[UserAdded](cfg.userAdded))
add(cfg.userAdded, makeUpsert[UserAdded](cfg.userAdded).drain)

val userUpdated: Stream[F, Unit] =
add(cfg.userUpdated, makeUpdated[UserUpdated](cfg.userUpdated, DocumentUpdates.user))
add(cfg.userUpdated, makeUpsert[UserUpdated](cfg.userUpdated).drain)

val userRemoved: Stream[F, Unit] =
val ps = steps(cfg.userRemoved)
Expand All @@ -94,50 +102,30 @@ final class MessageHandlers[F[_]: Async](
})
)

private def makeUpsert[A](queue: QueueName)(using
private[provision] def makeUpsert[A](queue: QueueName)(using
QueueMessageDecoder[F, A],
DocumentMerger[A],
IdExtractor[A],
Show[A]
): Stream[F, Unit] =
val ps = steps(queue)
ps.reader
.read[A]
.through(ps.fetchFromSolr.fetchEntityOrPartial)
.map { msg =>
val merger = DocumentMerger[A]
msg.merge(merger.create, merger.merge)
}
.through(ps.pushToSolr.push1)

private def makeCreated[A](queue: QueueName)(using
QueueMessageDecoder[F, A],
DocumentConverter[A],
Show[A]
): Stream[F, Unit] =
val ps = steps(queue)
ps.reader
.read[A]
.chunks
.through(ps.converter.convertChunk)
.map(_.map(_.map(e => e: EntityOrPartial)))
.through(ps.pushToSolr.pushChunk)

private def makeUpdated[A](
queue: QueueName,
docUpdate: (A, EntityDocument) => Option[EntityDocument]
)(using
QueueMessageDecoder[F, A],
Show[A],
IdExtractor[A]
): Stream[F, Unit] =
): Stream[F, UpsertResponse] =
val ps = steps(queue)
def processMsg(
msg: MessageReader.Message[A],
retries: Int
): Stream[F, UpsertResponse] =
lazy val retry = OptionT.when(retries > 0)(processMsg(msg, retries - 1))
Stream
.emit(msg)
.through(ps.fetchFromSolr.fetchEntityOrPartial)
.map { m =>
val merger = DocumentMerger[A]
m.merge(merger.create, merger.merge)
}
.through(ps.pushToSolr.push(onConflict = retry))

ps.reader
.read[A]
.through(ps.fetchFromSolr.fetchEntity)
.map(_.update(docUpdate))
.map(_.map(e => e: EntityOrPartial))
.through(ps.pushToSolr.push)
.flatMap(processMsg(_, maxConflictRetries))

private def makeRemovedSimple[A](queue: QueueName)(using
QueueMessageDecoder[F, A],
Expand Down
Loading

0 comments on commit b697989

Please sign in to comment.