diff --git a/lib/communicator/Communicator_mpi3_leader.cc b/lib/communicator/Communicator_mpi3_leader.cc index 71f1a913..6e26bd3e 100644 --- a/lib/communicator/Communicator_mpi3_leader.cc +++ b/lib/communicator/Communicator_mpi3_leader.cc @@ -27,6 +27,7 @@ Author: Peter Boyle /* END LEGAL */ #include "Grid.h" #include +//#include //////////////////////////////////////////////////////////////////////////////////////////////////////////////// /// Workarounds: @@ -42,19 +43,27 @@ Author: Peter Boyle #include #include #include - typedef sem_t *Grid_semaphore; + +#error /*THis is deprecated*/ + +#if 0 #define SEM_INIT(S) S = sem_open(sem_name,0,0600,0); assert ( S != SEM_FAILED ); #define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,0600,0); assert ( S != SEM_FAILED ); #define SEM_POST(S) assert ( sem_post(S) == 0 ); #define SEM_WAIT(S) assert ( sem_wait(S) == 0 ); - +#else +#define SEM_INIT(S) ; +#define SEM_INIT_EXCL(S) ; +#define SEM_POST(S) ; +#define SEM_WAIT(S) ; +#endif #include namespace Grid { -enum { COMMAND_ISEND, COMMAND_IRECV, COMMAND_WAITALL }; +enum { COMMAND_ISEND, COMMAND_IRECV, COMMAND_WAITALL, COMMAND_SENDRECV }; struct Descriptor { uint64_t buf; @@ -62,6 +71,12 @@ struct Descriptor { int rank; int tag; int command; + uint64_t xbuf; + uint64_t rbuf; + int xtag; + int rtag; + int src; + int dest; MPI_Request request; }; @@ -94,18 +109,14 @@ public: void SemInit(void) { sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); - // printf("SEM_NAME: %s \n",sem_name); SEM_INIT(sem_head); sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); - // printf("SEM_NAME: %s \n",sem_name); SEM_INIT(sem_tail); } void SemInitExcl(void) { sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); - // printf("SEM_INIT_EXCL: %s \n",sem_name); SEM_INIT_EXCL(sem_head); sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); - // printf("SEM_INIT_EXCL: %s \n",sem_name); SEM_INIT_EXCL(sem_tail); } void WakeUpDMA(void) { @@ -125,6 +136,13 @@ public: while(1){ WaitForCommand(); // std::cout << "Getting command "<head,0,0); + int s=state->start; + if ( s != state->head ) { + _mm_mwait(0,0); + } +#endif Event(); } } @@ -132,6 +150,7 @@ public: int Event (void) ; uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ; + void QueueSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src) ; void WaitAll() { // std::cout << "Queueing WAIT command "<tail == state->head ); + while ( state->tail != state->head ); } }; @@ -196,6 +215,12 @@ public: // std::cout << "Waking up DMA "<< slave< MPIoffloadEngine::VerticalShmBufs; std::vector > MPIoffloadEngine::UniverseRanks; std::vector MPIoffloadEngine::UserCommunicatorToWorldRanks; +int CartesianCommunicator::NodeCount(void) { return HorizontalSize;}; int MPIoffloadEngine::ShmSetup = 0; void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, @@ -370,12 +418,22 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, ftruncate(fd, size); VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); - if ( VerticalShmBufs[r] == MAP_FAILED ) { perror("failed mmap"); assert(0); } + /* + for(uint64_t page=0;pagehead ) { switch ( state->Descrs[s].command ) { case COMMAND_ISEND: - /* - std::cout<< " Send "<Descrs[s].buf<< "["<Descrs[s].bytes<<"]" - << " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag - << " Comm " << MPIoffloadEngine::communicator_universe<< " me " <Descrs[s].buf+base), state->Descrs[s].bytes, MPI_CHAR, @@ -568,11 +623,6 @@ int Slave::Event (void) { break; case COMMAND_IRECV: - /* - std::cout<< " Recv "<Descrs[s].buf<< "["<Descrs[s].bytes<<"]" - << " from " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag - << " Comm " << MPIoffloadEngine::communicator_universe<< " me "<< universe_rank<< std::endl; - */ ierr=MPI_Irecv((void *)(state->Descrs[s].buf+base), state->Descrs[s].bytes, MPI_CHAR, @@ -588,10 +638,32 @@ int Slave::Event (void) { return 1; break; + case COMMAND_SENDRECV: + + // fprintf(stderr,"Sendrecv ->%d %d : <-%d %d \n",state->Descrs[s].dest, state->Descrs[s].xtag+i*10,state->Descrs[s].src, state->Descrs[s].rtag+i*10); + + ierr=MPI_Sendrecv((void *)(state->Descrs[s].xbuf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].dest, state->Descrs[s].xtag+i*10, + (void *)(state->Descrs[s].rbuf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].src , state->Descrs[s].rtag+i*10, + MPIoffloadEngine::communicator_universe,MPI_STATUS_IGNORE); + + assert(ierr==0); + + // fprintf(stderr,"Sendrecv done %d %d\n",ierr,i); + // MPI_Barrier(MPIoffloadEngine::HorizontalComm); + // fprintf(stderr,"Barrier\n"); + i++; + + state->start = PERI_PLUS(s); + + return 1; + break; + case COMMAND_WAITALL: for(int t=state->tail;t!=s; t=PERI_PLUS(t) ){ - MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE); + if ( state->Descrs[t].command != COMMAND_SENDRECV ) { + MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE); + } }; s=PERI_PLUS(s); state->start = s; @@ -613,6 +685,45 @@ int Slave::Event (void) { // External interaction with the queue ////////////////////////////////////////////////////////////////////////////// +void Slave::QueueSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src) +{ + int head =state->head; + int next = PERI_PLUS(head); + + // Set up descriptor + int worldrank; + int hashtag; + MPI_Comm communicator; + MPI_Request request; + uint64_t relative; + + relative = (uint64_t)xbuf - base; + state->Descrs[head].xbuf = relative; + + relative= (uint64_t)rbuf - base; + state->Descrs[head].rbuf = relative; + + state->Descrs[head].bytes = bytes; + + MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,xtag,comm,dest); + state->Descrs[head].dest = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]; + state->Descrs[head].xtag = hashtag; + + MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,rtag,comm,src); + state->Descrs[head].src = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]; + state->Descrs[head].rtag = hashtag; + + state->Descrs[head].command= COMMAND_SENDRECV; + + // Block until FIFO has space + while( state->tail==next ); + + // Msync on weak order architectures + + // Advance pointer + state->head = next; + +}; uint64_t Slave::QueueCommand(int command,void *buf, int bytes, int tag, MPI_Comm comm,int commrank) { ///////////////////////////////////////// @@ -812,19 +923,22 @@ void CartesianCommunicator::StencilSendToRecvFromBegin(std::vector= shm) && (recv_i+bytes <= shm+MAX_MPI_SHM_BYTES) ); assert(from!=_processor); assert(dest!=_processor); - MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest); - MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from); -} + MPIoffloadEngine::QueueMultiplexedSendRecv(xmit,recv,bytes,_processor,from,communicator,dest,from); + + //MPIoffloadEngine::QueueRoundRobinSendRecv(xmit,recv,bytes,_processor,from,communicator,dest,from); + + //MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest); + //MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from); +} void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector &list) { MPIoffloadEngine::WaitAll(); + //this->Barrier(); } -void CartesianCommunicator::StencilBarrier(void) -{ -} +void CartesianCommunicator::StencilBarrier(void) { } void CartesianCommunicator::SendToRecvFromComplete(std::vector &list) {