Skip to content

Commit

Permalink
feat: partial entity documents (#67)
Browse files Browse the repository at this point in the history
Allow partial- and full entity documents

With this change multiple hierarchies of documents are managed in
solr. They are differentiated by the `_kind` property. This property
separates (possibly) unrelated documents. It will be used to separate
"partial entity" and "full entity" documents. Partial entity documents
are not ready to be returned in search results and are thus hidden.
When completed eventually, they will be moved to a "full entity
document". Within each "kind", further distinctions can be made
depending on the specific encoding used. Currently, while there are
`PartialEntityDocument` and `EntityDocument` separeted by `_kind`,
each has its own ADT hierarchy which will be distnguished by the
`_type` member. Since both adt hierarchies are related, they share
this `_type` member. I.e. there is a `PartialEntityDocument.Project`
which relates to `EntityDocument.Project`, though their structures can
be differnt.

Additionally, configure the http server port per (micro)service to be
able to start both on the same machine.
  • Loading branch information
eikek authored Mar 21, 2024
1 parent 2882b62 commit e4857c7
Show file tree
Hide file tree
Showing 35 changed files with 581 additions and 176 deletions.
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
RS_REDIS_PORT = "6379";
RS_CONTAINER = "rsdev";
RS_LOG_LEVEL = "3";
RS_SEARCH_HTTP_SERVER_PORT = "8080";
RS_PROVISION_HTTP_SERVER_PORT = "8082";

#don't start docker container for dbTests
NO_SOLR = "true";
Expand All @@ -96,6 +98,8 @@
RS_REDIS_PORT = "16379";
VM_SSH_PORT = "10022";
RS_LOG_LEVEL = "3";
RS_SEARCH_HTTP_SERVER_PORT = "8080";
RS_PROVISION_HTTP_SERVER_PORT = "8082";

#don't start docker container for dbTests
NO_SOLR = "true";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ConfigValues extends ConfigDecoders:
(url, core, maybeUser, defaultCommit, logMessageBodies).mapN(SolrConfig.apply)
}

def httpServerConfig(defaultPort: Port): ConfigValue[Effect, HttpServerConfig] =
val bindAddress = renv("HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address]
val port = renv("HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port]
def httpServerConfig(prefix: String, defaultPort: Port): ConfigValue[Effect, HttpServerConfig] =
val bindAddress = renv(s"${prefix}_HTTP_SERVER_BIND_ADDRESS").default("0.0.0.0").as[Ipv4Address]
val port = renv(s"${prefix}_HTTP_SERVER_PORT").default(defaultPort.value.toString).as[Port]
(bindAddress, port).mapN(HttpServerConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ object SearchApiConfig:
val config: ConfigValue[Effect, SearchApiConfig] =
(
ConfigValues.solrConfig,
ConfigValues.httpServerConfig(port"8080"),
ConfigValues.httpServerConfig("SEARCH", port"8080"),
ConfigValues.logLevel
).mapN(SearchApiConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ object SearchProvisionConfig:
ConfigValues.metricsUpdateInterval,
ConfigValues.logLevel,
QueuesConfig.config,
ConfigValues.httpServerConfig(defaultPort = port"8081"),
ConfigValues.httpServerConfig("PROVISION", defaultPort = port"8081"),
ConfigValues.clientId(ClientId("search-provisioner"))
).mapN(SearchProvisionConfig.apply)
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,19 @@ import cats.effect.Sync
import cats.syntax.all.*
import fs2.{Pipe, Stream}

import io.bullet.borer.Decoder
import io.bullet.borer.derivation.MapBasedCodecs
import io.renku.search.model.EntityType
import io.renku.search.model.Id
import io.renku.search.provision.handler.FetchFromSolr.MessageDocument
import io.renku.search.provision.handler.MessageReader.Message
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.EntityDocument
import io.renku.search.query.Query
import io.bullet.borer.derivation.MapBasedCodecs
import io.bullet.borer.Decoder
import io.renku.search.solr.SearchRole
import io.renku.search.solr.schema.EntityDocumentSchema.Fields
import io.renku.solr.client.QueryString
import io.renku.solr.client.QueryData
import io.renku.search.solr.client.SearchSolrClient
import io.renku.search.solr.documents.EntityDocument
import io.renku.search.solr.query.SolrToken
import io.renku.search.model.EntityType
import io.renku.solr.client.QueryData
import io.renku.solr.client.QueryString

trait FetchFromSolr[F[_]]:
def fetch1[A](using IdExtractor[A]): Pipe[F, Message[A], MessageDocument[A]]
Expand Down Expand Up @@ -76,13 +75,10 @@ object FetchFromSolr:
def fetchProjectForUser(userId: Id): Stream[F, FetchFromSolr.ProjectId] =
val query = QueryString(
List(
SolrToken.fieldIs(
Fields.entityType,
SolrToken.fromEntityType(EntityType.Project)
),
SolrToken.entityTypeIs(EntityType.Project),
List(
SolrToken.fieldIs(Fields.owners, SolrToken.fromId(userId)),
SolrToken.fieldIs(Fields.members, SolrToken.fromId(userId))
SolrToken.ownerIs(userId),
SolrToken.memberIs(userId)
).foldOr
).foldAnd.value
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.{Id, projects}
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.solr.documents.{EntityDocument, Project}
import io.renku.search.solr.documents.{CompoundId, EntityDocument, Project}
import munit.CatsEffectSuite

class AuthorizationAddedProvisioningSpec extends ProvisioningSuite:
Expand Down Expand Up @@ -63,7 +63,11 @@ class AuthorizationAddedProvisioningSpec extends ProvisioningSuite:
.evalTap(_ =>
scribe.cats.io.info(s"Looking for project with id ${projectDoc.id}...")
)
.evalMap(_ => solrClient.findById[EntityDocument](projectDoc.id))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(projectDoc.id)
)
)
.evalMap(
_.fold(().pure[IO])(e =>
solrDocs.update(_ => Set(e.asInstanceOf[Project]))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ import io.renku.events.v1.*
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.Id
import io.renku.search.solr.documents.Project
import io.renku.search.solr.documents.{CompoundId, EntityDocument}
import munit.CatsEffectSuite

class AuthorizationRemovedProvisioningSpec extends ProvisioningSuite:

test("can fetch events, decode them, and update docs in Solr"):
withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) =>
for
solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty)
solrDocs <- SignallingRef.of[IO, Set[EntityDocument]](Set.empty)

provisioningFiber <- handlers.projectAuthRemoved.compile.drain.start

Expand All @@ -60,7 +60,9 @@ class AuthorizationRemovedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](projectDoc.id))
.evalMap(_ =>
solrClient.findById[EntityDocument](CompoundId.projectEntity(projectDoc.id))
)
.evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e))))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import io.renku.events.v1.{ProjectAuthorizationUpdated, ProjectMemberRole}
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.{Id, projects}
import io.renku.search.solr.documents.{EntityDocument, Project}
import io.renku.search.solr.documents.{CompoundId, EntityDocument, Project}
import munit.CatsEffectSuite

class AuthorizationUpdatedProvisioningSpec extends ProvisioningSuite:
Expand All @@ -42,7 +42,7 @@ class AuthorizationUpdatedProvisioningSpec extends ProvisioningSuite:
test(s"can fetch events, decode them, and update docs in Solr in case of $name"):
withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) =>
for
solrDocs <- SignallingRef.of[IO, Set[Project]](Set.empty)
solrDocs <- SignallingRef.of[IO, Set[EntityDocument]](Set.empty)

provisioningFiber <- handlers.projectAuthUpdated.compile.drain.start

Expand All @@ -59,7 +59,11 @@ class AuthorizationUpdatedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](projectDoc.id))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(projectDoc.id)
)
)
.evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e))))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@
package io.renku.search.provision.project

import scala.concurrent.duration.*

import cats.effect.{IO, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.concurrent.SignallingRef

import io.renku.avro.codec.encoders.all.given
import io.renku.events.EventsGenerators.projectCreatedGen
import io.renku.events.v1.{ProjectCreated, Visibility}
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.queue.client.DataContentType
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.Id
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.solr.documents.{EntityDocument, Project}
import io.renku.search.solr.documents.{CompoundId, EntityDocument}
import munit.CatsEffectSuite

class ProjectCreatedProvisioningSpec extends ProvisioningSuite:
Expand All @@ -54,7 +56,11 @@ class ProjectCreatedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](Id(created.id)))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(Id(created.id))
)
)
.evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e))))
.compile
.drain
Expand Down Expand Up @@ -84,7 +90,11 @@ class ProjectCreatedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](Id(created.id)))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(Id(created.id))
)
)
.evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e))))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import io.renku.search.provision.ProvisioningSuite
import io.renku.search.query.Query
import io.renku.search.query.Query.Segment
import io.renku.search.query.Query.Segment.typeIs
import io.renku.search.solr.documents.{EntityDocument, Project}
import io.renku.search.solr.documents.{CompoundId, EntityDocument}
import munit.CatsEffectSuite

class ProjectRemovedProcessSpec extends ProvisioningSuite:

test(s"can fetch events, decode them, and remove Solr"):
withMessageHandlers(queueConfig).use { case (handlers, queueClient, solrClient) =>
for
solrDoc <- SignallingRef.of[IO, Option[Project]](None)
solrDoc <- SignallingRef.of[IO, Option[EntityDocument]](None)

provisioningFiber <- handlers.projectRemoved.compile.drain.start

Expand All @@ -52,7 +52,11 @@ class ProjectRemovedProcessSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](Id(created.id)))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(Id(created.id))
)
)
.evalMap(e => solrDoc.update(_ => e))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package io.renku.search.provision.project

import scala.concurrent.duration.*

import cats.effect.{IO, Resource}
import cats.syntax.all.*
import fs2.Stream
import fs2.concurrent.SignallingRef

import io.github.arainko.ducktape.*
import io.renku.avro.codec.encoders.all.given
import io.renku.events.EventsGenerators.*
Expand All @@ -31,7 +33,7 @@ import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.Id
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.solr.documents.{EntityDocument, Project}
import io.renku.search.solr.documents.{CompoundId, EntityDocument}
import munit.CatsEffectSuite

class ProjectUpdatedProvisioningSpec extends ProvisioningSuite:
Expand All @@ -58,7 +60,11 @@ class ProjectUpdatedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[Project](Id(created.id)))
.evalMap(_ =>
solrClient.findById[EntityDocument](
CompoundId.projectEntity(Id(created.id))
)
)
.evalMap(_.fold(().pure[IO])(e => solrDocs.update(_ => Set(e))))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@

package io.renku.search.provision.user

import scala.concurrent.duration.*

import cats.effect.{IO, Resource}
import fs2.Stream
import fs2.concurrent.SignallingRef

import io.renku.avro.codec.encoders.all.given
import io.renku.events.EventsGenerators.userAddedGen
import io.renku.events.v1.UserAdded
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.Id
import io.renku.search.solr.documents.{EntityDocument, User}
import munit.CatsEffectSuite

import scala.concurrent.duration.*
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.solr.documents.{CompoundId, EntityDocument}
import munit.CatsEffectSuite

class UserAddedProvisioningSpec extends ProvisioningSuite:

Expand All @@ -52,7 +53,9 @@ class UserAddedProvisioningSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[User](Id(userAdded.id)))
.evalMap(_ =>
solrClient.findById[EntityDocument](CompoundId.userEntity(Id(userAdded.id)))
)
.evalMap(e => solrDoc.update(_ => e))
.compile
.drain
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@

package io.renku.search.provision.user

import scala.concurrent.duration.*

import cats.effect.{IO, Resource}
import fs2.Stream
import fs2.concurrent.SignallingRef

import io.renku.avro.codec.encoders.all.given
import io.renku.events.EventsGenerators.*
import io.renku.events.v1.*
import io.renku.queue.client.Generators.messageHeaderGen
import io.renku.search.GeneratorSyntax.*
import io.renku.search.model.ModelGenerators.projectMemberRoleGen
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.provision.QueueMessageDecoder
import io.renku.search.solr.client.SolrDocumentGenerators.*
import io.renku.search.solr.documents.{EntityDocument, User}

import scala.concurrent.duration.*
import io.renku.search.provision.ProvisioningSuite
import io.renku.search.solr.documents.{CompoundId, EntityDocument}

class UserRemovedProcessSpec extends ProvisioningSuite:
test(
Expand Down Expand Up @@ -62,7 +63,9 @@ class UserRemovedProcessSpec extends ProvisioningSuite:
docsCollectorFiber <-
Stream
.awakeEvery[IO](500 millis)
.evalMap(_ => solrClient.findById[User](user.id))
.evalMap(_ =>
solrClient.findById[EntityDocument](CompoundId.userEntity(user.id))
)
.evalMap(e => solrDoc.update(_ => e))
.compile
.drain
Expand Down
Loading

0 comments on commit e4857c7

Please sign in to comment.