Skip to content

Commit

Permalink
Handling for pathos parallel processing package; comparison example t…
Browse files Browse the repository at this point in the history
…o multiprocessing.
  • Loading branch information
misi9170 committed Sep 26, 2024
1 parent 4d552a9 commit b80527f
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 8 deletions.
6 changes: 4 additions & 2 deletions examples/examples_parallel/000_parallel_timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig
from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new


DEBUG = True

if __name__ == "__main__":
Expand Down Expand Up @@ -73,7 +74,7 @@ def set_up_and_run_models(n_turbs, n_findex, max_workers):
"../inputs/gch.yaml",
max_workers=max_workers,
n_wind_condition_splits=max_workers,
interface=parallel_interface,
interface="pathos",
print_timings=True,
)

Expand Down Expand Up @@ -119,7 +120,8 @@ def set_up_and_run_models(n_turbs, n_findex, max_workers):

# Save the data
df = pd.DataFrame({
"model": ["FlorisModel", "ParallelFlorisModel_orig", "ParallelFlorisModel_new", "ParallelFlorisModel_new_poweronly"],
"model": ["FlorisModel", "ParallelFlorisModel_orig", "ParallelFlorisModel_new",
"ParallelFlorisModel_new_poweronly"],
"AEP": [aep_fmodel, aep_pfmodel_orig, aep_pfmodel_new, aep_pfmodel_new_p],
"time": [t_fmodel, t_pfmodel_orig, t_pfmodel_new, t_pfmodel_new_p],
})
Expand Down
151 changes: 151 additions & 0 deletions examples/examples_parallel/001_parallel_timing_output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
"""Example: Timing tests for parallel computation interfaces.
Tests:
- max_workers specified, small.
- max_workers specified, large.
- max_workers unspecified.
- various n_findex
- various n_turbines
- return_turbine_powers_only=True
- return_turbine_powers_only=False
"""

from time import perf_counter as timerpc

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


max_workers_options = [2, 16, -1]
n_findex_options = [100, 1000, 10000]
n_turbines_options = [5, 10, 15] # Will be squared!
# Parallelization parameters

DEBUG = True

# First run max_workers tests
timing_data = []
for mw in max_workers_options:
# Set up models
n_turbs = 2 if DEBUG else 10 # Will be squared
n_findex = 1000

df = pd.read_csv(f"outputs/comptime_maxworkers{mw}_nturbs{n_turbs}_nfindex{n_findex}.csv")

timing_data.append(df.time.values)

timing_data = np.array(timing_data).T

x = np.arange(len(max_workers_options))
width = 0.2
multiplier = 0

fig, ax = plt.subplots(1,1)

for dat, lab in zip(timing_data.tolist(), df.model.values):
offset = width * multiplier
rects = ax.bar(x + offset, dat, width, label=lab)
ax.bar_label(rects, padding=3, fmt='%.1f')
multiplier += 1

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_xlabel('Max. workers [-]')
ax.set_xticks(x + width, max_workers_options)
ax.set_ylabel('Time [s]')
ax.legend(loc='upper left', ncols=2)
ax.set_yscale('log')
fig.savefig("outputs/max_workers_timing.png", format='png', dpi=300)


# Similar now for n_turbs
timing_data = []
for nt in n_turbines_options:
# Set up models
n_findex = 10 if DEBUG else 1000
max_workers = -1
df = pd.read_csv(f"outputs/comptime_maxworkers{max_workers}_nturbs{nt}_nfindex{n_findex}.csv")
timing_data.append(df.time.values)

timing_data = np.array(timing_data).T

x = np.arange(len(n_turbines_options))
width = 0.2
multiplier = 0

fig, ax = plt.subplots(1,1)

for dat, lab in zip(timing_data.tolist(), df.model.values):
offset = width * multiplier
rects = ax.bar(x + offset, dat, width, label=lab)
ax.bar_label(rects, padding=3, fmt='%.1f')
multiplier += 1

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_xlabel('n_turbines [-]')
ax.set_xticks(x + width, np.array(n_turbines_options)**2)
ax.set_ylabel('Time [s]')
ax.legend(loc='upper left', ncols=2)
ax.set_yscale('log')
fig.savefig("outputs/n_turbines_timing.png", format='png', dpi=300)


# Similar now for n_findex
timing_data = []
for nf in n_findex_options:
# Set up models
n_turbs = 2 if DEBUG else 10 # Will be squared
max_workers = -1
df = pd.read_csv(f"outputs/comptime_maxworkers{max_workers}_nturbs{n_turbs}_nfindex{nf}.csv")
timing_data.append(df.time.values)

timing_data = np.array(timing_data).T

x = np.arange(len(n_findex_options))
width = 0.2
multiplier = 0

fig, ax = plt.subplots(1,1)

for dat, lab in zip(timing_data.tolist(), df.model.values):
offset = width * multiplier
rects = ax.bar(x + offset, dat, width, label=lab)
ax.bar_label(rects, padding=3, fmt='%.1f')
multiplier += 1

# Add some text for labels, title and custom x-axis tick labels, etc.
ax.set_xlabel('n_findex [-]')
ax.set_xticks(x + width, n_findex_options)
ax.set_ylabel('Time [s]')
ax.legend(loc='upper left', ncols=2)
ax.set_yscale('log')
fig.savefig("outputs/n_findex_timing.png", format='png', dpi=300)


plt.show()




# # Then run n_turbines tests
# for nt in n_turbines_options:
# # Set up models
# n_findex = 10 if DEBUG else 1000
# max_workers = 16

# set_up_and_run_models(
# n_turbs=nt, n_findex=n_findex, max_workers=max_workers
# )

# # Then run n_findex tests
# for nf in n_findex_options:
# # Set up models
# n_turbs = 2 if DEBUG else 10 # Will be squared
# max_workers = 16

# set_up_and_run_models(
# n_turbs=n_turbs, n_findex=nf, max_workers=max_workers
# )

107 changes: 107 additions & 0 deletions examples/examples_parallel/002_worker_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
"""Example: Timing tests for parallel computation interfaces.
Tests:
- max_workers specified, small.
- max_workers specified, large.
- max_workers unspecified.
- various n_findex
- various n_turbines
- return_turbine_powers_only=True
- return_turbine_powers_only=False
"""

from time import perf_counter as timerpc

import numpy as np
import pandas as pd

from floris import (
FlorisModel,
TimeSeries,
)
from floris.parallel_floris_model import ParallelFlorisModel as ParallelFlorisModel_orig
from floris.parallel_floris_model_2 import ParallelFlorisModel as ParallelFlorisModel_new


DEBUG = True

if __name__ == "__main__":
# Create random wind data
np.random.seed(0)
n_findex = 10 if DEBUG else 1000
wind_speeds = np.random.normal(loc=8.0, scale=2.0, size=n_findex)
wind_directions = np.random.normal(loc=270.0, scale=15.0, size=n_findex)
turbulence_intensities = 0.06*np.ones_like(wind_speeds)

time_series = TimeSeries(
wind_directions=wind_directions,
wind_speeds=wind_speeds,
turbulence_intensities=turbulence_intensities,
)

# Clip wind_rose to specified n_findex

fmodel = FlorisModel("../inputs/gch.yaml")

# Specify wind farm layout and update in the floris object
N = 20 if DEBUG else 100

X, Y = np.meshgrid(
5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1),
5.0 * fmodel.core.farm.rotor_diameters_sorted[0][0] * np.arange(0, N, 1),
)

# Set up new parallel Floris model
print("Beginning multiprocessing test")
t0 = timerpc()
pfmodel_mp = ParallelFlorisModel_new(
"../inputs/gch.yaml",
max_workers=-1,
n_wind_condition_splits=-1,
interface="multiprocessing",
print_timings=True,
)
pfmodel_mp.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series)
t1 = timerpc()
pfmodel_mp.run()
aep1 = pfmodel_mp.get_farm_AEP()
t2 = timerpc()
pfmodel_mp.set(layout_x=X.flatten()+10, layout_y=Y.flatten())
pfmodel_mp.run()
aep2 = pfmodel_mp.get_farm_AEP()
t3 = timerpc()

print(f"Multiprocessing (max_workers={pfmodel_mp.max_workers})")
print(f"Time to set up: {t1-t0}")
print(f"Time to run first: {t2-t1}")
print(f"Time to run second: {t3-t2}")

# When is the worker pool released, though??
print("Beginning pathos test")
t0 = timerpc()
pfmodel_pathos = ParallelFlorisModel_new(
"../inputs/gch.yaml",
max_workers=-1,
n_wind_condition_splits=-1,
interface="pathos",
print_timings=True,
)
pfmodel_pathos.set(layout_x=X.flatten(), layout_y=Y.flatten(), wind_data=time_series)
t1 = timerpc()
pfmodel_pathos.run()
aep3 = pfmodel_pathos.get_farm_AEP()
t2 = timerpc()
pfmodel_pathos.set(layout_x=X.flatten()+10, layout_y=Y.flatten())
pfmodel_pathos.run()
aep4 = pfmodel_pathos.get_farm_AEP()
t3 = timerpc()

print(f"Pathos (max_workers={pfmodel_pathos.max_workers})")
print(f"Time to set up: {t1-t0}")
print(f"Time to run first: {t2-t1}")
print(f"Time to run second: {t3-t2}")

if np.isclose(aep1 + aep2 + aep3 + aep4, 4*aep4):
print("AEPs are equal!")
51 changes: 45 additions & 6 deletions floris/parallel_floris_model_2.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
from __future__ import annotations

import copy
import warnings
from pathlib import Path
from time import perf_counter as timerpc

import numpy as np
import pandas as pd

from floris.core import State
from floris.floris_model import FlorisModel
from floris.optimization.yaw_optimization.yaw_optimizer_sr import YawOptimizationSR


class ParallelFlorisModel(FlorisModel):
Expand Down Expand Up @@ -40,7 +37,7 @@ def __init__(
- **wake**: See `floris.simulation.wake.WakeManager` for more details.
- **logging**: See `floris.simulation.core.Core` for more details.
interface: The parallelization interface to use. Options are "multiprocessing",
with possible future support for "mpi4py" and "concurrent"
and "pathos", with possible future support for "mpi4py" and "concurrent"
max_workers: The maximum number of workers to use. Defaults to -1, which then
takes the number of CPUs available.
n_wind_condition_splits: The number of wind conditions to split the simulation over.
Expand All @@ -65,6 +62,11 @@ def __init__(
if max_workers == -1:
max_workers = mp.cpu_count()
# TODO: test spinning up the worker pool at this point
elif interface == "pathos":
import pathos
if max_workers == -1:
max_workers = pathos.helpers.cpu_count()
self.pathos_pool = pathos.pools.ProcessPool(nodes=max_workers)
elif interface in ["mpi4py", "concurrent"]:
raise NotImplementedError(
f"Parallelization interface {interface} not yet supported."
Expand All @@ -80,10 +82,10 @@ def __init__(
else:
raise ValueError(
f"Invalid parallelization interface {interface}. "
"Options are 'multiprocessing', 'mpi4py' or 'concurrent'."
"Options are 'multiprocessing' or 'pathos'."
)

self.interface = interface
self._interface = interface
self.max_workers = max_workers
if n_wind_condition_splits == -1:
self.n_wind_condition_splits = max_workers
Expand Down Expand Up @@ -128,6 +130,28 @@ def run(self) -> None:
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
elif self.interface == "pathos":
t0 = timerpc()
self.core.initialize_domain()
parallel_run_inputs = self._preprocessing()
t1 = timerpc()
if self.return_turbine_powers_only:
self._turbine_powers_split = self.pathos_pool.map(
_parallel_run_powers_only,
parallel_run_inputs
)
else:
def pr(x):
return _parallel_run(*x)
self._fmodels_split = self.pathos_pool.map(
pr,
parallel_run_inputs
)
t2 = timerpc()
self._postprocessing()
self.core.farm.finalize(self.core.grid.unsorted_indices)
self.core.state = State.USED
t3 = timerpc()
if self.print_timings:
print("===============================================================================")
if self.interface is None:
Expand Down Expand Up @@ -249,6 +273,21 @@ def fmodel(self):
"attributes and methods of FlorisModel directly."
)

@property
def interface(self):
"""
The parallelization interface used.
"""
return self._interface

@interface.setter
def interface(self, value):
"""
Raise error regarding setting the interface.
"""
raise AttributeError(
"The parallelization interface cannot be changed after instantiation."
)

def _parallel_run(fmodel_dict, set_kwargs) -> FlorisModel:
"""
Expand Down

0 comments on commit b80527f

Please sign in to comment.