Skip to content

Commit

Permalink
escalate exceptions during parallel population reading
Browse files Browse the repository at this point in the history
before, the reading of the population just continued, even if one of the reader-threads had an exception.
Now, the exception from a reader-thread is propagated to the main-thread.
  • Loading branch information
mrieser committed Nov 18, 2023
1 parent 1068c6c commit ea64047
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
* the file and creates empty person objects which are added to the population to ensure
* that their order is not changed. Note that this approach is not compatible with
* population streaming. When this feature is activated, the non-parallel reader is used.
*
*
* The parallel threads interpret the xml data for each person.
*
*
* @author cdobler
*/
/* deliberately package */ class ParallelPopulationReaderMatsimV4 extends PopulationReaderMatsimV4 {

static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV4.class);

private final boolean isPopulationStreaming;
Expand All @@ -70,6 +70,7 @@
private List<Tag> currentPersonXmlData;

private final CoordinateTransformation coordinateTransformation;
private Throwable exception = null;

public ParallelPopulationReaderMatsimV4(
final Scenario scenario ) {
Expand All @@ -81,14 +82,14 @@ public ParallelPopulationReaderMatsimV4(
final Scenario scenario) {
super( coordinateTransformation , scenario );
this.coordinateTransformation = coordinateTransformation;

/*
* Check whether population streaming is activated
*/
// if (scenario.getPopulation() instanceof Population && ((Population)scenario.getPopulation()).isStreaming()) {
if ( scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation ) {
log.warn("Population streaming is activated - cannot use " + ParallelPopulationReaderMatsimV4.class.getName() + "!");

this.isPopulationStreaming = true;
this.numThreads = 1;
this.queue = null;
Expand All @@ -98,42 +99,70 @@ public ParallelPopulationReaderMatsimV4(
isPopulationStreaming = false;

if (scenario.getConfig().global().getNumberOfThreads() > 0) {
this.numThreads = scenario.getConfig().global().getNumberOfThreads();
this.numThreads = scenario.getConfig().global().getNumberOfThreads();
} else this.numThreads = 1;

this.queue = new LinkedBlockingQueue<>();
this.collectorPopulation = new CollectorPopulation(this.plans);
this.collectorScenario = new CollectorScenario(scenario, collectorPopulation);
}
}

private void initThreads() {
threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {

ParallelPopulationReaderMatsimV4Runner runner =
new ParallelPopulationReaderMatsimV4Runner(
this.coordinateTransformation,
this.collectorScenario,
this.queue);

Thread thread = new Thread(runner);
thread.setDaemon(true);
thread.setName(ParallelPopulationReaderMatsimV4Runner.class.toString() + i);
thread.setUncaughtExceptionHandler(this::catchReaderException);
threads[i] = thread;
thread.start();
}
}


private void stopThreads() {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
List<Tag> list = new ArrayList<>();
list.add(new EndProcessingTag());
this.queue.add(list);
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (this.exception != null) {
throw new RuntimeException(this.exception);
}
}

private void catchReaderException(Thread thread, Throwable throwable) {
log.error("Error parsing XML", throwable);
this.exception = throwable;
}

@Override
public void startTag(String name, Attributes atts, Stack<String> context) {

// if population streaming is activated, use non-parallel reader
if (isPopulationStreaming) {
super.startTag(name, atts, context);
return;
}

if (PLANS.equals(name)) {
log.info("Start parallel population reading...");
initThreads();
Expand All @@ -149,7 +178,7 @@ public void startTag(String name, Attributes atts, Stack<String> context) {
currentPersonXmlData.add(personTag);
this.plans.addPerson(person);
}

// Create a new start tag and add it to the person data.
StartTag tag = new StartTag();
tag.name = name;
Expand All @@ -160,30 +189,15 @@ public void startTag(String name, Attributes atts, Stack<String> context) {

@Override
public void endTag(String name, String content, Stack<String> context) {

// if population streaming is activated, use non-parallel reader
if (isPopulationStreaming) {
super.endTag(name, content, context);
return;
}

if (PLANS.equals(name)) {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
List<Tag> list = new ArrayList<>();
list.add(new EndProcessingTag());
this.queue.add(list);
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

this.stopThreads();
super.endTag(name, content, context);
log.info("Finished parallel population reading...");
} else {
Expand All @@ -193,24 +207,24 @@ public void endTag(String name, String content, Stack<String> context) {
tag.content = content;
tag.context = context;
currentPersonXmlData.add(tag);

// if its a person end tag, add the persons xml data to the queue.
if (PERSON.equals(name)) queue.add(currentPersonXmlData);
}
}

private static class CollectorScenario implements Scenario {
// yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so
// yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so
// what is the difference to using the underlying population directly?

private final Scenario delegate;
private final CollectorPopulation population;

public CollectorScenario(Scenario scenario, CollectorPopulation population) {
this.delegate = scenario;
this.population = population;
}

@Override
public Network getNetwork() {
return this.delegate.getNetwork();
Expand All @@ -220,7 +234,7 @@ public Network getNetwork() {
public Population getPopulation() {
return this.population; // return collector population
}

@Override
public ActivityFacilities getActivityFacilities() {
return this.delegate.getActivityFacilities();
Expand Down Expand Up @@ -266,15 +280,15 @@ public Vehicles getVehicles() {
return this.delegate.getVehicles() ;
}
}

private static class CollectorPopulation implements Population {

private final Population population;

public CollectorPopulation(Population population) {
this.population = population;
}

@Override
public PopulationFactory getFactory() {
return population.getFactory();
Expand Down Expand Up @@ -310,24 +324,24 @@ public org.matsim.utils.objectattributes.attributable.Attributes getAttributes()
throw new RuntimeException("Calls to this method are not expected to happen...");
}
}

public abstract static class Tag {
String name;
Stack<String> context = null; // not used by the PopulationReader
}

public final class StartTag extends Tag {
Attributes atts;
}

public final class PersonTag extends Tag {
Person person;
}

public final class EndTag extends Tag {
String content;
}

/*
* Marker Tag to inform the threads that no further data has to be parsed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

private final BlockingQueue<CompletableFuture<Person>> personInsertionQueue = new LinkedBlockingQueue<>();
private Thread personInsertionThread;
private Throwable exception = null;

public ParallelPopulationReaderMatsimV6(
final String inputCRS,
Expand Down Expand Up @@ -108,6 +109,7 @@ private void initThreads() {
Thread thread = new Thread(runner);
thread.setDaemon(true);
thread.setName(ParallelPopulationReaderMatsimV6Runner.class.toString() + i);
thread.setUncaughtExceptionHandler(this::catchReaderException);
threads[i] = thread;
thread.start();
}
Expand All @@ -118,6 +120,44 @@ private void initThreads() {
}
}

private void stopThreads() {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
this.tagQueue.add(List.of(new EndProcessingTag()));
}

if (isPopulationStreaming) {
CompletableFuture<Person> finishPerson = new CompletableFuture<>();
finishPerson.complete(null);
try {
this.personInsertionQueue.put(finishPerson);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
if(this.isPopulationStreaming) {
this.personInsertionThread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (this.exception != null) {
throw new RuntimeException(this.exception);
}
}

private void catchReaderException(Thread thread, Throwable throwable) {
log.error("Error parsing XML", throwable);
this.exception = throwable;
}

@Override
public void startTag(String name, Attributes atts, Stack<String> context) {
//Reached first time a person
Expand All @@ -139,6 +179,11 @@ public void startTag(String name, Attributes atts, Stack<String> context) {

// If it is a new person, create a new person and a list for its attributes.
if (PERSON.equals(name)) {
if (this.exception != null) {
this.stopThreads();
throw new RuntimeException(this.exception);
}

// Just create a person, but do not add it here!
Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(ATTR_PERSON_ID), Person.class));
currentPersonXmlData = new ArrayList<>();
Expand Down Expand Up @@ -187,34 +232,7 @@ public void endTag(String name, String content, Stack<String> context) {

// End of population reached
if (POPULATION.equals(name)) {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
this.tagQueue.add(List.of(new EndProcessingTag()));
}
if(isPopulationStreaming)
{
CompletableFuture<Person> finishPerson = new CompletableFuture<>();
finishPerson.complete(null);
try {
this.personInsertionQueue.put(finishPerson);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
if(this.isPopulationStreaming)
{
this.personInsertionThread.join();
}

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.stopThreads();

super.endTag(name, content, context);
log.info("Finished parallel population reading...");
Expand Down
Loading

0 comments on commit ea64047

Please sign in to comment.