1
0
mirror of https://github.com/paboyle/Grid.git synced 2025-04-09 21:50:45 +01:00

Stencil comms cleaner

This commit is contained in:
Peter Boyle 2019-07-12 17:12:25 +01:00
parent bd155ca5c0
commit a29b43d755

View File

@ -315,6 +315,7 @@ public:
////////////////////////////////////////// //////////////////////////////////////////
// Comms packet queue for asynch thread // Comms packet queue for asynch thread
// Use OpenMP Tasks for cleaner ???
////////////////////////////////////////// //////////////////////////////////////////
void CommunicateThreaded() void CommunicateThreaded()
{ {
@ -371,6 +372,9 @@ public:
} }
commtime+= last-first; commtime+= last-first;
} }
////////////////////////////////////////////////////////////////////////
// Non blocking send and receive. Necessarily parallel.
////////////////////////////////////////////////////////////////////////
void CommunicateBegin(std::vector<std::vector<CommsRequest_t> > &reqs) void CommunicateBegin(std::vector<std::vector<CommsRequest_t> > &reqs)
{ {
reqs.resize(Packets.size()); reqs.resize(Packets.size());
@ -394,41 +398,44 @@ public:
} }
commtime+=usecond(); commtime+=usecond();
} }
////////////////////////////////////////////////////////////////////////
// Blocking send and receive. Either sequential or parallel.
////////////////////////////////////////////////////////////////////////
void Communicate(void) void Communicate(void)
{ {
thread_region if ( CartesianCommunicator::CommunicatorPolicy == CartesianCommunicator::CommunicatorPolicySequential ){
{ thread_region {
// must be called in parallel region // must be called in parallel region
int mythread = thread_num(); int mythread = thread_num();
int maxthreads= thread_max(); int maxthreads= thread_max();
int nthreads = CartesianCommunicator::nCommThreads; int nthreads = CartesianCommunicator::nCommThreads;
assert(nthreads <= maxthreads); assert(nthreads <= maxthreads);
if (nthreads == -1) nthreads = 1; if (nthreads == -1) nthreads = 1;
if (mythread < nthreads) { if (mythread < nthreads) {
for (int i = mythread; i < Packets.size(); i += nthreads) { for (int i = mythread; i < Packets.size(); i += nthreads) {
double start = usecond(); double start = usecond();
uint64_t bytes= _grid->StencilSendToRecvFrom(Packets[i].send_buf, uint64_t bytes= _grid->StencilSendToRecvFrom(Packets[i].send_buf,
Packets[i].to_rank, Packets[i].to_rank,
Packets[i].recv_buf, Packets[i].recv_buf,
Packets[i].from_rank, Packets[i].from_rank,
Packets[i].bytes,i); Packets[i].bytes,i);
comm_bytes_thr[mythread] += bytes; comm_bytes_thr[mythread] += bytes;
shm_bytes_thr[mythread] += Packets[i].bytes - bytes; shm_bytes_thr[mythread] += Packets[i].bytes - bytes;
comm_time_thr[mythread] += usecond() - start; comm_time_thr[mythread] += usecond() - start;
}
} }
} }
} else { // Concurrent and non-threaded asynch calls to MPI
std::vector<std::vector<CommsRequest_t> > reqs;
this->CommunicateBegin(reqs);
this->CommunicateComplete(reqs);
} }
} }
template<class compressor> void HaloExchange(const Lattice<vobj> &source,compressor &compress) template<class compressor> void HaloExchange(const Lattice<vobj> &source,compressor &compress)
{ {
std::vector<std::vector<CommsRequest_t> > reqs;
Prepare(); Prepare();
HaloGather(source,compress); HaloGather(source,compress);
// Concurrent
//CommunicateBegin(reqs);
//CommunicateComplete(reqs);
// Sequential, possibly threaded
Communicate(); Communicate();
CommsMergeSHM(compress); CommsMergeSHM(compress);
CommsMerge(compress); CommsMerge(compress);