Skip to content

Commit

Permalink
Fix Delta startup when applying indexing fix (#3278)
Browse files Browse the repository at this point in the history
Co-authored-by: Simon Dumas <[email protected]>
  • Loading branch information
imsdu and Simon Dumas authored May 23, 2022
1 parent bdaa5f1 commit 69b0a5a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
scheduler: Scheduler,
uuidF: UUIDF
) =>
Task.when(sys.env.getOrElse("FIX_3266", "false").toBoolean) {
val fix = Task.when(sys.env.getOrElse("FIX_3266", "false").toBoolean) {
new BlazegraphIndexing3266(log, views, indexingCleanup, config).run()
} >>
BlazegraphIndexingCoordinator(views, indexingController, indexingStream, indexingCleanup, config)(
uuidF,
as,
scheduler
)
}
BlazegraphIndexingCoordinator(views, indexingController, indexingStream, indexingCleanup, config, fix)(
uuidF,
as,
scheduler
)
}

make[BlazegraphViewsCache].from { (config: BlazegraphViewsConfig, as: ActorSystem[Nothing]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ object BlazegraphIndexingCoordinator {
indexingController: BlazegraphIndexingController,
indexingStream: BlazegraphIndexingStream,
indexingCleanup: BlazegraphIndexingCleanup,
config: BlazegraphViewsConfig
config: BlazegraphViewsConfig,
beforeRunning: Task[Unit] = Task.unit
)(implicit
uuidF: UUIDF,
as: ActorSystem[Nothing],
Expand All @@ -65,6 +66,6 @@ object BlazegraphIndexingCoordinator {
retryStrategy
)
}
.tapEval(BlazegraphViewsIndexing.startIndexingStreams(config.indexing.retry, views, _))
.tapEval(BlazegraphViewsIndexing.startIndexingStreams(config.indexing.retry, views, _, beforeRunning))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,23 @@ object BlazegraphViewsIndexing {
def startIndexingStreams(
retry: RetryStrategyConfig,
views: BlazegraphViews,
coordinator: BlazegraphIndexingCoordinator
coordinator: BlazegraphIndexingCoordinator,
beforeRunning: Task[Unit] = Task.unit
)(implicit uuidF: UUIDF, as: ActorSystem[Nothing], sc: Scheduler): Task[Unit] = {
def onEvent(event: BlazegraphViewEvent) = coordinator.run(event.id, event.project, event.rev)
apply("BlazegraphIndexingCoordinatorScan", retry, views, onEvent)
apply("BlazegraphIndexingCoordinatorScan", retry, views, onEvent, beforeRunning)
}

private def apply(
name: String,
retry: RetryStrategyConfig,
views: BlazegraphViews,
onEvent: BlazegraphViewEvent => Task[Unit]
onEvent: BlazegraphViewEvent => Task[Unit],
beforeRunning: Task[Unit] = Task.unit
)(implicit uuidF: UUIDF, as: ActorSystem[Nothing], sc: Scheduler): Task[Unit] =
DaemonStreamCoordinator.run(
name,
stream = views.events(Offset.noOffset).evalMap { e => onEvent(e.event) },
stream = fs2.Stream.eval(beforeRunning) >> views.events(Offset.noOffset).evalMap { e => onEvent(e.event) },
retryStrategy = RetryStrategy.retryOnNonFatal(retry, logger, name)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,14 @@ class ElasticSearchPluginModule(priority: Int) extends ModuleDef {
scheduler: Scheduler,
uuidF: UUIDF
) =>
Task.when(sys.env.getOrElse("FIX_3266", "false").toBoolean) {
val fix = Task.when(sys.env.getOrElse("FIX_3266", "false").toBoolean) {
new ElasticsearchIndexing3266(log, views, indexingCleanup, config).run()
} >>
ElasticSearchIndexingCoordinator(views, indexingController, indexingStream, indexingCleanup, config)(
uuidF,
as,
scheduler
)
}
ElasticSearchIndexingCoordinator(views, indexingController, indexingStream, indexingCleanup, config, fix)(
uuidF,
as,
scheduler
)
}

make[ElasticSearchViewCache].fromEffect { (config: ElasticSearchViewsConfig, as: ActorSystem[Nothing]) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object ElasticSearchIndexingCoordinator {
indexingController: ElasticSearchIndexingController,
indexingStream: ElasticSearchIndexingStream,
indexingCleanup: ElasticSearchIndexingCleanup,
config: ElasticSearchViewsConfig
config: ElasticSearchViewsConfig,
beforeRunning: Task[Unit] = Task.unit
)(implicit
uuidF: UUIDF,
as: ActorSystem[Nothing],
Expand All @@ -67,6 +68,6 @@ object ElasticSearchIndexingCoordinator {
retryStrategy
)
}
.tapEval(ElasticSearchViewsIndexing.startIndexingStreams(config.indexing.retry, views, _))
.tapEval(ElasticSearchViewsIndexing.startIndexingStreams(config.indexing.retry, views, _, beforeRunning))

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,23 @@ object ElasticSearchViewsIndexing {
def startIndexingStreams(
retry: RetryStrategyConfig,
views: ElasticSearchViews,
coordinator: ElasticSearchIndexingCoordinator
coordinator: ElasticSearchIndexingCoordinator,
beforeRunning: Task[Unit] = Task.unit
)(implicit uuidF: UUIDF, as: ActorSystem[Nothing], sc: Scheduler): Task[Unit] = {
def onEvent(event: ElasticSearchViewEvent) = coordinator.run(event.id, event.project, event.rev)
apply("ElasticSearchIndexingCoordinatorScan", retry, views, onEvent)
apply("ElasticSearchIndexingCoordinatorScan", retry, views, onEvent, beforeRunning)
}

private def apply(
name: String,
retry: RetryStrategyConfig,
views: ElasticSearchViews,
onEvent: ElasticSearchViewEvent => Task[Unit]
onEvent: ElasticSearchViewEvent => Task[Unit],
beforeRunning: Task[Unit] = Task.unit
)(implicit uuidF: UUIDF, as: ActorSystem[Nothing], sc: Scheduler): Task[Unit] =
DaemonStreamCoordinator.run(
name,
stream = views.events(Offset.noOffset).evalMap { e => onEvent(e.event) },
stream = fs2.Stream.eval(beforeRunning) >> views.events(Offset.noOffset).evalMap { e => onEvent(e.event) },
retryStrategy = RetryStrategy.retryOnNonFatal(retry, logger, name)
)

Expand Down

0 comments on commit 69b0a5a

Please sign in to comment.