From 6aeaf6f568a391e34b913f08be6a11beb28d8842 Mon Sep 17 00:00:00 2001 From: Peter Boyle Date: Sun, 21 Feb 2016 08:03:21 -0600 Subject: [PATCH] Parallel IO worked on. I'm puzzled because I already thought I shook this out on MacOS + OpenMPI and then turned up problems on the BlueWaters Cray. Gets 75MB/s from home filesystem on parallel configuration read. Need to make the RNG IO parallel, and also to look at aggregating bigger writes for the parallel write. Not sure what the home filesystem is. --- lib/communicator/Communicator_base.h | 11 +- lib/communicator/Communicator_mpi.cc | 27 ++--- lib/communicator/Communicator_shmem.cc | 116 +++++++++++++++++----- lib/parallelIO/BinaryIO.h | 66 ++++++------ lib/parallelIO/NerscIO.h | 15 ++- lib/qcd/action/fermion/WilsonFermion5D.cc | 5 +- 6 files changed, 160 insertions(+), 80 deletions(-) diff --git a/lib/communicator/Communicator_base.h b/lib/communicator/Communicator_base.h index f17f6f53..76a6a2f8 100644 --- a/lib/communicator/Communicator_base.h +++ b/lib/communicator/Communicator_base.h @@ -120,12 +120,11 @@ class CartesianCommunicator { int recv_from_rank, int bytes); - void RecvFrom(void *recv, - int recv_from_rank, - int bytes); - void SendTo(void *xmit, - int xmit_to_rank, - int bytes); + void SendRecvPacket(void *xmit, + void *recv, + int xmit_to_rank, + int recv_from_rank, + int bytes); void SendToRecvFromBegin(std::vector &list, void *xmit, diff --git a/lib/communicator/Communicator_mpi.cc b/lib/communicator/Communicator_mpi.cc index 95b46cc0..26896d54 100644 --- a/lib/communicator/Communicator_mpi.cc +++ b/lib/communicator/Communicator_mpi.cc @@ -117,21 +117,22 @@ void CartesianCommunicator::SendToRecvFrom(void *xmit, SendToRecvFromBegin(reqs,xmit,dest,recv,from,bytes); SendToRecvFromComplete(reqs); } -void CartesianCommunicator::RecvFrom(void *recv, - int from, - int bytes) + +void CartesianCommunicator::SendRecvPacket(void *xmit, + void *recv, + int sender, + int receiver, + int bytes) { MPI_Status stat; - int ierr=MPI_Recv(recv, bytes, MPI_CHAR,from,from,communicator,&stat); - assert(ierr==0); -} -void CartesianCommunicator::SendTo(void *xmit, - int dest, - int bytes) -{ - int rank = _processor; // used for tag; must know who it comes from - int ierr = MPI_Send(xmit, bytes, MPI_CHAR,dest,_processor,communicator); - assert(ierr==0); + assert(sender != receiver); + int tag = sender; + if ( _processor == sender ) { + MPI_Send(xmit, bytes, MPI_CHAR,receiver,tag,communicator); + } + if ( _processor == receiver ) { + MPI_Recv(recv, bytes, MPI_CHAR,sender,tag,communicator,&stat); + } } // Basic Halo comms primitive diff --git a/lib/communicator/Communicator_shmem.cc b/lib/communicator/Communicator_shmem.cc index b78e2e7c..8db5fcf1 100644 --- a/lib/communicator/Communicator_shmem.cc +++ b/lib/communicator/Communicator_shmem.cc @@ -39,11 +39,28 @@ namespace Grid { BACKTRACEFILE(); \ }\ } - int Rank(void) { - return shmem_my_pe(); - } +int Rank(void) { + return shmem_my_pe(); +} +typedef struct HandShake_t { + uint64_t seq_local; + uint64_t seq_remote; +} HandShake; + +static Vector< HandShake > XConnections; +static Vector< HandShake > RConnections; + void CartesianCommunicator::Init(int *argc, char ***argv) { shmem_init(); + XConnections.resize(shmem_n_pes()); + RConnections.resize(shmem_n_pes()); + for(int pe =0 ; pe &processors) { @@ -69,23 +86,29 @@ CartesianCommunicator::CartesianCommunicator(const std::vector &processors) } void CartesianCommunicator::GlobalSum(uint32_t &u){ - static long long source = (long long) u; - static long long dest = 0 ; + static long long source ; + static long long dest ; static long long llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; // int nreduce=1; // int pestart=0; // int logStride=0; + + source = u; + dest = 0; shmem_longlong_sum_to_all(&dest,&source,1,0,0,_Nprocessors,llwrk,psync); + shmem_barrier_all(); // necessary? u = dest; } void CartesianCommunicator::GlobalSum(float &f){ - static float source = f; - static float dest = 0 ; + static float source ; + static float dest ; static float llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; + source = f; + dest =0.0; shmem_float_sum_to_all(&dest,&source,1,0,0,_Nprocessors,llwrk,psync); f = dest; } @@ -96,13 +119,13 @@ void CartesianCommunicator::GlobalSumVector(float *f,int N) static float llwrk[_SHMEM_REDUCE_MIN_WRKDATA_SIZE]; static long psync[_SHMEM_REDUCE_SYNC_SIZE]; - // Inefficient, but don't want to dynamic alloc if ( shmem_addr_accessible(f,_processor) ){ shmem_float_sum_to_all(f,f,N,0,0,_Nprocessors,llwrk,psync); return; } for(int i=0;i %d\n",sender,receiver); + // Check he has posted a receive + while(SendSeq->seq_remote == SendSeq->seq_local); + + printf("Sender receive %d posted\n",sender,receiver); + + // Advance our send count + seq = ++(SendSeq->seq_local); + + // Send this packet + SHMEM_VET(recv); + shmem_putmem(recv,xmit,bytes,receiver); + shmem_fence(); + + printf("Sender sent payload %d\n",seq); + //Notify him we're done + shmem_putmem((void *)&(RecvSeq->seq_remote),&seq,sizeof(seq),receiver); + shmem_fence(); + printf("Sender ringing door bell %d\n",seq); + } + if ( _processor == receiver ) { + + printf("Receiver SHMEM pt2pt %d->%d\n",sender,receiver); + // Post a receive + seq = ++(RecvSeq->seq_local); + shmem_putmem((void *)&(SendSeq->seq_remote),&seq,sizeof(seq),sender); + + printf("Receiver Opening letter box %d\n",seq); + + + // Now wait until he has advanced our reception counter + while(RecvSeq->seq_remote != RecvSeq->seq_local); + + printf("Receiver Got the mail %d\n",seq); + } } // Basic Halo comms primitive @@ -217,7 +280,12 @@ void CartesianCommunicator::Broadcast(int root,void* data, int bytes) uint32_t *array = (uint32_t *) data; assert( (bytes % 4)==0); int words = bytes/4; - + + if ( shmem_addr_accessible(data,_processor) ){ + shmem_broadcast32(data,data,words,root,0,0,shmem_n_pes(),psync); + return; + } + for(int w=0;w_gsites; @@ -310,7 +311,7 @@ class BinaryIO { Uint32Checksum((uint32_t *)&saved[0],bytes,csum); fout.write((char *)&saved[0],bytes); } - + grid->Broadcast(0,(void *)&csum,sizeof(csum)); return csum; } static inline uint32_t readRNGSerial(GridSerialRNG &serial,GridParallelRNG ¶llel,std::string file,int offset) @@ -360,6 +361,8 @@ class BinaryIO { Uint32Checksum((uint32_t *)&saved[0],bytes,csum); } + grid->Broadcast(0,(void *)&csum,sizeof(csum)); + return csum; } @@ -398,7 +401,7 @@ class BinaryIO { int IOnode = 1; for(int d=0;d_ndimension;d++) { - if ( d==0 ) parallel[d] = 0; + if ( d == 0 ) parallel[d] = 0; if (parallel[d]) { range[d] = grid->_ldimensions[d]; start[d] = grid->_processor_coor[d]*range[d]; @@ -439,9 +442,9 @@ class BinaryIO { // available (how short sighted is that?) ////////////////////////////////////////////////////////// Umu = zero; - uint32_t csum=0; + static uint32_t csum=0; fobj fileObj; - sobj siteObj; + static sobj siteObj; // Static to place in symmetric region for SHMEM // need to implement these loops in Nd independent way with a lexico conversion for(int tlex=0;tlexGlobalCoorToRankIndex(rank,o_idx,i_idx,gsite); grid->GlobalCoorToGlobalIndex(gsite,g_idx); @@ -479,23 +482,24 @@ class BinaryIO { if(ieee64) le64toh_v((void *)&fileObj,sizeof(fileObj)); munge(fileObj,siteObj,csum); - - if ( rank != myrank ) { - grid->SendTo((void *)&siteObj,rank,sizeof(siteObj)); - } else { - pokeLocalSite(siteObj,Umu,lsite); + + } + + // Possibly do transport through pt2pt + if ( rank != iorank ) { + if ( (myrank == rank) || (myrank==iorank) ) { + grid->SendRecvPacket((void *)&siteObj,(void *)&siteObj,iorank,rank,sizeof(siteObj)); } - - } else { - if ( myrank == rank ) { - grid->RecvFrom((void *)&siteObj,iorank,sizeof(siteObj)); + } + // Poke at destination + if ( myrank == rank ) { pokeLocalSite(siteObj,Umu,lsite); - } } grid->Barrier(); // necessary? } grid->GlobalSum(csum); + grid->Barrier(); return csum; } @@ -530,7 +534,7 @@ class BinaryIO { for(int d=0;d_ndimension;d++) { - if ( d==0 ) parallel[d] = 0; + if ( d!= grid->_ndimension-1 ) parallel[d] = 0; if (parallel[d]) { range[d] = grid->_ldimensions[d]; @@ -577,10 +581,10 @@ class BinaryIO { uint32_t csum=0; fobj fileObj; - sobj siteObj; + static sobj siteObj; // static for SHMEM target; otherwise dynamic allocate with AlignedAllocator - - // need to implement these loops in Nd independent way with a lexico conversion + // should aggregate a whole chunk and then write. + // need to implement these loops in Nd independent way with a lexico conversion for(int tlex=0;tlex tsite(nd); // temporary mixed up site @@ -606,13 +610,21 @@ class BinaryIO { //////////////////////////////// // iorank writes from the seek //////////////////////////////// - if (myrank == iorank) { + + // Owner of data peeks it + peekLocalSite(siteObj,Umu,lsite); - if ( rank != myrank ) { - grid->RecvFrom((void *)&siteObj,rank,sizeof(siteObj)); - } else { - peekLocalSite(siteObj,Umu,lsite); + // Pair of nodes may need to do pt2pt send + if ( rank != iorank ) { // comms is necessary + if ( (myrank == rank) || (myrank==iorank) ) { // and we have to do it + // Send to IOrank + grid->SendRecvPacket((void *)&siteObj,(void *)&siteObj,rank,iorank,sizeof(siteObj)); } + } + + grid->Barrier(); // necessary? + + if (myrank == iorank) { munge(siteObj,fileObj,csum); @@ -624,13 +636,7 @@ class BinaryIO { fout.seekp(offset+g_idx*sizeof(fileObj)); fout.write((char *)&fileObj,sizeof(fileObj)); - } else { - if ( myrank == rank ) { - peekLocalSite(siteObj,Umu,lsite); - grid->SendTo((void *)&siteObj,iorank,sizeof(siteObj)); - } } - grid->Barrier(); // necessary// or every 16 packets to rate throttle?? } grid->GlobalSum(csum); diff --git a/lib/parallelIO/NerscIO.h b/lib/parallelIO/NerscIO.h index efe23e18..36e4371c 100644 --- a/lib/parallelIO/NerscIO.h +++ b/lib/parallelIO/NerscIO.h @@ -345,17 +345,17 @@ static inline void readConfiguration(Lattice > &Umu, if ( header.data_type == std::string("4D_SU3_GAUGE") ) { if ( ieee32 || ieee32big ) { // csum=BinaryIO::readObjectSerial, LorentzColour2x3F> - csum=BinaryIO::readObjectParallel, LorentzColour2x3F> + csum=BinaryIO::readObjectParallel, LorentzColour2x3F> (Umu,file,Nersc3x2munger(), offset,format); } if ( ieee64 || ieee64big ) { - // csum=BinaryIO::readObjectSerial, LorentzColour2x3D> + //csum=BinaryIO::readObjectSerial, LorentzColour2x3D> csum=BinaryIO::readObjectParallel, LorentzColour2x3D> - (Umu,file,Nersc3x2munger(),offset,format); + (Umu,file,Nersc3x2munger(),offset,format); } } else if ( header.data_type == std::string("4D_SU3_GAUGE_3X3") ) { if ( ieee32 || ieee32big ) { - // csum=BinaryIO::readObjectSerial,LorentzColourMatrixF> + //csum=BinaryIO::readObjectSerial,LorentzColourMatrixF> csum=BinaryIO::readObjectParallel,LorentzColourMatrixF> (Umu,file,NerscSimpleMunger(),offset,format); } @@ -372,6 +372,7 @@ static inline void readConfiguration(Lattice > &Umu, assert(fabs(clone.plaquette -header.plaquette ) < 1.0e-5 ); assert(fabs(clone.link_trace-header.link_trace) < 1.0e-6 ); + assert(csum == header.checksum ); std::cout< > &Umu std::string file1 = file+"para"; int offset1 = writeHeader(header,file1); int csum1=BinaryIO::writeObjectParallel(Umu,file1,munge,offset,header.floating_point); + //int csum1=BinaryIO::writeObjectSerial(Umu,file1,munge,offset,header.floating_point); std::cout << GridLogMessage << " TESTING PARALLEL WRITE offsets " << offset1 << " "<< offset << std::endl; @@ -433,7 +435,8 @@ static inline void writeConfiguration(Lattice > &Umu NerscSimpleUnmunger munge; BinaryIO::Uint32Checksum(Umu, munge,header.checksum); offset = writeHeader(header,file); - csum=BinaryIO::writeObjectSerial(Umu,file,munge,offset,header.floating_point); + // csum=BinaryIO::writeObjectSerial(Umu,file,munge,offset,header.floating_point); + csum=BinaryIO::writeObjectParallel(Umu,file,munge,offset,header.floating_point); } std::cout< uint32_t csum=BinaryIO::readRNGSerial(serial,parallel,file,offset); + std::cerr<<" Csum "<< csum << " "<< header.checksum <::DhopInternalCommsThenCompute(StencilImpl & st, Lebes int nwork = U._grid->oSites(); commtime -=usecond(); - auto handle = st.HaloExchangeBegin(in,compressor); - st.HaloExchangeComplete(handle); + auto handle = st.HaloExchangeOptBegin(in,compressor); + st.HaloExchangeOptComplete(handle); commtime +=usecond(); jointime -=usecond();