Skip to content

Commit

Permalink
use shared memory for data communications (#158)
Browse files Browse the repository at this point in the history
Co-authored-by: jerome berryhill <[email protected]>
  • Loading branch information
JeromeBerryhill and jerome berryhill authored Dec 12, 2024
1 parent c5b540f commit 456c7bf
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/Core/schism_glbl.F90
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,9 @@ module schism_glbl
&ath(:,:,:,:),carea(:),clen(:),eta_mean(:),q_block(:),vnth_block(:,:), &
&dir_block(:,:),q_block_lcl(:)
real(4),save,allocatable :: ath2(:,:,:,:,:) !used to read *.nc for b.c. time series
#ifndef SH_MEM_COMM
real(4),save,allocatable :: ath3(:,:,:,:) !used to read source/sink inputs
#endif

! Land boundary segment data
integer,save :: nland_global ! Global number of land bndry segments
Expand Down
18 changes: 18 additions & 0 deletions src/Core/schism_msgp.F90
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ module schism_msgp
integer,public,save,allocatable :: ranknbr_s3(:) ! Mapping from MPI rank to neighbor index (elements)
!<weno

#ifdef SH_MEM_COMM
! variables supporting shared memory communications
integer,public,save :: h_win ! handle for shared window for ath3
integer(KIND=MPI_ADDRESS_KIND),public,save :: win_size
TYPE(C_PTR),public,save :: c_window_ptr
real(4),public,save,pointer :: ath3(:,:,:,:)
integer,public,save :: disp_unit ! displacement stride in shared window
#endif ! SH_MEM_COMM

!-----------------------------------------------------------------------------
! Private data
!-----------------------------------------------------------------------------
Expand Down Expand Up @@ -417,6 +426,15 @@ subroutine parallel_init(communicator)
CALL MPI_Comm_size(comm2, nproc3, ierr)
CALL MPI_Comm_rank(comm2, myrank3,ierr)

#ifdef SH_MEM_COMM
! Set up by-node shared-memory communicator for compute nodes
if(task_id==1) then
CALL MPI_Comm_split_type(comm2,MPI_COMM_TYPE_SHARED,0,MPI_INFO_NULL,comm_node,ierr)
CALL MPI_Comm_size(comm_node, nproc_node, ierr)
CALL MPI_Comm_rank(comm_node, myrank_node,ierr)
endif
#endif ! SH_MEM_COMM

if(task_id==1) then !compute
comm=comm2
nproc=nproc3
Expand Down
12 changes: 11 additions & 1 deletion src/Hydro/misc_subs.F90
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,12 @@ subroutine other_hot_init(time)

ath3(:,1,1,1:2)=0.d0
ath3(:,1,1,3)=-9999.d0
#else
#else ! USE_NWM_BMI
#ifdef SH_MEM_COMM
if(if_source==1.and.myrank_node==0) then !ASCII
#else ! SH_MEM_COMM
if(if_source==1.and.myrank==0) then !ASCII
#endif ! SH_MEM_COMM
if(nsources>0) then
open(63,file=in_dir(1:len_in_dir)//'vsource.th',status='old') !values (>=0) in m^3/s
rewind(63)
Expand Down Expand Up @@ -772,7 +776,11 @@ subroutine other_hot_init(time)
endif !nsinks
endif !if_source=1

#ifdef SH_MEM_COMM
if(if_source==-1.and.myrank_node==0) then !nc
#else ! SH_MEM_COMM
if(if_source==-1.and.myrank==0) then !nc
#endif ! SH_MEM_COMM
if(nsources>0) then
ninv=time/th_dt3(1)
th_time3(1,1)=dble(ninv)*th_dt3(1)
Expand Down Expand Up @@ -813,7 +821,9 @@ subroutine other_hot_init(time)
if(if_source/=0) then
call mpi_bcast(th_dt3,nthfiles3,rtype,0,comm,istat)
call mpi_bcast(th_time3,2*nthfiles3,rtype,0,comm,istat)
#ifndef SH_MEM_COMM
call mpi_bcast(ath3,max(1,nsources,nsinks)*ntracers*2*nthfiles3,MPI_REAL4,0,comm,istat)
#endif ! SH_MEM_COMM
endif
#endif /*USE_NWM_BMI*/

Expand Down
4 changes: 4 additions & 0 deletions src/Hydro/schism_finalize.F90
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ subroutine schism_finalize
!-------------------------------------------------------------------------------
!-------------------------------------------------------------------------------

#ifdef SH_MEM_COMM
call mpi_win_free(h_win,ierr) ! free ath3 shared memory window
#endif ! SH_MEM_COMM

if(ihydraulics/=0) call finalize_hydraulic_structures

#ifdef USE_PETSC
Expand Down
59 changes: 56 additions & 3 deletions src/Hydro/schism_init.F90
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ subroutine schism_init(iorder,indir,iths,ntime)
use netcdf
use misc_modules

#ifdef SH_MEM_COMM
use iso_c_binding, only: c_ptr, c_f_pointer
#endif ! SH_MEM_COMM

#ifdef USE_PAHM
use ParWind, only : ReadCsvBestTrackFile
use PaHM_Utilities, only : ReadControlFile
Expand Down Expand Up @@ -2816,8 +2820,29 @@ subroutine schism_init(iorder,indir,iths,ntime)
call mpi_bcast(nsinks,1,itype,0,comm,istat)

if(iorder==0) then
#ifdef SH_MEM_COMM
allocate(ieg_sink(max(1,nsinks)),stat=istat)
if(istat/=0) call parallel_abort('INIT: ieg_sink failure')
! On each node, rank 0 allocates the storage, other ranks allocate with size zero and get a pointer
disp_unit = 4 ! size of real(4)
If (myrank_node==0) THEN
win_size = disp_unit * max(1,nsources,nsinks) * ntracers * 2 * nthfiles3
call MPI_Win_allocate_shared(win_size, disp_unit, MPI_INFO_NULL, comm_node,
c_window_ptr, h_win, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_allocate failed, node rank 0')
ELSE
win_size = 0
call MPI_Win_allocate_shared(win_size, disp_unit, MPI_INFO_NULL, comm_node, c_window_ptr, h_win, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_allocate failed, node rank>0')
call MPI_Win_shared_query(h_win, 0, win_size, disp_unit, c_window_ptr, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_shared_query failed, node rank>0')
ENDIF
! point ath3 array at the shared buffer
call C_F_POINTER(c_window_ptr, ath3, SHAPE = [max(1,nsources,nsinks),ntracers,2,nthfiles3])
#else ! SH_MEM_COMM
allocate(ieg_sink(max(1,nsinks)),ath3(max(1,nsources,nsinks),ntracers,2,nthfiles3),stat=istat)
if(istat/=0) call parallel_abort('INIT: ieg_sink failure')
#endif ! SH_MEM_COMM
endif

if(myrank==0) then
Expand All @@ -2832,7 +2857,11 @@ subroutine schism_init(iorder,indir,iths,ntime)
endif !if_source

if(if_source==-1) then !nc
#ifdef SH_MEM_COMM
if(myrank_node==0) then
#else ! SH_MEM_COMM
if(myrank==0) then
#endif ! SH_MEM_COMM
j=nf90_open(in_dir(1:len_in_dir)//'source.nc',OR(NF90_NETCDF4,NF90_NOWRITE),ncid_source)
if(j/=NF90_NOERR) call parallel_abort('init: source.nc')
j=nf90_inq_dimid(ncid_source,'nsources',mm)
Expand Down Expand Up @@ -2871,12 +2900,36 @@ subroutine schism_init(iorder,indir,iths,ntime)
call mpi_bcast(th_dt3,nthfiles3,rtype,0,comm,istat)

if(iorder==0) then
allocate(ieg_source(max(1,nsources)),ieg_sink(max(1,nsinks)), &
&ath3(max(1,nsources,nsinks),ntracers,2,nthfiles3),stat=istat)
if(istat/=0) call parallel_abort('INIT: ieg_source failure(3)')
#ifdef SH_MEM_COMM
allocate(ieg_source(max(1,nsources)),ieg_sink(max(1,nsinks)),stat=istat)
if(istat/=0) call parallel_abort('INIT: ieg_sink failure')
! On each node, rank 0 allocates the storage, other ranks allocate with size zero and get a pointer
disp_unit = 4 ! size of real(4)
If (myrank_node==0) THEN
win_size = disp_unit * max(1,nsources,nsinks) * ntracers * 2 * nthfiles3
call MPI_Win_allocate_shared(win_size, disp_unit, MPI_INFO_NULL, comm_node,
c_window_ptr, h_win, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_allocate failed, node rank 0')
ELSE
win_size = 0
call MPI_Win_allocate_shared(win_size, disp_unit, MPI_INFO_NULL, comm_node, c_window_ptr, h_win, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_allocate failed, node rank>0')
call MPI_Win_shared_query(h_win, 0, win_size, disp_unit, c_window_ptr, istat)
if(istat/=0) call parallel_abort('INIT: MPI_Win_shared_query failed, node rank>0')
ENDIF
! point ath3 array at the shared buffer
call C_F_POINTER(c_window_ptr, ath3, SHAPE = [max(1,nsources,nsinks),ntracers,2,nthfiles3])
#else ! SH_MEM_COMM
allocate(ieg_sink(max(1,nsinks)),ath3(max(1,nsources,nsinks),ntracers,2,nthfiles3),stat=istat)
if(istat/=0) call parallel_abort('INIT: ieg_sink failure')
#endif ! SH_MEM_COMM
endif

#ifdef SH_MEM_COMM
if(myrank_node==0) then
#else ! SH_MEM_COMM
if(myrank==0) then
#endif ! SH_MEM_COMM
if(nsources>0) then
j=nf90_inq_varid(ncid_source, "source_elem",mm)
if(j/=NF90_NOERR) call parallel_abort('init: source_elem')
Expand Down
13 changes: 13 additions & 0 deletions src/Hydro/schism_step.F90
Original file line number Diff line number Diff line change
Expand Up @@ -1622,7 +1622,11 @@ subroutine schism_step(it)

#else
!Reading by rank 0
#ifdef SH_MEM_COMM
if(nsources>0.and.myrank_node==0) then
#else ! SH_MEM_COMM
if(nsources>0.and.myrank==0) then
#endif ! SH_MEM_COMM
if(time>th_time3(2,1)) then !not '>=' to avoid last step
ath3(:,1,1,1)=ath3(:,1,2,1)
th_time3(1,1)=th_time3(2,1)
Expand Down Expand Up @@ -1655,7 +1659,11 @@ subroutine schism_step(it)
endif !time
endif !nsources>0.and.myrank==0

#ifdef SH_MEM_COMM
if(nsinks>0.and.myrank_node==0) then
#else ! SH_MEM_COMM
if(nsinks>0.and.myrank==0) then
#endif ! SH_MEM_COMM
if(time>th_time3(2,2)) then !not '>=' to avoid last step
ath3(:,1,1,2)=ath3(:,1,2,2)
th_time3(1,2)=th_time3(2,2)
Expand All @@ -1673,7 +1681,12 @@ subroutine schism_step(it)
endif !nsinks
! Finished reading; bcast
call mpi_bcast(th_time3,2*nthfiles3,rtype,0,comm,istat)
#ifdef SH_MEM_COMM
! ath3 data now in shared buffer, no longer necessary to broadcast
call mpi_barrier(comm_node, istat)
#else ! SH_MEM_COMM
call mpi_bcast(ath3,max(1,nsources,nsinks)*ntracers*2*nthfiles3,MPI_REAL4,0,comm,istat)
#endif ! SH_MEM_COMM
#endif /*USE_NWM_BMI*/

if(nsources>0) then
Expand Down

0 comments on commit 456c7bf

Please sign in to comment.