diff --git a/lib/communicator/Communicator_base.h b/lib/communicator/Communicator_base.h index f17f6f53..76a6a2f8 100644 --- a/lib/communicator/Communicator_base.h +++ b/lib/communicator/Communicator_base.h @@ -120,12 +120,11 @@ class CartesianCommunicator { int recv_from_rank, int bytes); - void RecvFrom(void *recv, - int recv_from_rank, - int bytes); - void SendTo(void *xmit, - int xmit_to_rank, - int bytes); + void SendRecvPacket(void *xmit, + void *recv, + int xmit_to_rank, + int recv_from_rank, + int bytes); void SendToRecvFromBegin(std::vector &list, void *xmit, diff --git a/lib/communicator/Communicator_mpi.cc b/lib/communicator/Communicator_mpi.cc index 95b46cc0..26896d54 100644 --- a/lib/communicator/Communicator_mpi.cc +++ b/lib/communicator/Communicator_mpi.cc @@ -117,21 +117,22 @@ void CartesianCommunicator::SendToRecvFrom(void *xmit, SendToRecvFromBegin(reqs,xmit,dest,recv,from,bytes); SendToRecvFromComplete(reqs); } -void CartesianCommunicator::RecvFrom(void *recv, - int from, - int bytes) + +void CartesianCommunicator::SendRecvPacket(void *xmit, + void *recv, + int sender, + int receiver, + int bytes) { MPI_Status stat; - int ierr=MPI_Recv(recv, bytes, MPI_CHAR,from,from,communicator,&stat); - assert(ierr==0); -} -void CartesianCommunicator::SendTo(void *xmit, - int dest, - int bytes) -{ - int rank = _processor; // used for tag; must know who it comes from - int ierr = MPI_Send(xmit, bytes, MPI_CHAR,dest,_processor,communicator); - assert(ierr==0); + assert(sender != receiver); + int tag = sender; + if ( _processor == sender ) { + MPI_Send(xmit, bytes, MPI_CHAR,receiver,tag,communicator); + } + if ( _processor == receiver ) { + MPI_Recv(recv, bytes, MPI_CHAR,sender,tag,communicator,&stat); + } } // Basic Halo comms primitive diff --git a/lib/communicator/Communicator_shmem.cc b/lib/communicator/Communicator_shmem.cc index b78e2e7c..8db5fcf1 100644 --- a/lib/communicator/Communicator_shmem.cc +++ b/lib/communicator/Communicator_shmem.cc @@ -39,11 +39,28 @@ namespace Grid { BACKTRACEFILE(); \ }\ } - int Rank(void) { - return shmem_my_pe(); - } +int Rank(void) { + return shmem_my_pe(); +} +typedef struct HandShake_t { + uint64_t seq_local; + uint64_t seq_remote; +} HandShake; + +static Vector< HandShake > XConnections; +static Vector< HandShake > RConnections; + void CartesianCommunicator::Init(int *argc, char ***argv) { shmem_init(); + XConnections.resize(shmem_n_pes()); + RConnections.resize(shmem_n_pes()); + for(int pe =0 ; pe &processors) { @@ -69,23 +86,29 @@ CartesianCommunicator::CartesianCommunicator(const std::vector &processors) } void CartesianCommunicator::GlobalSum(uint32_t &u){ - static long long source = (long long) u; - static long long dest = 0 ; + static long long source ; + static long long dest ; static long long llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; // int nreduce=1; // int pestart=0; // int logStride=0; + + source = u; + dest = 0; shmem_longlong_sum_to_all(&dest,&source,1,0,0,_Nprocessors,llwrk,psync); + shmem_barrier_all(); // necessary? u = dest; } void CartesianCommunicator::GlobalSum(float &f){ - static float source = f; - static float dest = 0 ; + static float source ; + static float dest ; static float llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; + source = f; + dest =0.0; shmem_float_sum_to_all(&dest,&source,1,0,0,_Nprocessors,llwrk,psync); f = dest; } @@ -96,13 +119,13 @@ void CartesianCommunicator::GlobalSumVector(float *f,int N) static float llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; - // Inefficient, but don't want to dynamic alloc if ( shmem_addr_accessible(f,_processor) ){ shmem_float_sum_to_all(f,f,N,0,0,_Nprocessors,llwrk,psync); return; } for(int i=0;i %d\n",sender,receiver); + // Check he has posted a receive + while(SendSeq->seq_remote == SendSeq->seq_local); + + printf("Sender receive %d posted\n",sender,receiver); + + // Advance our send count + seq = ++(SendSeq->seq_local); + + // Send this packet + SHMEM_VET(recv); + shmem_putmem(recv,xmit,bytes,receiver); + shmem_fence(); + + printf("Sender sent payload %d\n",seq); + //Notify him we're done + shmem_putmem((void *)&(RecvSeq->seq_remote),&seq,sizeof(seq),receiver); + shmem_fence(); + printf("Sender ringing door bell %d\n",seq); + } + if ( _processor == receiver ) { + + printf("Receiver SHMEM pt2pt %d->%d\n",sender,receiver); + // Post a receive + seq = ++(RecvSeq->seq_local); + shmem_putmem((void *)&(SendSeq->seq_remote),&seq,sizeof(seq),sender); + + printf("Receiver Opening letter box %d\n",seq); + + + // Now wait until he has advanced our reception counter + while(RecvSeq->seq_remote != RecvSeq->seq_local); + + printf("Receiver Got the mail %d\n",seq); + } } // Basic Halo comms primitive @@ -217,7 +280,12 @@ void CartesianCommunicator::Broadcast(int root,void* data, int bytes) uint32_t *array = (uint32_t *) data; assert( (bytes % 4)==0); int words = bytes/4; - + + if ( shmem_addr_accessible(data,_processor) ){ + shmem_broadcast32(data,data,words,root,0,0,shmem_n_pes(),psync); + return; + } + for(int w=0;w_gsites; @@ -310,7 +311,7 @@ class BinaryIO { Uint32Checksum((uint32_t *)&saved[0],bytes,csum); fout.write((char *)&saved[0],bytes); } - + grid->Broadcast(0,(void *)&csum,sizeof(csum)); return csum; } static inline uint32_t readRNGSerial(GridSerialRNG &serial,GridParallelRNG ¶llel,std::string file,int offset) @@ -360,6 +361,8 @@ class BinaryIO { Uint32Checksum((uint32_t *)&saved[0],bytes,csum); } + grid->Broadcast(0,(void *)&csum,sizeof(csum)); + return csum; } @@ -398,7 +401,7 @@ class BinaryIO { int IOnode = 1; for(int d=0;d_ndimension;d++) { - if ( d==0 ) parallel[d] = 0; + if ( d == 0 ) parallel[d] = 0; if (parallel[d]) { range[d] = grid->_ldimensions[d]; start[d] = grid->_processor_coor[d]*range[d]; @@ -439,9 +442,9 @@ class BinaryIO { // available (how short sighted is that?) ////////////////////////////////////////////////////////// Umu = zero; - uint32_t csum=0; + static uint32_t csum=0; fobj fileObj; - sobj siteObj; + static sobj siteObj; // Static to place in symmetric region for SHMEM // need to implement these loops in Nd independent way with a lexico conversion for(int tlex=0;tlexGlobalCoorToRankIndex(rank,o_idx,i_idx,gsite); grid->GlobalCoorToGlobalIndex(gsite,g_idx); @@ -479,23 +482,24 @@ class BinaryIO { if(ieee64) le64toh_v((void *)&fileObj,sizeof(fileObj)); munge(fileObj,siteObj,csum); - - if ( rank != myrank ) { - grid->SendTo((void *)&siteObj,rank,sizeof(siteObj)); - } else { - pokeLocalSite(siteObj,Umu,lsite); + + } + + // Possibly do transport through pt2pt + if ( rank != iorank ) { + if ( (myrank == rank) || (myrank==iorank) ) { + grid->SendRecvPacket((void *)&siteObj,(void *)&siteObj,iorank,rank,sizeof(siteObj)); } - - } else { - if ( myrank == rank ) { - grid->RecvFrom((void *)&siteObj,iorank,sizeof(siteObj)); + } + // Poke at destination + if ( myrank == rank ) { pokeLocalSite(siteObj,Umu,lsite); - } } grid->Barrier(); // necessary? } grid->GlobalSum(csum); + grid->Barrier(); return csum; } @@ -530,7 +534,7 @@ class BinaryIO { for(int d=0;d_ndimension;d++) { - if ( d==0 ) parallel[d] = 0; + if ( d!= grid->_ndimension-1 ) parallel[d] = 0; if (parallel[d]) { range[d] = grid->_ldimensions[d]; @@ -577,10 +581,10 @@ class BinaryIO { uint32_t csum=0; fobj fileObj; - sobj siteObj; + static sobj siteObj; // static for SHMEM target; otherwise dynamic allocate with AlignedAllocator - - // need to implement these loops in Nd independent way with a lexico conversion + // should aggregate a whole chunk and then write. + // need to implement these loops in Nd independent way with a lexico conversion for(int tlex=0;tlex tsite(nd); // temporary mixed up site @@ -606,13 +610,21 @@ class BinaryIO { //////////////////////////////// // iorank writes from the seek //////////////////////////////// - if (myrank == iorank) { + + // Owner of data peeks it + peekLocalSite(siteObj,Umu,lsite); - if ( rank != myrank ) { - grid->RecvFrom((void *)&siteObj,rank,sizeof(siteObj)); - } else { - peekLocalSite(siteObj,Umu,lsite); + // Pair of nodes may need to do pt2pt send + if ( rank != iorank ) { // comms is necessary + if ( (myrank == rank) || (myrank==iorank) ) { // and we have to do it + // Send to IOrank + grid->SendRecvPacket((void *)&siteObj,(void *)&siteObj,rank,iorank,sizeof(siteObj)); } + } + + grid->Barrier(); // necessary? + + if (myrank == iorank) { munge(siteObj,fileObj,csum); @@ -624,13 +636,7 @@ class BinaryIO { fout.seekp(offset+g_idx*sizeof(fileObj)); fout.write((char *)&fileObj,sizeof(fileObj)); - } else { - if ( myrank == rank ) { - peekLocalSite(siteObj,Umu,lsite); - grid->SendTo((void *)&siteObj,iorank,sizeof(siteObj)); - } } - grid->Barrier(); // necessary// or every 16 packets to rate throttle?? } grid->GlobalSum(csum); diff --git a/lib/parallelIO/NerscIO.h b/lib/parallelIO/NerscIO.h index efe23e18..36e4371c 100644 --- a/lib/parallelIO/NerscIO.h +++ b/lib/parallelIO/NerscIO.h @@ -345,17 +345,17 @@ static inline void readConfiguration(Lattice > &Umu, if ( header.data_type == std::string("4D_SU3_GAUGE") ) { if ( ieee32 || ieee32big ) { // csum=BinaryIO::readObjectSerial, LorentzColour2x3F> - csum=BinaryIO::readObjectParallel, LorentzColour2x3F> + csum=BinaryIO::readObjectParallel, LorentzColour2x3F> (Umu,file,Nersc3x2munger(), offset,format); } if ( ieee64 || ieee64big ) { - // csum=BinaryIO::readObjectSerial, LorentzColour2x3D> + //csum=BinaryIO::readObjectSerial, LorentzColour2x3D> csum=BinaryIO::readObjectParallel, LorentzColour2x3D> - (Umu,file,Nersc3x2munger(),offset,format); + (Umu,file,Nersc3x2munger(),offset,format); } } else if ( header.data_type == std::string("4D_SU3_GAUGE_3X3") ) { if ( ieee32 || ieee32big ) { - // csum=BinaryIO::readObjectSerial,LorentzColourMatrixF> + //csum=BinaryIO::readObjectSerial,LorentzColourMatrixF> csum=BinaryIO::readObjectParallel,LorentzColourMatrixF> (Umu,file,NerscSimpleMunger(),offset,format); } @@ -372,6 +372,7 @@ static inline void readConfiguration(Lattice > &Umu, assert(fabs(clone.plaquette -header.plaquette ) < 1.0e-5 ); assert(fabs(clone.link_trace-header.link_trace) < 1.0e-6 ); + assert(csum == header.checksum ); std::cout< > &Umu std::string file1 = file+"para"; int offset1 = writeHeader(header,file1); int csum1=BinaryIO::writeObjectParallel(Umu,file1,munge,offset,header.floating_point); + //int csum1=BinaryIO::writeObjectSerial(Umu,file1,munge,offset,header.floating_point); std::cout << GridLogMessage << " TESTING PARALLEL WRITE offsets " << offset1 << " "<< offset << std::endl; @@ -433,7 +435,8 @@ static inline void writeConfiguration(Lattice > &Umu NerscSimpleUnmunger munge; BinaryIO::Uint32Checksum(Umu, munge,header.checksum); offset = writeHeader(header,file); - csum=BinaryIO::writeObjectSerial(Umu,file,munge,offset,header.floating_point); + // csum=BinaryIO::writeObjectSerial(Umu,file,munge,offset,header.floating_point); + csum=BinaryIO::writeObjectParallel(Umu,file,munge,offset,header.floating_point); } std::cout< uint32_t csum=BinaryIO::readRNGSerial(serial,parallel,file,offset); + std::cerr<<" Csum "<< csum << " "<< header.checksum <::DhopInternalCommsThenCompute(StencilImpl & st, Lebes int nwork = U._grid->oSites(); commtime -=usecond(); - auto handle = st.HaloExchangeBegin(in,compressor); - st.HaloExchangeComplete(handle); + auto handle = st.HaloExchangeOptBegin(in,compressor); + st.HaloExchangeOptComplete(handle); commtime +=usecond(); jointime -=usecond();