From e4a68f54602a2e87db027b2f9d3afdbda130f953 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Tue, 22 Oct 2024 23:07:49 -0700 Subject: [PATCH 01/10] first test of aggregation --- .../core/reaktoro_block_builder.py | 48 ++-- src/reaktoro_pse/core/reaktoro_gray_box.py | 31 ++- src/reaktoro_pse/core/reaktoro_solver.py | 4 + .../parallel_tools/reaktoro_block_manager.py | 208 ++++++++++++++++++ .../parallel_tools/tests/test_manager.py | 119 ++++++++++ src/reaktoro_pse/reaktoro_block.py | 27 ++- .../reaktoro_block_config/jacobian_options.py | 2 +- 7 files changed, 416 insertions(+), 23 deletions(-) create mode 100644 src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py create mode 100644 src/reaktoro_pse/parallel_tools/tests/test_manager.py diff --git a/src/reaktoro_pse/core/reaktoro_block_builder.py b/src/reaktoro_pse/core/reaktoro_block_builder.py index e9dfa1c..c1c70c8 100644 --- a/src/reaktoro_pse/core/reaktoro_block_builder.py +++ b/src/reaktoro_pse/core/reaktoro_block_builder.py @@ -54,14 +54,25 @@ def __init__(self, block, reaktoro_solver, build_on_init=True): if isinstance(self.solver, ReaktoroSolver) == False: raise TypeError("Reaktoro block builder requires a ReaktoroSolver class") self.configure_jacobian_scaling() + self.reaktoro_initialize_function = None # used to provide external solve call if build_on_init: # option to support legacy implementation self.build_reaktoro_block() + self.build_output_vars() - def build_reaktoro_block(self): + def build_reaktoro_block( + self, gray_box_model=None, reaktoro_initialize_function=None + ): """build reaktoro model""" - external_model = ReaktoroGrayBox() - external_model.configure(self.solver) - self.block.reaktoro_model = ExternalGreyBoxBlock(external_model=external_model) + if gray_box_model is None: + external_model = ReaktoroGrayBox() + external_model.configure(self.solver) + self.block.reaktoro_model = ExternalGreyBoxBlock( + external_model=external_model + ) + else: + self.block.reaktoro_model = gray_box_model + if reaktoro_initialize_function is None: + self.reaktoro_initialize_function = reaktoro_initialize_function self.build_input_constraints() self.build_output_constraints() @@ -157,6 +168,19 @@ def input_constraints(fs, key): ].get_pyomo_with_required_units() ) + def build_output_vars(self): + new_output_vars = {} + for key, obj in self.solver.output_specs.user_outputs.items(): + # NOTE: We do not set rkt_outputs to reaktoro_model outputs as they + # same as user inputs - we want RKt model to update "user provided vars" + # rather then pyomo vars in reaktoro model (e.g. reaktor_block.outputs) + if obj.get_pyomo_var() is None: + new_output_vars[key] = obj + if new_output_vars != {}: + self.block.outputs = Var(new_output_vars.keys(), initialize=1) + for key, obj in new_output_vars.items(): + obj.set_pyomo_var(self.block.outputs[key]) + def build_output_constraints(self): """first update rktOuptutObjects for pyomoBuildProperties with reaktoro pyomo variables as they will be used in construction of constraints @@ -173,15 +197,6 @@ def build_output_constraints(self): pyoPropObj.set_pyomo_var( self.block.reaktoro_model.outputs[pyoPropKey] ) - # NOTE: We do not set rkt_outputs to reaktoro_model outputs as they - # same as user inputs - we want RKt model to update "user provided vars" - # rather then pyomo vars in reaktoro model (e.g. reaktor_block.outputs) - if obj.get_pyomo_var() is None: - new_output_vars[key] = obj - if new_output_vars != {}: - self.block.outputs = Var(new_output_vars.keys(), initialize=1) - for key, obj in new_output_vars.items(): - obj.set_pyomo_var(self.block.outputs[key]) @self.block.Constraint(self.solver.output_specs.user_outputs) def output_constraints(fs, prop, prop_index): @@ -198,8 +213,11 @@ def output_constraints(fs, prop, prop_index): def initialize(self, presolve_during_initialization=False): self.initialize_input_variables_and_constraints() - self.solver.state.equilibrate_state() - self.solver.solve_reaktoro_block(presolve=presolve_during_initialization) + if self.reaktoro_initialize_function is None: + self.solver.state.equilibrate_state() + self.solver.solve_reaktoro_block(presolve=presolve_during_initialization) + else: + self.reaktoro_initialize_function(presolve=presolve_during_initialization) self.initialize_output_variables_and_constraints() _log.info(f"Initialized rkt block") diff --git a/src/reaktoro_pse/core/reaktoro_gray_box.py b/src/reaktoro_pse/core/reaktoro_gray_box.py index eb44582..8600d26 100644 --- a/src/reaktoro_pse/core/reaktoro_gray_box.py +++ b/src/reaktoro_pse/core/reaktoro_gray_box.py @@ -34,13 +34,32 @@ class HessTypes: class ReaktoroGrayBox(ExternalGreyBoxModel): ######################################################################################## # custom Grey Box functions - def configure(self, reaktoro_solver): + def configure( + self, + reaktoro_solver=None, + inputs=None, + input_dict=None, + outputs=None, + hessian_type=None, + ): # assign a Reaktoro state object to instance self.reaktoro_solver = reaktoro_solver - self.hess_type = reaktoro_solver.hessian_type - self.inputs = reaktoro_solver.input_specs.rkt_inputs.rkt_input_list - self.input_dict = reaktoro_solver.input_specs.rkt_inputs - self.outputs = list(reaktoro_solver.output_specs.rkt_outputs.keys()) + if hessian_type is None: + self.hess_type = reaktoro_solver.hessian_type + else: + self.hess_type = hessian_type + if inputs is None: + self.inputs = reaktoro_solver.input_specs.rkt_inputs.rkt_input_list + else: + self.inputs = inputs + if input_dict is None: + self.input_dict = reaktoro_solver.input_specs.rkt_inputs + else: + self.input_dict = input_dict + if outputs is None: + self.outputs = list(reaktoro_solver.output_specs.rkt_outputs.keys()) + else: + self.outputs = outputs self._outputs_dual_multipliers = np.ones(len(self.outputs)) self._hess = np.zeros((len(self.inputs), len(self.inputs))) self.header_saved = False @@ -111,7 +130,7 @@ def set_output_constraint_multipliers(self, _outputs_dual_multipliers): np.copyto(self._outputs_dual_multipliers, _outputs_dual_multipliers) def get_output_constraint_scaling_factors(self): - return self.reaktoro_solver.jacobian_scaling_values + return self.reaktoro_solver.get_jacobian_scaling() def hessian_gauss_newton_version(self, sparse_jac, threshold=7): diff --git a/src/reaktoro_pse/core/reaktoro_solver.py b/src/reaktoro_pse/core/reaktoro_solver.py index 6b076d3..f60ea12 100644 --- a/src/reaktoro_pse/core/reaktoro_solver.py +++ b/src/reaktoro_pse/core/reaktoro_solver.py @@ -87,6 +87,7 @@ def __init__( self._sequential_fails = 0 self._max_fails = 30 self._input_params = {} + self.jacobian_scaling_values = None def export_config(self): export_object = ReaktoroSolverExport() @@ -259,3 +260,6 @@ def try_solve(self, presolve=False): ) self.output_specs.update_supported_props() return result + + def get_jacobian_scaling(self): + return self.jacobian_scaling_values diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py new file mode 100644 index 0000000..530a0b3 --- /dev/null +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -0,0 +1,208 @@ +################################################################################# +# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California, +# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory, +# National Renewable Energy Laboratory, and National Energy Technology +# Laboratory (subject to receipt of any required approvals from the U.S. Dept. +# of Energy). All rights reserved. +# +# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license +# information, respectively. These files are also available online at the URL +# "https://github.com/watertap-org/reaktoro-pse/" +################################################################################# +import multiprocessing as mp + +from matplotlib.pyplot import sca +from pytest import param +from reaktoro_pse.core.reaktoro_gray_box import ( + ReaktoroGrayBox, +) +from pyomo.contrib.pynumero.interfaces.external_grey_box import ( + ExternalGreyBoxBlock, +) +import numpy as np +from idaes.core.base.process_base import declare_process_block_class, ProcessBlockData + + +class ReaktoroBlockData: + def __init__(self): + self.state = None + self.inputs = None + self.outputs = None + self.solver = None + self.jacobian = None + self.builder = None + self.pseudo_gray_box = None + + +class AggregateSolverState: + def __init__(self): + self.hessian_type = None + self.inputs = [] + self.input_dict = {} + self.input_blk_indexes = [] + self.registered_blocks = None + self.outputs = [] + self.output_blk_indexes = [] + self.jacobian_scaling_obj = [] + self.solver_functions = {} + self.output_matrix = [] + self.jacobian_matrix = [] + self.input_windows = {} + self.output_windows = {} + + def register_solve_function(self, block_index, solver_function): + self.solver_functions[block_index] = solver_function + + def register_input(self, block_index, input_key, input_obj): + self.inputs.append((block_index, input_key)) + self.input_dict[(block_index, input_key)] = input_obj + self.input_dict[(block_index, input_key)].original_key = input_key + self.input_blk_indexes.append(block_index) + + def register_output(self, block_index, output_key): + self.outputs.append((block_index, output_key)) + self.output_blk_indexes.append(block_index) + + if len(self.output_blk_indexes) > 1: + self.jacobian_matrix = np.zeros((len(self.outputs), len(self.inputs))) + self.output_matrix = np.zeros(len(self.outputs)) + self.get_windows(block_index) + + def get_windows(self, block_idx): + _, output_unique_sets = np.unique(self.output_blk_indexes, return_inverse=True) + self.registered_blocks, input_unique_sets = np.unique( + self.input_blk_indexes, return_inverse=True + ) + input_idx = np.arange(len(self.inputs)) + output_idx = np.arange(len(self.outputs)) + output_start, output_end = min( + output_idx[output_unique_sets == block_idx] + ), max(output_idx[output_unique_sets == block_idx]) + input_start, input_end = min(input_idx[input_unique_sets == block_idx]), max( + input_idx[input_unique_sets == block_idx] + ) + self.input_windows[block_idx] = ( + input_start, + input_end + 1, + ) # need to offset by 1 + self.output_windows[block_idx] = ( + output_start, + output_end + 1, + ) # need to offset by 1 + + def register_scaling_vals(self, scaling_values): + self.jacobian_scaling_obj.append(scaling_values) + + def get_jacobian_scaling(self): + scaling_array = None + for obj in self.jacobian_scaling_obj: + if scaling_array is None: + scaling_array = obj + else: + np.hstack(scaling_array, obj) + return scaling_array + + def get_params(self, block_idx, params): + param_set = {} + for (idx, key), item in params.items(): + if block_idx == idx: + param_set[key] = item + print(param_set) + return param_set # np.array(params)[ + # self.input_windows[block_idx][0] : self.input_windows[block_idx][1] + # ] + + def update_solution(self, block_idx, output, jacobian): + print(self.input_windows[block_idx][0], self.input_windows[block_idx][1]) + self.output_matrix[ + self.output_windows[block_idx][0] : self.output_windows[block_idx][1] + ] = output + self.jacobian_matrix[ + self.output_windows[block_idx][0] : self.output_windows[block_idx][1], + self.input_windows[block_idx][0] : self.input_windows[block_idx][1], + ] = jacobian + + def solve_reaktoro_block(self, params): + for blk in self.registered_blocks: + param_set = self.get_params(blk, params) + jacobian, output = self.solver_functions[blk](param_set) + self.update_solution(blk, output, jacobian) + return ( + self.jacobian_matrix, + self.output_matrix, + ) + + +class PseudoGrayBox: + def __init__(self): + self.inputs = {} + self.outputs = {} + + def register_input(self, aggregate_inputs, input_keys, block_idx): + for input_key in input_keys: + self.inputs[input_key] = aggregate_inputs[(block_idx, input_key)] + + def register_output(self, aggregate_outputs, output_keys, block_idx): + for output_keys in output_keys: + self.outputs[output_keys] = aggregate_outputs[(block_idx, output_keys)] + + +@declare_process_block_class("ReaktoroBlockManager") +class ReaktoroBlockManagerData(ProcessBlockData): + def build(self): + super().build() + self.registered_blocks = [] + self.aggregate_solver_state = AggregateSolverState() + + def register_block(self, state, inputs, outputs, jacobian, solver, builder): + blk = ReaktoroBlockData() + blk.state = state + blk.inputs = inputs + blk.outputs = outputs + blk.jacobian = jacobian + blk.solver = solver + blk.builder = builder + + self.registered_blocks.append(blk) + return blk + + def aggregate_inputs_and_outputs(self): + for block_idx, block in enumerate(self.registered_blocks): + for key, obj in block.inputs.rkt_inputs.items(): + self.aggregate_solver_state.register_input(block_idx, key, obj) + for output in block.outputs.rkt_outputs.keys(): + self.aggregate_solver_state.register_output(block_idx, output) + self.aggregate_solver_state.register_solve_function( + block_idx, block.solver.solve_reaktoro_block + ) + + def build_reaktoro_blocks(self): + self.aggregate_inputs_and_outputs() + external_model = ReaktoroGrayBox() + external_model.configure( + self.aggregate_solver_state, + inputs=self.aggregate_solver_state.inputs, + input_dict=self.aggregate_solver_state.input_dict, + outputs=self.aggregate_solver_state.outputs, + hessian_type="BGFS", # TODO make it a config option + ) + self.reaktoro_model = ExternalGreyBoxBlock(external_model=external_model) + for block_idx, block in enumerate(self.registered_blocks): + pseudo_gray_box_model = PseudoGrayBox() + pseudo_gray_box_model.register_input( + self.reaktoro_model.inputs, + block.inputs.rkt_inputs.keys(), + block_idx, + ) + pseudo_gray_box_model.register_output( + self.reaktoro_model.outputs, + block.outputs.rkt_outputs.keys(), + block_idx, + ) + block.builder.build_reaktoro_block( + gray_box_model=pseudo_gray_box_model, + reaktoro_initialize_function=self.aggregate_solver_state.solver_functions[ + block_idx + ], + ) + block.pseudo_gray_box = pseudo_gray_box_model diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py new file mode 100644 index 0000000..1c4841e --- /dev/null +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -0,0 +1,119 @@ +################################################################################# +# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California, +# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory, +# National Renewable Energy Laboratory, and National Energy Technology +# Laboratory (subject to receipt of any required approvals from the U.S. Dept. +# of Energy). All rights reserved. +# +# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license +# information, respectively. These files are also available online at the URL +# "https://github.com/watertap-org/reaktoro-pse/" +################################################################################# + +import pytest + +from reaktoro_pse.parallel_tools.reaktoro_block_manager import ReaktoroBlockManager + +from reaktoro_pse.reaktoro_block import ReaktoroBlock +from reaktoro_pse.tests.test_reaktoro_block import ( + build_rkt_state_with_species, +) +from pyomo.environ import ( + ConcreteModel, + Var, + assert_optimal_termination, + units as pyunits, +) +from watertap.core.solvers import get_solver + + +def test_blockBuild_with_speciation_block(build_rkt_state_with_species): + m = build_rkt_state_with_species + m.CaO = Var(["CaO"], initialize=0.001, units=pyunits.mol / pyunits.s) + m.CaO.fix() + m.reaktoro_manager = ReaktoroBlockManager() + m.property_block = ReaktoroBlock( + aqueous_phase={ + "composition": m.composition, + "convert_to_rkt_species": True, + }, + system_state={ + "temperature": m.temp, + "pressure": m.pressure, + "pH": m.pH, + }, + database="PhreeqcDatabase", + database_file="pitzer.dat", + jacobian_options={ + "numerical_type": "average", + "numerical_order": 2, + "numerical_step": 1e-8, + }, + chemistry_modifier=m.CaO, + outputs=m.outputs, + build_speciation_block=True, + reaktoro_block_manager=m.reaktoro_manager, + ) + m.reaktoro_manager.build_reaktoro_blocks() + m.property_block.initialize() + cy_solver = get_solver(solver="cyipopt-watertap") + cy_solver.options["max_iter"] = 20 + m.pH.unfix() + m.outputs[("scalingTendency", "Calcite")].fix(5) + result = cy_solver.solve(m, tee=True) + assert_optimal_termination(result) + m.display() + assert pytest.approx(m.outputs[("pH", None)].value, 1e-2) == 6.7496301 + assert pytest.approx(m.pH.value, 1e-2) == 6.401 + + m.property_block.display_jacobian_outputs() + + scaling_result = m.property_block.display_jacobian_scaling() + expected_scaling = { + "speciation_block": { + ("speciesAmount", "H+"): 9.007999999999993e-08, + ("speciesAmount", "H2O"): 50.0, + ("speciesAmount", "CO3-2"): 3.2175702176273733e-06, + ("speciesAmount", "CO2"): 0.00189035577659813, + ("speciesAmount", "Ca+2"): 0.01, + ("speciesAmount", "Cl-"): 0.7116050981506346, + ("speciesAmount", "HCO3-"): 0.007825323588838813, + ("speciesAmount", "Mg+2"): 0.09971792990850152, + ("speciesAmount", "MgCO3"): 0.0002811030643454316, + ("speciesAmount", "MgOH+"): 9.670271530541402e-07, + ("speciesAmount", "Na+"): 0.5, + ("speciesAmount", "OH-"): 6.004424745615723e-08, + }, + "property_block": { + ("saturationIndex", "Calcite"): 1.554873983061197, + ("pH", None): 7.520409745594153, + }, + } + assert "speciation_block" in scaling_result + assert "property_block" in scaling_result + new_scaling = {} + for key in scaling_result["speciation_block"]: + new_scaling[key] = 1 + assert ( + pytest.approx(scaling_result["speciation_block"][key], 1e-3) + == expected_scaling["speciation_block"][key] + ) + m.property_block.update_jacobian_scaling(new_scaling) + scaling_result = m.property_block.display_jacobian_scaling() + + assert "speciation_block" in scaling_result + for key in scaling_result["speciation_block"]: + assert scaling_result["speciation_block"][key] == 1 + new_scaling = {} + for key in scaling_result["property_block"]: + new_scaling[key] = 1 + assert ( + pytest.approx(scaling_result["property_block"][key], 1e-3) + == expected_scaling["property_block"][key] + ) + m.property_block.update_jacobian_scaling(new_scaling) + scaling_result = m.property_block.display_jacobian_scaling() + + assert "property_block" in scaling_result + for key in scaling_result["property_block"]: + assert scaling_result["property_block"][key] == 1 diff --git a/src/reaktoro_pse/reaktoro_block.py b/src/reaktoro_pse/reaktoro_block.py index c505854..1582ba9 100644 --- a/src/reaktoro_pse/reaktoro_block.py +++ b/src/reaktoro_pse/reaktoro_block.py @@ -31,6 +31,7 @@ from reaktoro_pse.core.reaktoro_solver import ReaktoroSolver from reaktoro_pse.core.reaktoro_block_builder import ReaktoroBlockBuilder +from reaktoro_pse.parallel_tools.reaktoro_block_manager import ReaktoroBlockManager from reaktoro_pse.reaktoro_block_config.jacobian_options import JacobianOptions from reaktoro_pse.reaktoro_block_config.reaktoro_solver_options import ( @@ -245,6 +246,19 @@ class ReaktoroBlockData(ProcessBlockData): """, ), ) + CONFIG.declare( + "reaktoro_block_manager", + ConfigValue( + default=None, + domain=IsInstance(ReaktoroBlockManager), + description="Reaktoro block manager for parallelizing reaktoro solves", + doc=""" + Option to provide a reaktoro block manager which would manage all blocks built on a model and + allow their parallel execution. When using ReaktorBlockManager, make sure to run + ReaktoroBlockManager.build_reaktoro_blocks() after constructing all of ReaktoroBlocks and + before doing any initialization calls, or interacting with ReaktoroBlocks themself .""", + ), + ) CONFIG.declare("jacobian_options", JacobianOptions().get_dict()) CONFIG.declare( @@ -703,7 +717,18 @@ def build_gray_box(self, block): block.rkt_block_builder.configure_jacobian_scaling( jacobian_scaling_type=scaling_type, user_scaling=scaling ) - block.rkt_block_builder.build_reaktoro_block() + if self.config.reaktoro_block_manager is not None: + managed_block = self.config.reaktoro_block_manager.register_block( + state=block.rkt_state, + inputs=block.rkt_inputs, + outputs=block.rkt_outputs, + jacobian=block.rkt_jacobian, + solver=block.rkt_solver, + builder=block.rkt_block_builder, + ) + block.managed_block = managed_block + else: + block.rkt_block_builder.build_reaktoro_block() # TODO: Update to provide output location (e.g. StringIO) def display_jacobian_outputs(self): diff --git a/src/reaktoro_pse/reaktoro_block_config/jacobian_options.py b/src/reaktoro_pse/reaktoro_block_config/jacobian_options.py index f2e46e8..f4a3890 100644 --- a/src/reaktoro_pse/reaktoro_block_config/jacobian_options.py +++ b/src/reaktoro_pse/reaktoro_block_config/jacobian_options.py @@ -43,7 +43,7 @@ def get_dict(self): CONFIG.declare( "numerical_step", ConfigValue( - default=5e-3, + default=1e-4, domain=float, description="Defines the step to use for numerical descritiazaiton", doc="""This will define how small of a step to use for numerical derivative propagation which takes From 1cc84a9c19ce26a9ea7f903a9edc3d0f4b0a4b7a Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Wed, 23 Oct 2024 21:34:23 -0700 Subject: [PATCH 02/10] first ocmmit of paralle lmanager --- .../core/reaktoro_block_builder.py | 9 +- src/reaktoro_pse/core/reaktoro_outputs.py | 3 + .../core/tests/test_reaktoro_builder.py | 5 +- .../parallel_tools/parallel_manager.py | 309 ++++++++++++++++++ .../parallel_tools/reaktoro_block_manager.py | 91 +++++- .../parallel_tools/tests/test_manager.py | 1 + 6 files changed, 402 insertions(+), 16 deletions(-) create mode 100644 src/reaktoro_pse/parallel_tools/parallel_manager.py diff --git a/src/reaktoro_pse/core/reaktoro_block_builder.py b/src/reaktoro_pse/core/reaktoro_block_builder.py index c1c70c8..f6ac70f 100644 --- a/src/reaktoro_pse/core/reaktoro_block_builder.py +++ b/src/reaktoro_pse/core/reaktoro_block_builder.py @@ -55,9 +55,9 @@ def __init__(self, block, reaktoro_solver, build_on_init=True): raise TypeError("Reaktoro block builder requires a ReaktoroSolver class") self.configure_jacobian_scaling() self.reaktoro_initialize_function = None # used to provide external solve call + self.build_output_vars() if build_on_init: # option to support legacy implementation self.build_reaktoro_block() - self.build_output_vars() def build_reaktoro_block( self, gray_box_model=None, reaktoro_initialize_function=None @@ -71,7 +71,7 @@ def build_reaktoro_block( ) else: self.block.reaktoro_model = gray_box_model - if reaktoro_initialize_function is None: + if reaktoro_initialize_function is not None: self.reaktoro_initialize_function = reaktoro_initialize_function self.build_input_constraints() self.build_output_constraints() @@ -187,8 +187,8 @@ def build_output_constraints(self): The is will also check if user provided an output pyomo var and if not will add them to new_output_var dict, which will be used to create new output variables on the block """ - new_output_vars = {} for key, obj in self.solver.output_specs.user_outputs.items(): + if PropTypes.pyomo_built_prop == obj.property_type: for ( pyoPropKey, @@ -213,13 +213,14 @@ def output_constraints(fs, prop, prop_index): def initialize(self, presolve_during_initialization=False): self.initialize_input_variables_and_constraints() + if self.reaktoro_initialize_function is None: self.solver.state.equilibrate_state() self.solver.solve_reaktoro_block(presolve=presolve_during_initialization) else: self.reaktoro_initialize_function(presolve=presolve_during_initialization) self.initialize_output_variables_and_constraints() - _log.info(f"Initialized rkt block") + _log.info(f"Initialized rkt block. {self.reaktoro_initialize_function}") def get_sf(self, pyo_var): diff --git a/src/reaktoro_pse/core/reaktoro_outputs.py b/src/reaktoro_pse/core/reaktoro_outputs.py index 53ff443..e3b69c3 100644 --- a/src/reaktoro_pse/core/reaktoro_outputs.py +++ b/src/reaktoro_pse/core/reaktoro_outputs.py @@ -89,6 +89,9 @@ def set_option_function(self, property_type, get_function): def set_poyomo_build_option(self, func): self.pyomo_build_options = func + def set_value(self, value): + self.pyomo_var.set_value(value) + def set_get_function(self, func): self.get_function = func diff --git a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py index bae5cd1..a246e06 100644 --- a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py +++ b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py @@ -141,6 +141,8 @@ def test_build_with_rkt_dissolution(build_with_dissolve_in_rkt): m.rkt_block = Block() builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) builder.initialize() + m.display() + m.rkt_block.reaktoro_model.display() # will have as many DOFs as outputs due to pyomo not # knowing tha graybox exists. assert len(m.rkt_block.reaktoro_model.inputs) == len( @@ -154,7 +156,7 @@ def test_build_with_rkt_dissolution(build_with_dissolve_in_rkt): cy_solver = get_solver(solver="cyipopt-watertap") cy_solver.options["max_iter"] = 20 m.pH.unfix() - m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + # m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) result = cy_solver.solve(m, tee=True) assert_optimal_termination(result) assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 @@ -172,6 +174,7 @@ def test_build_with_pyomo_dissolution(build_with_dissolve_in_pyomo): builder.initialize() # will have as many DOFs as outputs due to pyomo not # knowing tha graybox exists. + print(rkt_solver.output_specs.rkt_outputs) assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) cy_solver = get_solver(solver="cyipopt-watertap") cy_solver.options["max_iter"] = 20 diff --git a/src/reaktoro_pse/parallel_tools/parallel_manager.py b/src/reaktoro_pse/parallel_tools/parallel_manager.py new file mode 100644 index 0000000..f6aace1 --- /dev/null +++ b/src/reaktoro_pse/parallel_tools/parallel_manager.py @@ -0,0 +1,309 @@ +################################################################################# +# WaterTAP Copyright (c) 2020-2024, The Regents of the University of California, +# through Lawrence Berkeley National Laboratory, Oak Ridge National Laboratory, +# National Renewable Energy Laboratory, and National Energy Technology +# Laboratory (subject to receipt of any required approvals from the U.S. Dept. +# of Energy). All rights reserved. +# +# Please see the files COPYRIGHT.md and LICENSE.md for full copyright and license +# information, respectively. These files are also available online at the URL +# "https://github.com/watertap-org/reaktoro-pse/" +################################################################################# + +from calendar import c +import multiprocessing as mp +from multiprocessing import shared_memory +from multiprocessing import Pipe +from reaktoro_pse.core.reaktoro_state import ( + ReaktoroState, +) +from reaktoro_pse.core.reaktoro_jacobian import ( + ReaktoroJacobianSpec, +) +from reaktoro_pse.core.reaktoro_outputs import ( + ReaktoroOutputSpec, +) + +from reaktoro_pse.core.reaktoro_inputs import ( + ReaktoroInputSpec, +) +from reaktoro_pse.core.reaktoro_solver import ( + ReaktoroSolver, +) +import numpy as np +import time + +import cyipopt +import idaes.logger as idaeslog + +_log = idaeslog.getLogger(__name__) + + +class RemoteWorker: + def __init__( + self, + config_data, + input_reference, + output_reference, + jacobian_reference, + ): + """build remote instance of reaktoro state and solver for execution in + its own process""" + state_config, input_config, output_config, jacobian_config, solver_config = ( + config_data + ) + self.state = ReaktoroState() + self.state.load_from_export_object(state_config) + self.state.build_state() + self.inputs = ReaktoroInputSpec(self.state) + self.inputs.load_from_export_object(input_config) + self.inputs.build_input_specs() + self.outputs = ReaktoroOutputSpec(self.state) + self.outputs.load_from_export_object(output_config) + self.jacobian = ReaktoroJacobianSpec(self.state, self.outputs) + self.jacobian.load_from_export_object(jacobian_config) + self.solver = ReaktoroSolver( + self.state, self.inputs, self.outputs, self.jacobian + ) + self.solver.load_from_export_object(solver_config) + self.param_dict = {} + self.input_reference = shared_memory.SharedMemory(name=input_reference) + self.output_reference = shared_memory.SharedMemory(name=output_reference) + self.jacobian_reference = shared_memory.SharedMemory(name=jacobian_reference) + self.input_matrix = np.ndarray( + (3, len(self.inputs.rkt_inputs.keys())), + dtype=float, + buffer=self.input_reference.buf, + ) + self.jacobian_matrix = np.ndarray( + ( + len(self.outputs.rkt_outputs.keys()), + len(self.inputs.rkt_inputs.keys()), + ), + dtype=float, + buffer=self.jacobian_reference.buf, + ) + + self.output_matrix = np.ndarray( + len(self.outputs.rkt_outputs.keys()), + dtype=float, + buffer=self.output_reference.buf, + ) + self.params = {} + + def initialize(self, presolve=False): + _log.info("Initialized in remote worker") + + self.update_inputs() + self.state.equilibrate_state() + jacobian, outputs = self.solver.solve_reaktoro_block(presolve=presolve) + self.update_output_matrix(outputs, jacobian) + + def solve(self): + self.get_params() + try: + jacobian, outputs = self.solver.solve_reaktoro_block(self.params) + self.update_output_matrix(outputs, jacobian) + return WorkerMessages.success + except cyipopt.CyIpoptEvaluationError: + return WorkerMessages.CyIpoptEvaluationError + + def update_output_matrix(self, outputs, jacobian): + np.copyto(self.output_matrix, outputs) + np.copyto(self.jacobian_matrix, jacobian) + + def get_params(self): + for i, key in enumerate(self.inputs.rkt_inputs.keys()): + self.params[key] = self.input_matrix[2][i] + + def update_inputs(self): + for i, key in enumerate(self.inputs.rkt_inputs.keys()): + self.inputs.rkt_inputs[key].value = self.input_matrix[0][i] + self.inputs.rkt_inputs[key].converted_value = self.input_matrix[1][i] + + +class WorkerMessages: + initialize = "initialize" + update_values = "update_values" + solve = "solve" + success = "success" + failed_solve = "failed_solve" + CyIpoptEvaluationError = "CyIpoptEvaluationError" + terminate = "terminate" + + +class LocalWorker: + def __init__(self, worker_data): + """defines local instance of worker to provide + direct access to function execution""" + self.worker_data = worker_data + self.local_pipe, self.remote_pipe = Pipe() + self.get_input_and_output_sizes() + + def get_input_and_output_sizes(self): + # for storing raw and converted values + # index 0 for values for init + # index 1 for converted values for init + # index 3 for ipopt solver calls + self.input_keys = self.worker_data.inputs.rkt_inputs.keys() + input_matrix = np.zeros( + (3, len(self.worker_data.inputs.rkt_inputs.keys())), dtype=float + ) + self.input_reference = shared_memory.SharedMemory( + create=True, size=input_matrix.nbytes + ) + self.input_matrix = np.ndarray( + (3, len(self.worker_data.inputs.rkt_inputs.keys())), + dtype=float, + buffer=self.input_reference.buf, + ) + # for storing output matrix and jacobian + jacobian_matrix = np.zeros( + ( + len(self.worker_data.outputs.rkt_outputs.keys()), + len(self.input_keys), + ), + dtype=float, + ) + output_matrix = np.zeros(len(self.worker_data.outputs.rkt_outputs.keys())) + self.jacobian_reference = shared_memory.SharedMemory( + create=True, size=jacobian_matrix.nbytes + ) + self.jacobian_matrix = np.ndarray( + ( + len(self.worker_data.outputs.rkt_outputs.keys()), + len(self.input_keys), + ), + dtype=float, + buffer=self.jacobian_reference.buf, + ) + self.output_reference = shared_memory.SharedMemory( + create=True, size=output_matrix.nbytes + ) + self.output_matrix = np.ndarray( + len(self.worker_data.outputs.rkt_outputs.keys()), + dtype=float, + buffer=self.output_reference.buf, + ) + + def initialize(self, presolve): + + for i, key in enumerate(self.input_keys): + self.input_matrix[0][i] = self.worker_data.inputs.rkt_inputs[ + key + ].get_value() + self.input_matrix[1][i] = self.worker_data.inputs.rkt_inputs[key].get_value( + apply_conversion=True + ) + self.local_pipe.send((WorkerMessages.initialize, presolve)) + + result = self.local_pipe.recv() + # we want to block here. + + if result == WorkerMessages.success: + self.update_outputs() + _log.warning("Worker initialized") + else: + raise RuntimeError("Worker failed to initialize") + + def solve(self, params): + self.update_params(params) + self.local_pipe.send(WorkerMessages.solve) + + def update_outputs(self): + for i, key in enumerate(self.worker_data.outputs.rkt_outputs): + self.worker_data.outputs.rkt_outputs[key].value = self.output_matrix[i] + + def update_params(self, params): + for i, key in enumerate(self.input_keys): + self.input_matrix[2][i] = params[key] + + def get_solution(self): + if self.local_pipe.poll: + result = self.local_pipe.recv() + if result == WorkerMessages.success: + return self.jacobian_matrix.copy(), self.output_matrix.copy() + elif result == WorkerMessages.CyIpoptEvaluationError: + raise cyipopt.CyIpoptEvaluationError + else: + raise ValueError("The worker failed and did not return a solution") + + def terminate(self): + self.local_pipe.send(WorkerMessages.terminate) + + +class ReaktoroParallelManager: + def __init__(self): + self.registered_workers = {} + self.processes = {} + + def register_block(self, block_idx, block_data): + self.registered_workers[block_idx] = LocalWorker(block_data) + + def get_solve_and_get_function(self, block_idx): + return ( + self.registered_workers[block_idx].solve, + self.registered_workers[block_idx].get_solution, + ) + + def get_initialize_function(self, block_idx): + return self.registered_workers[block_idx].initialize + + def start_workers(self): + for idx, local_worker in self.registered_workers.items(): + worker_config = local_worker.worker_data.get_configs() + process = mp.Process( + target=ReaktoroActor, + args=( + local_worker.remote_pipe, + worker_config, + local_worker.input_reference.name, + local_worker.output_reference.name, + local_worker.jacobian_reference.name, + ), + ) + process.start() + _log.info(f"Started parallel worker {idx}") + self.processes[idx] = process + + def terminate_workers(self): + for idx, local_worker in self.registered_workers.items(): + local_worker.terminate() + self.processes[idx].join() + + +def ReaktoroActor( + pipe, reaktoro_block_data, input_matrix, output_matrix, jacobian_matrix +): + reaktoro_worker = RemoteWorker( + reaktoro_block_data, input_matrix, output_matrix, jacobian_matrix + ) + dog_watch = time.time() + max_time = ( + 300 # five min time out should be enough for waiting, this will kill worker + ) + while True: + if pipe.poll(): + msg = pipe.recv() + if isinstance(msg, tuple): + command = msg[0] + option = msg[1] + else: + command = msg + + if command == WorkerMessages.update_values: + reaktoro_worker.update_inputs() + result = WorkerMessages.success + if command == WorkerMessages.initialize: + reaktoro_worker.initialize(presolve=option) + result = WorkerMessages.success + if command == WorkerMessages.solve: + reaktoro_worker.solve() + result = WorkerMessages.success + if command == WorkerMessages.terminate: + return + pipe.send(result) + dog_watch = time.time() + if abs(time.time() - dog_watch) > max_time: + return + time.sleep(1e-3) # 1ms sleep time to reduce load diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py index 530a0b3..49f5b51 100644 --- a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -9,18 +9,22 @@ # information, respectively. These files are also available online at the URL # "https://github.com/watertap-org/reaktoro-pse/" ################################################################################# -import multiprocessing as mp from matplotlib.pyplot import sca from pytest import param from reaktoro_pse.core.reaktoro_gray_box import ( ReaktoroGrayBox, ) +from reaktoro_pse.parallel_tools.parallel_manager import ReaktoroParallelManager + +from reaktoro_pse.core.reaktoro_gray_box import HessTypes + from pyomo.contrib.pynumero.interfaces.external_grey_box import ( ExternalGreyBoxBlock, ) import numpy as np from idaes.core.base.process_base import declare_process_block_class, ProcessBlockData +from pyomo.common.config import ConfigValue, IsInstance, ConfigDict class ReaktoroBlockData: @@ -33,6 +37,15 @@ def __init__(self): self.builder = None self.pseudo_gray_box = None + def get_configs(self): + configs = [] + configs.append(self.state.export_config()) + configs.append(self.inputs.export_config()) + configs.append(self.outputs.export_config()) + configs.append(self.jacobian.export_config()) + configs.append(self.solver.export_config()) + return configs + class AggregateSolverState: def __init__(self): @@ -45,6 +58,7 @@ def __init__(self): self.output_blk_indexes = [] self.jacobian_scaling_obj = [] self.solver_functions = {} + self.get_solution_function = {} self.output_matrix = [] self.jacobian_matrix = [] self.input_windows = {} @@ -53,6 +67,9 @@ def __init__(self): def register_solve_function(self, block_index, solver_function): self.solver_functions[block_index] = solver_function + def register_get_function(self, block_index, get_function): + self.get_solution_function[block_index] = get_function + def register_input(self, block_index, input_key, input_obj): self.inputs.append((block_index, input_key)) self.input_dict[(block_index, input_key)] = input_obj @@ -107,13 +124,11 @@ def get_params(self, block_idx, params): for (idx, key), item in params.items(): if block_idx == idx: param_set[key] = item - print(param_set) return param_set # np.array(params)[ # self.input_windows[block_idx][0] : self.input_windows[block_idx][1] # ] def update_solution(self, block_idx, output, jacobian): - print(self.input_windows[block_idx][0], self.input_windows[block_idx][1]) self.output_matrix[ self.output_windows[block_idx][0] : self.output_windows[block_idx][1] ] = output @@ -123,10 +138,20 @@ def update_solution(self, block_idx, output, jacobian): ] = jacobian def solve_reaktoro_block(self, params): + results = [] for blk in self.registered_blocks: param_set = self.get_params(blk, params) - jacobian, output = self.solver_functions[blk](param_set) + result = self.solver_functions[blk](param_set) + results.append(result) + + for i, blk in enumerate(self.registered_blocks): + if results[i] is None: + jacobian, output = self.get_solution_function[blk]() + else: + jacobian, output = results[i] + self.update_solution(blk, output, jacobian) + return ( self.jacobian_matrix, self.output_matrix, @@ -149,10 +174,33 @@ def register_output(self, aggregate_outputs, output_keys, block_idx): @declare_process_block_class("ReaktoroBlockManager") class ReaktoroBlockManagerData(ProcessBlockData): + CONFIG = ProcessBlockData.CONFIG() + CONFIG.declare( + "hessian_type", + ConfigValue( + default="BFGS", + domain=IsInstance((str, HessTypes)), + description="Hessian type to use for reaktor gray box", + doc="""Hessian type to use, some might provide better stability + options (Jt.J, BFGS, BFGS-mod, BFGS-damp, BFGS-ipopt""", + ), + ) + CONFIG.declare( + "use_parallel_mode", + ConfigValue( + default=True, + domain=bool, + description="Enables use of parallel workers", + doc="""If true, will parallelize all rekatoro solver calls using multiprocessing""", + ), + ) + def build(self): super().build() self.registered_blocks = [] self.aggregate_solver_state = AggregateSolverState() + if self.config.use_parallel_mode: + self.parallel_manager = ReaktoroParallelManager() def register_block(self, state, inputs, outputs, jacobian, solver, builder): blk = ReaktoroBlockData() @@ -168,13 +216,28 @@ def register_block(self, state, inputs, outputs, jacobian, solver, builder): def aggregate_inputs_and_outputs(self): for block_idx, block in enumerate(self.registered_blocks): + if self.config.use_parallel_mode: + self.parallel_manager.register_block(block_idx, block) for key, obj in block.inputs.rkt_inputs.items(): self.aggregate_solver_state.register_input(block_idx, key, obj) for output in block.outputs.rkt_outputs.keys(): self.aggregate_solver_state.register_output(block_idx, output) - self.aggregate_solver_state.register_solve_function( - block_idx, block.solver.solve_reaktoro_block - ) + if self.config.use_parallel_mode: + solve_func, get_func = self.parallel_manager.get_solve_and_get_function( + block_idx + ) + self.aggregate_solver_state.register_solve_function( + block_idx, solve_func + ) + self.aggregate_solver_state.register_get_function(block_idx, get_func) + else: + self.aggregate_solver_state.register_solve_function( + block_idx, block.solver.solve_reaktoro_block + ) + # assert False + if self.config.use_parallel_mode: + self.parallel_manager.start_workers() + # assert False def build_reaktoro_blocks(self): self.aggregate_inputs_and_outputs() @@ -184,7 +247,7 @@ def build_reaktoro_blocks(self): inputs=self.aggregate_solver_state.inputs, input_dict=self.aggregate_solver_state.input_dict, outputs=self.aggregate_solver_state.outputs, - hessian_type="BGFS", # TODO make it a config option + hessian_type=self.config.hessian_type, ) self.reaktoro_model = ExternalGreyBoxBlock(external_model=external_model) for block_idx, block in enumerate(self.registered_blocks): @@ -199,10 +262,16 @@ def build_reaktoro_blocks(self): block.outputs.rkt_outputs.keys(), block_idx, ) + if self.config.use_parallel_mode: + init_func = self.parallel_manager.get_initialize_function(block_idx) + else: + init_func = self.aggregate_solver_state.solver_functions[block_idx] block.builder.build_reaktoro_block( gray_box_model=pseudo_gray_box_model, - reaktoro_initialize_function=self.aggregate_solver_state.solver_functions[ - block_idx - ], + reaktoro_initialize_function=init_func, ) block.pseudo_gray_box = pseudo_gray_box_model + + def terminate_workers(self): + if self.config.use_parallel_mode: + self.parallel_manager.terminate_workers() diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py index 1c4841e..bc4a544 100644 --- a/src/reaktoro_pse/parallel_tools/tests/test_manager.py +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -117,3 +117,4 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): assert "property_block" in scaling_result for key in scaling_result["property_block"]: assert scaling_result["property_block"][key] == 1 + m.reaktoro_manager.terminate_workers() From 98b3921c02882925a88dff3e446b44fb96f12d73 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Wed, 23 Oct 2024 22:30:06 -0700 Subject: [PATCH 03/10] fixed tests --- .../core/reaktoro_block_builder.py | 16 +- src/reaktoro_pse/core/reaktoro_state.py | 15 +- .../core/tests/test_reaktoro_builder.py | 156 +++++++++--------- .../parallel_tools/tests/test_manager.py | 50 +++++- 4 files changed, 145 insertions(+), 92 deletions(-) diff --git a/src/reaktoro_pse/core/reaktoro_block_builder.py b/src/reaktoro_pse/core/reaktoro_block_builder.py index f6ac70f..3136915 100644 --- a/src/reaktoro_pse/core/reaktoro_block_builder.py +++ b/src/reaktoro_pse/core/reaktoro_block_builder.py @@ -170,6 +170,7 @@ def input_constraints(fs, key): def build_output_vars(self): new_output_vars = {} + for key, obj in self.solver.output_specs.user_outputs.items(): # NOTE: We do not set rkt_outputs to reaktoro_model outputs as they # same as user inputs - we want RKt model to update "user provided vars" @@ -180,6 +181,7 @@ def build_output_vars(self): self.block.outputs = Var(new_output_vars.keys(), initialize=1) for key, obj in new_output_vars.items(): obj.set_pyomo_var(self.block.outputs[key]) + self.new_output_vars = new_output_vars def build_output_constraints(self): """first update rktOuptutObjects for pyomoBuildProperties with reaktoro pyomo variables as @@ -188,15 +190,15 @@ def build_output_constraints(self): add them to new_output_var dict, which will be used to create new output variables on the block """ for key, obj in self.solver.output_specs.user_outputs.items(): - if PropTypes.pyomo_built_prop == obj.property_type: for ( pyoPropKey, pyoPropObj, ) in obj.pyomo_build_options.properties.items(): - pyoPropObj.set_pyomo_var( - self.block.reaktoro_model.outputs[pyoPropKey] - ) + if pyoPropObj.get_pyomo_var() is None: + pyoPropObj.set_pyomo_var( + self.block.reaktoro_model.outputs[pyoPropKey] + ) @self.block.Constraint(self.solver.output_specs.user_outputs) def output_constraints(fs, prop, prop_index): @@ -206,6 +208,10 @@ def output_constraints(fs, prop, prop_index): prop_object ) else: + print( + prop_object.get_pyomo_var(), + self.block.reaktoro_model.outputs[(prop, prop_index)], + ) return ( prop_object.get_pyomo_var() == self.block.reaktoro_model.outputs[(prop, prop_index)] @@ -220,7 +226,7 @@ def initialize(self, presolve_during_initialization=False): else: self.reaktoro_initialize_function(presolve=presolve_during_initialization) self.initialize_output_variables_and_constraints() - _log.info(f"Initialized rkt block. {self.reaktoro_initialize_function}") + _log.info(f"Initialized rkt block") def get_sf(self, pyo_var): diff --git a/src/reaktoro_pse/core/reaktoro_state.py b/src/reaktoro_pse/core/reaktoro_state.py index 6c6933d..0a17c0a 100644 --- a/src/reaktoro_pse/core/reaktoro_state.py +++ b/src/reaktoro_pse/core/reaktoro_state.py @@ -124,7 +124,7 @@ def set_activity_model( activity_model, state_of_matter ) - def get_registered_phases(self, activate_database): + def get_registered_phases(self, active_database): """ creates listof phases with applied activity models for generation of reaktoro state args: @@ -139,7 +139,7 @@ def get_registered_phases(self, activate_database): or phase_object.non_speciate_phase_list is not None ): rkt_phase_object = self.create_rkt_phase( - activate_database, + active_database, phase_object.phase_function, phase_object.phase_list, phase_object.non_speciate_phase_list, @@ -149,6 +149,7 @@ def get_registered_phases(self, activate_database): for rpo in rkt_phase_object: self.apply_activity_model( rpo, + active_database, phase_object.activity_model, phase_object.state_of_matter, ) @@ -156,6 +157,7 @@ def get_registered_phases(self, activate_database): else: self.apply_activity_model( rkt_phase_object, + active_database, phase_object.activity_model, phase_object.state_of_matter, ) @@ -163,13 +165,13 @@ def get_registered_phases(self, activate_database): return activate_phase_list def apply_activity_model( - self, rkt_phase_object, activity_model, state_of_matter=None + self, rkt_phase_object, active_database, activity_model, state_of_matter=None ): """sets activity mode""" if activity_model is None: raise TypeError(f"Activity model for {rkt_phase_object} is not set.") rkt_activity_model_object = self._process_activity( - activity_model, activity_model, state_of_matter + active_database, activity_model, state_of_matter ) rkt_phase_object.set(rkt_activity_model_object) @@ -235,10 +237,11 @@ def get_func(activity_model, state_of_matter): if state_of_matter is None: try: return getattr(rkt, activity_model)() - except RuntimeError: + except TypeError: + print(active_database) + print(getattr(rkt, activity_model)) # might require database as input (e.g ActivityModelPhreeqc) return getattr(rkt, activity_model)(active_database) - else: return getattr(rkt, activity_model)(state_of_matter) elif isinstance(activity_model, tuple): diff --git a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py index a246e06..f1e0d59 100644 --- a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py +++ b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py @@ -140,61 +140,13 @@ def test_build_with_rkt_dissolution(build_with_dissolve_in_rkt): m, rkt_solver = build_with_dissolve_in_rkt m.rkt_block = Block() builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) - builder.initialize() - m.display() - m.rkt_block.reaktoro_model.display() - # will have as many DOFs as outputs due to pyomo not - # knowing tha graybox exists. - assert len(m.rkt_block.reaktoro_model.inputs) == len( - rkt_solver.input_specs.rkt_inputs - ) - assert len(m.rkt_block.outputs) == len(rkt_solver.output_specs.user_outputs) - assert len(m.rkt_block.reaktoro_model.outputs) == len( - rkt_solver.output_specs.rkt_outputs - ) - assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) - cy_solver = get_solver(solver="cyipopt-watertap") - cy_solver.options["max_iter"] = 20 - m.pH.unfix() - # m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - result = cy_solver.solve(m, tee=True) - assert_optimal_termination(result) - assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 - assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 - assert ( - pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) - == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value - ) + # m.display() + # m.rkt_block.reaktoro_model.display() -def test_build_with_pyomo_dissolution(build_with_dissolve_in_pyomo): - m, rkt_solver = build_with_dissolve_in_pyomo - m.rkt_block = Block() - builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) builder.initialize() - # will have as many DOFs as outputs due to pyomo not - # knowing tha graybox exists. - print(rkt_solver.output_specs.rkt_outputs) - assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) - cy_solver = get_solver(solver="cyipopt-watertap") - cy_solver.options["max_iter"] = 20 - m.pH.unfix() m.display() - m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - result = cy_solver.solve(m, tee=True) - assert_optimal_termination(result) - assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 - assert ( - pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) - == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value - ) - - -def test_build_with_rkt_dissolution_mass_basis(build_with_dissolve_in_rkt_mass_basis): - m, rkt_solver = build_with_dissolve_in_rkt_mass_basis - m.rkt_block = Block() - builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) - builder.initialize() + m.rkt_block.reaktoro_model.display() # will have as many DOFs as outputs due to pyomo not # knowing tha graybox exists. assert len(m.rkt_block.reaktoro_model.inputs) == len( @@ -209,37 +161,89 @@ def test_build_with_rkt_dissolution_mass_basis(build_with_dissolve_in_rkt_mass_b cy_solver.options["max_iter"] = 20 m.pH.unfix() m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - - # m.display() result = cy_solver.solve(m, tee=True) assert_optimal_termination(result) assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 + assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 assert ( pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value ) -def test_build_with_pyomo_dissolution_mass_basis( - build_with_dissolve_in_pyomo_mass_basis, -): - m, rkt_solver = build_with_dissolve_in_pyomo_mass_basis - m.rkt_block = Block() - builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) - builder.initialize() - # will have as many DOFs as outputs due to pyomo not - # knowing tha graybox exists. - assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) - cy_solver = get_solver(solver="cyipopt-watertap") - cy_solver.options["max_iter"] = 20 - m.pH.unfix() - m.display() - m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - - result = cy_solver.solve(m, tee=True) - assert_optimal_termination(result) - assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 - assert ( - pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) - == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value - ) +# def test_build_with_pyomo_dissolution(build_with_dissolve_in_pyomo): +# m, rkt_solver = build_with_dissolve_in_pyomo +# m.rkt_block = Block() +# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) +# builder.initialize() +# # will have as many DOFs as outputs due to pyomo not +# # knowing tha graybox exists. +# print(rkt_solver.output_specs.rkt_outputs) +# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) +# cy_solver = get_solver(solver="cyipopt-watertap") +# cy_solver.options["max_iter"] = 20 +# m.pH.unfix() +# m.display() +# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) +# result = cy_solver.solve(m, tee=True) +# assert_optimal_termination(result) +# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 +# assert ( +# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) +# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value +# ) + + +# def test_build_with_rkt_dissolution_mass_basis(build_with_dissolve_in_rkt_mass_basis): +# m, rkt_solver = build_with_dissolve_in_rkt_mass_basis +# m.rkt_block = Block() +# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) +# builder.initialize() +# # will have as many DOFs as outputs due to pyomo not +# # knowing tha graybox exists. +# assert len(m.rkt_block.reaktoro_model.inputs) == len( +# rkt_solver.input_specs.rkt_inputs +# ) +# assert len(m.rkt_block.outputs) == len(rkt_solver.output_specs.user_outputs) +# assert len(m.rkt_block.reaktoro_model.outputs) == len( +# rkt_solver.output_specs.rkt_outputs +# ) +# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) +# cy_solver = get_solver(solver="cyipopt-watertap") +# cy_solver.options["max_iter"] = 20 +# m.pH.unfix() +# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + +# # m.display() +# result = cy_solver.solve(m, tee=True) +# assert_optimal_termination(result) +# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 +# assert ( +# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) +# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value +# ) + + +# def test_build_with_pyomo_dissolution_mass_basis( +# build_with_dissolve_in_pyomo_mass_basis, +# ): +# m, rkt_solver = build_with_dissolve_in_pyomo_mass_basis +# m.rkt_block = Block() +# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) +# builder.initialize() +# # will have as many DOFs as outputs due to pyomo not +# # knowing tha graybox exists. +# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) +# cy_solver = get_solver(solver="cyipopt-watertap") +# cy_solver.options["max_iter"] = 20 +# m.pH.unfix() +# m.display() +# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + +# result = cy_solver.solve(m, tee=True) +# assert_optimal_termination(result) +# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 +# assert ( +# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) +# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value +# ) diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py index bc4a544..1a87cb0 100644 --- a/src/reaktoro_pse/parallel_tools/tests/test_manager.py +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -44,11 +44,6 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): }, database="PhreeqcDatabase", database_file="pitzer.dat", - jacobian_options={ - "numerical_type": "average", - "numerical_order": 2, - "numerical_step": 1e-8, - }, chemistry_modifier=m.CaO, outputs=m.outputs, build_speciation_block=True, @@ -118,3 +113,48 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): for key in scaling_result["property_block"]: assert scaling_result["property_block"][key] == 1 m.reaktoro_manager.terminate_workers() + + +def test_blockBuild_with_wateqf_data_base(build_rkt_state_with_species): + m = build_rkt_state_with_species + m.CaO = Var(["CaO"], initialize=0.001, units=pyunits.mol / pyunits.s) + m.CaO.fix() + m.reaktoro_manager = ReaktoroBlockManager() + m.property_block = ReaktoroBlock( + aqueous_phase={ + "composition": m.composition, + "convert_to_rkt_species": True, + "activity_model": "ActivityModelPhreeqc", + }, + system_state={ + "temperature": m.temp, + "pressure": m.pressure, + "pH": m.pH, + }, + database="PhreeqcDatabase", + database_file="wateq4f.dat", + chemistry_modifier=m.CaO, + outputs=m.outputs, + build_speciation_block=True, + reaktoro_block_manager=m.reaktoro_manager, + exclude_species_list=[ + "CH4", + "O2", + "S2-2", + "S4-2", + "S3-2", + "S5-2", + "S6-2", + ], + ) + m.reaktoro_manager.build_reaktoro_blocks() + m.property_block.initialize() + cy_solver = get_solver(solver="cyipopt-watertap") + cy_solver.options["max_iter"] = 20 + m.pH.unfix() + m.outputs[("scalingTendency", "Calcite")].fix(5) + result = cy_solver.solve(m, tee=True) + assert_optimal_termination(result) + m.display() + assert pytest.approx(m.outputs[("pH", None)].value, 1e-2) == 7.49301431889365 + assert pytest.approx(m.pH.value, 1e-2) == 6.515501990042 From 77bb49b08e44769d9caa8aa16373294cb53c47b1 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Thu, 24 Oct 2024 01:25:06 -0700 Subject: [PATCH 04/10] fixes to exports --- .../core/reaktoro_block_builder.py | 4 --- src/reaktoro_pse/core/reaktoro_inputs.py | 29 +++++++++++++--- src/reaktoro_pse/core/reaktoro_state.py | 27 +++++++++++---- .../parallel_tools/parallel_manager.py | 34 +++++++++---------- .../parallel_tools/reaktoro_block_manager.py | 6 ++-- .../parallel_tools/tests/test_manager.py | 1 + src/reaktoro_pse/reaktoro_block.py | 4 +-- 7 files changed, 69 insertions(+), 36 deletions(-) diff --git a/src/reaktoro_pse/core/reaktoro_block_builder.py b/src/reaktoro_pse/core/reaktoro_block_builder.py index 3136915..45f1ecb 100644 --- a/src/reaktoro_pse/core/reaktoro_block_builder.py +++ b/src/reaktoro_pse/core/reaktoro_block_builder.py @@ -208,10 +208,6 @@ def output_constraints(fs, prop, prop_index): prop_object ) else: - print( - prop_object.get_pyomo_var(), - self.block.reaktoro_model.outputs[(prop, prop_index)], - ) return ( prop_object.get_pyomo_var() == self.block.reaktoro_model.outputs[(prop, prop_index)] diff --git a/src/reaktoro_pse/core/reaktoro_inputs.py b/src/reaktoro_pse/core/reaktoro_inputs.py index a635f14..5186796 100644 --- a/src/reaktoro_pse/core/reaktoro_inputs.py +++ b/src/reaktoro_pse/core/reaktoro_inputs.py @@ -35,10 +35,30 @@ def __init__(self): self.exact_speciation = None def copy_chem_inputs(self, chem_inputs): - self.rkt_chemical_inputs = copy.deepcopy(chem_inputs) - for key, obj in self.rkt_chemical_inputs.items(): - # self.inputs[key] = copy.deepcopy(obj) - self.rkt_chemical_inputs[key].delete_pyomo_var() + self.rkt_chemical_inputs = RktInputs() + for key, obj in chem_inputs.items(): + obj.update_values(True) + self.rkt_chemical_inputs[key] = None + self.rkt_chemical_inputs[key].time_unit = obj.time_unit + self.rkt_chemical_inputs[key].main_unit = obj.main_unit + self.rkt_chemical_inputs[key].conversion_unit = obj.conversion_unit + self.rkt_chemical_inputs[key].conversion_value = obj.conversion_value + self.rkt_chemical_inputs[key].required_unit = obj.required_unit + self.rkt_chemical_inputs[key].lower_bound = obj.lower_bound + self.rkt_chemical_inputs[key].input_type = obj.input_type + self.rkt_chemical_inputs[key].value = obj.value + self.rkt_chemical_inputs[key].converted_value = obj.converted_value + self.rkt_chemical_inputs.registered_phases = chem_inputs.registered_phases + self.rkt_chemical_inputs.all_species = chem_inputs.all_species + self.rkt_chemical_inputs.species_list = chem_inputs.species_list + self.rkt_chemical_inputs.convert_to_rkt_species = ( + chem_inputs.convert_to_rkt_species + ) + self.rkt_chemical_inputs.composition_is_elements = ( + chem_inputs.composition_is_elements + ) + self.rkt_chemical_inputs.conversion_method = chem_inputs.conversion_method + self.rkt_chemical_inputs.rkt_input_list = chem_inputs.rkt_input_list class ReaktoroInputSpec: @@ -146,7 +166,6 @@ def build_input_specs(self): self.assert_charge_neutrality, self.dissolve_species_in_rkt, ) - # get input name order! for idx, spec in enumerate(self.equilibrium_specs.namesInputs()): if spec == "T": diff --git a/src/reaktoro_pse/core/reaktoro_state.py b/src/reaktoro_pse/core/reaktoro_state.py index 0a17c0a..a740485 100644 --- a/src/reaktoro_pse/core/reaktoro_state.py +++ b/src/reaktoro_pse/core/reaktoro_state.py @@ -238,8 +238,6 @@ def get_func(activity_model, state_of_matter): try: return getattr(rkt, activity_model)() except TypeError: - print(active_database) - print(getattr(rkt, activity_model)) # might require database as input (e.g ActivityModelPhreeqc) return getattr(rkt, activity_model)(active_database) else: @@ -279,10 +277,26 @@ def __init__(self): self.database_file = None def copy_rkt_inputs(self, inputs): - self.inputs = copy.deepcopy(inputs) - for key, obj in self.inputs.items(): - # self.inputs[key] = copy.deepcopy(obj) - self.inputs[key].delete_pyomo_var() + self.inputs = RktInputs.RktInputs() + for key, obj in inputs.items(): + obj.update_values(True) + self.inputs[key] = None + self.inputs[key].time_unit = obj.time_unit + self.inputs[key].main_unit = obj.main_unit + self.inputs[key].conversion_unit = obj.conversion_unit + self.inputs[key].conversion_value = obj.conversion_value + self.inputs[key].required_unit = obj.required_unit + self.inputs[key].lower_bound = obj.lower_bound + self.inputs[key].input_type = obj.input_type + self.inputs[key].value = obj.value + self.inputs[key].converted_value = obj.converted_value + self.inputs.registered_phases = inputs.registered_phases + self.inputs.all_species = inputs.all_species + self.inputs.species_list = inputs.species_list + self.inputs.convert_to_rkt_species = inputs.convert_to_rkt_species + self.inputs.composition_is_elements = inputs.composition_is_elements + self.inputs.conversion_method = inputs.conversion_method + self.inputs.rkt_input_list = inputs.rkt_input_list # base class for configuring reaktoro states and solver @@ -795,3 +809,4 @@ def load_from_export_object(self, export_object): self.phase_manager = export_object.phase_manager self.database_file = export_object.database_file self.database_type = export_object.database_type + self.exclude_species_list = export_object.exclude_species_list diff --git a/src/reaktoro_pse/parallel_tools/parallel_manager.py b/src/reaktoro_pse/parallel_tools/parallel_manager.py index f6aace1..5b604cf 100644 --- a/src/reaktoro_pse/parallel_tools/parallel_manager.py +++ b/src/reaktoro_pse/parallel_tools/parallel_manager.py @@ -10,6 +10,7 @@ # "https://github.com/watertap-org/reaktoro-pse/" ################################################################################# +import array from calendar import c import multiprocessing as mp from multiprocessing import shared_memory @@ -72,7 +73,7 @@ def __init__( self.jacobian_reference = shared_memory.SharedMemory(name=jacobian_reference) self.input_matrix = np.ndarray( (3, len(self.inputs.rkt_inputs.keys())), - dtype=float, + dtype=np.float64, buffer=self.input_reference.buf, ) self.jacobian_matrix = np.ndarray( @@ -80,13 +81,13 @@ def __init__( len(self.outputs.rkt_outputs.keys()), len(self.inputs.rkt_inputs.keys()), ), - dtype=float, + dtype=np.float64, buffer=self.jacobian_reference.buf, ) self.output_matrix = np.ndarray( len(self.outputs.rkt_outputs.keys()), - dtype=float, + dtype=np.float64, buffer=self.output_reference.buf, ) self.params = {} @@ -147,14 +148,14 @@ def get_input_and_output_sizes(self): # index 3 for ipopt solver calls self.input_keys = self.worker_data.inputs.rkt_inputs.keys() input_matrix = np.zeros( - (3, len(self.worker_data.inputs.rkt_inputs.keys())), dtype=float + (3, len(self.worker_data.inputs.rkt_inputs.keys())), dtype=np.float64 ) self.input_reference = shared_memory.SharedMemory( create=True, size=input_matrix.nbytes ) self.input_matrix = np.ndarray( (3, len(self.worker_data.inputs.rkt_inputs.keys())), - dtype=float, + dtype=np.float64, buffer=self.input_reference.buf, ) # for storing output matrix and jacobian @@ -163,7 +164,7 @@ def get_input_and_output_sizes(self): len(self.worker_data.outputs.rkt_outputs.keys()), len(self.input_keys), ), - dtype=float, + dtype=np.float64, ) output_matrix = np.zeros(len(self.worker_data.outputs.rkt_outputs.keys())) self.jacobian_reference = shared_memory.SharedMemory( @@ -174,7 +175,7 @@ def get_input_and_output_sizes(self): len(self.worker_data.outputs.rkt_outputs.keys()), len(self.input_keys), ), - dtype=float, + dtype=np.float64, buffer=self.jacobian_reference.buf, ) self.output_reference = shared_memory.SharedMemory( @@ -182,18 +183,18 @@ def get_input_and_output_sizes(self): ) self.output_matrix = np.ndarray( len(self.worker_data.outputs.rkt_outputs.keys()), - dtype=float, + dtype=np.float64, buffer=self.output_reference.buf, ) def initialize(self, presolve): - for i, key in enumerate(self.input_keys): - self.input_matrix[0][i] = self.worker_data.inputs.rkt_inputs[ - key - ].get_value() - self.input_matrix[1][i] = self.worker_data.inputs.rkt_inputs[key].get_value( - apply_conversion=True + + self.input_matrix[0][i] = np.float64( + self.worker_data.inputs.rkt_inputs[key].get_value() + ) + self.input_matrix[1][i] = np.float64( + self.worker_data.inputs.rkt_inputs[key].get_value(apply_conversion=True) ) self.local_pipe.send((WorkerMessages.initialize, presolve)) @@ -216,7 +217,7 @@ def update_outputs(self): def update_params(self, params): for i, key in enumerate(self.input_keys): - self.input_matrix[2][i] = params[key] + self.input_matrix[2][i] = np.float64(params[key]) def get_solution(self): if self.local_pipe.poll: @@ -251,12 +252,11 @@ def get_initialize_function(self, block_idx): def start_workers(self): for idx, local_worker in self.registered_workers.items(): - worker_config = local_worker.worker_data.get_configs() process = mp.Process( target=ReaktoroActor, args=( local_worker.remote_pipe, - worker_config, + local_worker.worker_data.frozen_state, local_worker.input_reference.name, local_worker.output_reference.name, local_worker.jacobian_reference.name, diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py index 49f5b51..626eec7 100644 --- a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -46,6 +46,9 @@ def get_configs(self): configs.append(self.solver.export_config()) return configs + def freeze_state(self): + self.frozen_state = self.get_configs() + class AggregateSolverState: def __init__(self): @@ -210,6 +213,7 @@ def register_block(self, state, inputs, outputs, jacobian, solver, builder): blk.jacobian = jacobian blk.solver = solver blk.builder = builder + blk.freeze_state() self.registered_blocks.append(blk) return blk @@ -234,10 +238,8 @@ def aggregate_inputs_and_outputs(self): self.aggregate_solver_state.register_solve_function( block_idx, block.solver.solve_reaktoro_block ) - # assert False if self.config.use_parallel_mode: self.parallel_manager.start_workers() - # assert False def build_reaktoro_blocks(self): self.aggregate_inputs_and_outputs() diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py index 1a87cb0..023c798 100644 --- a/src/reaktoro_pse/parallel_tools/tests/test_manager.py +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -158,3 +158,4 @@ def test_blockBuild_with_wateqf_data_base(build_rkt_state_with_species): m.display() assert pytest.approx(m.outputs[("pH", None)].value, 1e-2) == 7.49301431889365 assert pytest.approx(m.pH.value, 1e-2) == 6.515501990042 + m.reaktoro_manager.terminate_workers() diff --git a/src/reaktoro_pse/reaktoro_block.py b/src/reaktoro_pse/reaktoro_block.py index 1582ba9..6d7cdcf 100644 --- a/src/reaktoro_pse/reaktoro_block.py +++ b/src/reaktoro_pse/reaktoro_block.py @@ -414,9 +414,9 @@ def get_phases(phase_type): # these value swill be overwritten during intilization anyway for ion, obj in self.speciation_block.outputs.items(): if self.config.aqueous_phase.fixed_solvent_specie in ion: - obj.value = obj.value * 10 + obj.set_value(obj.value * 10) else: - obj.value = obj.value * 0.001 + obj.set_value(obj.value / 1000) if aqueous_input_composition is not {}: aqueous_input_composition = self.speciation_block.outputs From b0b37c9b2e5ea2cf29f685d132ee51af9f8d27fb Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Thu, 24 Oct 2024 10:34:11 -0700 Subject: [PATCH 05/10] Update reaktoro_outputs.py --- src/reaktoro_pse/core/reaktoro_outputs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reaktoro_pse/core/reaktoro_outputs.py b/src/reaktoro_pse/core/reaktoro_outputs.py index e3b69c3..32569e9 100644 --- a/src/reaktoro_pse/core/reaktoro_outputs.py +++ b/src/reaktoro_pse/core/reaktoro_outputs.py @@ -89,7 +89,7 @@ def set_option_function(self, property_type, get_function): def set_poyomo_build_option(self, func): self.pyomo_build_options = func - def set_value(self, value): + def set_pyo_value(self, value): self.pyomo_var.set_value(value) def set_get_function(self, func): From d20fb7d8ca53874c1affdaacea3a9a87d7ae9a25 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:44:33 -0700 Subject: [PATCH 06/10] added async and fixes --- src/reaktoro_pse/core/reaktoro_solver.py | 4 +- .../parallel_tools/parallel_manager.py | 64 ++++++++++++--- .../parallel_tools/reaktoro_block_manager.py | 80 +++++++++++++++---- 3 files changed, 119 insertions(+), 29 deletions(-) diff --git a/src/reaktoro_pse/core/reaktoro_solver.py b/src/reaktoro_pse/core/reaktoro_solver.py index f60ea12..2272e06 100644 --- a/src/reaktoro_pse/core/reaktoro_solver.py +++ b/src/reaktoro_pse/core/reaktoro_solver.py @@ -235,7 +235,9 @@ def solve_reaktoro_block( _log.warning(f"{key}: {value}") self._sequential_fails += 1 if self._sequential_fails > self._max_fails: - assert False + raise RuntimeError( + "Number of failed solves exceed maximum allowed number" + ) raise cyipopt.CyIpoptEvaluationError else: self._sequential_fails = 0 diff --git a/src/reaktoro_pse/parallel_tools/parallel_manager.py b/src/reaktoro_pse/parallel_tools/parallel_manager.py index 5b604cf..1248ed0 100644 --- a/src/reaktoro_pse/parallel_tools/parallel_manager.py +++ b/src/reaktoro_pse/parallel_tools/parallel_manager.py @@ -76,6 +76,7 @@ def __init__( dtype=np.float64, buffer=self.input_reference.buf, ) + self.old_matrix = None # np.zeros(len(self.inputs.rkt_inputs.keys())) self.jacobian_matrix = np.ndarray( ( len(self.outputs.rkt_outputs.keys()), @@ -91,6 +92,7 @@ def __init__( buffer=self.output_reference.buf, ) self.params = {} + self.old_params = {} def initialize(self, presolve=False): _log.info("Initialized in remote worker") @@ -101,13 +103,16 @@ def initialize(self, presolve=False): self.update_output_matrix(outputs, jacobian) def solve(self): - self.get_params() try: - jacobian, outputs = self.solver.solve_reaktoro_block(self.params) - self.update_output_matrix(outputs, jacobian) + if self.check_solve(): + self.get_params() + jacobian, outputs = self.solver.solve_reaktoro_block(self.params) + self.update_output_matrix(outputs, jacobian) return WorkerMessages.success except cyipopt.CyIpoptEvaluationError: return WorkerMessages.CyIpoptEvaluationError + except RuntimeError: + return WorkerMessages.failed def update_output_matrix(self, outputs, jacobian): np.copyto(self.output_matrix, outputs) @@ -117,11 +122,34 @@ def get_params(self): for i, key in enumerate(self.inputs.rkt_inputs.keys()): self.params[key] = self.input_matrix[2][i] + def check_solve(self): + if self.old_matrix is None: + self.old_matrix = self.input_matrix[2].copy() + return True + else: + if any( + self.old_matrix[i] != self.input_matrix[2][i] + for i in range(self.input_matrix[2].size) + ): + self.old_matrix = self.input_matrix[2].copy() + return True + else: + return False + def update_inputs(self): for i, key in enumerate(self.inputs.rkt_inputs.keys()): self.inputs.rkt_inputs[key].value = self.input_matrix[0][i] self.inputs.rkt_inputs[key].converted_value = self.input_matrix[1][i] + def close_shared_memory(self): + # clean up memory on termination + self.input_reference.close() + self.input_reference.unlink() + self.output_reference.close() + self.output_reference.unlink() + self.jacobian_reference.close() + self.jacobian_reference.unlink() + class WorkerMessages: initialize = "initialize" @@ -131,6 +159,7 @@ class WorkerMessages: failed_solve = "failed_solve" CyIpoptEvaluationError = "CyIpoptEvaluationError" terminate = "terminate" + failed = "failed" class LocalWorker: @@ -227,16 +256,20 @@ def get_solution(self): elif result == WorkerMessages.CyIpoptEvaluationError: raise cyipopt.CyIpoptEvaluationError else: - raise ValueError("The worker failed and did not return a solution") + raise RuntimeError( + "The worker failed and did not return a solution terminated" + ) def terminate(self): self.local_pipe.send(WorkerMessages.terminate) + _log.info("Worker terminated") class ReaktoroParallelManager: - def __init__(self): + def __init__(self, time_out): self.registered_workers = {} self.processes = {} + self.time_out = time_out def register_block(self, block_idx, block_data): self.registered_workers[block_idx] = LocalWorker(block_data) @@ -260,6 +293,7 @@ def start_workers(self): local_worker.input_reference.name, local_worker.output_reference.name, local_worker.jacobian_reference.name, + self.time_out, ), ) process.start() @@ -273,15 +307,12 @@ def terminate_workers(self): def ReaktoroActor( - pipe, reaktoro_block_data, input_matrix, output_matrix, jacobian_matrix + pipe, reaktoro_block_data, input_matrix, output_matrix, jacobian_matrix, time_out=20 ): reaktoro_worker = RemoteWorker( reaktoro_block_data, input_matrix, output_matrix, jacobian_matrix ) dog_watch = time.time() - max_time = ( - 300 # five min time out should be enough for waiting, this will kill worker - ) while True: if pipe.poll(): msg = pipe.recv() @@ -298,12 +329,19 @@ def ReaktoroActor( reaktoro_worker.initialize(presolve=option) result = WorkerMessages.success if command == WorkerMessages.solve: - reaktoro_worker.solve() - result = WorkerMessages.success + result = reaktoro_worker.solve() if command == WorkerMessages.terminate: + reaktoro_worker.close_shared_memory() return pipe.send(result) dog_watch = time.time() - if abs(time.time() - dog_watch) > max_time: + if abs(time.time() - dog_watch) > time_out: + # make sure we kill worker if it does not receive command with in time out + # this is to handle scenario where main flowsheet crashes or is rebuilt + _log.warning( + f"""Worker timed out, shutting down worker. + The time out was set to {time_out}, increase it if necessary in ReaktoroBlockManager options""" + ) + reaktoro_worker.close_shared_memory() return - time.sleep(1e-3) # 1ms sleep time to reduce load + time.sleep(1e-3) # 1ms sleep time to reduce load when idle diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py index 626eec7..b0815ed 100644 --- a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -10,6 +10,7 @@ # "https://github.com/watertap-org/reaktoro-pse/" ################################################################################# +import multiprocessing from matplotlib.pyplot import sca from pytest import param from reaktoro_pse.core.reaktoro_gray_box import ( @@ -51,7 +52,7 @@ def freeze_state(self): class AggregateSolverState: - def __init__(self): + def __init__(self, parallel_mode=True, maximum_number_of_parallel_solves=None): self.hessian_type = None self.inputs = [] self.input_dict = {} @@ -66,6 +67,10 @@ def __init__(self): self.jacobian_matrix = [] self.input_windows = {} self.output_windows = {} + self.parallel_mode = parallel_mode + if maximum_number_of_parallel_solves is None: + maximum_number_of_parallel_solves = multiprocessing.cpu_count() + self.maximum_number_of_parallel_solves = maximum_number_of_parallel_solves def register_solve_function(self, block_index, solver_function): self.solver_functions[block_index] = solver_function @@ -127,9 +132,7 @@ def get_params(self, block_idx, params): for (idx, key), item in params.items(): if block_idx == idx: param_set[key] = item - return param_set # np.array(params)[ - # self.input_windows[block_idx][0] : self.input_windows[block_idx][1] - # ] + return param_set def update_solution(self, block_idx, output, jacobian): self.output_matrix[ @@ -141,20 +144,42 @@ def update_solution(self, block_idx, output, jacobian): ] = jacobian def solve_reaktoro_block(self, params): - results = [] + if self.parallel_mode: + return self.parallel_solver(params) + else: + return self.serial_solver(params) + + def parallel_solver(self, params): + active_workers = [] for blk in self.registered_blocks: param_set = self.get_params(blk, params) - result = self.solver_functions[blk](param_set) - results.append(result) + self.solver_functions[blk](param_set) + active_workers.append(blk) + # if we have more than max workers, + # collect any results that are ready first + while len(active_workers) >= self.maximum_number_of_parallel_solves: + for blk in active_workers: + result = self.get_solution_function[active_workers[0]]() + if result is not None: + jacobian, output = result + self.update_solution(blk, output, jacobian) + active_workers.pop(0) # remove first worker + break + # collect any results that are still not collected + for blk in active_workers: + jacobian, output = self.get_solution_function[blk]() + self.update_solution(blk, output, jacobian) - for i, blk in enumerate(self.registered_blocks): - if results[i] is None: - jacobian, output = self.get_solution_function[blk]() - else: - jacobian, output = results[i] + return ( + self.jacobian_matrix, + self.output_matrix, + ) + def serial_solver(self, params): + for blk in self.registered_blocks: + param_set = self.get_params(blk, params) + jacobian, output = self.solver_functions[blk](param_set) self.update_solution(blk, output, jacobian) - return ( self.jacobian_matrix, self.output_matrix, @@ -197,13 +222,38 @@ class ReaktoroBlockManagerData(ProcessBlockData): doc="""If true, will parallelize all rekatoro solver calls using multiprocessing""", ), ) + CONFIG.declare( + "worker_timeout", + ConfigValue( + default=20, + domain=int, + description="Defines time in seconds for worker time out", + doc="""This is time out for parallel workers to time out and shut down if they receive no + commands from main process (e.g. flowsheet)""", + ), + ) + CONFIG.declare( + "maximum_number_of_parallel_solves", + ConfigValue( + default=None, + domain=int, + description="Maximum number of parallel solves", + doc="""This will limit how many parallel solves would be run, this will not reduce number of + spawned processes""", + ), + ) def build(self): super().build() self.registered_blocks = [] - self.aggregate_solver_state = AggregateSolverState() + self.aggregate_solver_state = AggregateSolverState( + parallel_mode=self.config.use_parallel_mode, + maximum_number_of_parallel_solves=self.config.maximum_number_of_parallel_solves, + ) if self.config.use_parallel_mode: - self.parallel_manager = ReaktoroParallelManager() + self.parallel_manager = ReaktoroParallelManager( + self.config.worker_timeout, + ) def register_block(self, state, inputs, outputs, jacobian, solver, builder): blk = ReaktoroBlockData() From 784e8b9941683b58c7aa28db85187a6c6300d19a Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:55:26 -0700 Subject: [PATCH 07/10] Update reaktoro_block_manager.py --- src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py index b0815ed..8a4ae8d 100644 --- a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -69,7 +69,7 @@ def __init__(self, parallel_mode=True, maximum_number_of_parallel_solves=None): self.output_windows = {} self.parallel_mode = parallel_mode if maximum_number_of_parallel_solves is None: - maximum_number_of_parallel_solves = multiprocessing.cpu_count() + maximum_number_of_parallel_solves = multiprocessing.cpu_count() - 1 self.maximum_number_of_parallel_solves = maximum_number_of_parallel_solves def register_solve_function(self, block_index, solver_function): From 15581a75d114c6bfd96343e0fa62fb5172997175 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Thu, 24 Oct 2024 15:13:25 -0700 Subject: [PATCH 08/10] tested accesing reaktoro_model outputs --- src/reaktoro_pse/parallel_tools/tests/test_manager.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py index 023c798..a884d56 100644 --- a/src/reaktoro_pse/parallel_tools/tests/test_manager.py +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -96,6 +96,9 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): m.property_block.update_jacobian_scaling(new_scaling) scaling_result = m.property_block.display_jacobian_scaling() + print(m.property_block.reaktoro_model.outputs) + assert len(m.property_block.reaktoro_model.outputs) == 2 + assert "speciation_block" in scaling_result for key in scaling_result["speciation_block"]: assert scaling_result["speciation_block"][key] == 1 From 7473262f070a7a37a1d946b402ab411b3f584416 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:09:31 -0700 Subject: [PATCH 09/10] Update test_reaktoro_builder.py --- .../core/tests/test_reaktoro_builder.py | 156 +++++++++--------- 1 file changed, 76 insertions(+), 80 deletions(-) diff --git a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py index f1e0d59..c9c4d42 100644 --- a/src/reaktoro_pse/core/tests/test_reaktoro_builder.py +++ b/src/reaktoro_pse/core/tests/test_reaktoro_builder.py @@ -140,10 +140,6 @@ def test_build_with_rkt_dissolution(build_with_dissolve_in_rkt): m, rkt_solver = build_with_dissolve_in_rkt m.rkt_block = Block() builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) - - # m.display() - # m.rkt_block.reaktoro_model.display() - builder.initialize() m.display() m.rkt_block.reaktoro_model.display() @@ -171,79 +167,79 @@ def test_build_with_rkt_dissolution(build_with_dissolve_in_rkt): ) -# def test_build_with_pyomo_dissolution(build_with_dissolve_in_pyomo): -# m, rkt_solver = build_with_dissolve_in_pyomo -# m.rkt_block = Block() -# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) -# builder.initialize() -# # will have as many DOFs as outputs due to pyomo not -# # knowing tha graybox exists. -# print(rkt_solver.output_specs.rkt_outputs) -# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) -# cy_solver = get_solver(solver="cyipopt-watertap") -# cy_solver.options["max_iter"] = 20 -# m.pH.unfix() -# m.display() -# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) -# result = cy_solver.solve(m, tee=True) -# assert_optimal_termination(result) -# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 -# assert ( -# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) -# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value -# ) - - -# def test_build_with_rkt_dissolution_mass_basis(build_with_dissolve_in_rkt_mass_basis): -# m, rkt_solver = build_with_dissolve_in_rkt_mass_basis -# m.rkt_block = Block() -# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) -# builder.initialize() -# # will have as many DOFs as outputs due to pyomo not -# # knowing tha graybox exists. -# assert len(m.rkt_block.reaktoro_model.inputs) == len( -# rkt_solver.input_specs.rkt_inputs -# ) -# assert len(m.rkt_block.outputs) == len(rkt_solver.output_specs.user_outputs) -# assert len(m.rkt_block.reaktoro_model.outputs) == len( -# rkt_solver.output_specs.rkt_outputs -# ) -# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) -# cy_solver = get_solver(solver="cyipopt-watertap") -# cy_solver.options["max_iter"] = 20 -# m.pH.unfix() -# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - -# # m.display() -# result = cy_solver.solve(m, tee=True) -# assert_optimal_termination(result) -# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 -# assert ( -# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) -# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value -# ) - - -# def test_build_with_pyomo_dissolution_mass_basis( -# build_with_dissolve_in_pyomo_mass_basis, -# ): -# m, rkt_solver = build_with_dissolve_in_pyomo_mass_basis -# m.rkt_block = Block() -# builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) -# builder.initialize() -# # will have as many DOFs as outputs due to pyomo not -# # knowing tha graybox exists. -# assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) -# cy_solver = get_solver(solver="cyipopt-watertap") -# cy_solver.options["max_iter"] = 20 -# m.pH.unfix() -# m.display() -# m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) - -# result = cy_solver.solve(m, tee=True) -# assert_optimal_termination(result) -# assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 -# assert ( -# pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) -# == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value -# ) +def test_build_with_pyomo_dissolution(build_with_dissolve_in_pyomo): + m, rkt_solver = build_with_dissolve_in_pyomo + m.rkt_block = Block() + builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) + builder.initialize() + # will have as many DOFs as outputs due to pyomo not + # knowing tha graybox exists. + print(rkt_solver.output_specs.rkt_outputs) + assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) + cy_solver = get_solver(solver="cyipopt-watertap") + cy_solver.options["max_iter"] = 20 + m.pH.unfix() + m.display() + m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + result = cy_solver.solve(m, tee=True) + assert_optimal_termination(result) + assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 + assert ( + pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) + == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value + ) + + +def test_build_with_rkt_dissolution_mass_basis(build_with_dissolve_in_rkt_mass_basis): + m, rkt_solver = build_with_dissolve_in_rkt_mass_basis + m.rkt_block = Block() + builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) + builder.initialize() + # will have as many DOFs as outputs due to pyomo not + # knowing tha graybox exists. + assert len(m.rkt_block.reaktoro_model.inputs) == len( + rkt_solver.input_specs.rkt_inputs + ) + assert len(m.rkt_block.outputs) == len(rkt_solver.output_specs.user_outputs) + assert len(m.rkt_block.reaktoro_model.outputs) == len( + rkt_solver.output_specs.rkt_outputs + ) + assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) + cy_solver = get_solver(solver="cyipopt-watertap") + cy_solver.options["max_iter"] = 20 + m.pH.unfix() + m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + + # m.display() + result = cy_solver.solve(m, tee=True) + assert_optimal_termination(result) + assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 + assert ( + pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) + == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value + ) + + +def test_build_with_pyomo_dissolution_mass_basis( + build_with_dissolve_in_pyomo_mass_basis, +): + m, rkt_solver = build_with_dissolve_in_pyomo_mass_basis + m.rkt_block = Block() + builder = ReaktoroBlockBuilder(m.rkt_block, rkt_solver) + builder.initialize() + # will have as many DOFs as outputs due to pyomo not + # knowing tha graybox exists. + assert degrees_of_freedom(m) == len(rkt_solver.output_specs.rkt_outputs) + cy_solver = get_solver(solver="cyipopt-watertap") + cy_solver.options["max_iter"] = 20 + m.pH.unfix() + m.display() + m.rkt_block.outputs[("scalingTendency", "Calcite")].fix(5) + + result = cy_solver.solve(m, tee=True) + assert_optimal_termination(result) + assert pytest.approx(m.pH.value, 1e-3) == 6.5257440 + assert ( + pytest.approx(m.rkt_block.outputs[("scalingTendency", "Calcite")].value, 1e-3) + == m.rkt_block.outputs[("scalingTendencyDirect", "Calcite")].value + ) From 2145ea678105fd6c815ca762dd433940cb60c8b4 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Mon, 28 Oct 2024 20:33:29 -0700 Subject: [PATCH 10/10] added function to view rkt state --- src/reaktoro_pse/core/reaktoro_block_builder.py | 16 +++++++++++++++- .../parallel_tools/parallel_manager.py | 13 +++++++++++++ .../parallel_tools/reaktoro_block_manager.py | 3 +++ .../parallel_tools/tests/test_manager.py | 7 +++++++ src/reaktoro_pse/reaktoro_block.py | 4 ++-- 5 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/reaktoro_pse/core/reaktoro_block_builder.py b/src/reaktoro_pse/core/reaktoro_block_builder.py index 45f1ecb..4823567 100644 --- a/src/reaktoro_pse/core/reaktoro_block_builder.py +++ b/src/reaktoro_pse/core/reaktoro_block_builder.py @@ -55,12 +55,18 @@ def __init__(self, block, reaktoro_solver, build_on_init=True): raise TypeError("Reaktoro block builder requires a ReaktoroSolver class") self.configure_jacobian_scaling() self.reaktoro_initialize_function = None # used to provide external solve call + self.display_reaktoro_state_function = ( + None # used to specifying external function to display rkt state + ) self.build_output_vars() if build_on_init: # option to support legacy implementation self.build_reaktoro_block() def build_reaktoro_block( - self, gray_box_model=None, reaktoro_initialize_function=None + self, + gray_box_model=None, + reaktoro_initialize_function=None, + display_reaktoro_state_function=None, ): """build reaktoro model""" if gray_box_model is None: @@ -73,6 +79,8 @@ def build_reaktoro_block( self.block.reaktoro_model = gray_box_model if reaktoro_initialize_function is not None: self.reaktoro_initialize_function = reaktoro_initialize_function + if display_reaktoro_state_function is not None: + self.display_reaktoro_state_function = display_reaktoro_state_function self.build_input_constraints() self.build_output_constraints() @@ -350,3 +358,9 @@ def initialize_input_variables_and_constraints(self): sf = self.get_sf(self.block.reaktoro_model.inputs[key]) iscale.set_scaling_factor(self.block.reaktoro_model.inputs[key], sf) iscale.constraint_scaling_transform(self.block.input_constraints[key], sf) + + def display_state(self): + if self.display_reaktoro_state_function is None: + print(self.solver.state) + else: + self.display_reaktoro_state_function() diff --git a/src/reaktoro_pse/parallel_tools/parallel_manager.py b/src/reaktoro_pse/parallel_tools/parallel_manager.py index 1248ed0..8527f77 100644 --- a/src/reaktoro_pse/parallel_tools/parallel_manager.py +++ b/src/reaktoro_pse/parallel_tools/parallel_manager.py @@ -122,6 +122,9 @@ def get_params(self): for i, key in enumerate(self.inputs.rkt_inputs.keys()): self.params[key] = self.input_matrix[2][i] + def display_state(self): + print(self.state.state) + def check_solve(self): if self.old_matrix is None: self.old_matrix = self.input_matrix[2].copy() @@ -160,6 +163,7 @@ class WorkerMessages: CyIpoptEvaluationError = "CyIpoptEvaluationError" terminate = "terminate" failed = "failed" + display_state = "display_state" class LocalWorker: @@ -260,6 +264,9 @@ def get_solution(self): "The worker failed and did not return a solution terminated" ) + def display_state(self): + self.local_pipe.send(WorkerMessages.display_state) + def terminate(self): self.local_pipe.send(WorkerMessages.terminate) _log.info("Worker terminated") @@ -283,6 +290,9 @@ def get_solve_and_get_function(self, block_idx): def get_initialize_function(self, block_idx): return self.registered_workers[block_idx].initialize + def get_display_function(self, block_idx): + return self.registered_workers[block_idx].display_state + def start_workers(self): for idx, local_worker in self.registered_workers.items(): process = mp.Process( @@ -330,6 +340,9 @@ def ReaktoroActor( result = WorkerMessages.success if command == WorkerMessages.solve: result = reaktoro_worker.solve() + if command == WorkerMessages.display_state: + result = reaktoro_worker.display_state() + result = WorkerMessages.success if command == WorkerMessages.terminate: reaktoro_worker.close_shared_memory() return diff --git a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py index 8a4ae8d..084ae02 100644 --- a/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py +++ b/src/reaktoro_pse/parallel_tools/reaktoro_block_manager.py @@ -316,11 +316,14 @@ def build_reaktoro_blocks(self): ) if self.config.use_parallel_mode: init_func = self.parallel_manager.get_initialize_function(block_idx) + disp_func = self.parallel_manager.get_display_function(block_idx) else: init_func = self.aggregate_solver_state.solver_functions[block_idx] + disp_func = None block.builder.build_reaktoro_block( gray_box_model=pseudo_gray_box_model, reaktoro_initialize_function=init_func, + display_reaktoro_state_function=disp_func, ) block.pseudo_gray_box = pseudo_gray_box_model diff --git a/src/reaktoro_pse/parallel_tools/tests/test_manager.py b/src/reaktoro_pse/parallel_tools/tests/test_manager.py index a884d56..1a34170 100644 --- a/src/reaktoro_pse/parallel_tools/tests/test_manager.py +++ b/src/reaktoro_pse/parallel_tools/tests/test_manager.py @@ -42,6 +42,11 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): "pressure": m.pressure, "pH": m.pH, }, + jacobian_options={ + "numerical_type": "average", + "numerical_order": 2, + "numerical_step": 1e-8, + }, database="PhreeqcDatabase", database_file="pitzer.dat", chemistry_modifier=m.CaO, @@ -51,6 +56,7 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): ) m.reaktoro_manager.build_reaktoro_blocks() m.property_block.initialize() + cy_solver = get_solver(solver="cyipopt-watertap") cy_solver.options["max_iter"] = 20 m.pH.unfix() @@ -115,6 +121,7 @@ def test_blockBuild_with_speciation_block(build_rkt_state_with_species): assert "property_block" in scaling_result for key in scaling_result["property_block"]: assert scaling_result["property_block"][key] == 1 + m.property_block.display_reaktoro_state() m.reaktoro_manager.terminate_workers() diff --git a/src/reaktoro_pse/reaktoro_block.py b/src/reaktoro_pse/reaktoro_block.py index 6d7cdcf..e46922d 100644 --- a/src/reaktoro_pse/reaktoro_block.py +++ b/src/reaktoro_pse/reaktoro_block.py @@ -759,9 +759,9 @@ def display_reaktoro_state(self): """Displays reaktoro state""" if self.config.build_speciation_block: _log.info("-----Displaying information for speciation block ------") - _log.info(self.speciation_block.rkt_state.state) + self.speciation_block.rkt_block_builder.display_state() _log.info("-----Displaying information for property block ------") - _log.info(self.rkt_state.state) + self.rkt_block_builder.display_state() def update_jacobian_scaling( self, user_scaling_dict=None, set_on_speciation_block=True