Skip to content

Commit

Permalink
parallel run seems to work now
Browse files Browse the repository at this point in the history
however, there is a bug with the REMA IO routine when both the lat and lon extent span multiple files, e.g. with grid cell 47 in this commit
  • Loading branch information
ray-chew committed May 22, 2024
1 parent b53b06c commit 5ee9e01
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 16 deletions.
39 changes: 23 additions & 16 deletions runs/icon_merit_global.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def do_cell(c_idx,
reader,
writer,
):

print(c_idx)

topo = var.topo_cell()
lat_verts = grid.clat_vertices[c_idx]
lon_verts = grid.clon_vertices[c_idx]
Expand Down Expand Up @@ -72,8 +75,6 @@ def do_cell(c_idx,
triangles[i, :, 0] = np.array(clon_vertices[i, :])
triangles[i, :, 1] = np.array(clat_vertices[i, :])

print("--> triangles done")

if params.plot:
cart_plot.lat_lon_icon(topo, triangles, ncells=ncells, clon=clon, clat=clat)

Expand All @@ -95,13 +96,13 @@ def do_cell(c_idx,
tri.tri_lon_verts = triangles[:, :, 0]
tri.tri_lat_verts = triangles[:, :, 1]


simplex_lat = tri.tri_lat_verts[tri_idx]
simplex_lon = tri.tri_lon_verts[tri_idx]

if utils.is_land(cell, simplex_lat, simplex_lon, topo):
writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0)
return
# writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0)
print("--> skipping land cell")
return writer.grp_struct(c_idx, clat_rad[c_idx], clon_rad[c_idx], 0)
else:
is_land = 1

Expand Down Expand Up @@ -149,7 +150,8 @@ def do_cell(c_idx,
cell, ampls_sa, uw_sa, dat_2D_sa = sols
v_extent = [dat_2D_sa.min(), dat_2D_sa.max()]

writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis)
# writer.output(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis)
result = writer.grp_struct(c_idx, clat_rad[c_idx], clon_rad[c_idx], is_land, cell.analysis)

if params.plot:
if params.dfft_first_guess:
Expand All @@ -160,7 +162,9 @@ def do_cell(c_idx,
else:
dplot.show(tri_idx, sols, v_extent=v_extent, output_fig=False)

return 1
print("--> analysis done")

return result


def parallel_wrapper(grid, params, reader, writer):
Expand All @@ -173,9 +177,10 @@ def parallel_wrapper(grid, params, reader, writer):
# autoreload()
from pycsam.inputs.icon_regional_run import params

# %%
from dask.distributed import Client, progress
import dask
# from dask.distributed import Client, progress
# import dask

# dask.config.set(scheduler='synchronous')

if __name__ == '__main__':
if params.self_test():
Expand Down Expand Up @@ -204,16 +209,18 @@ def parallel_wrapper(grid, params, reader, writer):

pw_run = parallel_wrapper(grid, params, reader, writer)

client = Client(threads_per_worker=2, n_workers=4)
# client = Client(threads_per_worker=1, n_workers=1)

lazy_results = []

for c_idx in range(n_cells)[:12]:
# pw_run(c_idx)
lazy_result = dask.delayed(pw_run)(c_idx)
lazy_results.append(lazy_result)
for c_idx in range(n_cells)[47:48]:
pw_run(c_idx)
# lazy_result = dask.delayed(pw_run)(c_idx)
# lazy_results.append(lazy_result)

results = dask.compute(*lazy_results)
# results = dask.compute(*lazy_results)

# merit_reader.close_all()

# for item in results:
# writer.duplicate(item.c_idx, item)
51 changes: 51 additions & 0 deletions src/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,57 @@ def output(self, id, clat, clon, is_land, analysis=None):
rootgrp.close()


def duplicate(self, id, struct):

rootgrp = nc.Dataset(self.path + self.fn, "a", format="NETCDF4")

grp = rootgrp.createGroup(str(id))

is_land_var = grp.createVariable("is_land","i4")
is_land_var[:] = struct.is_land

clat_var = grp.createVariable("clat","f8")
clat_var[:] = struct.clat
clon_var = grp.createVariable("clon","f8")
clon_var[:] = struct.clon

if struct.is_land:
dk_var = grp.createVariable("dk","f8")
dk_var[:] = struct.dk
dl_var = grp.createVariable("dl","f8")
dl_var[:] = struct.dl

pick_idx = np.where(struct.ampls > 0)

H_spec_var = grp.createVariable("H_spec","f8", ("nspec",))
H_spec_var[:] = self.__pad_zeros(struct.ampls[pick_idx], self.n_modes)

kks_var = grp.createVariable("kks","f8", ("nspec",))
kks_var[:] = self.__pad_zeros(struct.kks[pick_idx], self.n_modes)

lls_var = grp.createVariable("lls","f8", ("nspec",))
lls_var[:] = self.__pad_zeros(struct.lls[pick_idx], self.n_modes)

rootgrp.close()

class grp_struct(object):
def __init__(self, c_idx, clat, clon, is_land, analysis = None):
self.c_idx = c_idx
self.clat = clat
self.clon = clon
self.is_land = is_land

self.dk = None
self.dl = None

self.ampls = None
self.kks = None
self.lls = None

if analysis is not None:
for key, value in vars(analysis).items():
setattr(self, key, value)


@staticmethod
def __pad_zeros(lst, n_modes):
Expand Down

0 comments on commit 5ee9e01

Please sign in to comment.