Skip to content

Commit

Permalink
Conditional waiting on the previous polling task
Browse files Browse the repository at this point in the history
  • Loading branch information
vpaturet committed Jan 22, 2025
1 parent 6fdb022 commit cf8acfe
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opentripplanner.framework.application.OTPFeature;
import org.opentripplanner.updater.GraphWriterRunnable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -42,6 +43,12 @@ public abstract class PollingGraphUpdater implements GraphUpdater {
*/
protected WriteToGraphCallback saveResultOnGraph;

/**
* Handle on the previous polling execution.
* Initially null when the polling updater starts.
*/
private Future<?> previousTask;

/** Shared configuration code for all polling graph updaters. */
protected PollingGraphUpdater(PollingGraphUpdaterParameters config) {
this.pollingPeriod = config.frequency();
Expand All @@ -55,6 +62,10 @@ public Duration pollingPeriod() {
@Override
public final void run() {
try {
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
waitForPreviousTask();
}

// Run concrete polling graph updater's implementation method.
runPolling();
if (runOnlyOnce()) {
Expand Down Expand Up @@ -113,11 +124,28 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) {
*/
protected abstract void runPolling() throws Exception;

protected final void updateGraph(GraphWriterRunnable task)
throws ExecutionException, InterruptedException {
var result = saveResultOnGraph.execute(task);
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
result.get();
/**
* Post the update task in the GraphWriter queue.
* This is non-blocking.
*/
protected final void updateGraph(GraphWriterRunnable task) {
previousTask = saveResultOnGraph.execute(task);
}

/**
* If the previous task takes longer than the polling interval,
* we delay the next polling cycle until the task is complete.
* This prevents tasks from piling up.
* */
private void waitForPreviousTask() throws InterruptedException, ExecutionException {
if (previousTask != null && !previousTask.isDone()) {
LOG.info("Delaying polling until the previous task is complete");
long startBlockingWait = System.currentTimeMillis();
previousTask.get();
LOG.info(
"Resuming polling after waiting an additional {}s",
(System.currentTimeMillis() - startBlockingWait) / 1000
);
}
}
}

This file was deleted.

0 comments on commit cf8acfe

Please sign in to comment.