mirror of
				https://github.com/paboyle/Grid.git
				synced 2025-10-31 03:54:33 +00:00 
			
		
		
		
	Don't use MPI3_leader any more. No real gain and complex
This commit is contained in:
		| @@ -27,6 +27,7 @@ Author: Peter Boyle <paboyle@ph.ed.ac.uk> | ||||
|     /*  END LEGAL */ | ||||
| #include "Grid.h" | ||||
| #include <mpi.h> | ||||
| //#include <numaif.h> | ||||
|  | ||||
| //////////////////////////////////////////////////////////////////////////////////////////////////////////////// | ||||
| /// Workarounds: | ||||
| @@ -42,19 +43,27 @@ Author: Peter Boyle <paboyle@ph.ed.ac.uk> | ||||
| #include <fcntl.h> | ||||
| #include <unistd.h> | ||||
| #include <limits.h> | ||||
|  | ||||
| typedef sem_t *Grid_semaphore; | ||||
|  | ||||
|  | ||||
| #error  /*THis is deprecated*/ | ||||
|  | ||||
| #if 0  | ||||
| #define SEM_INIT(S)      S = sem_open(sem_name,0,0600,0); assert ( S != SEM_FAILED ); | ||||
| #define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,0600,0); assert ( S != SEM_FAILED ); | ||||
| #define SEM_POST(S) assert ( sem_post(S) == 0 );  | ||||
| #define SEM_WAIT(S) assert ( sem_wait(S) == 0 ); | ||||
|  | ||||
| #else | ||||
| #define SEM_INIT(S)      ; | ||||
| #define SEM_INIT_EXCL(S) ; | ||||
| #define SEM_POST(S) ; | ||||
| #define SEM_WAIT(S) ; | ||||
| #endif | ||||
| #include <sys/mman.h> | ||||
|  | ||||
| namespace Grid { | ||||
|  | ||||
| enum { COMMAND_ISEND, COMMAND_IRECV, COMMAND_WAITALL }; | ||||
| enum { COMMAND_ISEND, COMMAND_IRECV, COMMAND_WAITALL, COMMAND_SENDRECV }; | ||||
|  | ||||
| struct Descriptor { | ||||
|   uint64_t buf; | ||||
| @@ -62,6 +71,12 @@ struct Descriptor { | ||||
|   int rank; | ||||
|   int tag; | ||||
|   int command; | ||||
|   uint64_t xbuf; | ||||
|   uint64_t rbuf; | ||||
|   int xtag; | ||||
|   int rtag; | ||||
|   int src; | ||||
|   int dest; | ||||
|   MPI_Request request; | ||||
| }; | ||||
|  | ||||
| @@ -94,18 +109,14 @@ public: | ||||
|  | ||||
|   void SemInit(void) { | ||||
|     sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); | ||||
|     //    printf("SEM_NAME: %s \n",sem_name); | ||||
|     SEM_INIT(sem_head); | ||||
|     sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); | ||||
|     //    printf("SEM_NAME: %s \n",sem_name); | ||||
|     SEM_INIT(sem_tail); | ||||
|   }   | ||||
|   void SemInitExcl(void) { | ||||
|     sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank); | ||||
|     //    printf("SEM_INIT_EXCL: %s \n",sem_name); | ||||
|     SEM_INIT_EXCL(sem_head); | ||||
|     sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank); | ||||
|     //    printf("SEM_INIT_EXCL: %s \n",sem_name); | ||||
|     SEM_INIT_EXCL(sem_tail); | ||||
|   }   | ||||
|   void WakeUpDMA(void) {  | ||||
| @@ -125,6 +136,13 @@ public: | ||||
|     while(1){ | ||||
|       WaitForCommand(); | ||||
|       //      std::cout << "Getting command "<<std::endl; | ||||
| #if 0 | ||||
|       _mm_monitor((void *)&state->head,0,0); | ||||
|       int s=state->start; | ||||
|       if ( s != state->head ) { | ||||
| 	_mm_mwait(0,0); | ||||
|       } | ||||
| #endif | ||||
|       Event(); | ||||
|     } | ||||
|   } | ||||
| @@ -132,6 +150,7 @@ public: | ||||
|   int Event (void) ; | ||||
|  | ||||
|   uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ; | ||||
|   void QueueSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src) ; | ||||
|  | ||||
|   void WaitAll() { | ||||
|     //    std::cout << "Queueing WAIT command  "<<std::endl; | ||||
| @@ -141,7 +160,7 @@ public: | ||||
|     //    std::cout << "Waiting from semaphore "<<std::endl; | ||||
|     WaitForComplete(); | ||||
|     //    std::cout << "Checking FIFO is empty "<<std::endl; | ||||
|     assert ( state->tail == state->head ); | ||||
|     while ( state->tail != state->head ); | ||||
|   } | ||||
| }; | ||||
|  | ||||
| @@ -196,6 +215,12 @@ public: | ||||
|     //    std::cout << "Waking up DMA "<< slave<<std::endl; | ||||
|   }; | ||||
|  | ||||
|   static void QueueSendRecv(int slave,void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src)  | ||||
|   { | ||||
|     Slaves[slave].QueueSendRecv(xbuf,rbuf,bytes,xtag,rtag,comm,dest,src); | ||||
|     Slaves[slave].WakeUpDMA(); | ||||
|   } | ||||
|  | ||||
|   static void QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) { | ||||
|     //    std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank  <<std::endl; | ||||
|     Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank); | ||||
| @@ -226,6 +251,28 @@ public: | ||||
|     return; | ||||
|   }; | ||||
|  | ||||
|   static void QueueRoundRobinSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src) { | ||||
|     uint8_t * cxbuf = (uint8_t *) xbuf; | ||||
|     uint8_t * crbuf = (uint8_t *) rbuf; | ||||
|     static int rrp=0; | ||||
|     int procs = VerticalSize-1; | ||||
|     int myoff=0; | ||||
|     int mywork=bytes; | ||||
|     QueueSendRecv(rrp+1,&cxbuf[myoff],&crbuf[myoff],mywork,xtag,rtag,comm,dest,src); | ||||
|     rrp = rrp+1; | ||||
|     if ( rrp == (VerticalSize-1) ) rrp = 0; | ||||
|   } | ||||
|  | ||||
|   static void QueueMultiplexedSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src) { | ||||
|     uint8_t * cxbuf = (uint8_t *) xbuf; | ||||
|     uint8_t * crbuf = (uint8_t *) rbuf; | ||||
|     int mywork, myoff, procs; | ||||
|     procs = VerticalSize-1; | ||||
|     for(int s=0;s<procs;s++) { | ||||
|       GetWork(bytes,s,mywork,myoff,procs); | ||||
|       QueueSendRecv(s+1,&cxbuf[myoff],&crbuf[myoff],mywork,xtag,rtag,comm,dest,src); | ||||
|     } | ||||
|   }; | ||||
|   static void QueueMultiplexedSend(void *buf, int bytes, int tag, MPI_Comm comm,int rank) { | ||||
|     uint8_t * cbuf = (uint8_t *) buf; | ||||
|     int mywork, myoff, procs; | ||||
| @@ -275,6 +322,7 @@ std::vector<void *>            MPIoffloadEngine::VerticalShmBufs; | ||||
| std::vector<std::vector<int> > MPIoffloadEngine::UniverseRanks; | ||||
| std::vector<int>               MPIoffloadEngine::UserCommunicatorToWorldRanks;  | ||||
|  | ||||
| int CartesianCommunicator::NodeCount(void)    { return HorizontalSize;}; | ||||
| int MPIoffloadEngine::ShmSetup = 0; | ||||
|  | ||||
| void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, | ||||
| @@ -370,12 +418,22 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, | ||||
|       ftruncate(fd, size); | ||||
|  | ||||
|       VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); | ||||
|  | ||||
|       if ( VerticalShmBufs[r] == MAP_FAILED ) {  | ||||
| 	perror("failed mmap"); | ||||
| 	assert(0); | ||||
|       } | ||||
|  | ||||
|       /* | ||||
|       for(uint64_t page=0;page<size;page+=4096){ | ||||
| 	void *pages = (void *) ( page + (uint64_t)VerticalShmBufs[r] ); | ||||
| 	int status; | ||||
| 	int flags=MPOL_MF_MOVE_ALL; | ||||
| 	int nodes=1; // numa domain == MCDRAM | ||||
| 	unsigned long count=1; | ||||
| 	ierr= move_pages(0,count, &pages,&nodes,&status,flags); | ||||
| 	if (ierr && (page==0)) perror("numa relocate command failed"); | ||||
|       } | ||||
|       */ | ||||
|       uint64_t * check = (uint64_t *) VerticalShmBufs[r]; | ||||
|       check[0] = WorldRank; | ||||
|       check[1] = r; | ||||
| @@ -404,7 +462,7 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world, | ||||
|     uint64_t * check = (uint64_t *) VerticalShmBufs[r]; | ||||
|     assert(check[0]== WorldRank); | ||||
|     assert(check[1]== r); | ||||
|     std::cerr<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl; | ||||
|     //    std::cerr<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl; | ||||
|   } | ||||
|   } | ||||
| #endif | ||||
| @@ -542,6 +600,8 @@ int Slave::Event (void) { | ||||
|   static int head_last; | ||||
|   static int start_last; | ||||
|   int ierr; | ||||
|   MPI_Status stat; | ||||
|   static int i=0; | ||||
|  | ||||
|   //////////////////////////////////////////////////// | ||||
|   // Try to advance the start pointers | ||||
| @@ -550,11 +610,6 @@ int Slave::Event (void) { | ||||
|   if ( s != state->head ) { | ||||
|     switch ( state->Descrs[s].command ) { | ||||
|     case COMMAND_ISEND: | ||||
|       /* | ||||
|             std::cout<< " Send "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]" | ||||
|       	       << " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag | ||||
|        << " Comm " << MPIoffloadEngine::communicator_universe<< " me " <<universe_rank<< std::endl; | ||||
|       */ | ||||
|       ierr = MPI_Isend((void *)(state->Descrs[s].buf+base),  | ||||
| 		       state->Descrs[s].bytes,  | ||||
| 		       MPI_CHAR, | ||||
| @@ -568,11 +623,6 @@ int Slave::Event (void) { | ||||
|       break; | ||||
|  | ||||
|     case COMMAND_IRECV: | ||||
|       /* | ||||
|       std::cout<< " Recv "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]" | ||||
| 	       << " from " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag | ||||
| 	       << " Comm " << MPIoffloadEngine::communicator_universe<< " me "<< universe_rank<< std::endl; | ||||
|       */ | ||||
|       ierr=MPI_Irecv((void *)(state->Descrs[s].buf+base),  | ||||
| 		     state->Descrs[s].bytes,  | ||||
| 		     MPI_CHAR, | ||||
| @@ -588,10 +638,32 @@ int Slave::Event (void) { | ||||
|       return 1; | ||||
|       break; | ||||
|  | ||||
|     case COMMAND_SENDRECV: | ||||
|  | ||||
|       //      fprintf(stderr,"Sendrecv ->%d %d : <-%d %d \n",state->Descrs[s].dest, state->Descrs[s].xtag+i*10,state->Descrs[s].src, state->Descrs[s].rtag+i*10); | ||||
|  | ||||
|       ierr=MPI_Sendrecv((void *)(state->Descrs[s].xbuf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].dest, state->Descrs[s].xtag+i*10, | ||||
| 			(void *)(state->Descrs[s].rbuf+base), state->Descrs[s].bytes, MPI_CHAR, state->Descrs[s].src , state->Descrs[s].rtag+i*10, | ||||
| 			MPIoffloadEngine::communicator_universe,MPI_STATUS_IGNORE); | ||||
|  | ||||
|       assert(ierr==0); | ||||
|  | ||||
|       //      fprintf(stderr,"Sendrecv done %d %d\n",ierr,i); | ||||
|       //      MPI_Barrier(MPIoffloadEngine::HorizontalComm); | ||||
|       //      fprintf(stderr,"Barrier\n"); | ||||
|       i++; | ||||
|  | ||||
|       state->start = PERI_PLUS(s); | ||||
|  | ||||
|       return 1; | ||||
|       break; | ||||
|  | ||||
|     case COMMAND_WAITALL: | ||||
|  | ||||
|       for(int t=state->tail;t!=s; t=PERI_PLUS(t) ){ | ||||
| 	MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE); | ||||
| 	if ( state->Descrs[t].command != COMMAND_SENDRECV ) { | ||||
| 	  MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE); | ||||
| 	} | ||||
|       }; | ||||
|       s=PERI_PLUS(s); | ||||
|       state->start = s; | ||||
| @@ -613,6 +685,45 @@ int Slave::Event (void) { | ||||
|   // External interaction with the queue | ||||
|   ////////////////////////////////////////////////////////////////////////////// | ||||
|    | ||||
| void Slave::QueueSendRecv(void *xbuf, void *rbuf, int bytes, int xtag, int rtag, MPI_Comm comm,int dest,int src)  | ||||
| { | ||||
|   int head =state->head; | ||||
|   int next = PERI_PLUS(head); | ||||
|    | ||||
|   // Set up descriptor | ||||
|   int worldrank; | ||||
|   int hashtag; | ||||
|   MPI_Comm    communicator; | ||||
|   MPI_Request request; | ||||
|   uint64_t relative; | ||||
|    | ||||
|   relative = (uint64_t)xbuf - base; | ||||
|   state->Descrs[head].xbuf    = relative; | ||||
|    | ||||
|   relative= (uint64_t)rbuf - base; | ||||
|   state->Descrs[head].rbuf    = relative; | ||||
|    | ||||
|   state->Descrs[head].bytes  = bytes; | ||||
|    | ||||
|   MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,xtag,comm,dest); | ||||
|   state->Descrs[head].dest   = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]; | ||||
|   state->Descrs[head].xtag    = hashtag; | ||||
|    | ||||
|   MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,rtag,comm,src); | ||||
|   state->Descrs[head].src    = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]; | ||||
|   state->Descrs[head].rtag    = hashtag; | ||||
|    | ||||
|   state->Descrs[head].command= COMMAND_SENDRECV; | ||||
|    | ||||
|   // Block until FIFO has space | ||||
|   while( state->tail==next ); | ||||
|    | ||||
|   // Msync on weak order architectures | ||||
|    | ||||
|   // Advance pointer | ||||
|   state->head = next; | ||||
|    | ||||
| }; | ||||
| uint64_t Slave::QueueCommand(int command,void *buf, int bytes, int tag, MPI_Comm comm,int commrank)  | ||||
| { | ||||
|   ///////////////////////////////////////// | ||||
| @@ -812,19 +923,22 @@ void CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_ | ||||
|   assert( (recv_i >= shm) && (recv_i+bytes <= shm+MAX_MPI_SHM_BYTES) ); | ||||
|   assert(from!=_processor); | ||||
|   assert(dest!=_processor); | ||||
|   MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest); | ||||
|   MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from); | ||||
| } | ||||
|  | ||||
|   MPIoffloadEngine::QueueMultiplexedSendRecv(xmit,recv,bytes,_processor,from,communicator,dest,from); | ||||
|  | ||||
|   //MPIoffloadEngine::QueueRoundRobinSendRecv(xmit,recv,bytes,_processor,from,communicator,dest,from); | ||||
|  | ||||
|   //MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest); | ||||
|   //MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from); | ||||
| } | ||||
|  | ||||
| void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsRequest_t> &list) | ||||
| { | ||||
|   MPIoffloadEngine::WaitAll(); | ||||
|   //this->Barrier(); | ||||
| } | ||||
|  | ||||
| void CartesianCommunicator::StencilBarrier(void) | ||||
| { | ||||
| } | ||||
| void CartesianCommunicator::StencilBarrier(void) { } | ||||
|  | ||||
| void CartesianCommunicator::SendToRecvFromComplete(std::vector<CommsRequest_t> &list) | ||||
| { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user