Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conditionally wait for previous polling task #6401

Open
wants to merge 2 commits into
base: dev-2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void runPolling() {
final boolean markPrimed = !moreData;
List<EstimatedTimetableDeliveryStructure> etds = serviceDelivery.getEstimatedTimetableDeliveries();
if (etds != null) {
saveResultOnGraph.execute(context -> {
updateGraph(context -> {
var result = estimatedTimetableHandler.applyUpdate(etds, incrementality, context);
ResultLogger.logUpdateResult(feedId, "siri-et", result);
metricsConsumer.accept(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ private void updateSiri() {
// All that said, out of all the update types, Alerts (and SIRI SX) are probably the ones
// that would be most tolerant of non-versioned application-wide storage since they don't
// participate in routing and are tacked on to already-completed routing responses.
saveResultOnGraph.execute(context -> {

updateGraph(context -> {
updateHandler.update(serviceDelivery, context);
if (markPrimed) {
primed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.opentripplanner.framework.application.OTPFeature;
import org.opentripplanner.updater.GraphWriterRunnable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -40,7 +41,15 @@ public abstract class PollingGraphUpdater implements GraphUpdater {
/**
* Parent update manager. Is used to execute graph writer runnables.
*/
protected WriteToGraphCallback saveResultOnGraph;
private WriteToGraphCallback saveResultOnGraph;

/**
* Handle on the task posted during the previous polling execution.
* If the updater posts several tasks during one polling cycle, the handle will point to the
* latest posted task.
* Initially null when the polling updater starts.
*/
private Future<?> previousTask;

/** Shared configuration code for all polling graph updaters. */
protected PollingGraphUpdater(PollingGraphUpdaterParameters config) {
Expand All @@ -55,6 +64,10 @@ public Duration pollingPeriod() {
@Override
public final void run() {
try {
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
waitForPreviousTask();
}

// Run concrete polling graph updater's implementation method.
runPolling();
if (runOnlyOnce()) {
Expand Down Expand Up @@ -113,11 +126,30 @@ public final void setup(WriteToGraphCallback writeToGraphCallback) {
*/
protected abstract void runPolling() throws Exception;

protected final void updateGraph(GraphWriterRunnable task)
throws ExecutionException, InterruptedException {
var result = saveResultOnGraph.execute(task);
if (OTPFeature.WaitForGraphUpdateInPollingUpdaters.isOn()) {
result.get();
/**
* Post an update task to the GraphWriter queue.
* This is non-blocking.
* This can be called several times during one polling cycle.
*/
protected final void updateGraph(GraphWriterRunnable task) {
previousTask = saveResultOnGraph.execute(task);
}

/**
* If the previous task takes longer than the polling interval,
* we delay the next polling cycle until the task is complete.
* This prevents tasks from piling up.
* If the updater sends several tasks during a polling cycle, we wait on the latest posted task.
* */
private void waitForPreviousTask() throws InterruptedException, ExecutionException {
if (previousTask != null && !previousTask.isDone()) {
LOG.info("Delaying polling until the previous task is complete");
long startBlockingWait = System.currentTimeMillis();
previousTask.get();
LOG.info(
"Resuming polling after waiting an additional {}s",
(System.currentTimeMillis() - startBlockingWait) / 1000
);
}
}
}

This file was deleted.

Loading