From 0fd44eabe238ce2f1cf793d2819e62b56de9bd9a Mon Sep 17 00:00:00 2001 From: raychew Date: Tue, 11 Jun 2024 20:37:26 +0200 Subject: [PATCH] changed dask delayed to dask bag --- inputs/icon_regional_run.py | 3 ++- runs/icon_merit_global.py | 17 +++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/inputs/icon_regional_run.py b/inputs/icon_regional_run.py index 5533588..a6abcfb 100644 --- a/inputs/icon_regional_run.py +++ b/inputs/icon_regional_run.py @@ -27,7 +27,8 @@ params.nhi = 24 params.nhj = 48 -params.n_modes = 50 +params.n_modes = 10 +params.padding = 10 params.U, params.V = 10.0, 0.0 diff --git a/runs/icon_merit_global.py b/runs/icon_merit_global.py index 1d9b13d..de1b031 100644 --- a/runs/icon_merit_global.py +++ b/runs/icon_merit_global.py @@ -212,7 +212,8 @@ def parallel_wrapper(grid, params, reader, writer): from pycsam.inputs.icon_regional_run import params from dask.distributed import Client -import dask +import dask.bag as db +# import dask # dask.config.set(scheduler='synchronous') @@ -247,12 +248,16 @@ def parallel_wrapper(grid, params, reader, writer): lazy_results = [] - for c_idx in range(n_cells): - # pw_run(c_idx) - lazy_result = dask.delayed(pw_run)(c_idx) - lazy_results.append(lazy_result) + b = db.from_sequence(range(n_cells), npartitions=10) + results = b.map(pw_run) + results = results.compute() - results = dask.compute(*lazy_results) + # for c_idx in range(n_cells): + # # pw_run(c_idx) + # lazy_result = dask.delayed(pw_run)(c_idx) + # lazy_results.append(lazy_result) + + # results = dask.compute(*lazy_results) for item in results: writer.duplicate(item.c_idx, item)