1
0
mirror of https://github.com/paboyle/Grid.git synced 2024-11-10 07:55:35 +00:00

Attempts to speed up the parallel IO

This commit is contained in:
paboyle 2017-05-25 13:32:24 +01:00
parent a8c10b1933
commit b8b5934193
3 changed files with 133 additions and 91 deletions

View File

@ -228,11 +228,11 @@ class BinaryIO {
bytes += sizeof(fobj)*lx;
if (grid->IsBoss()) {
fin.read((char *)&file_object[0], sizeof(fobj)*lx); assert( fin.fail()==0);
if (ieee32big) be32toh_v((void *)&file_object[0], sizeof(fobj)*lx);
if (ieee32) le32toh_v((void *)&file_object[0], sizeof(fobj)*lx);
if (ieee64big) be64toh_v((void *)&file_object[0], sizeof(fobj)*lx);
if (ieee64) le64toh_v((void *)&file_object[0], sizeof(fobj)*lx);
for(int x=0;x<lx;x++){
if (ieee32big) be32toh_v((void *)&file_object[x], sizeof(fobj));
if (ieee32) le32toh_v((void *)&file_object[x], sizeof(fobj));
if (ieee64big) be64toh_v((void *)&file_object[x], sizeof(fobj));
if (ieee64) le64toh_v((void *)&file_object[x], sizeof(fobj));
munge(file_object[x], munged[x], csum);
}
}
@ -294,11 +294,12 @@ class BinaryIO {
if ( grid->IsBoss() ) {
for(int x=0;x<lx;x++){
munge(unmunged[x],file_object[x],csum);
if(ieee32big) htobe32_v((void *)&file_object[x],sizeof(fobj));
if(ieee32) htole32_v((void *)&file_object[x],sizeof(fobj));
if(ieee64big) htobe64_v((void *)&file_object[x],sizeof(fobj));
if(ieee64) htole64_v((void *)&file_object[x],sizeof(fobj));
}
if(ieee32big) htobe32_v((void *)&file_object[0],sizeof(fobj)*lx);
if(ieee32) htole32_v((void *)&file_object[0],sizeof(fobj)*lx);
if(ieee64big) htobe64_v((void *)&file_object[0],sizeof(fobj)*lx);
if(ieee64) htole64_v((void *)&file_object[0],sizeof(fobj)*lx);
fout.write((char *)&file_object[0],sizeof(fobj)*lx);assert( fout.fail()==0);
bytes+=sizeof(fobj)*lx;
}
@ -350,10 +351,14 @@ class BinaryIO {
int l_idx=parallel.generator_idx(o_idx,i_idx);
if( rank == grid->ThisRank() ){
// std::cout << "rank" << rank<<" Getting state for index "<<l_idx<<std::endl;
parallel.GetState(saved,l_idx);
}
grid->Broadcast(rank, (void *)&saved[0], bytes);
if ( rank != 0 ) {
grid->Broadcast(rank, (void *)&saved[0], bytes);
}
grid->Barrier();
if ( grid->IsBoss() ) {
Uint32Checksum((uint32_t *)&saved[0],bytes,csum);
@ -370,8 +375,9 @@ class BinaryIO {
grid->Broadcast(0, (void *)&csum, sizeof(csum));
if (grid->IsBoss())
if (grid->IsBoss()) {
fout.close();
}
timer.Stop();
@ -426,6 +432,7 @@ class BinaryIO {
}
grid->Broadcast(0,(void *)&saved[0],bytes);
grid->Barrier();
if( rank == grid->ThisRank() ){
parallel.SetState(saved,l_idx);
@ -434,8 +441,8 @@ class BinaryIO {
if ( grid->IsBoss() ) {
fin.read((char *)&saved[0],bytes);assert( fin.fail()==0);
serial.SetState(saved,0);
Uint32Checksum((uint32_t *)&saved[0],bytes,csum);
serial.SetState(saved,0);
}
std::cout << GridLogMessage << "RNG file checksum " << std::hex << csum << std::dec << std::endl;
@ -445,7 +452,6 @@ class BinaryIO {
return csum;
}
template <class vobj, class fobj, class munger>
static inline uint32_t readObjectParallel(Lattice<vobj> &Umu,
std::string file,
@ -528,6 +534,10 @@ class BinaryIO {
if (!ILDG.is_ILDG) {
if ( IOnode ) {
fin.open(file,std::ios::binary|std::ios::in);
if ( !fin.is_open() ) {
std::cout << GridLogMessage << "readObjectParallel: Error opening file " << file << std::endl;
exit(0);
}
}
}
@ -540,7 +550,7 @@ class BinaryIO {
static uint32_t csum; csum=0;//static for SHMEM
std::vector<fobj> fileObj(chunk); // FIXME
std::vector<sobj> siteObj(chunk); // Use comm allocator to place in symmetric region for SHMEM
std::vector<sobj> siteObj(chunk); // Use alignedAllocator to place in symmetric region for SHMEM
// need to implement these loops in Nd independent way with a lexico conversion
for(int tlex=0;tlex<slice_vol;tlex+=chunk){
@ -549,17 +559,13 @@ class BinaryIO {
std::vector<int> gsite(nd);
std::vector<int> lsite(nd);
Lexicographic::CoorFromIndex(tsite,tlex,range);
for(int d=0;d<nd;d++){
lsite[d] = tsite[d]%grid->_ldimensions[d]; // local site
gsite[d] = tsite[d]+start[d]; // global site
}
int rank, o_idx,i_idx, g_idx;
///////////////////////////////////////////
// Get the global lexico base of the chunk
///////////////////////////////////////////
int rank, o_idx,i_idx, g_idx;
Lexicographic::CoorFromIndex(tsite,tlex,range);
for(int d=0;d<nd;d++) gsite[d] = tsite[d]+start[d];
grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gsite);
grid->GlobalCoorToGlobalIndex(gsite,g_idx);
@ -571,11 +577,14 @@ class BinaryIO {
if (ILDG.is_ILDG){
#ifdef HAVE_LIME
// use C-LIME to populate the record
uint64_t sizeFO = sizeof(fobj)*chunk;
uint64_t sizeFO = sizeof(fobj);
uint64_t sizeChunk= sizeFO*chunk;
limeReaderSeek(ILDG.LR, g_idx*sizeFO, SEEK_SET);
int status = limeReaderReadData((void *)&fileObj[0], &sizeFO, ILDG.LR);
int status = limeReaderReadData((void *)&fileObj[0], &sizeChunk, ILDG.LR);
#else
assert(0);
#endif
} else{
} else {
fin.seekg(offset+g_idx*sizeof(fobj));
fin.read((char *)&fileObj[0],sizeof(fobj)*chunk);
}
@ -630,6 +639,7 @@ class BinaryIO {
return csum;
}
//////////////////////////////////////////////////////////
// Parallel writer
//////////////////////////////////////////////////////////
@ -643,9 +653,9 @@ class BinaryIO {
GridBase *grid = Umu._grid;
int ieee32big = (format == std::string("IEEE32BIG"));
int ieee32 = (format == std::string("IEEE32"));
int ieee32 = (format == std::string("IEEE32"));
int ieee64big = (format == std::string("IEEE64BIG"));
int ieee64 = (format == std::string("IEEE64"));
int ieee64 = (format == std::string("IEEE64"));
if (!(ieee32big || ieee32 || ieee64big || ieee64)) {
std::cout << GridLogError << "Unrecognized file format " << format << std::endl;
@ -658,7 +668,9 @@ class BinaryIO {
assert(grid->CheckerBoarded(d) == 0);
}
std::vector<int> parallel(nd, 1);
// Parallel in yzt, serial funnelled in "x".
// gx x ly chunk size
std::vector<int> parallel(nd, 1); parallel[0] = 0;
std::vector<int> ioproc(nd);
std::vector<int> start(nd);
std::vector<int> range(nd);
@ -666,9 +678,13 @@ class BinaryIO {
uint64_t slice_vol = 1;
int IOnode = 1;
int gstrip = grid->_gdimensions[0];
int lstrip = grid->_ldimensions[0];
int chunk;
if ( nd==1) chunk = gstrip;
else chunk = gstrip*grid->_ldimensions[1];
for (int d = 0; d < grid->_ndimension; d++) {
if (d != grid->_ndimension - 1) parallel[d] = 0;
if (parallel[d]) {
range[d] = grid->_ldimensions[d];
@ -688,14 +704,16 @@ class BinaryIO {
{
uint32_t tmp = IOnode;
grid->GlobalSum(tmp);
std::cout<< GridLogMessage<< "Parallel write I/O from "<< file
<< " with " <<tmp<< " IOnodes for subslice ";
std::cout<< GridLogMessage<< "Parallel write I/O from "<< file << " with " <<tmp<< " IOnodes for subslice ";
for(int d=0;d<grid->_ndimension;d++){
std::cout<< range[d];
if( d< grid->_ndimension-1 )
std::cout<< " x ";
}
std::cout << std::endl;
std::cout<< GridLogMessage<< "Parallel I/O local strip size is "<< lstrip <<std::endl;
std::cout<< GridLogMessage<< "Parallel I/O global strip size is "<< gstrip <<std::endl;
std::cout<< GridLogMessage<< "Parallel I/O chunk size is "<< chunk <<std::endl;
}
GridStopWatch timer;
@ -706,20 +724,18 @@ class BinaryIO {
int iorank = grid->RankFromProcessorCoor(ioproc);
// Take into account block size of parallel file systems want about
// 4-16MB chunks.
// Ideally one reader/writer per xy plane and read these contiguously
// with comms from nominated I/O nodes.
std::ofstream fout;
if (!ILDG.is_ILDG)
if (IOnode){
fout.open(file, std::ios::binary | std::ios::in | std::ios::out);
if (!fout.is_open()) {
std::cout << GridLogMessage << "writeObjectParallel: Error opening file " << file
<< std::endl;
exit(0);
}
}
if (!ILDG.is_ILDG) {
if (IOnode){
fout.open(file, std::ios::binary | std::ios::in | std::ios::out);
if (!fout.is_open()) {
std::cout << GridLogMessage << "writeObjectParallel: Error opening file " << file << std::endl;
exit(0);
}
}
}
//////////////////////////////////////////////////////////
// Find the location of each site and send to primary node
@ -729,72 +745,82 @@ class BinaryIO {
//////////////////////////////////////////////////////////
uint32_t csum = 0;
fobj fileObj;
static sobj siteObj; // static for SHMEM target; otherwise dynamic allocate
// with AlignedAllocator
std::vector<fobj> fileObj(chunk);
std::vector<sobj> siteObj(chunk);
// should aggregate a whole chunk and then write.
// need to implement these loops in Nd independent way with a lexico
// conversion
for (int tlex = 0; tlex < slice_vol; tlex++) {
for (int tlex = 0; tlex < slice_vol; tlex+=chunk) {
std::vector<int> tsite(nd); // temporary mixed up site
std::vector<int> gsite(nd);
std::vector<int> lsite(nd);
Lexicographic::CoorFromIndex(tsite, tlex, range);
for(int d = 0;d < nd; d++){
lsite[d] = tsite[d] % grid->_ldimensions[d]; // local site
gsite[d] = tsite[d] + start[d]; // global site
}
/////////////////////////
// Get the rank of owner of data
/////////////////////////
int rank, o_idx, i_idx, g_idx;
grid->GlobalCoorToRankIndex(rank, o_idx, i_idx, gsite);
grid->GlobalCoorToGlobalIndex(gsite, g_idx);
////////////////////////////////
// iorank writes from the seek
////////////////////////////////
// Possibly do transport through pt2pt
for(int cc=0;cc<chunk;cc+=lstrip){
// Owner of data peeks it
peekLocalSite(siteObj, Umu, lsite);
// Get the rank of owner of strip
Lexicographic::CoorFromIndex(tsite,tlex+cc,range);
// Pair of nodes may need to do pt2pt send
if ( rank != iorank ) { // comms is necessary
if ( (myrank == rank) || (myrank==iorank) ) { // and we have to do it
// Send to IOrank
grid->SendRecvPacket((void *)&siteObj,(void *)&siteObj,rank,iorank,sizeof(siteObj));
for(int d=0;d<nd;d++){
lsite[d] = tsite[d]%grid->_ldimensions[d]; // local site
gsite[d] = tsite[d]+start[d]; // global site
}
grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gsite);
// Owner of data peeks it over lstrip
if ( myrank == rank ) {
for(int x=0;x<lstrip;x++){
lsite[0]=x;
peekLocalSite(siteObj[cc+x],Umu,lsite);
}
}
// Pair of nodes may need to do pt2pt send
if ( rank != iorank ) { // comms is necessary
if ( (myrank == rank) || (myrank==iorank) ) { // and we have to do it
// Send to IOrank
grid->SendRecvPacket((void *)&siteObj[cc],(void *)&siteObj[cc],rank,iorank,sizeof(sobj)*lstrip);
}
}
}
grid->Barrier(); // necessary?
/////////////////////////
// Get the global lexico base of the chunk
/////////////////////////
Lexicographic::CoorFromIndex(tsite, tlex, range);
for(int d = 0;d < nd; d++){ gsite[d] = tsite[d] + start[d];}
grid->GlobalCoorToRankIndex(rank, o_idx, i_idx, gsite);
grid->GlobalCoorToGlobalIndex(gsite, g_idx);
if (myrank == iorank) {
munge(siteObj, fileObj, csum);
if (ieee32big) htobe32_v((void *)&fileObj, sizeof(fileObj));
if (ieee32) htole32_v((void *)&fileObj, sizeof(fileObj));
if (ieee64big) htobe64_v((void *)&fileObj, sizeof(fileObj));
if (ieee64) htole64_v((void *)&fileObj, sizeof(fileObj));
for(int c=0;c<chunk;c++) munge(siteObj[c],fileObj[c],csum);
if (ieee32big) htobe32_v((void *)&fileObj[0], sizeof(fobj)*chunk);
if (ieee32 ) htole32_v((void *)&fileObj[0], sizeof(fobj)*chunk);
if (ieee64big) htobe64_v((void *)&fileObj[0], sizeof(fobj)*chunk);
if (ieee64 ) htole64_v((void *)&fileObj[0], sizeof(fobj)*chunk);
if (ILDG.is_ILDG) {
#ifdef HAVE_LIME
uint64_t sizeFO = sizeof(fileObj);
limeWriterSeek(ILDG.LW, g_idx*sizeFO, SEEK_SET);
int status = limeWriteRecordData((void *)&fileObj, &sizeFO, ILDG.LW);
#endif
#ifdef HAVE_LIME
uint64_t sizeFO = sizeof(fobj);
uint64_t sizeChunk= sizeof(fobj)*chunk;
limeWriterSeek(ILDG.LW, g_idx*sizeFO, SEEK_SET);
int status = limeWriteRecordData((void *)&fileObj[0], &sizeChunk, ILDG.LW);
#else
assert(0);
#endif
} else {
fout.seekp(offset + g_idx * sizeof(fobj));
fout.write((char *)&fileObj[0], sizeof(fobj)*chunk);assert( fout.fail()==0);
}
else {
fout.seekp(offset + g_idx * sizeof(fileObj));
fout.write((char *)&fileObj, sizeof(fileObj));assert( fout.fail()==0);
}
bytes += sizeof(fileObj);
bytes += sizeof(fobj)*chunk;
}
}
@ -806,12 +832,12 @@ class BinaryIO {
<< " bytes in " << timer.Elapsed() << " "
<< (double)bytes / timer.useconds() << " MB/s " << std::endl;
grid->Barrier(); // necessary?
if (IOnode)
fout.close();
if (!ILDG.is_ILDG) {
if (IOnode) {
fout.close();
}
}
return csum;
}

View File

@ -31,7 +31,7 @@
#define GRID_NERSC_IO_H
#define PARALLEL_READ
#undef PARALLEL_WRITE
#define PARALLEL_WRITE
#include <algorithm>
#include <iostream>
@ -401,6 +401,18 @@ namespace Grid {
std::cout<<GridLogMessage <<"NERSC Configuration "<<file<<" link_trace "<<clone.link_trace
<<" header "<<header.link_trace<<std::endl;
if ( fabs(clone.plaquette -header.plaquette ) >= 1.0e-5 ) {
std::cout << " Plaquette mismatch "<<std::endl;
std::cout << Umu[0]<<std::endl;
std::cout << Umu[1]<<std::endl;
}
if ( csum != header.checksum ) {
std::cerr << " checksum mismatch " << std::endl;
std::cerr << " plaqs " << clone.plaquette << " " << header.plaquette << std::endl;
std::cerr << " trace " << clone.link_trace<< " " << header.link_trace<< std::endl;
std::cerr << " csum " <<std::hex<< csum << " " << header.checksum<< std::dec<< std::endl;
exit(0);
}
assert(fabs(clone.plaquette -header.plaquette ) < 1.0e-5 );
assert(fabs(clone.link_trace-header.link_trace) < 1.0e-6 );
assert(csum == header.checksum );
@ -542,6 +554,10 @@ namespace Grid {
// munger is a function of <floating point, Real, data_type>
uint32_t csum=BinaryIO::readRNGSerial(serial,parallel,file,offset);
if ( csum != header.checksum ) {
std::cerr << "checksum mismatch "<<std::hex<< csum <<" "<<header.checksum<<std::dec<<std::endl;
exit(0);
}
assert(csum == header.checksum );
std::cout<<GridLogMessage <<"Read NERSC RNG file "<<file<< " format "<< data_type <<std::endl;

View File

@ -80,7 +80,7 @@ int main (int argc, char ** argv)
std::vector<LatticeColourMatrix> U(4,&Fine);
SU3::ColdConfiguration(pRNGa,Umu);
SU3::HotConfiguration(pRNGa,Umu);
NerscField header;
std::string file("./ckpoint_lat.4000");