1
0
mirror of https://github.com/paboyle/Grid.git synced 2025-07-12 19:27:05 +01:00

Compressed comms options as Sloppy

This commit is contained in:
Peter Boyle
2025-06-17 16:43:53 +02:00
parent 6ec5cee368
commit 9d6a38c44c
2 changed files with 117 additions and 75 deletions

View File

@ -30,25 +30,26 @@
NAMESPACE_BEGIN(Grid);
uint64_t DslashFullCount;
uint64_t DslashPartialCount;
//uint64_t DslashPartialCount;
uint64_t DslashDirichletCount;
void DslashResetCounts(void)
{
DslashFullCount=0;
DslashPartialCount=0;
// DslashPartialCount=0;
DslashDirichletCount=0;
}
void DslashGetCounts(uint64_t &dirichlet,uint64_t &partial,uint64_t &full)
{
dirichlet = DslashDirichletCount;
partial = DslashPartialCount;
partial = 0;
full = DslashFullCount;
}
void DslashLogFull(void) { DslashFullCount++;}
void DslashLogPartial(void) { DslashPartialCount++;}
//void DslashLogPartial(void) { DslashPartialCount++;}
void DslashLogDirichlet(void){ DslashDirichletCount++;}
deviceVector<unsigned char> StencilBuffer::DeviceCommBuf;
void Gather_plane_table_compute (GridBase *grid,int dimension,int plane,int cbmask,
int off,std::vector<std::pair<int,int> > & table)

View File

@ -69,6 +69,12 @@ struct DefaultImplParams {
void Gather_plane_table_compute (GridBase *grid,int dimension,int plane,int cbmask,
int off,std::vector<std::pair<int,int> > & table);
class StencilBuffer
{
public:
static deviceVector<unsigned char> DeviceCommBuf; // placed in Stencil.cc
};
void DslashResetCounts(void);
void DslashGetCounts(uint64_t &dirichlet,uint64_t &partial,uint64_t &full);
void DslashLogFull(void);
@ -207,10 +213,6 @@ public:
void * recv_buf;
void * compressed_send_buf;
void * compressed_recv_buf;
#ifndef ACCELERATOR_AWARE_MPI
void * host_send_buf; // Allocate this if not MPI_CUDA_AWARE
void * host_recv_buf; // Allocate this if not MPI_CUDA_AWARE
#endif
Integer to_rank;
Integer from_rank;
Integer do_send;
@ -256,6 +258,36 @@ public:
protected:
GridBase * _grid;
///////////////////////////////////////////////////
// Sloppy comms will make a second buffer upon comms
///////////////////////////////////////////////////
size_t device_heap_top; //
size_t device_heap_bytes;//
size_t device_heap_size; //
void *DeviceBufferMalloc(size_t bytes)
{
void *ptr = (void *)device_heap_top;
device_heap_top += bytes;
device_heap_bytes+= bytes;
if ( device_heap_bytes > device_heap_size ) {
std::cout << "DeviceBufferMalloc overflow bytes "<<bytes<<" heap bytes "<<device_heap_bytes<<" heap size "<<device_heap_size<<std::endl;
assert (device_heap_bytes <= device_heap_size);
}
return ptr;
}
void DeviceBufferFreeAll(void)
{
device_heap_size = _unified_buffer_size*sizeof(cobj);
// Resize up if necessary, never down
if ( StencilBuffer::DeviceCommBuf.size() < device_heap_size ) {
StencilBuffer::DeviceCommBuf.resize(device_heap_size);
}
device_heap_top =(size_t) &StencilBuffer::DeviceCommBuf[0];
device_heap_size = StencilBuffer::DeviceCommBuf.size();
device_heap_bytes=0;
}
public:
GridBase *Grid(void) const { return _grid; }
@ -375,7 +407,7 @@ public:
{
if ( !SloppyComms ) return;
if ( packet.do_recv ) {
if ( packet.do_recv && _grid->IsOffNode(packet.from_rank) ) {
typedef typename getPrecision<cobj>::real_scalar_type word;
uint64_t words = packet.rbytes/sizeof(word);
@ -387,39 +419,27 @@ public:
// Can either choose to represent as float vs double and prec change
// OR
// truncate the mantissa bfp16 style
double *dbuf =(double *) packet.recv_buf;
float *fbuf =(float *) packet.compressed_recv_buf;
static deviceVector<uint32_t> compression_buffer;
if(words > compression_buffer.size() ) compression_buffer.resize(words);
uint64_t *fbuf =(uint64_t *) &packet.recv_buf;
uint32_t *fhbuf=(uint32_t *) &packet.recv_buf;
uint32_t *hbuf =(uint32_t *) &compression_buffer[0];
accelerator_for(ss,outer,nsimd,{
hbuf[ss*nsimd+lane] = fhbuf[ss*nsimd+lane]; // copy at half precision
});
accelerator_for(ss,outer,nsimd,{
accelerator_forNB(ss,outer,nsimd,{
int lane = acceleratorSIMTlane(nsimd);
fbuf[ss*nsimd+lane] = ((uint64_t)hbuf[ss*nsimd+lane])<<32; //copy back and pad each word with zeroes
dbuf[ss*nsimd+lane] = fbuf[ss*nsimd+lane]; //conversion
});
} else if ( sizeof(word)==4){
// Can either choose to represent as half vs float and prec change
// OR
// truncate the mantissa bfp16 style
static deviceVector<uint16_t> compression_buffer;
if(words > compression_buffer.size() ) compression_buffer.resize(words);
uint32_t *fbuf =(uint32_t *) packet.recv_buf;
uint16_t *fhbuf=(uint16_t *) packet.recv_buf;
uint16_t *hbuf =(uint16_t *) &compression_buffer[0];
accelerator_for(ss,outer,nsimd,{
hbuf[ss*nsimd+lane] = fhbuf[ss*nsimd+lane];
});
accelerator_for(ss,outer,nsimd,{
uint16_t *hbuf =(uint16_t *) packet.compressed_recv_buf;
accelerator_forNB(ss,outer,nsimd,{
int lane = acceleratorSIMTlane(nsimd);
fbuf[ss*nsimd+lane] = ((uint32_t)hbuf[ss*nsimd+lane])<<16; //copy back and pad each word with zeroes
});
} else {
assert(0 && "unknown floating point precision");
}
@ -427,9 +447,13 @@ public:
}
void CompressPacket(Packet &packet)
{
if ( !SloppyComms ) {
packet.xbytes_compressed = packet.xbytes;
packet.compressed_send_buf = packet.send_buf;
packet.rbytes_compressed = packet.rbytes;
packet.compressed_recv_buf = packet.recv_buf;
if ( !SloppyComms ) {
return;
}
@ -438,70 +462,84 @@ public:
const int nsimd = sizeof(typename cobj::vector_type)/sizeof(word);
const uint64_t outer = words/nsimd;
if (packet.do_send) {
if (packet.do_recv && _grid->IsOffNode(packet.from_rank) ) {
packet.rbytes_compressed = packet.rbytes/2;
packet.compressed_recv_buf = DeviceBufferMalloc(packet.rbytes_compressed);
// std::cout << " CompressPacket recv from "<<packet.from_rank<<" "<<std::hex<<packet.compressed_recv_buf<<std::dec<<std::endl;
}
//else {
// std::cout << " CompressPacket recv is uncompressed from "<<packet.from_rank<<" "<<std::hex<<packet.compressed_recv_buf<<std::dec<<std::endl;
// }
if (packet.do_send && _grid->IsOffNode(packet.to_rank) ) {
packet.xbytes_compressed = packet.xbytes/2;
packet.compressed_send_buf = DeviceBufferMalloc(packet.xbytes_compressed);
// std::cout << " CompressPacket send to "<<packet.to_rank<<" "<<std::hex<<packet.compressed_send_buf<<std::dec<<std::endl;
if(sizeof(word)==8) {
static deviceVector<uint32_t> compression_buffer;
double *dbuf =(double *) packet.send_buf;
float *fbuf =(float *) packet.compressed_send_buf;
if(words > compression_buffer.size() ) compression_buffer.resize(words);
uint64_t *fbuf =(uint64_t *) packet.send_buf;
uint32_t *fhbuf=(uint32_t *) packet.send_buf;
uint32_t *hbuf =(uint32_t *) &compression_buffer[0];
accelerator_for(ss,outer,nsimd,{
accelerator_forNB(ss,outer,nsimd,{
int lane = acceleratorSIMTlane(nsimd);
hbuf[ss*nsimd+lane] = fbuf[ss*nsimd+lane]>>32; // truncate and copy
});
accelerator_for(ss,outer,nsimd,{
fhbuf[ss*nsimd+lane] = hbuf[ss*nsimd+lane]; // copy back
fbuf[ss*nsimd+lane] = dbuf[ss*nsimd+lane]; // convert fp64 to fp32
});
} else if ( sizeof(word)==4){
static deviceVector<uint16_t> compression_buffer;
if(words > compression_buffer.size() ) compression_buffer.resize(words);
uint32_t *fbuf =(uint32_t *) packet.send_buf;
uint16_t *fhbuf=(uint16_t *) packet.send_buf;
uint16_t *hbuf =(uint16_t *) &compression_buffer[0];
accelerator_for(ss,outer,nsimd,{
uint16_t *hbuf =(uint16_t *) packet.compressed_send_buf;
accelerator_forNB(ss,outer,nsimd,{
int lane = acceleratorSIMTlane(nsimd);
hbuf[ss*nsimd+lane] = fbuf[ss*nsimd+lane]>>16;
});
accelerator_for(ss,outer,nsimd,{
fhbuf[ss*nsimd+lane] = hbuf[ss*nsimd+lane];
hbuf[ss*nsimd+lane] = fbuf[ss*nsimd+lane]>>16; // convert as in Bagel/BFM ; bfloat16 ; s7e8 Intel patent
});
} else {
assert(0 && "unknown floating point precision");
}
}
packet.xbytes_compressed = packet.xbytes/2;
packet.rbytes_compressed = packet.rbytes/2;
// else {
// std::cout << " CompressPacket send is uncompressed to "<<packet.to_rank<<" "<<std::hex<<packet.compressed_send_buf<<std::dec<<std::endl;
// }
return;
}
void CommunicateBegin(std::vector<std::vector<CommsRequest_t> > &reqs)
{
// std::cout << "Communicate Begin "<<std::endl;
// _grid->Barrier();
FlightRecorder::StepLog("Communicate begin");
///////////////////////////////////////////////
// All GPU kernel tasks must complete
// accelerator_barrier(); // All kernels should ALREADY be complete
// _grid->StencilBarrier(); // Everyone is here, so noone running slow and still using receive buffer
// accelerator_barrier(); All kernels should ALREADY be complete
//Everyone is here, so noone running slow and still using receive buffer
_grid->StencilBarrier();
// But the HaloGather had a barrier too.
///////////////////////////////////////////////
if (SloppyComms) {
DeviceBufferFreeAll();
}
for(int i=0;i<Packets.size();i++){
this->CompressPacket(Packets[i]);
}
if (SloppyComms) {
accelerator_barrier();
#ifdef NVLINK_GET
_grid->StencilBarrier();
#endif
}
for(int i=0;i<Packets.size();i++){
// std::cout << "Communicate prepare "<<i<<std::endl;
// _grid->Barrier();
_grid->StencilSendToRecvFromPrepare(MpiReqs,
Packets[i].send_buf,
Packets[i].compressed_send_buf,
Packets[i].to_rank,Packets[i].do_send,
Packets[i].recv_buf,
Packets[i].compressed_recv_buf,
Packets[i].from_rank,Packets[i].do_recv,
Packets[i].xbytes_compressed,Packets[i].rbytes_compressed,i);
}
@ -514,19 +552,22 @@ public:
// Starts intranode
for(int i=0;i<Packets.size();i++){
// std::cout << "Communicate Begin "<<i<<std::endl;
// _grid->Barrier();
_grid->StencilSendToRecvFromBegin(MpiReqs,
Packets[i].send_buf,
Packets[i].send_buf,Packets[i].compressed_send_buf,
Packets[i].to_rank,Packets[i].do_send,
Packets[i].recv_buf,
Packets[i].recv_buf,Packets[i].compressed_recv_buf,
Packets[i].from_rank,Packets[i].do_recv,
Packets[i].xbytes_compressed,Packets[i].rbytes_compressed,i);
// std::cout << "Communicate Begin started "<<i<<std::endl;
// _grid->Barrier();
}
FlightRecorder::StepLog("Communicate begin has finished");
// Get comms started then run checksums
// Having this PRIOR to the dslash seems to make Sunspot work... (!)
for(int i=0;i<Packets.size();i++){
if ( Packets[i].do_send )
FlightRecorder::xmitLog(Packets[i].send_buf,Packets[i].xbytes_compressed);
FlightRecorder::xmitLog(Packets[i].compressed_send_buf,Packets[i].xbytes_compressed);
}
}
@ -547,9 +588,9 @@ public:
// acceleratorCopySynchronise();// is in the StencilSendToRecvFromComplete
// accelerator_barrier();
for(int i=0;i<Packets.size();i++){
if ( Packets[i].do_recv )
FlightRecorder::recvLog(Packets[i].recv_buf,Packets[i].rbytes_compressed,Packets[i].from_rank);
this->DecompressPacket(Packets[i]);
if ( Packets[i].do_recv )
FlightRecorder::recvLog(Packets[i].compressed_recv_buf,Packets[i].rbytes_compressed,Packets[i].from_rank);
}
FlightRecorder::StepLog("Finish communicate complete");
}
@ -994,7 +1035,7 @@ public:
/////////////////////////////////////////////////////////////////////////////////
const int Nsimd = grid->Nsimd();
// Allow for multiple stencils to exist simultaneously
// Allow for multiple stencils to be communicated simultaneously
if (!preserve_shm)
_grid->ShmBufferFreeAll();