/************************************************************************************* Grid physics library, www.github.com/paboyle/Grid Source file: ./lib/communicator/Communicator_mpi.cc Copyright (C) 2015 Author: Peter Boyle This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. See the full license in the file "LICENSE" in the top level distribution directory *************************************************************************************/ /* END LEGAL */ #include "Grid.h" #include //////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// Workarounds: /// i) bloody mac os doesn't implement unnamed semaphores since it is "optional" posix. /// darwin dispatch semaphores don't seem to be multiprocess. /// /// ii) openmpi under --mca shmem posix works with two squadrons per node; /// openmpi under default mca settings (I think --mca shmem mmap) on MacOS makes two squadrons map the SAME /// memory as each other, despite their living on different communicators. This appears to be a bug in OpenMPI. /// //////////////////////////////////////////////////////////////////////////////////////////////////////////////// #include typedef sem_t *Grid_semaphore; #define SEM_INIT(S) S = sem_open(sem_name,0,0600,0); assert ( S != SEM_FAILED ); #define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,0600,0); assert ( S != SEM_FAILED ); #define SEM_POST(S) assert ( sem_post(S) == 0 ); #define SEM_WAIT(S) assert ( sem_wait(S) == 0 ); #include namespace Grid { enum { COMMAND_ISEND, COMMAND_IRECV, COMMAND_WAITALL }; struct Descriptor { uint64_t buf; size_t bytes; int rank; int tag; int command; MPI_Request request; }; const int pool = 48; class SlaveState { public: volatile int head; volatile int start; volatile int tail; volatile Descriptor Descrs[pool]; }; class Slave { public: Grid_semaphore sem_head; Grid_semaphore sem_tail; SlaveState *state; MPI_Comm squadron; uint64_t base; int universe_rank; int vertical_rank; char sem_name [NAME_MAX]; //////////////////////////////////////////////////////////// // Descriptor circular pointers //////////////////////////////////////////////////////////// Slave() {}; void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank); void SemInit(void) { sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); // printf("SEM_NAME: %s \n",sem_name); SEM_INIT(sem_head); sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); // printf("SEM_NAME: %s \n",sem_name); SEM_INIT(sem_tail); } void SemInitExcl(void) { sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); // printf("SEM_INIT_EXCL: %s \n",sem_name); SEM_INIT_EXCL(sem_head); sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); // printf("SEM_INIT_EXCL: %s \n",sem_name); SEM_INIT_EXCL(sem_tail); } void WakeUpDMA(void) { SEM_POST(sem_head); }; void WakeUpCompute(void) { SEM_POST(sem_tail); }; void WaitForCommand(void) { SEM_WAIT(sem_head); }; void WaitForComplete(void) { SEM_WAIT(sem_tail); }; void EventLoop (void) { // std::cout<< " Entering event loop "<tail == state->head ); } }; //////////////////////////////////////////////////////////////////////// // One instance of a data mover. // Master and Slave must agree on location in shared memory //////////////////////////////////////////////////////////////////////// class MPIoffloadEngine { public: static std::vector Slaves; static int ShmSetup; static int UniverseRank; static int UniverseSize; static MPI_Comm communicator_universe; static MPI_Comm communicator_cached; static MPI_Comm HorizontalComm; static int HorizontalRank; static int HorizontalSize; static MPI_Comm VerticalComm; static MPI_Win VerticalWindow; static int VerticalSize; static int VerticalRank; static std::vector VerticalShmBufs; static std::vector > UniverseRanks; static std::vector UserCommunicatorToWorldRanks; static MPI_Group WorldGroup, CachedGroup; static void CommunicatorInit (MPI_Comm &communicator_world, MPI_Comm &ShmComm, void * &ShmCommBuf); static void MapCommRankToWorldRank(int &hashtag, int & comm_world_peer,int tag, MPI_Comm comm,int commrank); ///////////////////////////////////////////////////////// // routines for master proc must handle any communicator ///////////////////////////////////////////////////////// static void QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) { // std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<= units ) { mywork = myoff = 0; } else { mywork = (nwork+me)/units; myoff = basework * me; if ( me > backfill ) myoff+= (me-backfill); } return; }; static void QueueMultiplexedSend(void *buf, int bytes, int tag, MPI_Comm comm,int rank) { uint8_t * cbuf = (uint8_t *) buf; int mywork, myoff, procs; procs = VerticalSize-1; for(int s=0;s MPIoffloadEngine::Slaves; int MPIoffloadEngine::UniverseRank; int MPIoffloadEngine::UniverseSize; MPI_Comm MPIoffloadEngine::communicator_universe; MPI_Comm MPIoffloadEngine::communicator_cached; MPI_Group MPIoffloadEngine::WorldGroup; MPI_Group MPIoffloadEngine::CachedGroup; MPI_Comm MPIoffloadEngine::HorizontalComm; int MPIoffloadEngine::HorizontalRank; int MPIoffloadEngine::HorizontalSize; MPI_Comm MPIoffloadEngine::VerticalComm; int MPIoffloadEngine::VerticalSize; int MPIoffloadEngine::VerticalRank; MPI_Win MPIoffloadEngine::VerticalWindow; std::vector MPIoffloadEngine::VerticalShmBufs; std::vector > MPIoffloadEngine::UniverseRanks; std::vector MPIoffloadEngine::UserCommunicatorToWorldRanks; int MPIoffloadEngine::ShmSetup = 0; void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, MPI_Comm &ShmComm, void * &ShmCommBuf) { int flag; assert(ShmSetup==0); ////////////////////////////////////////////////////////////////////// // Universe is all nodes prior to squadron grouping ////////////////////////////////////////////////////////////////////// MPI_Comm_dup (MPI_COMM_WORLD,&communicator_universe); MPI_Comm_rank(communicator_universe,&UniverseRank); MPI_Comm_size(communicator_universe,&UniverseSize); ///////////////////////////////////////////////////////////////////// // Split into groups that can share memory (Verticals) ///////////////////////////////////////////////////////////////////// #undef MPI_SHARED_MEM_DEBUG #ifdef MPI_SHARED_MEM_DEBUG MPI_Comm_split(communicator_universe,(UniverseRank/4),UniverseRank,&VerticalComm); #else MPI_Comm_split_type(communicator_universe, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&VerticalComm); #endif MPI_Comm_rank(VerticalComm ,&VerticalRank); MPI_Comm_size(VerticalComm ,&VerticalSize); ////////////////////////////////////////////////////////////////////// // Split into horizontal groups by rank in squadron ////////////////////////////////////////////////////////////////////// MPI_Comm_split(communicator_universe,VerticalRank,UniverseRank,&HorizontalComm); MPI_Comm_rank(HorizontalComm,&HorizontalRank); MPI_Comm_size(HorizontalComm,&HorizontalSize); assert(HorizontalSize*VerticalSize==UniverseSize); //////////////////////////////////////////////////////////////////////////////// // What is my place in the world //////////////////////////////////////////////////////////////////////////////// int WorldRank=0; if(VerticalRank==0) WorldRank = HorizontalRank; int ierr=MPI_Allreduce(MPI_IN_PLACE,&WorldRank,1,MPI_INT,MPI_SUM,VerticalComm); assert(ierr==0); //////////////////////////////////////////////////////////////////////////////// // Where is the world in the universe? //////////////////////////////////////////////////////////////////////////////// UniverseRanks = std::vector >(HorizontalSize,std::vector(VerticalSize,0)); UniverseRanks[WorldRank][VerticalRank] = UniverseRank; for(int w=0;w0 ) size = sizeof(SlaveState); sprintf(shm_name,"/Grid_mpi3_shm_%d_%d",WorldRank,r); shm_unlink(shm_name); int fd=shm_open(shm_name,O_RDWR|O_CREAT,0600); if ( fd < 0 ) { perror("failed shm_open"); assert(0); } ftruncate(fd, size); VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); if ( VerticalShmBufs[r] == MAP_FAILED ) { perror("failed mmap"); assert(0); } uint64_t * check = (uint64_t *) VerticalShmBufs[r]; check[0] = WorldRank; check[1] = r; // std::cout<<"SHM "<0 ) size = sizeof(SlaveState); sprintf(shm_name,"/Grid_mpi3_shm_%d_%d",WorldRank,r); int fd=shm_open(shm_name,O_RDWR|O_CREAT,0600); if ( fd<0 ) { perror("failed shm_open"); assert(0); } VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); uint64_t * check = (uint64_t *) VerticalShmBufs[r]; assert(check[0]== WorldRank); assert(check[1]== r); std::cerr<<"SHM "<"<"< cached_ranks(size); for(int r=0;r"<>0 )&0xFFFF)^((icomm>>16)&0xFFFF) ^ ((icomm>>32)&0xFFFF)^((icomm>>48)&0xFFFF); // hashtag = (comm_hash<<15) | tag; hashtag = tag; }; void Slave::Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank) { squadron=_squadron; universe_rank=_universe_rank; vertical_rank=_vertical_rank; state =_state; // std::cout << "state "<<_state<<" comm "<<_squadron<<" universe_rank"<head = state->tail = state->start = 0; base = (uint64_t)MPIoffloadEngine::VerticalShmBufs[0]; int rank; MPI_Comm_rank(_squadron,&rank); } #define PERI_PLUS(A) ( (A+1)%pool ) int Slave::Event (void) { static int tail_last; static int head_last; static int start_last; int ierr; //////////////////////////////////////////////////// // Try to advance the start pointers //////////////////////////////////////////////////// int s=state->start; if ( s != state->head ) { switch ( state->Descrs[s].command ) { case COMMAND_ISEND: /* std::cout<< " Send "<Descrs[s].buf<< "["<Descrs[s].bytes<<"]" << " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag << " Comm " << MPIoffloadEngine::communicator_universe<< " me " <Descrs[s].buf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].rank, state->Descrs[s].tag, MPIoffloadEngine::communicator_universe, (MPI_Request *)&state->Descrs[s].request); assert(ierr==0); state->start = PERI_PLUS(s); return 1; break; case COMMAND_IRECV: /* std::cout<< " Recv "<Descrs[s].buf<< "["<Descrs[s].bytes<<"]" << " from " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag << " Comm " << MPIoffloadEngine::communicator_universe<< " me "<< universe_rank<< std::endl; */ ierr=MPI_Irecv((void *)(state->Descrs[s].buf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].rank, state->Descrs[s].tag, MPIoffloadEngine::communicator_universe, (MPI_Request *)&state->Descrs[s].request); // std::cout<< " Request is "<Descrs[s].request<Descrs[0].request<start = PERI_PLUS(s); return 1; break; case COMMAND_WAITALL: for(int t=state->tail;t!=s; t=PERI_PLUS(t) ){ MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE); }; s=PERI_PLUS(s); state->start = s; state->tail = s; WakeUpCompute(); return 1; break; default: assert(0); break; } } return 0; } ////////////////////////////////////////////////////////////////////////////// // External interaction with the queue ////////////////////////////////////////////////////////////////////////////// uint64_t Slave::QueueCommand(int command,void *buf, int bytes, int tag, MPI_Comm comm,int commrank) { ///////////////////////////////////////// // Spin; if FIFO is full until not full ///////////////////////////////////////// int head =state->head; int next = PERI_PLUS(head); // Set up descriptor int worldrank; int hashtag; MPI_Comm communicator; MPI_Request request; MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,tag,comm,commrank); uint64_t relative= (uint64_t)buf - base; state->Descrs[head].buf = relative; state->Descrs[head].bytes = bytes; state->Descrs[head].rank = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]; state->Descrs[head].tag = hashtag; state->Descrs[head].command= command; /* if ( command == COMMAND_ISEND ) { std::cout << "QueueSend from "<< universe_rank <<" to commrank " << commrank << " to worldrank " << worldrank <tail==next ); // Msync on weak order architectures // Advance pointer state->head = next; return 0; } /////////////////////////////////////////////////////////////////////////////////////////////////// // Info that is setup once and indept of cartesian layout /////////////////////////////////////////////////////////////////////////////////////////////////// MPI_Comm CartesianCommunicator::communicator_world; void CartesianCommunicator::Init(int *argc, char ***argv) { int flag; MPI_Initialized(&flag); // needed to coexist with other libs apparently if ( !flag ) { MPI_Init(argc,argv); } communicator_world = MPI_COMM_WORLD; MPI_Comm ShmComm; MPIoffloadEngine::CommunicatorInit (communicator_world,ShmComm,ShmCommBuf); } void CartesianCommunicator::ShiftedRanks(int dim,int shift,int &source,int &dest) { int ierr=MPI_Cart_shift(communicator,dim,shift,&source,&dest); assert(ierr==0); } int CartesianCommunicator::RankFromProcessorCoor(std::vector &coor) { int rank; int ierr=MPI_Cart_rank (communicator, &coor[0], &rank); assert(ierr==0); return rank; } void CartesianCommunicator::ProcessorCoorFromRank(int rank, std::vector &coor) { coor.resize(_ndimension); int ierr=MPI_Cart_coords (communicator, rank, _ndimension,&coor[0]); assert(ierr==0); } CartesianCommunicator::CartesianCommunicator(const std::vector &processors) { _ndimension = processors.size(); std::vector periodic(_ndimension,1); _Nprocessors=1; _processors = processors; for(int i=0;i<_ndimension;i++){ _Nprocessors*=_processors[i]; } int Size; MPI_Comm_size(communicator_world,&Size); assert(Size==_Nprocessors); _processor_coor.resize(_ndimension); MPI_Cart_create(communicator_world, _ndimension,&_processors[0],&periodic[0],1,&communicator); MPI_Comm_rank (communicator,&_processor); MPI_Cart_coords(communicator,_processor,_ndimension,&_processor_coor[0]); }; void CartesianCommunicator::GlobalSum(uint32_t &u){ int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT32_T,MPI_SUM,communicator); assert(ierr==0); } void CartesianCommunicator::GlobalSum(uint64_t &u){ int ierr=MPI_Allreduce(MPI_IN_PLACE,&u,1,MPI_UINT64_T,MPI_SUM,communicator); assert(ierr==0); } void CartesianCommunicator::GlobalSum(float &f){ int ierr=MPI_Allreduce(MPI_IN_PLACE,&f,1,MPI_FLOAT,MPI_SUM,communicator); assert(ierr==0); } void CartesianCommunicator::GlobalSumVector(float *f,int N) { int ierr=MPI_Allreduce(MPI_IN_PLACE,f,N,MPI_FLOAT,MPI_SUM,communicator); assert(ierr==0); } void CartesianCommunicator::GlobalSum(double &d) { int ierr = MPI_Allreduce(MPI_IN_PLACE,&d,1,MPI_DOUBLE,MPI_SUM,communicator); assert(ierr==0); } void CartesianCommunicator::GlobalSumVector(double *d,int N) { int ierr = MPI_Allreduce(MPI_IN_PLACE,d,N,MPI_DOUBLE,MPI_SUM,communicator); assert(ierr==0); } // Basic Halo comms primitive void CartesianCommunicator::SendToRecvFrom(void *xmit, int dest, void *recv, int from, int bytes) { std::vector reqs(0); SendToRecvFromBegin(reqs,xmit,dest,recv,from,bytes); SendToRecvFromComplete(reqs); } void CartesianCommunicator::SendRecvPacket(void *xmit, void *recv, int sender, int receiver, int bytes) { MPI_Status stat; assert(sender != receiver); int tag = sender; if ( _processor == sender ) { MPI_Send(xmit, bytes, MPI_CHAR,receiver,tag,communicator); } if ( _processor == receiver ) { MPI_Recv(recv, bytes, MPI_CHAR,sender,tag,communicator,&stat); } } // Basic Halo comms primitive void CartesianCommunicator::SendToRecvFromBegin(std::vector &list, void *xmit, int dest, void *recv, int from, int bytes) { MPI_Request xrq; MPI_Request rrq; int rank = _processor; int ierr; ierr =MPI_Isend(xmit, bytes, MPI_CHAR,dest,_processor,communicator,&xrq); ierr|=MPI_Irecv(recv, bytes, MPI_CHAR,from,from,communicator,&rrq); assert(ierr==0); list.push_back(xrq); list.push_back(rrq); } void CartesianCommunicator::StencilSendToRecvFromBegin(std::vector &list, void *xmit, int dest, void *recv, int from, int bytes) { uint64_t xmit_i = (uint64_t) xmit; uint64_t recv_i = (uint64_t) recv; uint64_t shm = (uint64_t) ShmCommBuf; // assert xmit and recv lie in shared memory region assert( (xmit_i >= shm) && (xmit_i+bytes <= shm+MAX_MPI_SHM_BYTES) ); assert( (recv_i >= shm) && (recv_i+bytes <= shm+MAX_MPI_SHM_BYTES) ); assert(from!=_processor); assert(dest!=_processor); MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest); MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from); } void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector &list) { MPIoffloadEngine::WaitAll(); } void CartesianCommunicator::StencilBarrier(void) { } void CartesianCommunicator::SendToRecvFromComplete(std::vector &list) { int nreq=list.size(); std::vector status(nreq); int ierr = MPI_Waitall(nreq,&list[0],&status[0]); assert(ierr==0); } void CartesianCommunicator::Barrier(void) { int ierr = MPI_Barrier(communicator); assert(ierr==0); } void CartesianCommunicator::Broadcast(int root,void* data, int bytes) { int ierr=MPI_Bcast(data, bytes, MPI_BYTE, root, communicator); assert(ierr==0); } void CartesianCommunicator::BroadcastWorld(int root,void* data, int bytes) { int ierr= MPI_Bcast(data, bytes, MPI_BYTE, root, communicator_world); assert(ierr==0); } void *CartesianCommunicator::ShmBufferSelf(void) { return ShmCommBuf; } void *CartesianCommunicator::ShmBuffer(int rank) { return NULL; } void *CartesianCommunicator::ShmBufferTranslate(int rank,void * local_p) { return NULL; } };