Skip to content

Commit

Permalink
Revert "Filter by type before fetching elems in composite projections (
Browse files Browse the repository at this point in the history
  • Loading branch information
olivergrabinski authored Nov 13, 2023
1 parent 8defd90 commit 80b3b0e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing

import cats.data.NonEmptyMapImpl.catsDataInstancesForNonEmptyMap
import cats.data.{NonEmptyChain, NonEmptyMap}
import cats.effect.{ContextShift, ExitCase, IO, Timer}
import cats.effect.ExitCase.{Canceled, Completed, Error}
import cats.effect.concurrent.Ref
import cats.effect.{ContextShift, ExitCase, IO, Timer}
import cats.kernel.Semigroup
import cats.syntax.all._
import ch.epfl.bluebrain.nexus.delta.kernel.Logger
Expand Down Expand Up @@ -189,13 +189,7 @@ object CompositeViewDef {
val fetchProgress: IO[CompositeProgress] = compositeProjections.progress(view.indexingRef)

def compileSource =
CompositeViewDef.compileSource(
view.ref.project,
compilePipeChain,
graphStream,
sinks.commonSink(view),
projectionTypes(view)
)(_)
CompositeViewDef.compileSource(view.ref.project, compilePipeChain, graphStream, sinks.commonSink(view))(_)

def compileTarget = CompositeViewDef.compileTarget(compilePipeChain, sinks.projectionSink(view, _))(_)

Expand Down Expand Up @@ -515,15 +509,12 @@ object CompositeViewDef {
* generates the element stream for the source in the context of a branch
* @param sink
* the sink for the common space
* @param projectionTypes
* the view's projection resource types to use to filter the rebuild stream
*/
def compileSource(
project: ProjectRef,
compilePipeChain: PipeChain.Compile,
graphStream: CompositeGraphStream,
sink: Sink,
projectionTypes: Set[Iri]
sink: Sink
)(source: CompositeViewSource): IO[(Iri, Source, Source, Operation)] =
IO.fromEither {
for {
Expand All @@ -536,7 +527,7 @@ object CompositeViewDef {
// The main source produces an infinite stream and waits for new elements
mainSource = graphStream.main(source, project)
// The rebuild one a finite one with only the current elements
rebuildSource = graphStream.rebuild(source, project, projectionTypes)
rebuildSource = graphStream.rebuild(source, project)
} yield (source.id, mainSource, rebuildSource, operation)
}

Expand Down Expand Up @@ -564,11 +555,4 @@ object CompositeViewDef {
} yield target.id -> result
}

/** Union of all resourceTypes specified in the view's projections */
private def projectionTypes(view: ActiveViewDef): Set[Iri] = {
val targets = view.value.projections
if (targets.exists(_.resourceTypes.isEmpty)) Set.empty[Iri]
else targets.foldLeft(Set.empty[Iri])(_ ++ _.resourceTypes)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream
import cats.effect.IO
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.{CrossProjectSource, ProjectSource, RemoteProjectSource}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.sdk.stream.GraphResourceStream
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemPipe, ProjectRef}
import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
Expand Down Expand Up @@ -31,10 +30,8 @@ trait CompositeGraphStream {
* the composite view source
* @param project
* the enclosing project
* @param projectionTypes
* the projection resource types to use to filter the stream
*/
def rebuild(source: CompositeViewSource, project: ProjectRef, projectionTypes: Set[Iri]): Source
def rebuild(source: CompositeViewSource, project: ProjectRef): Source

/**
* Get information about the remaining elements
Expand Down Expand Up @@ -65,20 +62,13 @@ object CompositeGraphStream {
}
}

override def rebuild(
source: CompositeViewSource,
project: ProjectRef,
projectionTypes: Set[Iri]
): Source = {
override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = {
source match {
case p: ProjectSource =>
val filter = p.selectFilter.copy(types = p.selectFilter.types ++ projectionTypes)
Source(local.currents(project, filter, _).through(drainSource))
Source(local.currents(project, p.selectFilter, _).through(drainSource))
case c: CrossProjectSource =>
val filter = c.selectFilter.copy(types = c.selectFilter.types ++ projectionTypes)
Source(local.currents(c.project, filter, _).through(drainSource))
case r: RemoteProjectSource =>
remote.rebuild(r)
Source(local.currents(c.project, c.selectFilter, _).through(drainSource))
case r: RemoteProjectSource => remote.rebuild(r)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConst
Source(_ => s.onFinalize(increment(mainCompleted, p)) ++ Stream.never[IO])
}

override def rebuild(source: CompositeViewSource, project: ProjectRef, projectionTypes: Set[Iri]): Source = {
override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = {
val (p, s) = stream(source, project)
Source(_ => s.onFinalize(increment(rebuildCompleted, p)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.CompositeViewsFixtur
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeView.{Interval, RebuildStrategy}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.stream.CompositeGraphStream
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.graph.NTriples
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import ch.epfl.bluebrain.nexus.delta.sourcing.model.{ElemStream, ProjectRef}
Expand Down Expand Up @@ -37,8 +36,7 @@ class CompositeViewDefSuite extends CatsEffectSuite with CompositeViewsFixture {

val graphStream = new CompositeGraphStream {
override def main(source: CompositeViewSource, project: ProjectRef): Source = makeSource("main")
override def rebuild(source: CompositeViewSource, project: ProjectRef, projectionTypes: Set[Iri]): Source =
makeSource("rebuild")
override def rebuild(source: CompositeViewSource, project: ProjectRef): Source = makeSource("rebuild")
override def remaining(source: CompositeViewSource, project: ProjectRef): Offset => IO[Option[RemainingElems]] =
_ => IO.none
}
Expand All @@ -48,8 +46,7 @@ class CompositeViewDefSuite extends CatsEffectSuite with CompositeViewsFixture {
project.ref,
_ => Right(FilterDeprecated.withConfig(())),
graphStream,
new NoopSink[NTriples](),
Set.empty
new NoopSink[NTriples]()
)(projectSource)
.map { case (id, mainSource, rebuildSource, operation) =>
assertEquals(id, projectSource.id)
Expand Down

0 comments on commit 80b3b0e

Please sign in to comment.