Skip to content

Commit

Permalink
Display processed rows / total rows in process view
Browse files Browse the repository at this point in the history
This helps check that processes are making progress for large
datasets, where the progress percentage may stay at 0% for a long time.
  • Loading branch information
wetneb committed Jan 30, 2024
1 parent d56475b commit c94d97f
Show file tree
Hide file tree
Showing 21 changed files with 152 additions and 43 deletions.
2 changes: 2 additions & 0 deletions main/webapp/modules/core/langs/translation-en.json
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@
"core-project/paste-json": "Select a file or paste an extracted JSON history of operations to perform:",
"core-project/some-operations-applied-but-error": "$1 {{plural:$1|operation|operations}} ran successfully but an error was encountered in the next one: $2. The following operations were not executed.",
"core-project/percent-complete": "$1% complete",
"core-project/percent-complete-rows": "$1% complete ($2 of $3 rows)",
"core-project/percent-complete-records": "$1% complete ($2 of $3 records)",
"core-project/other-processes": "($1 other pending {{plural:$1|process|processes}})",
"core-project/cancel-all": "{{plural:$1|Cancel|Cancel All}}",
"core-project/canceling": "Canceling…",
Expand Down
13 changes: 12 additions & 1 deletion main/webapp/modules/core/scripts/project/process-panel.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,19 @@ ProcessPanel.prototype._renderPanel = function(newData) {
});
}

var locale = $.i18n().locale;
var progressText = $.i18n('core-project/percent-complete', process.progress);
if (process.processedElements >= 0 && process.totalElements > 0) {
var processedElements = process.processedElements.toLocaleString(locale);
var totalElements = process.totalElements.toLocaleString(locale);
if (process.engineMode === "row-based") {
progressText = $.i18n('core-project/percent-complete-rows', process.progress, processedElements, totalElements);
} else {
progressText = $.i18n('core-project/percent-complete-records', process.progress, processedElements, totalElements);
}
}
li.find('.process-progress-container span')
.text($.i18n('core-project/percent-complete', process.progress));
.text(progressText);
li.find('.pause-button')
.data('paused', process.state === 'paused')
.text(process.state === 'paused' ? $.i18n('core-processes/resume') : $.i18n('core-processes/pause'));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ public boolean hasNext() {
while (!nextLineAttempted && nextLine == null && !endMarkerFound) {
long currentPosition = textPartition.start + (countingIs == null ? 0 : countingIs.getCount());
try {
if (!nextLineAttempted && ((currentPosition <= textPartition.getEnd() || textPartition.getEnd() < 0) || synchronous)) {
if (!nextLineAttempted
&& ((currentPosition <= textPartition.getEnd() || textPartition.getEnd() < 0) || synchronous)) {
if (synchronous) {
lineNumberReader.mark(4096);
// TODO add logic to bump this readAheadLimit (restart from the beginning of the
Expand Down Expand Up @@ -392,8 +393,7 @@ protected static class TextFilePartition implements Partition {
* @param start
* starting byte where to read from in the file
* @param end
* first byte not to be read after the end of the partition,
* or -1 if the entire file should be read
* first byte not to be read after the end of the partition, or -1 if the entire file should be read
*/
protected TextFilePartition(File path, int index, long start, long end) {
this.path = path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ public class TaskSignalling {

private boolean paused = false;
private int progress = 0;
private long elementsToProcess;
private long totalElements;
private long processedElements = 0;
private List<ProgressReporter> progressReporters = new ArrayList<>();

public TaskSignalling(long elementsToProcess) {
this.elementsToProcess = elementsToProcess;
/**
* Constructor.
*
* @param totalElements
* the number of elements expected to be processed by the underlying task (used to compute process percentages)
*/
public TaskSignalling(long totalElements) {
this.totalElements = totalElements;
}

/**
Expand Down Expand Up @@ -53,7 +59,7 @@ public synchronized boolean isPaused() {
*/
public synchronized void registerProgressReporter(ProgressReporter reporter) {
progressReporters.add(reporter);
reporter.reportProgress(progress);
reporter.reportProgress(progress, processedElements, totalElements);
}

/**
Expand All @@ -80,27 +86,26 @@ public synchronized void yield() throws InterruptedException {
* Method to be called by the computing thread after processing a bunch of elements, so that progress is updated.
*/
public synchronized void addProcessedElements(long processedElements) {
if (elementsToProcess < 0 || progress >= 100) {
if (totalElements <= 0 || progress >= 100) {
return;
}
this.processedElements += processedElements;
setProgress((int) ((100 * this.processedElements) / elementsToProcess));
progress = (int) ((100 * this.processedElements) / totalElements);
broadcastProgress();
}

/**
* Mark this task as fully done.
*/
public synchronized void setFullProgress() {
setProgress(100);
processedElements = totalElements;
progress = 100;
broadcastProgress();
}

private void setProgress(int newProgress) {
int oldProgress = progress;
progress = newProgress;
if (progress != oldProgress) {
for (ProgressReporter reporter : progressReporters) {
reporter.reportProgress(progress);
}
private void broadcastProgress() {
for (ProgressReporter reporter : progressReporters) {
reporter.reportProgress(progress, processedElements, totalElements);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ static public ProgressReporter createProgressReporterForProjectSave(final Import
return new ProgressReporter() {

@Override
public void reportProgress(int percentage) {
public void reportProgress(int percentage, long processedRows, long totalRows) {
job.setProgress(50 + percentage / 2, "Saving project in the workspace");
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public <T> ChangeData<T> getChangeData(
grid,
partialChangeData -> completionProcess.apply(grid, partialChangeData),
_changeDescription,
_history, applicationIndex);
_history, applicationIndex, mode);
} catch (OperationException e) {
// unreachable since earliestStepContainingDependencies must return a grid that is already computed
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.Optional;
import java.util.function.Function;

import org.openrefine.browsing.Engine;
import org.openrefine.history.History;
import org.openrefine.model.Grid;
import org.openrefine.process.ProcessManager;
Expand Down Expand Up @@ -87,14 +88,22 @@ public <T> ChangeData<T> retrieve(
* a function taking the existing state of the change data and returning the complete version
* @param description
* a description of the completion process, to be reported to the user
* @param history
* the history of the project
* @param requiredStepIndex
* the index of the step in the history from which this change data should be computed
* @param engineMode
* whether this change data should be computed row- or record-wise
*/
public <T> ChangeData<T> retrieveOrCompute(
ChangeDataId changeDataId,
ChangeDataSerializer<T> serializer,
Grid baseGrid,
Function<Optional<ChangeData<T>>, ChangeData<T>> completionProcess,
String description,
History history, int requiredStepIndex)
History history,
int requiredStepIndex,
Engine.Mode engineMode)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.openrefine.browsing.Engine;
import org.openrefine.history.History;
import org.openrefine.model.Grid;
import org.openrefine.model.Runner;
Expand Down Expand Up @@ -127,7 +128,9 @@ public <T> ChangeData<T> retrieveOrCompute(
Grid baseGrid,
Function<Optional<ChangeData<T>>, ChangeData<T>> completionProcess,
String description,
History history, int requiredStepIndex) throws IOException {
History history,
int requiredStepIndex,
Engine.Mode engineMode) throws IOException {
File changeDataDir = idsToFile(changeDataId, false);
registerId(changeDataId);
File incompleteDir = null;
Expand Down Expand Up @@ -179,7 +182,8 @@ public <T> ChangeData<T> retrieveOrCompute(
completionProcess,
incompleteDir,
history,
requiredStepIndex);
requiredStepIndex,
engineMode);
processManager.queueProcess(process);
_toRefresh.add(changeDataId);
} else if (storedChangeDataIsComplete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import org.openrefine.browsing.Engine;
import org.openrefine.history.History;
import org.openrefine.model.Grid;
import org.openrefine.model.Runner;
Expand Down Expand Up @@ -67,7 +68,9 @@ public <T> ChangeData<T> retrieveOrCompute(
Grid baseGrid,
Function<Optional<ChangeData<T>>, ChangeData<T>> completionProcess,
String description,
History history, int requiredStepIndex) throws IOException {
History history,
int requiredStepIndex,
Engine.Mode engineMode) throws IOException {
if (!_changeData.containsKey(changeDataId)) {
ChangeData<T> computed = completionProcess.apply(Optional.empty());
_changeData.put(changeDataId, computed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.commons.io.FileUtils;

import org.openrefine.browsing.Engine;
import org.openrefine.history.History;
import org.openrefine.model.changes.ChangeData;
import org.openrefine.model.changes.ChangeDataId;
Expand Down Expand Up @@ -53,6 +54,8 @@ public class ChangeDataStoringProcess<T> extends Process {
* the history of the project this computation pertains to
* @param requiredStepIndex
* the step in the history which must be fully computed as a prerequisite for this process to run
* @param engineMode
* whether this change data should be computed row- or record-wise
*/
public ChangeDataStoringProcess(
String description,
Expand All @@ -62,8 +65,9 @@ public ChangeDataStoringProcess(
ChangeDataSerializer<T> serializer, Function<Optional<ChangeData<T>>, ChangeData<T>> completionProcess,
File temporaryDirToDelete,
History history,
int requiredStepIndex) {
super(description);
int requiredStepIndex,
Engine.Mode engineMode) {
super(description, engineMode);
this.storedChangeData = storedChangeData;
this.changeDataId = changeDataId;
this.changeDataStore = changeDataStore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public int getProgress() {

@Override
public void onProgress(ProgressReporter reporter) {
reporter.reportProgress(100);
reporter.reportProgress(100, 0L, 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public int getProgress() {

@Override
public void onProgress(ProgressReporter reporter) {
reporter.reportProgress(0);
reporter.reportProgress(0, 0L, 0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;

import org.openrefine.browsing.Engine;
import org.openrefine.browsing.Engine.Mode;
import org.openrefine.model.changes.ChangeDataId;
import org.openrefine.operations.exceptions.ChangeDataFetchingException;

Expand All @@ -63,15 +65,22 @@ public enum State {
@JsonIgnore
protected int _progress; // out of 100
@JsonIgnore
protected long _processedElements = 0;
@JsonIgnore
protected long _totalElements = 0;
@JsonIgnore
protected Engine.Mode _engineMode;
@JsonIgnore
protected boolean _canceled;
@JsonIgnore
protected Exception _exception;
@JsonIgnore
protected ProgressReporter _reporter;

public Process(String description) {
public Process(String description, Mode engineMode) {
_description = description;
_reporter = new Reporter();
_engineMode = engineMode;
}

@JsonProperty("state")
Expand Down Expand Up @@ -153,6 +162,21 @@ public int getProgress() {
return _progress;
}

@JsonProperty("processedElements")
public long getProcessedElements() {
return _processedElements;
}

@JsonProperty("totalElements")
public long getTotalElements() {
return _totalElements;
}

@JsonProperty("engineMode")
public Mode getEngineMode() {
return _engineMode;
}

/**
* Cancels the process. Note that calling {@link ProcessManager#update()} after this cancellation is required for
* any other process in the queue to start.
Expand Down Expand Up @@ -196,8 +220,10 @@ public long getId() {
protected class Reporter implements ProgressReporter {

@Override
public void reportProgress(int percentage) {
public void reportProgress(int percentage, long processedElements, long totalElements) {
_progress = percentage;
_processedElements = processedElements;
_totalElements = totalElements;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,25 @@
public interface ProgressReporter {

/**
* Reports progress of the current long-running operation.
* Reports progress of the current long-running operation, with additional information about how many rows have been
* processed, over a total number of rows/records to process. Both are set to 0 if the numbers of rows are unknown.
*
* @param percentage
* a number from 0 to 100
* @param processedElements
* the number of rows processed by the operation
* @param totalElements
* the total number of rows to be processed by the operation
*/
public void reportProgress(int percentage);
public void reportProgress(int percentage, long processedElements, long totalElements);

/**
* Convenience method to report progress without known row counts.
*
* @param percentage
* a number from 0 to 100
*/
public default void reportProgress(int percentage) {
reportProgress(percentage, 0L, 0L);
}
}
Loading

0 comments on commit c94d97f

Please sign in to comment.