mirror of
https://github.com/paboyle/Grid.git
synced 2025-04-09 21:50:45 +01:00
Commit
This commit is contained in:
parent
135808dcfa
commit
6616d5d090
@ -240,6 +240,19 @@ public:
|
|||||||
cobj * mpi_p;
|
cobj * mpi_p;
|
||||||
Integer buffer_size;
|
Integer buffer_size;
|
||||||
};
|
};
|
||||||
|
struct CopyReceiveBuffer {
|
||||||
|
void * from_p;
|
||||||
|
void * to_p;
|
||||||
|
Integer bytes;
|
||||||
|
};
|
||||||
|
struct CachedTransfer {
|
||||||
|
Integer direction;
|
||||||
|
Integer OrthogPlane;
|
||||||
|
Integer DestProc;
|
||||||
|
Integer bytes;
|
||||||
|
Integer lane;
|
||||||
|
void *recv_buf;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
@ -271,7 +284,8 @@ public:
|
|||||||
std::vector<Merge> MergersSHM;
|
std::vector<Merge> MergersSHM;
|
||||||
std::vector<Decompress> Decompressions;
|
std::vector<Decompress> Decompressions;
|
||||||
std::vector<Decompress> DecompressionsSHM;
|
std::vector<Decompress> DecompressionsSHM;
|
||||||
|
std::vector<CopyReceiveBuffer> CopyReceiveBuffers ;
|
||||||
|
std::vector<CachedTransfer> CachedTransfers;
|
||||||
///////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////
|
||||||
// Unified Comms buffers for all directions
|
// Unified Comms buffers for all directions
|
||||||
///////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////
|
||||||
@ -551,8 +565,57 @@ public:
|
|||||||
Mergers.resize(0);
|
Mergers.resize(0);
|
||||||
MergersSHM.resize(0);
|
MergersSHM.resize(0);
|
||||||
Packets.resize(0);
|
Packets.resize(0);
|
||||||
|
CopyReceiveBuffers.resize(0);
|
||||||
|
CachedTransfers.resize(0);
|
||||||
calls++;
|
calls++;
|
||||||
}
|
}
|
||||||
|
void AddCopy(void *from,void * to, Integer bytes)
|
||||||
|
{
|
||||||
|
CopyReceiveBuffer obj;
|
||||||
|
obj.from_p = from;
|
||||||
|
obj.to_p = to;
|
||||||
|
obj.bytes= bytes;
|
||||||
|
CopyReceiveBuffers.push_back(obj);
|
||||||
|
}
|
||||||
|
void CommsCopy()
|
||||||
|
{
|
||||||
|
// These are device resident MPI buffers.
|
||||||
|
for(int i=0;i<CopyReceiveBuffers.size();i++){
|
||||||
|
cobj *from=(cobj *)CopyReceiveBuffers[i].from_p;
|
||||||
|
cobj *to =(cobj *)CopyReceiveBuffers[i].to_p;
|
||||||
|
Integer words = CopyReceiveBuffers[i].bytes/sizeof(cobj);
|
||||||
|
accelerator_forNB(j, words, cobj::Nsimd(), {
|
||||||
|
to[j] = from [j]; // Start with a debug check that these give the same data.
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer CheckForDuplicate(Integer direction, Integer OrthogPlane, Integer DestProc, void *recv_buf,Integer lane,Integer bytes)
|
||||||
|
{
|
||||||
|
CachedTransfer obj;
|
||||||
|
obj.direction = direction;
|
||||||
|
obj.OrthogPlane = OrthogPlane;
|
||||||
|
obj.DestProc = DestProc;
|
||||||
|
obj.recv_buf = recv_buf;
|
||||||
|
obj.bytes = bytes;
|
||||||
|
obj.lane = lane;
|
||||||
|
for(int i=0;i<CachedTransfers.size();i++){
|
||||||
|
if ( (CachedTransfers[i].direction ==direction)
|
||||||
|
&&(CachedTransfers[i].OrthogPlane==OrthogPlane)
|
||||||
|
&&(CachedTransfers[i].DestProc ==DestProc)
|
||||||
|
&&(CachedTransfers[i].bytes ==bytes)
|
||||||
|
&&(CachedTransfers[i].lane ==lane)
|
||||||
|
){
|
||||||
|
std::cout << "Found duplicate copy plane dir "<<direction<<" plane "<< OrthogPlane<< " simd "<<lane << " relproc "<<DestProc<<std::endl;
|
||||||
|
AddCopy(CachedTransfers[i].recv_buf,recv_buf,bytes);
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
CachedTransfers.push_back(obj);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
void AddPacket(void *xmit,void * rcv, Integer to,Integer from,Integer bytes){
|
void AddPacket(void *xmit,void * rcv, Integer to,Integer from,Integer bytes){
|
||||||
Packet p;
|
Packet p;
|
||||||
p.send_buf = xmit;
|
p.send_buf = xmit;
|
||||||
@ -578,6 +641,7 @@ public:
|
|||||||
mv.push_back(m);
|
mv.push_back(m);
|
||||||
}
|
}
|
||||||
template<class decompressor> void CommsMerge(decompressor decompress) {
|
template<class decompressor> void CommsMerge(decompressor decompress) {
|
||||||
|
CommsCopy();
|
||||||
CommsMerge(decompress,Mergers,Decompressions);
|
CommsMerge(decompress,Mergers,Decompressions);
|
||||||
}
|
}
|
||||||
template<class decompressor> void CommsMergeSHM(decompressor decompress) {
|
template<class decompressor> void CommsMergeSHM(decompressor decompress) {
|
||||||
@ -590,8 +654,8 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
template<class decompressor>
|
template<class decompressor>
|
||||||
void CommsMerge(decompressor decompress,std::vector<Merge> &mm,std::vector<Decompress> &dd) {
|
void CommsMerge(decompressor decompress,std::vector<Merge> &mm,std::vector<Decompress> &dd)
|
||||||
|
{
|
||||||
|
|
||||||
mergetime-=usecond();
|
mergetime-=usecond();
|
||||||
for(int i=0;i<mm.size();i++){
|
for(int i=0;i<mm.size();i++){
|
||||||
@ -1011,9 +1075,11 @@ public:
|
|||||||
|
|
||||||
int sx = (x+sshift)%rd;
|
int sx = (x+sshift)%rd;
|
||||||
int comm_proc = ((x+sshift)/rd)%pd;
|
int comm_proc = ((x+sshift)/rd)%pd;
|
||||||
|
|
||||||
if (comm_proc) {
|
if (comm_proc) {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int words = buffer_size;
|
int words = buffer_size;
|
||||||
if (cbmask != 0x3) words=words>>1;
|
if (cbmask != 0x3) words=words>>1;
|
||||||
|
|
||||||
@ -1045,9 +1111,10 @@ public:
|
|||||||
recv_buf=this->u_recv_buf_p;
|
recv_buf=this->u_recv_buf_p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
cobj *send_buf;
|
cobj *send_buf;
|
||||||
send_buf = this->u_send_buf_p; // Gather locally, must send
|
send_buf = this->u_send_buf_p; // Gather locally, must send
|
||||||
|
|
||||||
////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////
|
||||||
// Gather locally
|
// Gather locally
|
||||||
////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////
|
||||||
@ -1056,23 +1123,27 @@ public:
|
|||||||
Gather_plane_simple_table(face_table[face_idx],rhs,send_buf,compress,u_comm_offset,so); face_idx++;
|
Gather_plane_simple_table(face_table[face_idx],rhs,send_buf,compress,u_comm_offset,so); face_idx++;
|
||||||
gathertime+=usecond();
|
gathertime+=usecond();
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////
|
int duplicate = CheckForDuplicate(dimension,x,comm_proc,(void *)&recv_buf[u_comm_offset],0,bytes);
|
||||||
// Build a list of things to do after we synchronise GPUs
|
if (!duplicate || 1) { // Force comms for now
|
||||||
// Start comms now???
|
|
||||||
///////////////////////////////////////////////////////////
|
|
||||||
AddPacket((void *)&send_buf[u_comm_offset],
|
|
||||||
(void *)&recv_buf[u_comm_offset],
|
|
||||||
xmit_to_rank,
|
|
||||||
recv_from_rank,
|
|
||||||
bytes);
|
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////
|
||||||
|
// Build a list of things to do after we synchronise GPUs
|
||||||
|
// Start comms now???
|
||||||
|
///////////////////////////////////////////////////////////
|
||||||
|
AddPacket((void *)&send_buf[u_comm_offset],
|
||||||
|
(void *)&recv_buf[u_comm_offset],
|
||||||
|
xmit_to_rank,
|
||||||
|
recv_from_rank,
|
||||||
|
bytes);
|
||||||
|
}
|
||||||
|
|
||||||
if ( compress.DecompressionStep() ) {
|
if ( compress.DecompressionStep() ) {
|
||||||
AddDecompress(&this->u_recv_buf_p[u_comm_offset],
|
AddDecompress(&this->u_recv_buf_p[u_comm_offset],
|
||||||
&recv_buf[u_comm_offset],
|
&recv_buf[u_comm_offset],
|
||||||
words,Decompressions);
|
words,Decompressions);
|
||||||
}
|
}
|
||||||
u_comm_offset+=words;
|
u_comm_offset+=words;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1181,8 +1252,10 @@ public:
|
|||||||
|
|
||||||
rpointers[i] = rp;
|
rpointers[i] = rp;
|
||||||
|
|
||||||
AddPacket((void *)sp,(void *)rp,xmit_to_rank,recv_from_rank,bytes);
|
int duplicate = CheckForDuplicate(dimension,x,nbr_proc,(void *)rp,i,bytes);
|
||||||
|
if (!duplicate || 1 ) { // Force comms for now
|
||||||
|
AddPacket((void *)sp,(void *)rp,xmit_to_rank,recv_from_rank,bytes);
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user