From 14d53e1c9eb8eb1ef684148728c075813814612e Mon Sep 17 00:00:00 2001 From: Peter Boyle Date: Sat, 29 Jul 2017 13:06:53 -0400 Subject: [PATCH] Threaded MPI calls patches --- benchmarks/Benchmark_dwf.cc | 2 +- lib/allocator/AlignedAllocator.h | 10 ++- lib/communicator/Communicator_base.cc | 4 +- lib/communicator/Communicator_base.h | 14 ++++- lib/communicator/Communicator_mpit.cc | 25 +++++++- lib/qcd/action/fermion/WilsonFermion5D.cc | 74 ++++++++++++++--------- lib/stencil/Stencil.h | 59 +++++++++--------- lib/util/Init.cc | 6 +- 8 files changed, 128 insertions(+), 66 deletions(-) diff --git a/benchmarks/Benchmark_dwf.cc b/benchmarks/Benchmark_dwf.cc index a071c050..0264905c 100644 --- a/benchmarks/Benchmark_dwf.cc +++ b/benchmarks/Benchmark_dwf.cc @@ -489,7 +489,7 @@ int main (int argc, char ** argv) //assert(norm2(src_e)<1.0e-4); //assert(norm2(src_o)<1.0e-4); - + exit(0); Grid_finalize(); } diff --git a/lib/allocator/AlignedAllocator.h b/lib/allocator/AlignedAllocator.h index 6e85ab27..7fd9496f 100644 --- a/lib/allocator/AlignedAllocator.h +++ b/lib/allocator/AlignedAllocator.h @@ -92,11 +92,15 @@ public: size_type bytes = __n*sizeof(_Tp); _Tp *ptr = (_Tp *) PointerCache::Lookup(bytes); - + ////////////////// + // Hack 2MB align; could make option probably doesn't need configurability + ////////////////// +//define GRID_ALLOC_ALIGN (128) +#define GRID_ALLOC_ALIGN (2*1024*1024) #ifdef HAVE_MM_MALLOC_H - if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) _mm_malloc(bytes,128); + if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) _mm_malloc(bytes,GRID_ALLOC_ALIGN); #else - if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) memalign(128,bytes); + if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) memalign(GRID_ALLOC_ALIGN,bytes); #endif return ptr; diff --git a/lib/communicator/Communicator_base.cc b/lib/communicator/Communicator_base.cc index a5edf8e9..67bfaed0 100644 --- a/lib/communicator/Communicator_base.cc +++ b/lib/communicator/Communicator_base.cc @@ -34,7 +34,9 @@ namespace Grid { /////////////////////////////////////////////////////////////// void * CartesianCommunicator::ShmCommBuf; uint64_t CartesianCommunicator::MAX_MPI_SHM_BYTES = 128*1024*1024; -CartesianCommunicator::CommunicatorPolicy_t CartesianCommunicator::CommunicatorPolicy= CartesianCommunicator::CommunicatorPolicyConcurrent; +CartesianCommunicator::CommunicatorPolicy_t +CartesianCommunicator::CommunicatorPolicy= CartesianCommunicator::CommunicatorPolicyConcurrent; +int CartesianCommunicator::nCommThreads = -1; ///////////////////////////////// // Alloc, free shmem region diff --git a/lib/communicator/Communicator_base.h b/lib/communicator/Communicator_base.h index 4e471b43..84dbedb4 100644 --- a/lib/communicator/Communicator_base.h +++ b/lib/communicator/Communicator_base.h @@ -54,8 +54,9 @@ class CartesianCommunicator { // 128MB shared memory for comms enought for 48^4 local vol comms // Give external control (command line override?) of this - static const int MAXLOG2RANKSPERNODE = 16; - static uint64_t MAX_MPI_SHM_BYTES; + static const int MAXLOG2RANKSPERNODE = 16; + static uint64_t MAX_MPI_SHM_BYTES; + static int nCommThreads; // Communicator should know nothing of the physics grid, only processor grid. int _Nprocessors; // How many in all @@ -125,7 +126,7 @@ class CartesianCommunicator { enum CommunicatorPolicy_t { CommunicatorPolicyConcurrent, CommunicatorPolicySequential }; static CommunicatorPolicy_t CommunicatorPolicy; static void SetCommunicatorPolicy(CommunicatorPolicy_t policy ) { CommunicatorPolicy = policy; } - + size_t heap_top; size_t heap_bytes; @@ -215,6 +216,12 @@ class CartesianCommunicator { void SendToRecvFromComplete(std::vector &waitall); + double StencilSendToRecvFrom(void *xmit, + int xmit_to_rank, + void *recv, + int recv_from_rank, + int bytes,int dir); + double StencilSendToRecvFromBegin(std::vector &list, void *xmit, int xmit_to_rank, @@ -222,6 +229,7 @@ class CartesianCommunicator { int recv_from_rank, int bytes,int dir); + void StencilSendToRecvFromComplete(std::vector &waitall,int i); void StencilBarrier(void); diff --git a/lib/communicator/Communicator_mpit.cc b/lib/communicator/Communicator_mpit.cc index 24a518ec..f522701c 100644 --- a/lib/communicator/Communicator_mpit.cc +++ b/lib/communicator/Communicator_mpit.cc @@ -242,7 +242,8 @@ double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector &waitall,int dir) { diff --git a/lib/qcd/action/fermion/WilsonFermion5D.cc b/lib/qcd/action/fermion/WilsonFermion5D.cc index 6a6bc1f8..0b6c9e3d 100644 --- a/lib/qcd/action/fermion/WilsonFermion5D.cc +++ b/lib/qcd/action/fermion/WilsonFermion5D.cc @@ -391,37 +391,57 @@ void WilsonFermion5D::DhopInternalOverlappedComms(StencilImpl & st, Lebesg DhopFaceTime+=usecond(); // Rely on async comms; start comms before merge of local data - DhopComputeTime-=usecond(); - DhopCommTime-=usecond(); -#pragma omp parallel + double ctime=0; + double ptime=0; + // DhopComputeTime-=usecond(); + // DhopCommTime-=usecond(); +#pragma omp parallel reduction(max:ctime) reduction(max:ptime) { - // Should time this somehow; hard as the threads fork nowait - st.CommunicateThreaded(); - - if (dag == DaggerYes) { -#pragma omp for - for (int ss = 0; ss < U._grid->oSites(); ss++) { - int sU = ss; - int sF = LLs * sU; - Kernels::DhopSiteDag(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0); + int tid = omp_get_thread_num(); + int nthreads = omp_get_num_threads(); + int ncomms = CartesianCommunicator::nCommThreads; + if (ncomms == -1) ncomms = st.Packets.size(); + assert(nthreads > ncomms); + if (tid >= ncomms) { + double start = usecond(); + nthreads -= ncomms; + int ttid = tid - ncomms; + int n = U._grid->oSites(); + int chunk = n / nthreads; + int rem = n % nthreads; + int myblock, myn; + if (ttid < rem) { + myblock = ttid * chunk + ttid; + myn = chunk+1; + } else { + myblock = ttid*chunk + rem; + myn = chunk; + } + + // do the compute + if (dag == DaggerYes) { + for (int ss = myblock; ss < myblock+myn; ++ss) { + int sU = ss; + int sF = LLs * sU; + Kernels::DhopSiteDag(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0); + } + } else { + for (int ss = myblock; ss < myblock+myn; ++ss) { + int sU = ss; + int sF = LLs * sU; + Kernels::DhopSite(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0); + } + } + ptime = usecond() - start; } - } else { -#pragma omp for - for (int ss = 0; ss < U._grid->oSites(); ss++) { - int sU = ss; - int sF = LLs * sU; - Kernels::DhopSite(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0); + { + double start = usecond(); + st.CommunicateThreaded(); + ctime = usecond() - start; } } -#pragma omp single - DhopComputeTime+=usecond(); - -#pragma omp taskwait - -#pragma omp single - DhopCommTime+=usecond(); - } // Closes parallel region and waits the comms (I hope) - + DhopCommTime += ctime; + DhopComputeTime+=ptime; DhopFaceTime-=usecond(); st.CommsMerge(compressor); diff --git a/lib/stencil/Stencil.h b/lib/stencil/Stencil.h index 17db64d8..d1d7a7e0 100644 --- a/lib/stencil/Stencil.h +++ b/lib/stencil/Stencil.h @@ -185,6 +185,8 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal double splicetime; double nosplicetime; double calls; + std::vector comms_bytesthr; + std::vector commtimethr; //////////////////////////////////////// // Stencil query @@ -250,36 +252,22 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal ////////////////////////////////////////// void CommunicateThreaded() { - for(int i=0;i reqs; - bytes=_grid->StencilSendToRecvFromBegin(reqs, - Packets[i].send_buf, - Packets[i].to_rank, - Packets[i].recv_buf, - Packets[i].from_rank, - Packets[i].bytes,i); - _grid->StencilSendToRecvFromComplete(reqs,i); - // Last task logged; this is approximate but hard to catch - // the last to complete - stop = usecond(); - stop = stop - start; - - if ( i==0 ) commtime+=stop; - -#pragma omp critical - { - comms_bytes+=bytes; - } - + // must be called in parallel region + int mythread = omp_get_thread_num(); + int nthreads = CartesianCommunicator::nCommThreads; + if (nthreads == -1) nthreads = Packets.size(); + if (mythread < nthreads) { + for (int i = mythread; i < Packets.size(); i += nthreads) { + double start = usecond(); + uint64_t bytes = _grid->StencilSendToRecvFrom(Packets[i].send_buf, + Packets[i].to_rank, + Packets[i].recv_buf, + Packets[i].from_rank, + Packets[i].bytes,i); + comms_bytesthr[mythread] += bytes; + commtimethr[mythread] += usecond() - start; } } - } void CommunicateBegin(std::vector > &reqs) { @@ -475,7 +463,10 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal int checkerboard, const std::vector &directions, const std::vector &distances) - : _permute_type(npoints), _comm_buf_size(npoints) + : _permute_type(npoints), + _comm_buf_size(npoints), + comms_bytesthr(npoints), + commtimethr(npoints) { face_table_computed=0; _npoints = npoints; @@ -1029,6 +1020,8 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal void ZeroCounters(void) { gathertime = 0.; commtime = 0.; + memset(&commtimethr[0], 0, sizeof(commtimethr)); + memset(&comms_bytesthr[0], 0, sizeof(comms_bytesthr)); halogtime = 0.; mergetime = 0.; decompresstime = 0.; @@ -1044,6 +1037,14 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal #define PRINTIT(A) AVERAGE(A); std::cout << GridLogMessage << " Stencil " << #A << " "<< A/calls<_Nprocessors; RealD NN = _grid->NodeCount(); + double t = 0; + // if commtimethr is set they were all done in parallel so take the max + // but add up the bytes + for (int i = 0; i < 8; ++i) { + comms_bytes += comms_bytesthr[i]; + if (t < commtimethr[i]) t = commtimethr[i]; + } + commtime += t; _grid->GlobalSum(commtime); commtime/=NP; if ( calls > 0. ) { diff --git a/lib/util/Init.cc b/lib/util/Init.cc index fc701ac1..ef875429 100644 --- a/lib/util/Init.cc +++ b/lib/util/Init.cc @@ -359,7 +359,11 @@ void Grid_init(int *argc,char ***argv) if( GridCmdOptionExists(*argv,*argv+*argc,"--lebesgue") ){ LebesgueOrder::UseLebesgueOrder=1; } - + CartesianCommunicator::nCommThreads = -1; + if( GridCmdOptionExists(*argv,*argv+*argc,"--commthreads") ){ + arg= GridCmdOptionPayload(*argv,*argv+*argc,"--commthreads"); + GridCmdOptionInt(arg,CartesianCommunicator::nCommThreads); + } if( GridCmdOptionExists(*argv,*argv+*argc,"--cacheblocking") ){ arg= GridCmdOptionPayload(*argv,*argv+*argc,"--cacheblocking"); GridCmdOptionIntVector(arg,LebesgueOrder::Block);