diff --git a/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV4.java b/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV4.java index 976d040df4a..1ffbe805141 100644 --- a/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV4.java +++ b/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV4.java @@ -51,13 +51,13 @@ * the file and creates empty person objects which are added to the population to ensure * that their order is not changed. Note that this approach is not compatible with * population streaming. When this feature is activated, the non-parallel reader is used. - * + * * The parallel threads interpret the xml data for each person. - * + * * @author cdobler */ /* deliberately package */ class ParallelPopulationReaderMatsimV4 extends PopulationReaderMatsimV4 { - + static final Logger log = LogManager.getLogger(ParallelPopulationReaderMatsimV4.class); private final boolean isPopulationStreaming; @@ -70,6 +70,7 @@ private List currentPersonXmlData; private final CoordinateTransformation coordinateTransformation; + private Throwable exception = null; public ParallelPopulationReaderMatsimV4( final Scenario scenario ) { @@ -81,14 +82,14 @@ public ParallelPopulationReaderMatsimV4( final Scenario scenario) { super( coordinateTransformation , scenario ); this.coordinateTransformation = coordinateTransformation; - + /* * Check whether population streaming is activated */ // if (scenario.getPopulation() instanceof Population && ((Population)scenario.getPopulation()).isStreaming()) { if ( scenario.getPopulation() instanceof StreamingPopulationReader.StreamingPopulation ) { log.warn("Population streaming is activated - cannot use " + ParallelPopulationReaderMatsimV4.class.getName() + "!"); - + this.isPopulationStreaming = true; this.numThreads = 1; this.queue = null; @@ -98,42 +99,70 @@ public ParallelPopulationReaderMatsimV4( isPopulationStreaming = false; if (scenario.getConfig().global().getNumberOfThreads() > 0) { - this.numThreads = scenario.getConfig().global().getNumberOfThreads(); + this.numThreads = scenario.getConfig().global().getNumberOfThreads(); } else this.numThreads = 1; - + this.queue = new LinkedBlockingQueue<>(); this.collectorPopulation = new CollectorPopulation(this.plans); this.collectorScenario = new CollectorScenario(scenario, collectorPopulation); } } - + private void initThreads() { threads = new Thread[numThreads]; for (int i = 0; i < numThreads; i++) { - + ParallelPopulationReaderMatsimV4Runner runner = new ParallelPopulationReaderMatsimV4Runner( this.coordinateTransformation, this.collectorScenario, this.queue); - + Thread thread = new Thread(runner); thread.setDaemon(true); thread.setName(ParallelPopulationReaderMatsimV4Runner.class.toString() + i); + thread.setUncaughtExceptionHandler(this::catchReaderException); threads[i] = thread; thread.start(); } } - + + private void stopThreads() { + // signal the threads that they should end parsing + for (int i = 0; i < this.numThreads; i++) { + List list = new ArrayList<>(); + list.add(new EndProcessingTag()); + this.queue.add(list); + } + + // wait for the threads to finish + try { + for (Thread thread : threads) { + thread.join(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (this.exception != null) { + throw new RuntimeException(this.exception); + } + } + + private void catchReaderException(Thread thread, Throwable throwable) { + log.error("Error parsing XML", throwable); + this.exception = throwable; + } + @Override public void startTag(String name, Attributes atts, Stack context) { - + // if population streaming is activated, use non-parallel reader if (isPopulationStreaming) { super.startTag(name, atts, context); return; } - + if (PLANS.equals(name)) { log.info("Start parallel population reading..."); initThreads(); @@ -149,7 +178,7 @@ public void startTag(String name, Attributes atts, Stack context) { currentPersonXmlData.add(personTag); this.plans.addPerson(person); } - + // Create a new start tag and add it to the person data. StartTag tag = new StartTag(); tag.name = name; @@ -160,30 +189,15 @@ public void startTag(String name, Attributes atts, Stack context) { @Override public void endTag(String name, String content, Stack context) { - + // if population streaming is activated, use non-parallel reader if (isPopulationStreaming) { super.endTag(name, content, context); return; } - + if (PLANS.equals(name)) { - // signal the threads that they should end parsing - for (int i = 0; i < this.numThreads; i++) { - List list = new ArrayList<>(); - list.add(new EndProcessingTag()); - this.queue.add(list); - } - - // wait for the threads to finish - try { - for (Thread thread : threads) { - thread.join(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - + this.stopThreads(); super.endTag(name, content, context); log.info("Finished parallel population reading..."); } else { @@ -193,24 +207,24 @@ public void endTag(String name, String content, Stack context) { tag.content = content; tag.context = context; currentPersonXmlData.add(tag); - + // if its a person end tag, add the persons xml data to the queue. if (PERSON.equals(name)) queue.add(currentPersonXmlData); } } - + private static class CollectorScenario implements Scenario { - // yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so + // yyyy Why is this necessary at all? Could you please explain your design decisions? The same instance is passed to all threads, so // what is the difference to using the underlying population directly? - + private final Scenario delegate; private final CollectorPopulation population; - + public CollectorScenario(Scenario scenario, CollectorPopulation population) { this.delegate = scenario; this.population = population; } - + @Override public Network getNetwork() { return this.delegate.getNetwork(); @@ -220,7 +234,7 @@ public Network getNetwork() { public Population getPopulation() { return this.population; // return collector population } - + @Override public ActivityFacilities getActivityFacilities() { return this.delegate.getActivityFacilities(); @@ -266,15 +280,15 @@ public Vehicles getVehicles() { return this.delegate.getVehicles() ; } } - + private static class CollectorPopulation implements Population { private final Population population; - + public CollectorPopulation(Population population) { this.population = population; } - + @Override public PopulationFactory getFactory() { return population.getFactory(); @@ -310,24 +324,24 @@ public org.matsim.utils.objectattributes.attributable.Attributes getAttributes() throw new RuntimeException("Calls to this method are not expected to happen..."); } } - + public abstract static class Tag { String name; Stack context = null; // not used by the PopulationReader } - + public final class StartTag extends Tag { Attributes atts; } - + public final class PersonTag extends Tag { Person person; } - + public final class EndTag extends Tag { String content; } - + /* * Marker Tag to inform the threads that no further data has to be parsed. */ diff --git a/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV6.java b/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV6.java index fb0727493b8..648d6537d10 100644 --- a/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV6.java +++ b/matsim/src/main/java/org/matsim/core/population/io/ParallelPopulationReaderMatsimV6.java @@ -64,6 +64,7 @@ private final BlockingQueue> personInsertionQueue = new LinkedBlockingQueue<>(); private Thread personInsertionThread; + private Throwable exception = null; public ParallelPopulationReaderMatsimV6( final String inputCRS, @@ -108,6 +109,7 @@ private void initThreads() { Thread thread = new Thread(runner); thread.setDaemon(true); thread.setName(ParallelPopulationReaderMatsimV6Runner.class.toString() + i); + thread.setUncaughtExceptionHandler(this::catchReaderException); threads[i] = thread; thread.start(); } @@ -118,6 +120,44 @@ private void initThreads() { } } + private void stopThreads() { + // signal the threads that they should end parsing + for (int i = 0; i < this.numThreads; i++) { + this.tagQueue.add(List.of(new EndProcessingTag())); + } + + if (isPopulationStreaming) { + CompletableFuture finishPerson = new CompletableFuture<>(); + finishPerson.complete(null); + try { + this.personInsertionQueue.put(finishPerson); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + // wait for the threads to finish + try { + for (Thread thread : threads) { + thread.join(); + } + if(this.isPopulationStreaming) { + this.personInsertionThread.join(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (this.exception != null) { + throw new RuntimeException(this.exception); + } + } + + private void catchReaderException(Thread thread, Throwable throwable) { + log.error("Error parsing XML", throwable); + this.exception = throwable; + } + @Override public void startTag(String name, Attributes atts, Stack context) { //Reached first time a person @@ -139,6 +179,11 @@ public void startTag(String name, Attributes atts, Stack context) { // If it is a new person, create a new person and a list for its attributes. if (PERSON.equals(name)) { + if (this.exception != null) { + this.stopThreads(); + throw new RuntimeException(this.exception); + } + // Just create a person, but do not add it here! Person person = this.plans.getFactory().createPerson(Id.create(atts.getValue(ATTR_PERSON_ID), Person.class)); currentPersonXmlData = new ArrayList<>(); @@ -187,34 +232,7 @@ public void endTag(String name, String content, Stack context) { // End of population reached if (POPULATION.equals(name)) { - // signal the threads that they should end parsing - for (int i = 0; i < this.numThreads; i++) { - this.tagQueue.add(List.of(new EndProcessingTag())); - } - if(isPopulationStreaming) - { - CompletableFuture finishPerson = new CompletableFuture<>(); - finishPerson.complete(null); - try { - this.personInsertionQueue.put(finishPerson); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - // wait for the threads to finish - try { - for (Thread thread : threads) { - thread.join(); - } - if(this.isPopulationStreaming) - { - this.personInsertionThread.join(); - } - - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + this.stopThreads(); super.endTag(name, content, context); log.info("Finished parallel population reading..."); diff --git a/matsim/src/test/java/org/matsim/core/population/io/ParallelPopulationReaderTest.java b/matsim/src/test/java/org/matsim/core/population/io/ParallelPopulationReaderTest.java new file mode 100644 index 00000000000..ba5efe96823 --- /dev/null +++ b/matsim/src/test/java/org/matsim/core/population/io/ParallelPopulationReaderTest.java @@ -0,0 +1,74 @@ +package org.matsim.core.population.io; + +import org.junit.Assert; +import org.junit.Test; +import org.matsim.api.core.v01.Scenario; +import org.matsim.core.config.ConfigUtils; +import org.matsim.core.scenario.ScenarioUtils; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class ParallelPopulationReaderTest { + + @Test + public void testParallelPopulationReaderV4_escalateException() { + String xml = """ + + + + + + + + + + + + + + """; + + InputStream stream = new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + + Scenario scenario = ScenarioUtils.createScenario(ConfigUtils.createConfig()); + try { + new ParallelPopulationReaderMatsimV4(scenario).readStream(stream); + Assert.fail("Expected exception"); + } catch (Exception expected) { + expected.printStackTrace(); + } + } + + @Test + public void testParallelPopulationReaderV6_escalateException() { + String xml = """ + + + + + + + + + + + + + + + """; + + InputStream stream = new ByteArrayInputStream(xml.getBytes(StandardCharsets.UTF_8)); + + Scenario scenario = ScenarioUtils.createScenario(ConfigUtils.createConfig()); + try { + new ParallelPopulationReaderMatsimV6(null, null, scenario).readStream(stream); + Assert.fail("Expected exception"); + } catch (Exception expected) { + expected.printStackTrace(); + } + } + +}