/************************************************************************************* Grid physics library, www.github.com/paboyle/Grid Source file: ./lib/parallelIO/BinaryIO.h Copyright (C) 2015 Author: Peter Boyle Author: Guido Cossu This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. See the full license in the file "LICENSE" in the top level distribution directory *************************************************************************************/ /* END LEGAL */ #ifndef GRID_BINARY_IO_H #define GRID_BINARY_IO_H #include "IldgIOtypes.h" #ifdef HAVE_ENDIAN_H #include #endif #include #include inline uint32_t byte_reverse32(uint32_t f) { f = ((f&0xFF)<<24) | ((f&0xFF00)<<8) | ((f&0xFF0000)>>8) | ((f&0xFF000000UL)>>24) ; return f; } inline uint64_t byte_reverse64(uint64_t f) { uint64_t g; g = ((f&0xFF)<<24) | ((f&0xFF00)<<8) | ((f&0xFF0000)>>8) | ((f&0xFF000000UL)>>24) ; g = g << 32; f = f >> 32; g|= ((f&0xFF)<<24) | ((f&0xFF00)<<8) | ((f&0xFF0000)>>8) | ((f&0xFF000000UL)>>24) ; return g; } #if BYTE_ORDER == BIG_ENDIAN inline uint64_t Grid_ntohll(uint64_t A) { return A; } #else inline uint64_t Grid_ntohll(uint64_t A) { return byte_reverse64(A); } #endif namespace Grid { // A little helper inline void removeWhitespace(std::string &key) { key.erase(std::remove_if(key.begin(), key.end(), ::isspace),key.end()); } class BinaryIO { public: // Network is big endian static inline void htobe32_v(void *file_object,uint32_t bytes){ be32toh_v(file_object,bytes);} static inline void htobe64_v(void *file_object,uint32_t bytes){ be64toh_v(file_object,bytes);} static inline void htole32_v(void *file_object,uint32_t bytes){ le32toh_v(file_object,bytes);} static inline void htole64_v(void *file_object,uint32_t bytes){ le64toh_v(file_object,bytes);} static inline void be32toh_v(void *file_object,uint32_t bytes) { uint32_t * f = (uint32_t *)file_object; for(int i=0;i*sizeof(uint32_t)>8) | ((f&0xFF000000UL)>>24) ; fp[i] = ntohl(f); } } // BE is same as network static inline void be64toh_v(void *file_object,uint32_t bytes) { uint64_t * f = (uint64_t *)file_object; for(int i=0;i*sizeof(uint64_t)>8) | ((f&0xFF000000UL)>>24) ; g = g << 32; f = f >> 32; g|= ((f&0xFF)<<24) | ((f&0xFF00)<<8) | ((f&0xFF0000)>>8) | ((f&0xFF000000UL)>>24) ; fp[i] = Grid_ntohll(g); } } template static inline void Uint32Checksum(Lattice &lat,munger munge,uint32_t &csum) { typedef typename vobj::scalar_object sobj; GridBase *grid = lat._grid ; std::cout < lcoor; for(int l=0;llSites();l++){ Lexicographic::CoorFromIndex(lcoor,l,grid->_ldimensions); peekLocalSite(siteObj,lat,lcoor); munge(siteObj,fileObj,csum); } grid->GlobalSum(csum); } static inline void Uint32Checksum(uint32_t *buf,uint32_t buf_size_bytes,uint32_t &csum) { for(int i=0;i*sizeof(uint32_t) struct BinarySimpleUnmunger { typedef typename getPrecision::real_scalar_type fobj_stype; typedef typename getPrecision::real_scalar_type sobj_stype; void operator()(sobj &in, fobj &out, uint32_t &csum) { // take word by word and transform accoding to the status fobj_stype *out_buffer = (fobj_stype *)&out; sobj_stype *in_buffer = (sobj_stype *)∈ size_t fobj_words = sizeof(out) / sizeof(fobj_stype); size_t sobj_words = sizeof(in) / sizeof(sobj_stype); assert(fobj_words == sobj_words); for (unsigned int word = 0; word < sobj_words; word++) out_buffer[word] = in_buffer[word]; // type conversion on the fly BinaryIO::Uint32Checksum((uint32_t *)&out, sizeof(out), csum); } }; template struct BinarySimpleMunger { typedef typename getPrecision::real_scalar_type fobj_stype; typedef typename getPrecision::real_scalar_type sobj_stype; void operator()(fobj &in, sobj &out, uint32_t &csum) { // take word by word and transform accoding to the status fobj_stype *in_buffer = (fobj_stype *)∈ sobj_stype *out_buffer = (sobj_stype *)&out; size_t fobj_words = sizeof(in) / sizeof(fobj_stype); size_t sobj_words = sizeof(out) / sizeof(sobj_stype); assert(fobj_words == sobj_words); for (unsigned int word = 0; word < sobj_words; word++) out_buffer[word] = in_buffer[word]; // type conversion on the fly BinaryIO::Uint32Checksum((uint32_t *)&in, sizeof(in), csum); } }; template static inline uint32_t readObjectSerial(Lattice &Umu,std::string file,munger munge,int offset,const std::string &format) { typedef typename vobj::scalar_object sobj; GridBase *grid = Umu._grid; std::cout<< GridLogMessage<< "Serial read I/O "<< file<< std::endl; GridStopWatch timer; timer.Start(); int ieee32big = (format == std::string("IEEE32BIG")); int ieee32 = (format == std::string("IEEE32")); int ieee64big = (format == std::string("IEEE64BIG")); int ieee64 = (format == std::string("IEEE64")); // Find the location of each site and send to primary node // Take loop order from Chroma; defines loop order now that NERSC doc no longer // available (how short sighted is that?) std::ifstream fin(file,std::ios::binary|std::ios::in); fin.seekg(offset); Umu = zero; uint32_t csum=0; uint64_t bytes=0; int lx = grid->_fdimensions[0]; std::vector file_object(lx); std::vector munged(lx); for(int t=0;t_fdimensions[3];t++){ for(int z=0;z_fdimensions[2];z++){ for(int y=0;y_fdimensions[1];y++){ { bytes += sizeof(fobj)*lx; if (grid->IsBoss()) { fin.read((char *)&file_object[0], sizeof(fobj)*lx); assert( fin.fail()==0); if (ieee32big) be32toh_v((void *)&file_object[0], sizeof(fobj)*lx); if (ieee32) le32toh_v((void *)&file_object[0], sizeof(fobj)*lx); if (ieee64big) be64toh_v((void *)&file_object[0], sizeof(fobj)*lx); if (ieee64) le64toh_v((void *)&file_object[0], sizeof(fobj)*lx); for(int x=0;x site({x,y,z,t}); // The boss who read the file has their value poked pokeSite(munged[x],Umu,site); } }}}} timer.Stop(); std::cout<Broadcast(0,(void *)&csum,sizeof(csum)); return csum; } template static inline uint32_t readObjectMPI(Lattice &Umu,std::string file,munger munge,int offset,const std::string &format) { typedef typename vobj::scalar_object sobj; GridBase *grid = Umu._grid; std::cout<< GridLogMessage<< "MPI read I/O "<< file<< std::endl; GridStopWatch timer; timer.Start(); Umu = zero; uint32_t csum=0; uint64_t bytes=0; int ndim = grid->Dimensions(); int nrank = grid->ProcessorCount(); int myrank = grid->ThisRank(); std::vector psizes = grid->ProcessorGrid(); std::vector pcoor = grid->ThisProcessorCoor(); std::vector gLattice= grid->GlobalDimensions(); std::vector lLattice= grid->LocalDimensions(); std::vector distribs(ndim,MPI_DISTRIBUTE_BLOCK); std::vector dargs (ndim,MPI_DISTRIBUTE_DFLT_DARG); std::vector lStart(ndim); std::vector gStart(ndim); // Flatten the file int lsites = grid->lSites(); std::vector scalardata(lsites); std::vector iodata(lsites); // Munge, checksum, byte order in here for(int d=0;dcommunicator, file.c_str(), MPI_MODE_RDONLY, MPI_INFO_NULL, &fh); assert(ierr==0); ierr=MPI_File_set_view(fh, disp, mpiObject, fileArray, "native", MPI_INFO_NULL); // std::cout<< "MPI File set view returned " <GlobalSum(csum); grid->Barrier(); vectorizeFromLexOrdArray(scalardata,Umu); timer.Stop(); std::cout< static inline uint32_t writeObjectSerial(Lattice &Umu,std::string file,munger munge,int offset, const std::string & format) { typedef typename vobj::scalar_object sobj; GridBase *grid = Umu._grid; int ieee32big = (format == std::string("IEEE32BIG")); int ieee32 = (format == std::string("IEEE32")); int ieee64big = (format == std::string("IEEE64BIG")); int ieee64 = (format == std::string("IEEE64")); ////////////////////////////////////////////////// // Serialise through node zero ////////////////////////////////////////////////// std::cout<< GridLogMessage<< "Serial write I/O "<< file<IsBoss() ) { fout.open(file,std::ios::binary|std::ios::out|std::ios::in); fout.seekp(offset); } uint64_t bytes=0; uint32_t csum=0; int lx = grid->_fdimensions[0]; std::vector file_object(lx); std::vector unmunged(lx); for(int t=0;t_fdimensions[3];t++){ for(int z=0;z_fdimensions[2];z++){ for(int y=0;y_fdimensions[1];y++){ { std::vector site({0,y,z,t}); // peek & write for(int x=0;xIsBoss() ) { for(int x=0;xBroadcast(0,(void *)&csum,sizeof(csum)); return csum; } static inline uint32_t writeRNGSerial(GridSerialRNG &serial,GridParallelRNG ¶llel,std::string file,int offset) { typedef typename GridSerialRNG::RngStateType RngStateType; const int RngStateCount = GridSerialRNG::RngStateCount; GridBase *grid = parallel._grid; int gsites = grid->_gsites; GridStopWatch timer; timer.Start(); ////////////////////////////////////////////////// // Serialise through node zero ////////////////////////////////////////////////// std::ofstream fout; if (grid->IsBoss()) { fout.open(file, std::ios::binary | std::ios::out); if (!fout.is_open()) { std::cout << GridLogMessage << "writeRNGSerial: Error opening file " << file << std::endl; exit(0);// write better error handling } fout.seekp(offset); } std::cout << GridLogMessage << "Serial RNG write I/O on file " << file << std::endl; uint32_t csum = 0; std::vector saved(RngStateCount); int bytes = sizeof(RngStateType) * saved.size(); std::cout << GridLogDebug << "RngStateCount: " << RngStateCount << std::endl; std::cout << GridLogDebug << "Type has " << bytes << " bytes" << std::endl; std::vector gcoor; for(int gidx=0;gidxGlobalIndexToGlobalCoor(gidx,gcoor); grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gcoor); int l_idx=parallel.generator_idx(o_idx,i_idx); if( rank == grid->ThisRank() ){ parallel.GetState(saved,l_idx); } if ( rank != 0 ) { grid->Broadcast(rank, (void *)&saved[0], bytes); } grid->Barrier(); if ( grid->IsBoss() ) { Uint32Checksum((uint32_t *)&saved[0],bytes,csum); fout.write((char *)&saved[0],bytes);assert( fout.fail()==0); } } if ( grid->IsBoss() ) { serial.GetState(saved,0); Uint32Checksum((uint32_t *)&saved[0],bytes,csum); fout.write((char *)&saved[0],bytes);assert( fout.fail()==0); } grid->Broadcast(0, (void *)&csum, sizeof(csum)); if (grid->IsBoss()) { fout.close(); } timer.Stop(); std::cout << GridLogMessage << "RNG file checksum " << std::hex << csum << std::dec << std::endl; std::cout << GridLogMessage << "RNG state saved in " << timer.Elapsed() << std::endl; return csum; } static inline uint32_t readRNGSerial(GridSerialRNG &serial,GridParallelRNG ¶llel,std::string file,int offset) { typedef typename GridSerialRNG::RngStateType RngStateType; const int RngStateCount = GridSerialRNG::RngStateCount; GridBase *grid = parallel._grid; int gsites = grid->_gsites; ////////////////////////////////////////////////// // Serialise through node zero ////////////////////////////////////////////////// std::cout<< GridLogMessage<< "Serial RNG read I/O of file "<IsBoss()) { fin.open(file, std::ios::binary | std::ios::in); if (!fin.is_open()) { std::cout << GridLogMessage << "readRNGSerial: Error opening file " << file << std::endl; exit(0);// write better error handling } fin.seekg(offset); } uint32_t csum=0; std::vector saved(RngStateCount); int bytes = sizeof(RngStateType)*saved.size(); std::cout << GridLogDebug << "RngStateCount: " << RngStateCount << std::endl; std::cout << GridLogDebug << "Type has " << bytes << " bytes" << std::endl; std::vector gcoor; std::cout << GridLogDebug << "gsites: " << gsites << " loop" << std::endl; for(int gidx=0;gidxGlobalIndexToGlobalCoor(gidx,gcoor); grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gcoor); int l_idx=parallel.generator_idx(o_idx,i_idx); if ( grid->IsBoss() ) { fin.read((char *)&saved[0],bytes);assert( fin.fail()==0); Uint32Checksum((uint32_t *)&saved[0],bytes,csum); } grid->Broadcast(0,(void *)&saved[0],bytes); grid->Barrier(); if( rank == grid->ThisRank() ){ parallel.SetState(saved,l_idx); } } if ( grid->IsBoss() ) { fin.read((char *)&saved[0],bytes);assert( fin.fail()==0); Uint32Checksum((uint32_t *)&saved[0],bytes,csum); serial.SetState(saved,0); } std::cout << GridLogMessage << "RNG file checksum " << std::hex << csum << std::dec << std::endl; grid->Broadcast(0,(void *)&csum,sizeof(csum)); return csum; } template static inline uint32_t readObjectParallel(Lattice &Umu, std::string file, munger munge, int offset, const std::string &format, ILDGtype ILDG = ILDGtype()) { typedef typename vobj::scalar_object sobj; GridBase *grid = Umu._grid; int ieee32big = (format == std::string("IEEE32BIG")); int ieee32 = (format == std::string("IEEE32")); int ieee64big = (format == std::string("IEEE64BIG")); int ieee64 = (format == std::string("IEEE64")); // Ideally one reader/writer per xy plane and read these contiguously // with comms from nominated I/O nodes. std::ifstream fin; int nd = grid->_ndimension; std::vector parallel(nd,1); parallel[0] = 0; std::vector ioproc (nd); std::vector start(nd); std::vector range(nd); for(int d=0;dCheckerBoarded(d) == 0); } uint64_t slice_vol = 1; int IOnode = 1; int gstrip = grid->_gdimensions[0]; int lstrip = grid->_ldimensions[0]; int chunk ; if ( nd==1) chunk = gstrip; else chunk = gstrip*grid->_ldimensions[1]; for(int d=0;d_ndimension;d++) { if (parallel[d]) { range[d] = grid->_ldimensions[d]; start[d] = grid->_processor_coor[d]*range[d]; ioproc[d]= grid->_processor_coor[d]; } else { range[d] = grid->_gdimensions[d]; start[d] = 0; ioproc[d]= 0; if ( grid->_processor_coor[d] != 0 ) IOnode = 0; } slice_vol = slice_vol * range[d]; } { uint32_t tmp = IOnode; grid->GlobalSum(tmp); std::cout<< std::dec ; std::cout<< GridLogMessage<< "Parallel read I/O from "<< file << " with " <_ndimension;d++){ std::cout<< range[d]; if( d< grid->_ndimension-1 ) std::cout<< " x "; } std::cout << std::endl; std::cout<< GridLogMessage<< "Parallel I/O local strip size is "<< lstrip <ThisRank(); int iorank = grid->RankFromProcessorCoor(ioproc); if (!ILDG.is_ILDG) { if ( IOnode ) { fin.open(file,std::ios::binary|std::ios::in); if ( !fin.is_open() ) { std::cout << GridLogMessage << "readObjectParallel: Error opening file " << file << std::endl; exit(0); } } } ////////////////////////////////////////////////////////// // Find the location of each site and send to primary node // Take loop order from Chroma; defines loop order now that NERSC doc no longer // available (how short sighted is that?) ////////////////////////////////////////////////////////// Umu = zero; static uint32_t csum; csum=0;//static for SHMEM std::vector fileObj(chunk); // FIXME std::vector siteObj(chunk); // Use alignedAllocator to place in symmetric region for SHMEM // need to implement these loops in Nd independent way with a lexico conversion for(int tlex=0;tlex tsite(nd); // temporary mixed up site std::vector gsite(nd); std::vector lsite(nd); int rank, o_idx,i_idx, g_idx; /////////////////////////////////////////// // Get the global lexico base of the chunk /////////////////////////////////////////// Lexicographic::CoorFromIndex(tsite,tlex,range); for(int d=0;dGlobalCoorToRankIndex(rank,o_idx,i_idx,gsite); grid->GlobalCoorToGlobalIndex(gsite,g_idx); //////////////////////////////// // iorank reads from the seek //////////////////////////////// if (myrank == iorank) { if (ILDG.is_ILDG){ #ifdef HAVE_LIME // use C-LIME to populate the record uint64_t sizeFO = sizeof(fobj); uint64_t sizeChunk= sizeFO*chunk; limeReaderSeek(ILDG.LR, g_idx*sizeFO, SEEK_SET); int status = limeReaderReadData((void *)&fileObj[0], &sizeChunk, ILDG.LR); #else assert(0); #endif } else { fin.seekg(offset+g_idx*sizeof(fobj)); fin.read((char *)&fileObj[0],sizeof(fobj)*chunk); } bytes+=sizeof(fobj)*chunk; if(ieee32big) be32toh_v((void *)&fileObj[0],sizeof(fobj)*chunk); if(ieee32) le32toh_v((void *)&fileObj[0],sizeof(fobj)*chunk); if(ieee64big) be64toh_v((void *)&fileObj[0],sizeof(fobj)*chunk); if(ieee64) le64toh_v((void *)&fileObj[0],sizeof(fobj)*chunk); for(int c=0;c_ldimensions[d]; // local site gsite[d] = tsite[d]+start[d]; // global site } grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gsite); if ( rank != iorank ) { if ( (myrank == rank) || (myrank==iorank) ) { grid->SendRecvPacket((void *)&siteObj[cc],(void *)&siteObj[cc],iorank,rank,sizeof(sobj)*lstrip); } } // Poke at destination if ( myrank == rank ) { for(int x=0;xBarrier(); // necessary? } } grid->GlobalSum(csum); grid->GlobalSum(bytes); grid->Barrier(); timer.Stop(); std::cout< static inline uint32_t writeObjectParallel(Lattice &Umu, std::string file, munger munge, int offset, const std::string &format, ILDGtype ILDG = ILDGtype()) { typedef typename vobj::scalar_object sobj; GridBase *grid = Umu._grid; int ieee32big = (format == std::string("IEEE32BIG")); int ieee32 = (format == std::string("IEEE32")); int ieee64big = (format == std::string("IEEE64BIG")); int ieee64 = (format == std::string("IEEE64")); if (!(ieee32big || ieee32 || ieee64big || ieee64)) { std::cout << GridLogError << "Unrecognized file format " << format << std::endl; std::cout << GridLogError << "Allowed: IEEE32BIG | IEEE32 | IEEE64BIG | IEEE64" << std::endl; exit(0); } int nd = grid->_ndimension; for (int d = 0; d < nd; d++) { assert(grid->CheckerBoarded(d) == 0); } // Parallel in yzt, serial funnelled in "x". // gx x ly chunk size std::vector parallel(nd, 1); parallel[0] = 0; std::vector ioproc(nd); std::vector start(nd); std::vector range(nd); uint64_t slice_vol = 1; int IOnode = 1; int gstrip = grid->_gdimensions[0]; int lstrip = grid->_ldimensions[0]; int chunk; if ( nd==1) chunk = gstrip; else chunk = gstrip*grid->_ldimensions[1]; for (int d = 0; d < grid->_ndimension; d++) { if (parallel[d]) { range[d] = grid->_ldimensions[d]; start[d] = grid->_processor_coor[d]*range[d]; ioproc[d]= grid->_processor_coor[d]; } else { range[d] = grid->_gdimensions[d]; start[d] = 0; ioproc[d]= 0; if ( grid->_processor_coor[d] != 0 ) IOnode = 0; } slice_vol = slice_vol * range[d]; } { uint32_t tmp = IOnode; grid->GlobalSum(tmp); std::cout<< GridLogMessage<< "Parallel write I/O from "<< file << " with " <_ndimension;d++){ std::cout<< range[d]; if( d< grid->_ndimension-1 ) std::cout<< " x "; } std::cout << std::endl; std::cout<< GridLogMessage<< "Parallel I/O local strip size is "<< lstrip <ThisRank(); int iorank = grid->RankFromProcessorCoor(ioproc); // Take into account block size of parallel file systems want about // Ideally one reader/writer per xy plane and read these contiguously // with comms from nominated I/O nodes. std::ofstream fout; if (!ILDG.is_ILDG) { if (IOnode){ fout.open(file, std::ios::binary | std::ios::in | std::ios::out); if (!fout.is_open()) { std::cout << GridLogMessage << "writeObjectParallel: Error opening file " << file << std::endl; exit(0); } } } ////////////////////////////////////////////////////////// // Find the location of each site and send to primary node // Take loop order from Chroma; defines loop order now that NERSC doc no // longer // available (how short sighted is that?) ////////////////////////////////////////////////////////// uint32_t csum = 0; std::vector fileObj(chunk); std::vector siteObj(chunk); // should aggregate a whole chunk and then write. // need to implement these loops in Nd independent way with a lexico // conversion for (int tlex = 0; tlex < slice_vol; tlex+=chunk) { std::vector tsite(nd); // temporary mixed up site std::vector gsite(nd); std::vector lsite(nd); int rank, o_idx, i_idx, g_idx; // Possibly do transport through pt2pt for(int cc=0;cc_ldimensions[d]; // local site gsite[d] = tsite[d]+start[d]; // global site } grid->GlobalCoorToRankIndex(rank,o_idx,i_idx,gsite); // Owner of data peeks it over lstrip if ( myrank == rank ) { for(int x=0;xSendRecvPacket((void *)&siteObj[cc],(void *)&siteObj[cc],rank,iorank,sizeof(sobj)*lstrip); } } } grid->Barrier(); // necessary? ///////////////////////// // Get the global lexico base of the chunk ///////////////////////// Lexicographic::CoorFromIndex(tsite, tlex, range); for(int d = 0;d < nd; d++){ gsite[d] = tsite[d] + start[d];} grid->GlobalCoorToRankIndex(rank, o_idx, i_idx, gsite); grid->GlobalCoorToGlobalIndex(gsite, g_idx); if (myrank == iorank) { for(int c=0;cGlobalSum(csum); grid->GlobalSum(bytes); timer.Stop(); std::cout << GridLogPerformance << "writeObjectParallel: wrote " << bytes << " bytes in " << timer.Elapsed() << " " << (double)bytes / timer.useconds() << " MB/s " << std::endl; grid->Barrier(); // necessary? if (!ILDG.is_ILDG) { if (IOnode) { fout.close(); } } return csum; } }; } #endif