1
0
mirror of https://github.com/paboyle/Grid.git synced 2024-11-09 23:45:36 +00:00

Threaded MPI calls patches

This commit is contained in:
Peter Boyle 2017-07-29 13:06:53 -04:00
parent 6f5a5cd9b3
commit 14d53e1c9e
8 changed files with 128 additions and 66 deletions

View File

@ -489,7 +489,7 @@ int main (int argc, char ** argv)
//assert(norm2(src_e)<1.0e-4);
//assert(norm2(src_o)<1.0e-4);
exit(0);
Grid_finalize();
}

View File

@ -92,11 +92,15 @@ public:
size_type bytes = __n*sizeof(_Tp);
_Tp *ptr = (_Tp *) PointerCache::Lookup(bytes);
//////////////////
// Hack 2MB align; could make option probably doesn't need configurability
//////////////////
//define GRID_ALLOC_ALIGN (128)
#define GRID_ALLOC_ALIGN (2*1024*1024)
#ifdef HAVE_MM_MALLOC_H
if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) _mm_malloc(bytes,128);
if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) _mm_malloc(bytes,GRID_ALLOC_ALIGN);
#else
if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) memalign(128,bytes);
if ( ptr == (_Tp *) NULL ) ptr = (_Tp *) memalign(GRID_ALLOC_ALIGN,bytes);
#endif
return ptr;

View File

@ -34,7 +34,9 @@ namespace Grid {
///////////////////////////////////////////////////////////////
void * CartesianCommunicator::ShmCommBuf;
uint64_t CartesianCommunicator::MAX_MPI_SHM_BYTES = 128*1024*1024;
CartesianCommunicator::CommunicatorPolicy_t CartesianCommunicator::CommunicatorPolicy= CartesianCommunicator::CommunicatorPolicyConcurrent;
CartesianCommunicator::CommunicatorPolicy_t
CartesianCommunicator::CommunicatorPolicy= CartesianCommunicator::CommunicatorPolicyConcurrent;
int CartesianCommunicator::nCommThreads = -1;
/////////////////////////////////
// Alloc, free shmem region

View File

@ -56,6 +56,7 @@ class CartesianCommunicator {
static const int MAXLOG2RANKSPERNODE = 16;
static uint64_t MAX_MPI_SHM_BYTES;
static int nCommThreads;
// Communicator should know nothing of the physics grid, only processor grid.
int _Nprocessors; // How many in all
@ -215,6 +216,12 @@ class CartesianCommunicator {
void SendToRecvFromComplete(std::vector<CommsRequest_t> &waitall);
double StencilSendToRecvFrom(void *xmit,
int xmit_to_rank,
void *recv,
int recv_from_rank,
int bytes,int dir);
double StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
void *xmit,
int xmit_to_rank,
@ -222,6 +229,7 @@ class CartesianCommunicator {
int recv_from_rank,
int bytes,int dir);
void StencilSendToRecvFromComplete(std::vector<CommsRequest_t> &waitall,int i);
void StencilBarrier(void);

View File

@ -242,7 +242,8 @@ double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsReques
int recv_from_rank,
int bytes,int dir)
{
assert(false);
/*
int myrank = _processor;
int ierr;
assert(dir < communicator_halo.size());
@ -254,6 +255,28 @@ double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsReques
communicator_halo[dir],MPI_STATUS_IGNORE);
assert(ierr==0);
return 2.0*bytes;
*/
}
double CartesianCommunicator::StencilSendToRecvFrom(void *xmit,
int xmit_to_rank,
void *recv,
int recv_from_rank,
int bytes,int dir)
{
int myrank = _processor;
int ierr;
assert(dir < communicator_halo.size());
// std::cout << " sending on communicator "<<dir<<" " <<communicator_halo[dir]<<std::endl;
// Give the CPU to MPI immediately; can use threads to overlap optionally
MPI_Request req[2];
MPI_Irecv(recv,bytes,MPI_CHAR,recv_from_rank,recv_from_rank,
communicator_halo[dir],&req[1]);
MPI_Isend(xmit,bytes,MPI_CHAR,xmit_to_rank,myrank,
communicator_halo[dir], &req[0]);
MPI_Waitall(2, req, MPI_STATUSES_IGNORE);
return 2.0*bytes;
}
void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsRequest_t> &waitall,int dir)
{

View File

@ -391,37 +391,57 @@ void WilsonFermion5D<Impl>::DhopInternalOverlappedComms(StencilImpl & st, Lebesg
DhopFaceTime+=usecond();
// Rely on async comms; start comms before merge of local data
DhopComputeTime-=usecond();
DhopCommTime-=usecond();
#pragma omp parallel
double ctime=0;
double ptime=0;
// DhopComputeTime-=usecond();
// DhopCommTime-=usecond();
#pragma omp parallel reduction(max:ctime) reduction(max:ptime)
{
// Should time this somehow; hard as the threads fork nowait
st.CommunicateThreaded();
int tid = omp_get_thread_num();
int nthreads = omp_get_num_threads();
int ncomms = CartesianCommunicator::nCommThreads;
if (ncomms == -1) ncomms = st.Packets.size();
assert(nthreads > ncomms);
if (tid >= ncomms) {
double start = usecond();
nthreads -= ncomms;
int ttid = tid - ncomms;
int n = U._grid->oSites();
int chunk = n / nthreads;
int rem = n % nthreads;
int myblock, myn;
if (ttid < rem) {
myblock = ttid * chunk + ttid;
myn = chunk+1;
} else {
myblock = ttid*chunk + rem;
myn = chunk;
}
// do the compute
if (dag == DaggerYes) {
#pragma omp for
for (int ss = 0; ss < U._grid->oSites(); ss++) {
for (int ss = myblock; ss < myblock+myn; ++ss) {
int sU = ss;
int sF = LLs * sU;
Kernels::DhopSiteDag(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0);
}
} else {
#pragma omp for
for (int ss = 0; ss < U._grid->oSites(); ss++) {
for (int ss = myblock; ss < myblock+myn; ++ss) {
int sU = ss;
int sF = LLs * sU;
Kernels::DhopSite(st,lo,U,st.CommBuf(),sF,sU,LLs,1,in,out,1,0);
}
}
#pragma omp single
DhopComputeTime+=usecond();
#pragma omp taskwait
#pragma omp single
DhopCommTime+=usecond();
} // Closes parallel region and waits the comms (I hope)
ptime = usecond() - start;
}
{
double start = usecond();
st.CommunicateThreaded();
ctime = usecond() - start;
}
}
DhopCommTime += ctime;
DhopComputeTime+=ptime;
DhopFaceTime-=usecond();
st.CommsMerge(compressor);

View File

@ -185,6 +185,8 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal
double splicetime;
double nosplicetime;
double calls;
std::vector<double> comms_bytesthr;
std::vector<double> commtimethr;
////////////////////////////////////////
// Stencil query
@ -250,36 +252,22 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal
//////////////////////////////////////////
void CommunicateThreaded()
{
for(int i=0;i<Packets.size();i++){
#pragma omp task
{
double start;
double stop;
start = usecond();
uint64_t bytes;
std::vector<CommsRequest_t> reqs;
bytes=_grid->StencilSendToRecvFromBegin(reqs,
Packets[i].send_buf,
// must be called in parallel region
int mythread = omp_get_thread_num();
int nthreads = CartesianCommunicator::nCommThreads;
if (nthreads == -1) nthreads = Packets.size();
if (mythread < nthreads) {
for (int i = mythread; i < Packets.size(); i += nthreads) {
double start = usecond();
uint64_t bytes = _grid->StencilSendToRecvFrom(Packets[i].send_buf,
Packets[i].to_rank,
Packets[i].recv_buf,
Packets[i].from_rank,
Packets[i].bytes,i);
_grid->StencilSendToRecvFromComplete(reqs,i);
// Last task logged; this is approximate but hard to catch
// the last to complete
stop = usecond();
stop = stop - start;
if ( i==0 ) commtime+=stop;
#pragma omp critical
{
comms_bytes+=bytes;
}
comms_bytesthr[mythread] += bytes;
commtimethr[mythread] += usecond() - start;
}
}
}
void CommunicateBegin(std::vector<std::vector<CommsRequest_t> > &reqs)
{
@ -475,7 +463,10 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal
int checkerboard,
const std::vector<int> &directions,
const std::vector<int> &distances)
: _permute_type(npoints), _comm_buf_size(npoints)
: _permute_type(npoints),
_comm_buf_size(npoints),
comms_bytesthr(npoints),
commtimethr(npoints)
{
face_table_computed=0;
_npoints = npoints;
@ -1029,6 +1020,8 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal
void ZeroCounters(void) {
gathertime = 0.;
commtime = 0.;
memset(&commtimethr[0], 0, sizeof(commtimethr));
memset(&comms_bytesthr[0], 0, sizeof(comms_bytesthr));
halogtime = 0.;
mergetime = 0.;
decompresstime = 0.;
@ -1044,6 +1037,14 @@ class CartesianStencil { // Stencil runs along coordinate axes only; NO diagonal
#define PRINTIT(A) AVERAGE(A); std::cout << GridLogMessage << " Stencil " << #A << " "<< A/calls<<std::endl;
RealD NP = _grid->_Nprocessors;
RealD NN = _grid->NodeCount();
double t = 0;
// if commtimethr is set they were all done in parallel so take the max
// but add up the bytes
for (int i = 0; i < 8; ++i) {
comms_bytes += comms_bytesthr[i];
if (t < commtimethr[i]) t = commtimethr[i];
}
commtime += t;
_grid->GlobalSum(commtime); commtime/=NP;
if ( calls > 0. ) {

View File

@ -359,7 +359,11 @@ void Grid_init(int *argc,char ***argv)
if( GridCmdOptionExists(*argv,*argv+*argc,"--lebesgue") ){
LebesgueOrder::UseLebesgueOrder=1;
}
CartesianCommunicator::nCommThreads = -1;
if( GridCmdOptionExists(*argv,*argv+*argc,"--commthreads") ){
arg= GridCmdOptionPayload(*argv,*argv+*argc,"--commthreads");
GridCmdOptionInt(arg,CartesianCommunicator::nCommThreads);
}
if( GridCmdOptionExists(*argv,*argv+*argc,"--cacheblocking") ){
arg= GridCmdOptionPayload(*argv,*argv+*argc,"--cacheblocking");
GridCmdOptionIntVector(arg,LebesgueOrder::Block);