Skip to content

Commit

Permalink
Merge pull request #2946 from matsim-org/2929-escalate-exceptions-in-…
Browse files Browse the repository at this point in the history
…population-reader

escalate exceptions during parallel population reading (Fix for #2929)
  • Loading branch information
mrieser authored Nov 18, 2023
2 parents 1068c6c + ea64047 commit 381665d
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@
* the file and creates empty person objects which are added to the population to ensure
* that their order is not changed. Note that this approach is not compatible with
* population streaming. When this feature is activated, the non-parallel reader is used.
*
*
* The parallel threads interpret the xml data for each person.
*
*
* @author cdobler
*/
/* deliberately package */ class ParallelPopulationReaderMatsimV4 extends PopulationReaderMatsimV4 {

static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV4.class);

private final boolean isPopulationStreaming;
Expand All @@ -70,6 +70,7 @@
private List<Tag> currentPersonXmlData;

private final CoordinateTransformation coordinateTransformation;
private Throwable exception = null;

public ParallelPopulationReaderMatsimV4(
final Scenario scenario ) {
Expand All @@ -81,14 +82,14 @@ public ParallelPopulationReaderMatsimV4(
final Scenario scenario) {
super( coordinateTransformation , scenario );
this.coordinateTransformation = coordinateTransformation;

/*
* Check whether population streaming is activated
*/
// if (scenario.getPopulation() instanceof Population && ((Population)scenario.getPopulation()).isStreaming()) {
if ( scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation ) {
log.warn("Population streaming is activated - cannot use " + ParallelPopulationReaderMatsimV4.class.getName() + "!");

this.isPopulationStreaming = true;
this.numThreads = 1;
this.queue = null;
Expand All @@ -98,42 +99,70 @@ public ParallelPopulationReaderMatsimV4(
isPopulationStreaming = false;

if (scenario.getConfig().global().getNumberOfThreads() > 0) {
this.numThreads = scenario.getConfig().global().getNumberOfThreads();
this.numThreads = scenario.getConfig().global().getNumberOfThreads();
} else this.numThreads = 1;

this.queue = new LinkedBlockingQueue<>();
this.collectorPopulation = new CollectorPopulation(this.plans);
this.collectorScenario = new CollectorScenario(scenario, collectorPopulation);
}
}

private void initThreads() {
threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {

ParallelPopulationReaderMatsimV4Runner runner =
new ParallelPopulationReaderMatsimV4Runner(
this.coordinateTransformation,
this.collectorScenario,
this.queue);

Thread thread = new Thread(runner);
thread.setDaemon(true);
thread.setName(ParallelPopulationReaderMatsimV4Runner.class.toString() + i);
thread.setUncaughtExceptionHandler(this::catchReaderException);
threads[i] = thread;
thread.start();
}
}


private void stopThreads() {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
List<Tag> list = new ArrayList<>();
list.add(new EndProcessingTag());
this.queue.add(list);
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (this.exception != null) {
throw new RuntimeException(this.exception);
}
}

private void catchReaderException(Thread thread, Throwable throwable) {
log.error("Error parsing XML", throwable);
this.exception = throwable;
}

@Override
public void startTag(String name, Attributes atts, Stack<String> context) {

// if population streaming is activated, use non-parallel reader
if (isPopulationStreaming) {
super.startTag(name, atts, context);
return;
}

if (PLANS.equals(name)) {
log.info("Start parallel population reading...");
initThreads();
Expand All @@ -149,7 +178,7 @@ public void startTag(String name, Attributes atts, Stack<String> context) {
currentPersonXmlData.add(personTag);
this.plans.addPerson(person);
}

// Create a new start tag and add it to the person data.
StartTag tag = new StartTag();
tag.name = name;
Expand All @@ -160,30 +189,15 @@ public void startTag(String name, Attributes atts, Stack<String> context) {

@Override
public void endTag(String name, String content, Stack<String> context) {

// if population streaming is activated, use non-parallel reader
if (isPopulationStreaming) {
super.endTag(name, content, context);
return;
}

if (PLANS.equals(name)) {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
List<Tag> list = new ArrayList<>();
list.add(new EndProcessingTag());
this.queue.add(list);
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

this.stopThreads();
super.endTag(name, content, context);
log.info("Finished parallel population reading...");
} else {
Expand All @@ -193,24 +207,24 @@ public void endTag(String name, String content, Stack<String> context) {
tag.content = content;
tag.context = context;
currentPersonXmlData.add(tag);

// if its a person end tag, add the persons xml data to the queue.
if (PERSON.equals(name)) queue.add(currentPersonXmlData);
}
}

private static class CollectorScenario implements Scenario {
// yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so
// yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so
// what is the difference to using the underlying population directly?

private final Scenario delegate;
private final CollectorPopulation population;

public CollectorScenario(Scenario scenario, CollectorPopulation population) {
this.delegate = scenario;
this.population = population;
}

@Override
public Network getNetwork() {
return this.delegate.getNetwork();
Expand All @@ -220,7 +234,7 @@ public Network getNetwork() {
public Population getPopulation() {
return this.population; // return collector population
}

@Override
public ActivityFacilities getActivityFacilities() {
return this.delegate.getActivityFacilities();
Expand Down Expand Up @@ -266,15 +280,15 @@ public Vehicles getVehicles() {
return this.delegate.getVehicles() ;
}
}

private static class CollectorPopulation implements Population {

private final Population population;

public CollectorPopulation(Population population) {
this.population = population;
}

@Override
public PopulationFactory getFactory() {
return population.getFactory();
Expand Down Expand Up @@ -310,24 +324,24 @@ public org.matsim.utils.objectattributes.attributable.Attributes getAttributes()
throw new RuntimeException("Calls to this method are not expected to happen...");
}
}

public abstract static class Tag {
String name;
Stack<String> context = null; // not used by the PopulationReader
}

public final class StartTag extends Tag {
Attributes atts;
}

public final class PersonTag extends Tag {
Person person;
}

public final class EndTag extends Tag {
String content;
}

/*
* Marker Tag to inform the threads that no further data has to be parsed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

private final BlockingQueue<CompletableFuture<Person>> personInsertionQueue = new LinkedBlockingQueue<>();
private Thread personInsertionThread;
private Throwable exception = null;

public ParallelPopulationReaderMatsimV6(
final String inputCRS,
Expand Down Expand Up @@ -108,6 +109,7 @@ private void initThreads() {
Thread thread = new Thread(runner);
thread.setDaemon(true);
thread.setName(ParallelPopulationReaderMatsimV6Runner.class.toString() + i);
thread.setUncaughtExceptionHandler(this::catchReaderException);
threads[i] = thread;
thread.start();
}
Expand All @@ -118,6 +120,44 @@ private void initThreads() {
}
}

private void stopThreads() {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
this.tagQueue.add(List.of(new EndProcessingTag()));
}

if (isPopulationStreaming) {
CompletableFuture<Person> finishPerson = new CompletableFuture<>();
finishPerson.complete(null);
try {
this.personInsertionQueue.put(finishPerson);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
if(this.isPopulationStreaming) {
this.personInsertionThread.join();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

if (this.exception != null) {
throw new RuntimeException(this.exception);
}
}

private void catchReaderException(Thread thread, Throwable throwable) {
log.error("Error parsing XML", throwable);
this.exception = throwable;
}

@Override
public void startTag(String name, Attributes atts, Stack<String> context) {
//Reached first time a person
Expand All @@ -139,6 +179,11 @@ public void startTag(String name, Attributes atts, Stack<String> context) {

// If it is a new person, create a new person and a list for its attributes.
if (PERSON.equals(name)) {
if (this.exception != null) {
this.stopThreads();
throw new RuntimeException(this.exception);
}

// Just create a person, but do not add it here!
Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(ATTR_PERSON_ID), Person.class));
currentPersonXmlData = new ArrayList<>();
Expand Down Expand Up @@ -187,34 +232,7 @@ public void endTag(String name, String content, Stack<String> context) {

// End of population reached
if (POPULATION.equals(name)) {
// signal the threads that they should end parsing
for (int i = 0; i < this.numThreads; i++) {
this.tagQueue.add(List.of(new EndProcessingTag()));
}
if(isPopulationStreaming)
{
CompletableFuture<Person> finishPerson = new CompletableFuture<>();
finishPerson.complete(null);
try {
this.personInsertionQueue.put(finishPerson);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

// wait for the threads to finish
try {
for (Thread thread : threads) {
thread.join();
}
if(this.isPopulationStreaming)
{
this.personInsertionThread.join();
}

} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.stopThreads();

super.endTag(name, content, context);
log.info("Finished parallel population reading...");
Expand Down
Loading

0 comments on commit 381665d

Please sign in to comment.