1
0
mirror of https://github.com/paboyle/Grid.git synced 2025-06-16 23:07:05 +01:00

Pipeline mode commit on Aurora. 5+ TF/s on 16^3x32 per tile at 384

nodes.
More concurrency/fine grained scheduling is possible.
This commit is contained in:
2025-02-04 19:27:26 +00:00
parent b50fb34e71
commit 0baaddbe98
9 changed files with 151 additions and 81 deletions

View File

@ -192,6 +192,11 @@ public:
void *recv,
int recv_from_rank,int do_recv,
int xbytes,int rbytes,int dir);
// Could do a PollHtoD and have a CommsMerge dependence
void StencilSendToRecvFromPollDtoH (std::vector<CommsRequest_t> &list);
void StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list);
double StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
void *xmit,
int xmit_to_rank,int do_xmit,

View File

@ -399,6 +399,8 @@ double CartesianCommunicator::StencilSendToRecvFrom( void *xmit,
#ifdef ACCELERATOR_AWARE_MPI
void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list) {};
void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector<CommsRequest_t> &list) {};
double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequest_t> &list,
void *xmit,
int dest,int dox,
@ -561,53 +563,105 @@ double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequ
if (dox) {
if ( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) {
#undef DEVICE_TO_HOST_CONCURRENT // pipeline
#ifdef DEVICE_TO_HOST_CONCURRENT
tag= dir+_processor*32;
host_xmit = this->HostBufferMalloc(xbytes);
acceleratorCopyFromDeviceAsynch(xmit, host_xmit,xbytes); // Make this Asynch
CommsRequest_t srq;
srq.ev = acceleratorCopyFromDeviceAsynch(xmit, host_xmit,xbytes); // Make this Asynch
// ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
// assert(ierr==0);
// off_node_bytes+=xbytes;
CommsRequest_t srq;
srq.PacketType = InterNodeXmit;
srq.bytes = xbytes;
// srq.req = xrq;
srq.host_buf = host_xmit;
srq.device_buf = xmit;
srq.tag = tag;
srq.dest = dest;
srq.commdir = commdir;
list.push_back(srq);
#else
tag= dir+_processor*32;
host_xmit = this->HostBufferMalloc(xbytes);
const int chunks=1;
for(int n=0;n<chunks;n++){
void * host_xmitc = (void *)( (uint64_t) host_xmit + n*xbytes/chunks);
void * xmitc = (void *)( (uint64_t) xmit + n*xbytes/chunks);
acceleratorCopyFromDeviceAsynch(xmitc, host_xmitc,xbytes/chunks); // Make this Asynch
}
acceleratorCopySynchronise(); // Complete all pending copy transfers
ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
assert(ierr==0);
off_node_bytes+=xbytes;
CommsRequest_t srq;
srq.PacketType = InterNodeXmit;
srq.bytes = xbytes;
srq.req = xrq;
srq.host_buf = host_xmit;
srq.device_buf = xmit;
list.push_back(srq);
#endif
}
}
return off_node_bytes;
}
/*
* In the interest of better pipelining, poll for completion on each DtoH and
* start MPI_ISend in the meantime
*/
void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list)
{
int pending = 0;
do {
pending = 0;
for(int idx = 0; idx<list.size();idx++){
if ( list[idx].PacketType==InterNodeRecv ) {
int flag = 0;
MPI_Status status;
int ierr = MPI_Test(&list[idx].req,&flag,&status);
assert(ierr==0);
if ( flag ) {
// std::cout << " PollIrecv "<<idx<<" flag "<<flag<<std::endl;
acceleratorCopyToDeviceAsynch(list[idx].host_buf,list[idx].device_buf,list[idx].bytes);
list[idx].PacketType=InterNodeReceiveHtoD;
} else {
pending ++;
}
}
}
// std::cout << " PollIrecv "<<pending<<" pending requests"<<std::endl;
} while ( pending );
}
void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector<CommsRequest_t> &list)
{
int pending = 0;
do {
pending = 0;
for(int idx = 0; idx<list.size();idx++){
if ( list[idx].PacketType==InterNodeXmit ) {
if ( acceleratorEventIsComplete(list[idx].ev) ) {
void *host_xmit = list[idx].host_buf;
uint32_t xbytes = list[idx].bytes;
int dest = list[idx].dest;
int tag = list[idx].tag;
int commdir = list[idx].commdir;
///////////////////
// Send packet
///////////////////
// std::cout << " DtoH is complete for index "<<idx<<" calling MPI_Isend "<<std::endl;
MPI_Request xrq;
int ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
assert(ierr==0);
list[idx].req = xrq; // Update the MPI request in the list
list[idx].PacketType=InterNodeXmitISend;
} else {
// not done, so return to polling loop
pending++;
}
}
}
} while (pending);
}
double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
void *xmit,
@ -644,37 +698,10 @@ double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsReques
* - complete all copies
* - post MPI send asynch
*/
// static int printed;
// if((printed<8) && this->IsBoss() ) {
// printf("dir %d doX %d doR %d Face size %ld %ld\n",dir,dox,dor,xbytes,rbytes);
// printed++;
// }
if (dox) {
if ( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) {
#ifdef DEVICE_TO_HOST_CONCURRENT
tag= dir+_processor*32;
// Find the send in the prepared list
int list_idx=-1;
for(int idx = 0; idx<list.size();idx++){
if ( (list[idx].device_buf==xmit)
&&(list[idx].PacketType==InterNodeXmit)
&&(list[idx].bytes==xbytes) ) {
list_idx = idx;
host_xmit = list[idx].host_buf;
}
}
assert(list_idx != -1); // found it
ierr =MPI_Isend(host_xmit, xbytes, MPI_CHAR,dest,tag,communicator_halo[commdir],&xrq);
assert(ierr==0);
list[list_idx].req = xrq; // Update the MPI request in the list
off_node_bytes+=xbytes;
#endif
} else {
if ( !( (gdest == MPI_UNDEFINED) || Stencil_force_mpi ) ) {
// Intranode
void *shm = (void *) this->ShmBufferTranslate(dest,recv);
assert(shm!=NULL);
acceleratorCopyDeviceToDeviceAsynch(xmit,shm,xbytes);
@ -686,7 +713,7 @@ void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsReque
{
int nreq=list.size();
if (nreq==0) return;
// if (nreq==0) return;
std::vector<MPI_Status> status(nreq);
std::vector<MPI_Request> MpiRequests(nreq);
@ -694,16 +721,17 @@ void CartesianCommunicator::StencilSendToRecvFromComplete(std::vector<CommsReque
MpiRequests[r] = list[r].req;
}
int ierr = MPI_Waitall(nreq,&MpiRequests[0],&status[0]);
int ierr = MPI_Waitall(nreq,&MpiRequests[0],&status[0]); // must at least wait for sends
assert(ierr==0);
for(int r=0;r<nreq;r++){
if ( list[r].PacketType==InterNodeRecv ) {
acceleratorCopyToDeviceAsynch(list[r].host_buf,list[r].device_buf,list[r].bytes);
}
}
// for(int r=0;r<nreq;r++){
// if ( list[r].PacketType==InterNodeRecv ) {
// acceleratorCopyToDeviceAsynch(list[r].host_buf,list[r].device_buf,list[r].bytes);
// }
// }
acceleratorCopySynchronise(); // Complete all pending copy transfers D2D
acceleratorCopySynchronise(); // Complete all pending copy transfers
list.resize(0); // Delete the list
this->HostBufferFreeAll(); // Clean up the buffer allocs
this->StencilBarrier();

View File

@ -132,6 +132,8 @@ double CartesianCommunicator::StencilSendToRecvFrom( void *xmit,
{
return 2.0*bytes;
}
void CartesianCommunicator::StencilSendToRecvFromPollIRecv(std::vector<CommsRequest_t> &list) {};
void CartesianCommunicator::StencilSendToRecvFromPollDtoH(std::vector<CommsRequest_t> &list) {};
double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequest_t> &list,
void *xmit,
int xmit_to_rank,int dox,
@ -139,7 +141,7 @@ double CartesianCommunicator::StencilSendToRecvFromPrepare(std::vector<CommsRequ
int recv_from_rank,int dor,
int xbytes,int rbytes, int dir)
{
return xbytes+rbytes;
return 0.0;
}
double CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_t> &list,
void *xmit,

View File

@ -50,12 +50,16 @@ typedef MPI_Request MpiCommsRequest_t;
#ifdef ACCELERATOR_AWARE_MPI
typedef MPI_Request CommsRequest_t;
#else
enum PacketType_t { InterNodeXmit, InterNodeRecv, IntraNodeXmit, IntraNodeRecv };
enum PacketType_t { InterNodeXmit, InterNodeRecv, IntraNodeXmit, IntraNodeRecv, InterNodeXmitISend, InterNodeReceiveHtoD };
typedef struct {
PacketType_t PacketType;
void *host_buf;
void *device_buf;
int dest;
int tag;
int commdir;
unsigned long bytes;
acceleratorEvent_t ev;
MpiCommsRequest_t req;
} CommsRequest_t;
#endif