Skip to content

Commit

Permalink
Migrate ElasticSearch routes and state machine to Cats Effect (#4449)
Browse files Browse the repository at this point in the history
* Migrate ElasticSearch routes/state machine to Cats Effect

* Compile tests

* Migrate ES search routes and queries
  • Loading branch information
dantb authored Oct 31, 2023
1 parent 99128c7 commit 09106ca
Show file tree
Hide file tree
Showing 37 changed files with 431 additions and 416 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.contexts
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, JsonLdContext, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.decoder.Configuration
import monix.bio.Task

private[elasticsearch] object ElasticSearchDecoderConfiguration {

/**
* @return
* a decoder configuration that uses the elasticsearch context
*/
def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): Task[Configuration] =
def apply(implicit jsonLdApi: JsonLdApi, rcr: RemoteContextResolution): IO[Configuration] =
for {
contextValue <- Task.delay { ContextValue(contexts.elasticsearch) }
jsonLdContext <- JsonLdContext(contextValue)
contextValue <- IO { ContextValue(contexts.elasticsearch) }
jsonLdContext <- JsonLdContext(contextValue).toCatsIO
} yield Configuration(jsonLdContext, "id")

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import akka.actor.typed.ActorSystem
import cats.effect.{Clock, ContextShift, IO}
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.client.ElasticSearchClient
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.config.ElasticSearchViewsConfig
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.deletion.{ElasticSearchDeletionTask, EventMetricsDeletionTask}
Expand Down Expand Up @@ -40,7 +40,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Label
import ch.epfl.bluebrain.nexus.delta.sourcing.projections.{ProjectionErrors, Projections}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{PipeChain, ReferenceRegistry, Supervisor}
import izumi.distage.model.definition.{Id, ModuleDef}
import monix.bio.UIO
import monix.execution.Scheduler

/**
Expand Down Expand Up @@ -88,19 +87,18 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
config: ElasticSearchViewsConfig,
xas: Transactors,
api: JsonLdApi,
clock: Clock[UIO],
clock: Clock[IO],
uuidF: UUIDF
) =>
toCatsIO(
ElasticSearchViews(
fetchContext.mapRejection(ProjectContextRejection),
contextResolution,
validateElasticSearchView,
config.eventLog,
config.prefix,
xas
)(api, clock, uuidF)
)
ElasticSearchViews(
fetchContext.mapRejection(ProjectContextRejection),
contextResolution,
validateElasticSearchView,
config.eventLog,
config.prefix,
xas
)(api, clock, uuidF)

}

make[ElasticSearchCoordinator].fromEffect {
Expand Down Expand Up @@ -183,7 +181,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
viewsQuery: ElasticSearchViewsQuery,
shift: ElasticSearchView.Shift,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
fusionConfig: FusionConfig
Expand All @@ -197,7 +194,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
indexingAction(_, _, _)(shift)
)(
baseUri,
s,
cr,
ordering,
fusionConfig
Expand All @@ -211,7 +207,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
schemeDirectives: DeltaSchemeDirectives,
defaultViewsQuery: DefaultViewsQuery.Elasticsearch,
baseUri: BaseUri,
s: Scheduler,
cr: RemoteContextResolution @Id("aggregate"),
ordering: JsonKeyOrdering,
resourcesToSchemaSet: Set[ResourceToSchemaMappings],
Expand All @@ -228,7 +223,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)(
baseUri,
esConfig.pagination,
s,
cr,
ordering,
fetchContext.mapRejection(ElasticSearchQueryError.ProjectContextRejection)
Expand All @@ -244,7 +238,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
projectionErrors: ProjectionErrors,
schemeDirectives: DeltaSchemeDirectives,
baseUri: BaseUri,
s: Scheduler,
c: ContextShift[IO],
cr: RemoteContextResolution @Id("aggregate"),
esConfig: ElasticSearchViewsConfig,
Expand All @@ -262,7 +255,6 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
)(
baseUri,
esConfig.pagination,
s,
c,
cr,
ordering
Expand All @@ -278,15 +270,13 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
identities: Identities,
aclCheck: AclCheck,
idResolution: IdResolution,
s: Scheduler,
ordering: JsonKeyOrdering,
rcr: RemoteContextResolution @Id("aggregate"),
fusionConfig: FusionConfig,
baseUri: BaseUri
) =>
new IdResolutionRoutes(identities, aclCheck, idResolution)(
baseUri,
s,
ordering,
rcr,
fusionConfig
Expand Down Expand Up @@ -390,12 +380,10 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
}

make[ElasticSearchView.Shift].fromEffect { (views: ElasticSearchViews, base: BaseUri) =>
toCatsIO(
for {
defaultMapping <- defaultElasticsearchMapping
defaultSettings <- defaultElasticsearchSettings
} yield ElasticSearchView.shift(views, defaultMapping, defaultSettings)(base)
)
for {
defaultMapping <- defaultElasticsearchMapping
defaultSettings <- defaultElasticsearchSettings
} yield ElasticSearchView.shift(views, defaultMapping, defaultSettings)(base)
}

many[ResourceShift[_, _, _]].ref[ElasticSearchView.Shift]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DefaultLabelPredicates, SourceAsText}
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration._

/**
* The default creation of the default ElasticSearchView as part of the project initialization.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch

import cats.data.NonEmptySet
import cats.effect.IO
import cats.implicits._
import ch.epfl.bluebrain.nexus.delta.kernel.effect.migration.toCatsIOOps
import ch.epfl.bluebrain.nexus.delta.kernel.utils.UUIDF
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.ElasticSearchViewJsonLdSourceDecoder.{toValue, ElasticSearchViewFields}
import ch.epfl.bluebrain.nexus.delta.plugins.elasticsearch.model.ElasticSearchViewValue.{AggregateElasticSearchViewValue, IndexingElasticSearchViewValue}
Expand All @@ -27,7 +29,6 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.model.Tag.UserTag
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes._
import io.circe.syntax._
import io.circe.{Json, JsonObject}
import monix.bio.{IO, Task}

import scala.annotation.nowarn

Expand All @@ -42,20 +43,20 @@ class ElasticSearchViewJsonLdSourceDecoder private (

def apply(ref: ProjectRef, context: ProjectContext, source: Json)(implicit
caller: Caller
): IO[ElasticSearchViewRejection, (Iri, ElasticSearchViewValue)] =
decoder(ref, context, mapJsonToString(source)).map { case (iri, fields) =>
): IO[(Iri, ElasticSearchViewValue)] =
decoder(ref, context, mapJsonToString(source)).toCatsIO.map { case (iri, fields) =>
iri -> toValue(fields)
}

def apply(ref: ProjectRef, context: ProjectContext, iri: Iri, source: Json)(implicit
caller: Caller
): IO[ElasticSearchViewRejection, ElasticSearchViewValue] =
): IO[ElasticSearchViewValue] =
decoder(
ref,
context,
iri,
mapJsonToString(source)
).map(toValue)
).toCatsIO.map(toValue)

private def mapJsonToString(json: Json): Json = json
.mapAllKeys("mapping", _.noSpaces.asJson)
Expand Down Expand Up @@ -196,7 +197,7 @@ object ElasticSearchViewJsonLdSourceDecoder {

def apply(uuidF: UUIDF, contextResolution: ResolverContextResolution)(implicit
api: JsonLdApi
): Task[ElasticSearchViewJsonLdSourceDecoder] = {
): IO[ElasticSearchViewJsonLdSourceDecoder] = {
implicit val rcr: RemoteContextResolution = contextResolution.rcr

ElasticSearchDecoderConfiguration.apply.map { implicit config =>
Expand Down
Loading

0 comments on commit 09106ca

Please sign in to comment.