From 5ee9e0139fd9e152663849427fee09f415a2cc22 Mon Sep 17 00:00:00 2001 From: raychew Date: Wed, 22 May 2024 08:40:54 +0200 Subject: [PATCH] parallel run seems to work now however, there is a bug with the REMA IO routine when both the lat and lon extent span multiple files, e.g. with grid cell 47 in this commit --- runs/icon_merit_global.py | 39 ++++++++++++++++++------------ src/io.py | 51 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 16 deletions(-) diff --git a/runs/icon_merit_global.py b/runs/icon_merit_global.py index be8fd30..be520b7 100644 --- a/runs/icon_merit_global.py +++ b/runs/icon_merit_global.py @@ -31,6 +31,9 @@ def do_cell(c_idx, reader, writer, ): + + print(c_idx) + topo = var.topo_cell() lat_verts = grid.clat_vertices[c_idx] lon_verts = grid.clon_vertices[c_idx] @@ -72,8 +75,6 @@ def do_cell(c_idx, triangles[i, :, 0] = np.array(clon_vertices[i, :]) triangles[i, :, 1] = np.array(clat_vertices[i, :]) - print("--> triangles done") - if params.plot: cart_plot.lat_lon_icon(topo, triangles, ncells=ncells, clon=clon, clat=clat) @@ -95,13 +96,13 @@ def do_cell(c_idx, tri.tri_lon_verts = triangles[:, :, 0] tri.tri_lat_verts = triangles[:, :, 1] - simplex_lat = tri.tri_lat_verts[tri_idx] simplex_lon = tri.tri_lon_verts[tri_idx] if utils.is_land(cell, simplex_lat, simplex_lon, topo): - writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0) - return + # writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0) + print("--> skipping land cell") + return writer.grp_struct(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0) else: is_land = 1 @@ -149,7 +150,8 @@ def do_cell(c_idx, cell, ampls_sa, uw_sa, dat_2D_sa = sols v_extent = [dat_2D_sa.min(), dat_2D_sa.max()] - writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis) + # writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis) + result = writer.grp_struct(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis) if params.plot: if params.dfft_first_guess: @@ -160,7 +162,9 @@ def do_cell(c_idx, else: dplot.show(tri_idx, sols, v_extent=v_extent, output_fig=False) - return 1 + print("--> analysis done") + + return result def parallel_wrapper(grid, params, reader, writer): @@ -173,9 +177,10 @@ def parallel_wrapper(grid, params, reader, writer): # autoreload() from pycsam.inputs.icon_regional_run import params -# %% -from dask.distributed import Client, progress -import dask +# from dask.distributed import Client, progress +# import dask + +# dask.config.set(scheduler='synchronous') if __name__ == '__main__': if params.self_test(): @@ -204,16 +209,18 @@ def parallel_wrapper(grid, params, reader, writer): pw_run = parallel_wrapper(grid, params, reader, writer) - client = Client(threads_per_worker=2, n_workers=4) + # client = Client(threads_per_worker=1, n_workers=1) lazy_results = [] - for c_idx in range(n_cells)[:12]: - # pw_run(c_idx) - lazy_result = dask.delayed(pw_run)(c_idx) - lazy_results.append(lazy_result) + for c_idx in range(n_cells)[47:48]: + pw_run(c_idx) + # lazy_result = dask.delayed(pw_run)(c_idx) + # lazy_results.append(lazy_result) - results = dask.compute(*lazy_results) + # results = dask.compute(*lazy_results) # merit_reader.close_all() + # for item in results: + # writer.duplicate(item.c_idx, item) diff --git a/src/io.py b/src/io.py index 66dfad1..3cf0448 100644 --- a/src/io.py +++ b/src/io.py @@ -646,6 +646,57 @@ def output(self, id, clat, clon, is_land, analysis=None): rootgrp.close() + def duplicate(self, id, struct): + + rootgrp = nc.Dataset(self.path + self.fn, "a", format="NETCDF4") + + grp = rootgrp.createGroup(str(id)) + + is_land_var = grp.createVariable("is_land","i4") + is_land_var[:] = struct.is_land + + clat_var = grp.createVariable("clat","f8") + clat_var[:] = struct.clat + clon_var = grp.createVariable("clon","f8") + clon_var[:] = struct.clon + + if struct.is_land: + dk_var = grp.createVariable("dk","f8") + dk_var[:] = struct.dk + dl_var = grp.createVariable("dl","f8") + dl_var[:] = struct.dl + + pick_idx = np.where(struct.ampls > 0) + + H_spec_var = grp.createVariable("H_spec","f8", ("nspec",)) + H_spec_var[:] = self.__pad_zeros(struct.ampls[pick_idx], self.n_modes) + + kks_var = grp.createVariable("kks","f8", ("nspec",)) + kks_var[:] = self.__pad_zeros(struct.kks[pick_idx], self.n_modes) + + lls_var = grp.createVariable("lls","f8", ("nspec",)) + lls_var[:] = self.__pad_zeros(struct.lls[pick_idx], self.n_modes) + + rootgrp.close() + + class grp_struct(object): + def __init__(self, c_idx, clat, clon, is_land, analysis = None): + self.c_idx = c_idx + self.clat = clat + self.clon = clon + self.is_land = is_land + + self.dk = None + self.dl = None + + self.ampls = None + self.kks = None + self.lls = None + + if analysis is not None: + for key, value in vars(analysis).items(): + setattr(self, key, value) + @staticmethod def __pad_zeros(lst, n_modes):