From b3c48b66a362800217967f746017d5b2b0f64523 Mon Sep 17 00:00:00 2001 From: Even Solbraa <41290109+EvenSol@users.noreply.github.com> Date: Wed, 1 Jan 2025 20:45:32 +0100 Subject: [PATCH] fixed thread run (#1227) * fixed thread run * update models --- src/main/java/log4j2.properties | 2 +- .../equipment/valve/ThrottlingValve.java | 12 +- .../process/processmodel/ProcessModel.java | 89 ++++---- .../process/processmodel/ProcessSystem.java | 16 +- .../processmodel/LargeCombinedModelsTest.java | 206 ++++++++++++------ 5 files changed, 206 insertions(+), 119 deletions(-) diff --git a/src/main/java/log4j2.properties b/src/main/java/log4j2.properties index d3e118c3f5..d81ec3aae7 100644 --- a/src/main/java/log4j2.properties +++ b/src/main/java/log4j2.properties @@ -12,5 +12,5 @@ appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n #appender.console.filter.threshold.level = DEBUG # Root Logger -rootLogger.level = OFF +rootLogger.level = DEBUG rootLogger.appenderRef.stdout.ref = STDOUT \ No newline at end of file diff --git a/src/main/java/neqsim/process/equipment/valve/ThrottlingValve.java b/src/main/java/neqsim/process/equipment/valve/ThrottlingValve.java index be1c01978d..79152a3984 100644 --- a/src/main/java/neqsim/process/equipment/valve/ThrottlingValve.java +++ b/src/main/java/neqsim/process/equipment/valve/ThrottlingValve.java @@ -160,7 +160,7 @@ && getOutletPressure() == getOutletStream().getPressure()) { * @param percentValveOpening Percentage valve opening (0 to 100). * @return Adjusted flow coefficient (Cv) in US gallons per minute (USG/min). */ - private static double adjustCv(double Cv, double percentValveOpening) { + private double adjustCv(double Cv, double percentValveOpening) { return Cv * (percentValveOpening / 100); } @@ -176,7 +176,7 @@ private static double adjustCv(double Cv, double percentValveOpening) { * @param percentValveOpening Percentage valve opening (0 to 100). * @return Mass flow rate in kilograms per hour (kg/h). */ - public static double liquidValveMassFlow(double P1, double P2, double rho, double Cv, double Fp, + public double liquidValveMassFlow(double P1, double P2, double rho, double Cv, double Fp, double percentValveOpening) { // Equation unit conversion constant final double N1 = 0.0865; @@ -211,8 +211,8 @@ public static double liquidValveMassFlow(double P1, double P2, double rho, doubl * @param Fp The piping geometry factor (dimensionless). * @return The percent valve opening. */ - public static double calcPercentValveOpeningLiquid(double massFlowRate, double P1, double P2, - double rho, double Cv, double Fp) { + public double calcPercentValveOpeningLiquid(double massFlowRate, double P1, double P2, double rho, + double Cv, double Fp) { // Equation unit conversion constant final double N1 = 0.0865; @@ -244,7 +244,7 @@ public static double calcPercentValveOpeningLiquid(double massFlowRate, double P * @param percentValveOpening Percentage valve opening (0 to 100). * @return Downstream pressure in bar. */ - public static double liquidValvePout(double P1, double m, double rho, double Cv, double Fp, + public double liquidValvePout(double P1, double m, double rho, double Cv, double Fp, double percentValveOpening) { // Equation unit conversion constant final double N1 = 0.0865; @@ -281,7 +281,7 @@ public static double liquidValvePout(double P1, double m, double rho, double Cv, * @param percentValveOpening Percentage valve opening (0 to 100). * @return Flow coefficient (Cv) in US gallons per minute (USG/min). */ - public static double liquidValveCv(double P1, double P2, double rho, double m, double Fp, + public double liquidValveCv(double P1, double P2, double rho, double m, double Fp, double percentValveOpening) { // Equation unit conversion constant final double N1 = 0.0865; diff --git a/src/main/java/neqsim/process/processmodel/ProcessModel.java b/src/main/java/neqsim/process/processmodel/ProcessModel.java index 6fbf97a60c..6bf05b1345 100644 --- a/src/main/java/neqsim/process/processmodel/ProcessModel.java +++ b/src/main/java/neqsim/process/processmodel/ProcessModel.java @@ -12,21 +12,18 @@ * * Manages a collection of processes that can be run in steps or continuously. * - * @author Even Solbraa - * @version $Id: $Id + * */ public class ProcessModel implements Runnable { + static Logger logger = LogManager.getLogger(ProcessModel.class); private Map processes = new LinkedHashMap<>(); private boolean runStep = false; private int maxIterations = 50; - private int iterations = 0; /** * Checks if the model is running in step mode. - * - * @return true if running in step mode, false otherwise. */ public boolean isRunStep() { return runStep; @@ -34,8 +31,6 @@ public boolean isRunStep() { /** * Sets the step mode for the process. - * - * @param runStep true to enable step mode, false to disable. */ public void setRunStep(boolean runStep) { this.runStep = runStep; @@ -43,10 +38,6 @@ public void setRunStep(boolean runStep) { /** * Adds a process to the model. - * - * @param name the name of the process. - * @param process the process to add. - * @return true if the process was added successfully. */ public boolean add(String name, ProcessSystem process) { if (name == null || name.isEmpty()) { @@ -65,9 +56,6 @@ public boolean add(String name, ProcessSystem process) { /** * Retrieves a process by its name. - * - * @param name the name of the process. - * @return the corresponding process, or null if not found. */ public ProcessSystem get(String name) { return processes.get(name); @@ -75,45 +63,65 @@ public ProcessSystem get(String name) { /** * Removes a process by its name. - * - * @param name the name of the process to remove. - * @return true if the process was removed, false otherwise. */ public boolean remove(String name) { return processes.remove(name) != null; } /** - * Executes all processes, either continuously or in steps based on mode. + * The core run method. + * + * - If runStep == true, each process is run in "step" mode exactly once. - Otherwise (continuous + * mode), it loops up to maxIterations or until all processes are finished (isFinished() == true). */ @Override public void run() { - for (ProcessSystem process : processes.values()) { - try { - if (runStep) { + if (runStep) { + // Step mode: just run each process once in step mode + for (ProcessSystem process : processes.values()) { + try { + if (Thread.currentThread().isInterrupted()) { + logger.debug("Thread was interrupted, exiting run()..."); + return; + } process.run_step(); - } else { - process.run(); + } catch (Exception e) { + System.err.println("Error running process step: " + e.getMessage()); + e.printStackTrace(); } - } catch (Exception e) { - System.err.println("Error running process: " + e.getMessage()); - e.printStackTrace(); } - } - if (!runStep) { - if (!isFinished() && iterations < maxIterations) { - iterations += 1; - run(); - } else { - iterations = 0; + } else { + int iterations = 0; + while (!Thread.currentThread().isInterrupted() && !isFinished() + && iterations < maxIterations) { + for (ProcessSystem process : processes.values()) { + if (Thread.currentThread().isInterrupted()) { + logger.debug("Thread was interrupted, exiting run()..."); + return; + } + try { + process.run(); // the process's continuous run + } catch (Exception e) { + System.err.println("Error running process: " + e.getMessage()); + e.printStackTrace(); + } + } + iterations++; } } } + /** + * Starts this model in a new thread and returns that thread. + */ + public Thread runAsThread() { + Thread processThread = new Thread(this); + processThread.start(); + return processThread; + } + /** * Checks if all processes are finished. - * - * @return true if all processes are solved, false otherwise. */ public boolean isFinished() { for (ProcessSystem process : processes.values()) { @@ -125,11 +133,15 @@ public boolean isFinished() { } /** - * Executes all processes in a single step. + * Runs all processes in a single step (used outside of the thread model). */ public void runStep() { for (ProcessSystem process : processes.values()) { try { + if (Thread.currentThread().isInterrupted()) { + logger.debug("Thread was interrupted, exiting run()..."); + return; + } process.run_step(); } catch (Exception e) { System.err.println("Error in runStep: " + e.getMessage()); @@ -139,20 +151,19 @@ public void runStep() { } /** - * Runs the model as a separate thread. + * (Optional) Creates separate threads for each process (if you need them). */ public Map getThreads() { Map threads = new LinkedHashMap<>(); try { for (ProcessSystem process : processes.values()) { Thread thread = new Thread(process); + thread.setName(process.getName() + " thread"); threads.put(process.getName(), thread); } - } catch (Exception ex) { logger.debug(ex.getMessage(), ex); } return threads; } - } diff --git a/src/main/java/neqsim/process/processmodel/ProcessSystem.java b/src/main/java/neqsim/process/processmodel/ProcessSystem.java index b20ccc5db0..159342a1b9 100644 --- a/src/main/java/neqsim/process/processmodel/ProcessSystem.java +++ b/src/main/java/neqsim/process/processmodel/ProcessSystem.java @@ -425,6 +425,10 @@ public void run(UUID id) { iter++; isConverged = true; for (int i = 0; i < unitOperations.size(); i++) { + if (Thread.currentThread().isInterrupted()) { + logger.debug("Process simulation was interrupted, exiting run()..." + getName()); + break; + } if (!unitOperations.get(i).getClass().getSimpleName().equals("Recycle")) { try { if (iter == 1 || unitOperations.get(i).needRecalculation()) { @@ -432,7 +436,9 @@ public void run(UUID id) { } } catch (Exception ex) { // String error = ex.getMessage(); - logger.error(ex.getMessage(), ex); + logger.error("error running unit uperation " + unitOperations.get(i).getName() + " " + + ex.getMessage(), ex); + ex.printStackTrace(); } } if (unitOperations.get(i).getClass().getSimpleName().equals("Recycle") @@ -476,7 +482,8 @@ public void run(UUID id) { * signalDB[timeStepNumber][3 * i + 3] = ((MeasurementDeviceInterface) * measurementDevices.get(i)) .getUnit(); } */ - } while (((!isConverged || (iter < 2 && hasRecycle)) && iter < 100) && !runStep); + } while (((!isConverged || (iter < 2 && hasRecycle)) && iter < 100) && !runStep + && !Thread.currentThread().isInterrupted()); for (int i = 0; i < unitOperations.size(); i++) { unitOperations.get(i).setCalculationIdentifier(id); @@ -490,7 +497,10 @@ public void run(UUID id) { public void run_step(UUID id) { for (int i = 0; i < unitOperations.size(); i++) { try { - // if (unitOperations.get(i).needRecalculation()) { + if (Thread.currentThread().isInterrupted()) { + logger.debug("Process simulation was interrupted, exiting run()..." + getName()); + break; + } unitOperations.get(i).run(id); // } } catch (Exception ex) { diff --git a/src/test/java/neqsim/process/processmodel/LargeCombinedModelsTest.java b/src/test/java/neqsim/process/processmodel/LargeCombinedModelsTest.java index 192bf846a2..f669ded363 100644 --- a/src/test/java/neqsim/process/processmodel/LargeCombinedModelsTest.java +++ b/src/test/java/neqsim/process/processmodel/LargeCombinedModelsTest.java @@ -863,6 +863,7 @@ public ProcessModel getCombinedModel() { (Mixer) separationTrainA.getUnit("second Stage mixer"), (Mixer) separationTrainA.getUnit("first stage mixer"), (Mixer) separationTrainA.getUnit("MP liq gas mixer")); + ProcessSystem expanderProcessB = createExpanderProcessModel((Separator) separationTrainB.getUnit("dew point scrubber 2"), (ThreePhaseSeparator) separationTrainB.getUnit("4th stage separator"), @@ -1056,12 +1057,12 @@ public void testCombinedProcess() { .getOilOutStream().getFlowRate("kg/hr"), 100.1); - Assertions.assertEquals(8.3102628472, + Assertions.assertEquals(7.96422158134, ((ThreePhaseSeparator) fullProcess.get("separation train B").getUnit("1st stage separator")) .getGasOutStream().getFlowRate("MSm3/day"), 0.1); - Assertions.assertEquals(6.933692881, + Assertions.assertEquals(8.0048990113, ((ThrottlingValve) fullProcess.get("expander process A").getUnit("gas split valve")) .getOutletStream().getFlowRate("MSm3/day"), 0.1); @@ -1131,99 +1132,164 @@ public void testCombinedProcess() { } - public void testCombinedProcessAsThread() { - // Create and configure the full process model + // @Test + public void testCombinedProcessAsThread2() { ProcessModel fullProcess = getCombinedModel(); - - // Set properties for a specific process - ((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) - .setFlowRate(30.0, "MSm3/day"); - - // Retrieve threads for the processes - Map threads = fullProcess.getThreads(); + Thread processThread = fullProcess.runAsThread(); try { - for (Thread thread : threads.values()) { - try { - thread.start(); // Start the thread - thread.join(30000); // Wait for up to 30 seconds for this thread to complete - } catch (InterruptedException e) { - logger.debug("Thread interrupted: " + thread.getName(), e); - Thread.currentThread().interrupt(); // Restore interrupted status - } catch (Exception ex) { - logger.debug("Error with thread: " + thread.getName() + " - " + ex.getMessage(), ex); - } + processThread.join(100000); + if (processThread.isAlive()) { + processThread.interrupt(); + processThread.join(); } - } catch (Exception ex) { - logger.debug("Unexpected error in thread handling: " + ex.getMessage(), ex); + } catch (InterruptedException e) { + logger.debug("Thread was interrupted: " + e.getMessage()); + } catch (Exception e) { + logger.debug("Thread interrupted: " + e.getMessage()); } - // Perform assertions to validate the results - double inputFlowRate = + Assertions.assertEquals(0.0, ((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) - .getFlowRate("kg/hr"); - - double outputFlowRate = - ((Stream) fullProcess.get("expander process A").getUnit("export oil")).getFlowRate("kg/hr") - + ((Stream) fullProcess.get("expander process B").getUnit("export oil")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) .getFlowRate("kg/hr") - + ((Filter) fullProcess.get("compressor process A").getUnit("gas split valve")) + - ((Stream) fullProcess.get("expander process A").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Stream) fullProcess.get("expander process B").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Filter) fullProcess.get("compressor process A").getUnit("gas split valve")) .getOutletStream().getFlowRate("kg/hr") - + ((Filter) fullProcess.get("compressor process B").getUnit("gas split valve")) - .getOutletStream().getFlowRate("kg/hr"); + - ((Filter) fullProcess.get("compressor process B").getUnit("gas split valve")) + .getOutletStream().getFlowRate("kg/hr"), + (((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) + .getFlowRate("kg/hr")) + / 100.0); - double tolerance = inputFlowRate / 1000.0; + Assertions.assertEquals(11.5063800821, + ((Splitter) fullProcess.get("compressor process B").getUnit("TEE-104")).getSplitStream(0) + .getFlowRate("MSm3/day"), + 0.3); - Assertions.assertEquals(0, inputFlowRate - outputFlowRate, tolerance, - "Mismatch between input and output flow rates"); + Assertions.assertEquals(54.88159, + ((Compressor) fullProcess.get("compressor process B").getUnit("KA27841")).getOutletStream() + .getPressure("bara"), + 1.5); - Assertions.assertEquals(5.54017523150, - ((ThreePhaseSeparator) fullProcess.get("separation train A").getUnit("1st stage separator")) - .getGasOutStream().getFlowRate("MSm3/day"), - 0.1); + fullProcess.setRunStep(true); + Thread processThreadRunStep = fullProcess.runAsThread(); + try { + processThreadRunStep.join(100000); + if (processThreadRunStep.isAlive()) { + processThreadRunStep.interrupt(); + processThreadRunStep.join(); + } + } catch (InterruptedException e) { + logger.debug("Thread was interrupted: " + e.getMessage()); - Assertions.assertEquals(1742523.539419, - ((ThreePhaseSeparator) fullProcess.get("separation train A").getUnit("1st stage separator")) - .getOilOutStream().getFlowRate("kg/hr"), - 0.1); + } catch (Exception e) { + logger.debug("Thread interrupted: " + e.getMessage()); + } - Assertions.assertEquals(8.3102628472, - ((ThreePhaseSeparator) fullProcess.get("separation train B").getUnit("1st stage separator")) - .getGasOutStream().getFlowRate("MSm3/day"), - 0.1); - Assertions.assertEquals(6.933692881, - ((ThrottlingValve) fullProcess.get("expander process A").getUnit("gas split valve")) - .getOutletStream().getFlowRate("MSm3/day"), - 0.1); + // ProcessSystem expProcessA = fullProcess.get("expander process A"); + // Thread expProcessThreadA = expProcessA.runAsThread(); - Assertions.assertEquals(10.400539742809, - ((ThrottlingValve) fullProcess.get("expander process B").getUnit("gas split valve")) - .getOutletStream().getFlowRate("MSm3/day"), - 0.1); + ProcessSystem expProcessB = fullProcess.get("expander process B"); + Thread expProcessThreadB = expProcessB.runAsThread(); - Assertions.assertEquals(6.9336928813, - ((Splitter) fullProcess.get("compressor process A").getUnit("TEE-104")).getSplitStream(0) - .getFlowRate("MSm3/day"), - 0.1); + try { + // expProcessThreadA.join(100000); + // expProcessThreadB.join(100000); + + // if (expProcessThreadA.isAlive()) { + // expProcessThreadA.interrupt(); + // expProcessThreadA.join(); + // } + if (expProcessThreadB.isAlive()) { + expProcessThreadB.interrupt(); + expProcessThreadB.join(); + } + } catch (Exception e) { + logger.debug("Thread interrupted: " + e.getMessage()); + } - Assertions.assertEquals(180.168498506, - ((Compressor) fullProcess.get("compressor process A").getUnit("KA27841")).getOutletStream() - .getPressure("bara"), - 0.5); + Assertions.assertEquals(0.0, + ((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) + .getFlowRate("kg/hr") + - ((Stream) fullProcess.get("expander process A").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Stream) fullProcess.get("expander process B").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Filter) fullProcess.get("compressor process A").getUnit("gas split valve")) + .getOutletStream().getFlowRate("kg/hr") + - ((Filter) fullProcess.get("compressor process B").getUnit("gas split valve")) + .getOutletStream().getFlowRate("kg/hr"), + (((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) + .getFlowRate("kg/hr")) + / 100.0); + } - Assertions.assertEquals(10.4004397428, + // @Test + public void testCombinedProcessAsThread() { + ProcessModel fullProcess = getCombinedModel(); + fullProcess.runStep(); + fullProcess.runStep(); + fullProcess.runStep(); + fullProcess.runStep(); + + Map threads = fullProcess.getThreads(); + + for (Thread thread : threads.values()) { + try { + logger.debug("start thread " + thread.getName()); + thread.start(); + thread.join(30000); + if (thread.isAlive()) { + thread.interrupt(); + thread.join(); + logger.debug("stopped thread " + thread.getName()); + } + logger.debug("ended thread " + thread.getName()); + } catch (Exception e) { + logger.debug("Thread interrupted: " + thread.getName(), e); + } + } + + Assertions.assertEquals(0.0, + ((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) + .getFlowRate("kg/hr") + - ((Stream) fullProcess.get("expander process A").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Stream) fullProcess.get("expander process B").getUnit("export oil")) + .getFlowRate("kg/hr") + - ((Filter) fullProcess.get("compressor process A").getUnit("gas split valve")) + .getOutletStream().getFlowRate("kg/hr") + - ((Filter) fullProcess.get("compressor process B").getUnit("gas split valve")) + .getOutletStream().getFlowRate("kg/hr"), + (((Stream) fullProcess.get("well and manifold process").getUnit("HP well stream")) + .getFlowRate("kg/hr") + + ((Stream) fullProcess.get("well and manifold process").getUnit("LP well stream")) + .getFlowRate("kg/hr")) + / 100.0); + + Assertions.assertEquals(11.773116810, ((Splitter) fullProcess.get("compressor process B").getUnit("TEE-104")).getSplitStream(0) .getFlowRate("MSm3/day"), 0.1); - Assertions.assertEquals(182.39393222, + Assertions.assertEquals(48.43514225, ((Compressor) fullProcess.get("compressor process B").getUnit("KA27841")).getOutletStream() .getPressure("bara"), - 0.15); - - ((Stream) (fullProcess.get("well and manifold process")).getUnit("HP well stream")) - .setFlowRate(35.0, "MSm3/day"); + 0.5); }