/************************************************************************************* Grid physics library, www.github.com/paboyle/Grid Source file: ./lib/communicator/Communicator_mpi.cc Copyright (C) 2015 Author: Peter Boyle This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. See the full license in the file "LICENSE" in the top level distribution directory *************************************************************************************/ /* END LEGAL */ #include #include void GridAbort(void) { MPI_Abort(MPI_COMM_WORLD,SIGABRT); } extern void * Grid_backtrace_buffer[_NBACKTRACE]; NAMESPACE_BEGIN(Grid); Grid_MPI_Comm CartesianCommunicator::communicator_world; #ifdef GRID_CHECKSUM_COMMS uint64_t checksum_index = 1; #endif //////////////////////////////////////////// // First initialise of comms system //////////////////////////////////////////// void CartesianCommunicator::Init(int *argc, char ***argv) { int flag; int provided; MPI_Initialized(&flag); // needed to coexist with other libs apparently if ( !flag ) { #ifndef GRID_COMMS_THREADS nCommThreads=1; // wrong results here too // For now: comms-overlap leads to wrong results in Benchmark_wilson even on single node MPI runs // other comms schemes are ok MPI_Init_thread(argc,argv,MPI_THREAD_SERIALIZED,&provided); #else MPI_Init_thread(argc,argv,MPI_THREAD_MULTIPLE,&provided); #endif //If only 1 comms thread we require any threading mode other than SINGLE, but for multiple comms threads we need MULTIPLE if( (nCommThreads == 1) && (provided == MPI_THREAD_SINGLE) ) { GRID_ASSERT(0); } if( (nCommThreads > 1) && (provided != MPI_THREAD_MULTIPLE) ) { GRID_ASSERT(0); } } // Never clean up as done once. MPI_Comm_dup (MPI_COMM_WORLD,&communicator_world); Grid_quiesce_nodes(); GlobalSharedMemory::Init(communicator_world); GlobalSharedMemory::SharedMemoryAllocate( GlobalSharedMemory::MAX_MPI_SHM_BYTES, GlobalSharedMemory::Hugepages); Grid_unquiesce_nodes(); } /////////////////////////////////////////////////////////////////////////// // Use cartesian communicators now even in MPI3 /////////////////////////////////////////////////////////////////////////// void CartesianCommunicator::ShiftedRanks(int dim,int shift,int &source,int &dest) { int ierr=MPI_Cart_shift(communicator,dim,shift,&source,&dest); GRID_ASSERT(ierr==0); } int CartesianCommunicator::RankFromProcessorCoor(Coordinate &coor) { int rank; int ierr=MPI_Cart_rank (communicator, &coor[0], &rank); GRID_ASSERT(ierr==0); return rank; } void CartesianCommunicator::ProcessorCoorFromRank(int rank, Coordinate &coor) { coor.resize(_ndimension); int ierr=MPI_Cart_coords (communicator, rank, _ndimension,&coor[0]); GRID_ASSERT(ierr==0); } //////////////////////////////////////////////////////////////////////////////////////////////////////// // Initialises from communicator_world //////////////////////////////////////////////////////////////////////////////////////////////////////// CartesianCommunicator::CartesianCommunicator(const Coordinate &processors) { MPI_Comm optimal_comm; //////////////////////////////////////////////////// // Remap using the shared memory optimising routine // The remap creates a comm which must be freed //////////////////////////////////////////////////// GlobalSharedMemory::OptimalCommunicator (processors,optimal_comm,_shm_processors); InitFromMPICommunicator(processors,optimal_comm); SetCommunicator(optimal_comm); /////////////////////////////////////////////////// // Free the temp communicator /////////////////////////////////////////////////// MPI_Comm_free(&optimal_comm); } ////////////////////////////////// // Try to subdivide communicator ////////////////////////////////// CartesianCommunicator::CartesianCommunicator(const Coordinate &processors,const CartesianCommunicator &parent,int &srank) { _ndimension = processors.size(); GRID_ASSERT(_ndimension>=1); int parent_ndimension = parent._ndimension; GRID_ASSERT(_ndimension >= parent._ndimension); Coordinate parent_processor_coor(_ndimension,0); Coordinate parent_processors (_ndimension,1); Coordinate shm_processors (_ndimension,1); // Can make 5d grid from 4d etc... int pad = _ndimension-parent_ndimension; for(int d=0;d 1 ) { //////////////////////////////////////////////////////////////// // Split the communicator //////////////////////////////////////////////////////////////// int ierr= MPI_Comm_split(parent.communicator,srank,crank,&comm_split); GRID_ASSERT(ierr==0); } else { srank = 0; int ierr = MPI_Comm_dup (parent.communicator,&comm_split); GRID_ASSERT(ierr==0); } ////////////////////////////////////////////////////////////////////////////////////////////////////// // Set up from the new split communicator ////////////////////////////////////////////////////////////////////////////////////////////////////// InitFromMPICommunicator(processors,comm_split); ////////////////////////////////////////////////////////////////////////////////////////////////////// // Take the right SHM buffers ////////////////////////////////////////////////////////////////////////////////////////////////////// SetCommunicator(comm_split); /////////////////////////////////////////////// // Free the temp communicator /////////////////////////////////////////////// MPI_Comm_free(&comm_split); if(0){ std::cout << " ndim " <<_ndimension<<" " << parent._ndimension << std::endl; for(int d=0;d &list, void *xmit, int dest, void *recv, int from, int bytes,int dir) { MPI_Request xrq; MPI_Request rrq; GRID_ASSERT(dest != _processor); GRID_ASSERT(from != _processor); int tag; tag= dir+from*32; int ierr=MPI_Irecv(recv, bytes, MPI_CHAR,from,tag,communicator,&rrq); GRID_ASSERT(ierr==0); list.push_back(rrq); tag= dir+_processor*32; ierr =MPI_Isend(xmit, bytes, MPI_CHAR,dest,tag,communicator,&xrq); GRID_ASSERT(ierr==0); list.push_back(xrq); } void CartesianCommunicator::CommsComplete(std::vector &list) { int nreq=list.size(); if (nreq==0) return; std::vector status(nreq); int ierr = MPI_Waitall(nreq,&list[0],&status[0]); GRID_ASSERT(ierr==0); list.resize(0); } // Basic Halo comms primitive void CartesianCommunicator::SendToRecvFrom(void *xmit, int dest, void *recv, int from, int bytes) { std::vector reqs(0); int myrank = _processor; int ierr; // Enforce no UVM in comms, device or host OK GRID_ASSERT(acceleratorIsCommunicable(xmit)); GRID_ASSERT(acceleratorIsCommunicable(recv)); // Give the CPU to MPI immediately; can use threads to overlap optionally // printf("proc %d SendToRecvFrom %d bytes Sendrecv \n",_processor,bytes); ierr=MPI_Sendrecv(xmit,bytes,MPI_CHAR,dest,myrank, recv,bytes,MPI_CHAR,from, from, communicator,MPI_STATUS_IGNORE); GRID_ASSERT(ierr==0); } // Basic Halo comms primitive double CartesianCommunicator::StencilSendToRecvFrom( void *xmit, int dest, int dox, void *recv, int from, int dor, int bytes,int dir) { std::vector list; double offbytes = StencilSendToRecvFromPrepare(list,xmit,dest,dox,recv,from,dor,bytes,bytes,dir); offbytes += StencilSendToRecvFromBegin(list,xmit,xmit,dest,dox,recv,recv,from,dor,bytes,bytes,dir); StencilSendToRecvFromComplete(list,dir); return offbytes; } int CartesianCommunicator::IsOffNode(int rank) { int grank = ShmRanks[rank]; if ( grank == MPI_UNDEFINED ) return true; else return false; } #ifdef ACCELERATOR_AWARE_MPI void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector &list) {}; void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector &list) {}; double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector &list, void *xmit, int dest,int dox, void *recv, int from,int dor, int xbytes,int rbytes,int dir) { return 0.0; // Do nothing -- no preparation required } double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector &list, void *xmit,void *xmit_comp, int dest,int dox, void *recv,void *recv_comp, int from,int dor, int xbytes,int rbytes,int dir) { int ncomm =communicator_halo.size(); int commdir=dir%ncomm; MPI_Request xrq; MPI_Request rrq; int ierr; int gdest = ShmRanks[dest]; int gfrom = ShmRanks[from]; int gme = ShmRanks[_processor]; GRID_ASSERT(dest != _processor); GRID_ASSERT(from != _processor); GRID_ASSERT(gme == ShmRank); double off_node_bytes=0.0; int tag; if ( dor ) { if ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) { tag= dir+from*32; // std::cout << " StencilSendToRecvFrom "<ShmBufferTranslate(from,xmit); GRID_ASSERT(shm!=NULL); // std::cout << " StencilSendToRecvFrom "<ShmBufferTranslate(dest,recv); GRID_ASSERT(shm!=NULL); acceleratorCopyDeviceToDeviceAsynch(xmit,shm,xbytes); #endif } } return off_node_bytes; } void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector &list,int dir) { int nreq=list.size(); /*finishes Get/Put*/ acceleratorCopySynchronise(); if (nreq==0) return; std::vector status(nreq); int ierr = MPI_Waitall(nreq,&list[0],&status[0]); GRID_ASSERT(ierr==0); list.resize(0); this->StencilBarrier(); } #else /* NOT ... ACCELERATOR_AWARE_MPI */ /////////////////////////////////////////// // Pipeline mode through host memory /////////////////////////////////////////// /* * In prepare (phase 1): * PHASE 1: (prepare) * - post MPI receive buffers asynch * - post device - host send buffer transfer asynch * PHASE 2: (Begin) * - complete all copies * - post MPI send asynch * - post device - device transfers * PHASE 3: (Complete) * - MPI_waitall * - host-device transfers * ********************************* * NB could split this further: *-------------------------------- * PHASE 1: (Prepare) * - post MPI receive buffers asynch * - post device - host send buffer transfer asynch * PHASE 2: (BeginInterNode) * - complete all copies * - post MPI send asynch * PHASE 3: (BeginIntraNode) * - post device - device transfers * PHASE 4: (Complete) * - MPI_waitall * - host-device transfers asynch * - (complete all copies) */ double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector &list, void *xmit, int dest,int dox, void *recv, int from,int dor, int xbytes,int rbytes,int dir) { /* * Bring sequence from Stencil.h down to lower level. * Assume using XeLink is ok */ int ncomm =communicator_halo.size(); int commdir=dir%ncomm; MPI_Request xrq; MPI_Request rrq; int ierr; int gdest = ShmRanks[dest]; int gfrom = ShmRanks[from]; int gme = ShmRanks[_processor]; GRID_ASSERT(dest != _processor); GRID_ASSERT(from != _processor); GRID_ASSERT(gme == ShmRank); double off_node_bytes=0.0; int tag; void * host_recv = NULL; void * host_xmit = NULL; /* * PHASE 1: (Prepare) * - post MPI receive buffers asynch * - post device - host send buffer transfer asynch */ #ifdef GRID_CHECKSUM_COMMS rbytes += 8; xbytes += 8; #endif if ( dor ) { if ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) { tag= dir+from*32; host_recv = this->HostBufferMalloc(rbytes); ierr=MPI_Irecv(host_recv, rbytes, MPI_CHAR,from,tag,communicator_halo[commdir],&rrq); GRID_ASSERT(ierr==0); CommsRequest_t srq; srq.PacketType = InterNodeRecv; srq.bytes = rbytes; srq.req = rrq; srq.host_buf = host_recv; srq.device_buf = recv; srq.tag = tag; list.push_back(srq); off_node_bytes+=rbytes; } } if (dox) { if ( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) { tag= dir+_processor*32; host_xmit = this->HostBufferMalloc(xbytes); CommsRequest_t srq; #ifdef GRID_CHECKSUM_COMMS uint64_t xbytes_data = xbytes - 8; srq.ev = acceleratorCopyFromDeviceAsynch(xmit, host_xmit,xbytes_data); // Make this Asynch GRID_ASSERT(xbytes % 8 == 0); // flip one bit so that a zero buffer is not consistent uint64_t xsum = checksum_gpu((uint64_t*)xmit, xbytes_data / 8) ^ (checksum_index + 1 + 1000 * tag); *(uint64_t*)(((char*)host_xmit) + xbytes_data) = xsum; #else srq.ev = acceleratorCopyFromDeviceAsynch(xmit, host_xmit,xbytes); // Make this Asynch #endif // ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq); // GRID_ASSERT(ierr==0); // off_node_bytes+=xbytes; srq.PacketType = InterNodeXmit; srq.bytes = xbytes; // srq.req = xrq; srq.host_buf = host_xmit; srq.device_buf = xmit; srq.tag = tag; srq.dest = dest; srq.commdir = commdir; list.push_back(srq); } } return off_node_bytes; } /* * In the interest of better pipelining, poll for completion on each DtoH and * start MPI_ISend in the meantime */ void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector &list) { int pending = 0; do { pending = 0; for(int idx = 0; idx &list) { int pending = 0; do { pending = 0; for(int idx = 0; idx &list, void *xmit,void *xmit_comp, int dest,int dox, void *recv,void *recv_comp, int from,int dor, int xbytes,int rbytes,int dir) { int ncomm =communicator_halo.size(); int commdir=dir%ncomm; MPI_Request xrq; MPI_Request rrq; int ierr; int gdest = ShmRanks[dest]; int gfrom = ShmRanks[from]; int gme = ShmRanks[_processor]; GRID_ASSERT(dest != _processor); GRID_ASSERT(from != _processor); GRID_ASSERT(gme == ShmRank); double off_node_bytes=0.0; int tag; void * host_xmit = NULL; //////////////////////////////// // Receives already posted // Copies already started //////////////////////////////// /* * PHASE 2: (Begin) * - complete all copies * - post MPI send asynch */ #ifdef NVLINK_GET if ( dor ) { if ( ! ( (gfrom ==MPI_UNDEFINED) || Stencil_force_mpi ) ) { // Intranode void *shm = (void *) this->ShmBufferTranslate(from,xmit); GRID_ASSERT(shm!=NULL); CommsRequest_t srq; srq.ev = acceleratorCopyDeviceToDeviceAsynch(shm,recv,rbytes); srq.PacketType = IntraNodeRecv; srq.bytes = xbytes; // srq.req = xrq; srq.host_buf = NULL; srq.device_buf = xmit; srq.tag = -1; srq.dest = dest; srq.commdir = dir; list.push_back(srq); } } #else if (dox) { if ( !( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) ) { // Intranode void *shm = (void *) this->ShmBufferTranslate(dest,recv); GRID_ASSERT(shm!=NULL); CommsRequest_t srq; srq.ev = acceleratorCopyDeviceToDeviceAsynch(xmit,shm,xbytes); srq.PacketType = IntraNodeXmit; srq.bytes = xbytes; // srq.req = xrq; srq.host_buf = NULL; srq.device_buf = xmit; srq.tag = -1; srq.dest = dest; srq.commdir = dir; list.push_back(srq); } } #endif return off_node_bytes; } void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector &list,int dir) { acceleratorCopySynchronise(); // Complete all pending copy transfers D2D std::vector status; std::vector MpiRequests; for(int r=0;r0) { status.resize(MpiRequests.size()); int ierr = MPI_Waitall(MpiRequests.size(),&MpiRequests[0],&status[0]); // Sends are guaranteed in order. No harm in not completing. GRID_ASSERT(ierr==0); } // for(int r=0;rHostBufferFreeAll(); // Clean up the buffer allocs #ifndef NVLINK_GET this->StencilBarrier(); // if PUT must check our nbrs have filled our receive buffers. #endif } #endif //////////////////////////////////////////// // END PIPELINE MODE / NO CUDA AWARE MPI //////////////////////////////////////////// void CartesianCommunicator::StencilBarrier(void) { FlightRecorder::StepLog("NodeBarrier"); MPI_Barrier (ShmComm); } //void CartesianCommunicator::SendToRecvFromComplete(std::vector &list) //{ //} void CartesianCommunicator::Barrier(void) { FlightRecorder::StepLog("GridBarrier"); int ierr = MPI_Barrier(communicator); GRID_ASSERT(ierr==0); } void CartesianCommunicator::Broadcast(int root,void* data, int bytes) { FlightRecorder::StepLog("Broadcast"); int ierr=MPI_Bcast(data, bytes, MPI_BYTE, root, communicator); GRID_ASSERT(ierr==0); } int CartesianCommunicator::RankWorld(void){ int r; MPI_Comm_rank(communicator_world,&r); return r; } void CartesianCommunicator::BarrierWorld(void){ FlightRecorder::StepLog("BarrierWorld"); int ierr = MPI_Barrier(communicator_world); GRID_ASSERT(ierr==0); } void CartesianCommunicator::BroadcastWorld(int root,void* data, int bytes) { FlightRecorder::StepLog("BroadcastWorld"); int ierr= MPI_Bcast(data, bytes, MPI_BYTE, root, communicator_world); GRID_ASSERT(ierr==0); } void CartesianCommunicator::AllToAll(int dim,void *in,void *out,uint64_t words,uint64_t bytes) { Coordinate row(_ndimension,1); GRID_ASSERT(dim>=0 && dim<_ndimension); // Split the communicator row[dim] = _processors[dim]; int me; CartesianCommunicator Comm(row,*this,me); Comm.AllToAll(in,out,words,bytes); } void CartesianCommunicator::AllToAll(void *in,void *out,uint64_t words,uint64_t bytes) { FlightRecorder::StepLog("AllToAll"); // MPI is a pain and uses "int" arguments // 64*64*64*128*16 == 500Million elements of data. // When 24*4 bytes multiples get 50x 10^9 >>> 2x10^9 Y2K bug. // (Turns up on 32^3 x 64 Gparity too) MPI_Datatype object; int iwords; int ibytes; iwords = words; ibytes = bytes; GRID_ASSERT(words == iwords); // safe to cast to int ? GRID_ASSERT(bytes == ibytes); // safe to cast to int ? MPI_Type_contiguous(ibytes,MPI_BYTE,&object); MPI_Type_commit(&object); MPI_Alltoall(in,iwords,object,out,iwords,object,communicator); MPI_Type_free(&object); } NAMESPACE_END(Grid);