mirror of
https://github.com/paboyle/Grid.git
synced 2025-06-10 19:36:56 +01:00
Tidy up of mpi3; also some cleaning of the dslash controls.
This commit is contained in:
@ -32,6 +32,7 @@ namespace Grid {
|
||||
// Info that is setup once and indept of cartesian layout
|
||||
///////////////////////////////////////////////////////////////
|
||||
void * CartesianCommunicator::ShmCommBuf;
|
||||
uint64_t CartesianCommunicator::MAX_MPI_SHM_BYTES = 128*1024*1024;
|
||||
|
||||
/////////////////////////////////
|
||||
// Alloc, free shmem region
|
||||
@ -41,8 +42,12 @@ void *CartesianCommunicator::ShmBufferMalloc(size_t bytes){
|
||||
void *ptr = (void *)heap_top;
|
||||
heap_top += bytes;
|
||||
heap_bytes+= bytes;
|
||||
std::cout <<"Shm alloc "<<ptr<<std::endl;
|
||||
assert(heap_bytes < MAX_MPI_SHM_BYTES);
|
||||
if (heap_bytes >= MAX_MPI_SHM_BYTES) {
|
||||
std::cout<< " ShmBufferMalloc exceeded shared heap size -- try increasing with --shm <MB> flag" <<std::endl;
|
||||
std::cout<< " Parameter specified in units of MB (megabytes) " <<std::endl;
|
||||
std::cout<< " Current value is " << (MAX_MPI_SHM_BYTES/(1024*1024)) <<std::endl;
|
||||
assert(heap_bytes<MAX_MPI_SHM_BYTES);
|
||||
}
|
||||
return ptr;
|
||||
}
|
||||
void CartesianCommunicator::ShmBufferFreeAll(void) {
|
||||
|
@ -55,7 +55,7 @@ class CartesianCommunicator {
|
||||
// Give external control (command line override?) of this
|
||||
|
||||
static const int MAXLOG2RANKSPERNODE = 16;
|
||||
static const uint64_t MAX_MPI_SHM_BYTES = 128*1024*1024;
|
||||
static uint64_t MAX_MPI_SHM_BYTES;
|
||||
|
||||
// Communicator should know nothing of the physics grid, only processor grid.
|
||||
int _Nprocessors; // How many in all
|
||||
|
@ -27,6 +27,7 @@ Author: Peter Boyle <paboyle@ph.ed.ac.uk>
|
||||
/* END LEGAL */
|
||||
#include "Grid.h"
|
||||
#include <mpi.h>
|
||||
#include <semaphore.h>
|
||||
|
||||
namespace Grid {
|
||||
|
||||
@ -45,6 +46,7 @@ const int pool = 48;
|
||||
|
||||
class SlaveState {
|
||||
public:
|
||||
sem_t sem;
|
||||
volatile int head;
|
||||
volatile int start;
|
||||
volatile int tail;
|
||||
@ -56,29 +58,32 @@ public:
|
||||
SlaveState *state;
|
||||
MPI_Comm squadron;
|
||||
uint64_t base;
|
||||
int universe_rank;
|
||||
int vertical_rank;
|
||||
////////////////////////////////////////////////////////////
|
||||
// Descriptor circular pointers
|
||||
////////////////////////////////////////////////////////////
|
||||
Slave() {};
|
||||
|
||||
void Init(SlaveState * _state,MPI_Comm _squadron);
|
||||
void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank);
|
||||
|
||||
void EventLoop (void) {
|
||||
std::cerr<< " Entering even loop "<<std::endl;
|
||||
while(1) {
|
||||
std::cout<< " Entering event loop "<<std::endl;
|
||||
while(1){
|
||||
Event();
|
||||
MPI_Barrier(squadron);
|
||||
}
|
||||
}
|
||||
|
||||
void Event (void) ;
|
||||
int Event (void) ;
|
||||
|
||||
uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int rank) ;
|
||||
uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ;
|
||||
|
||||
void WaitAll() {
|
||||
void QueueWaitAll() {
|
||||
QueueCommand(COMMAND_WAITALL,0,0,0,squadron,0);
|
||||
std::cerr<< " Waiting on FIFO drain "<<std::endl;
|
||||
}
|
||||
void WaitAll() {
|
||||
while ( state->tail != state->head );
|
||||
std::cerr<< " FIFO drained "<< state->tail <<std::endl;
|
||||
}
|
||||
};
|
||||
|
||||
@ -119,26 +124,31 @@ public:
|
||||
MPI_Comm &ShmComm,
|
||||
void * &ShmCommBuf);
|
||||
|
||||
static void MapCommRankToWorldRank(int &hashtag, int & comm_world_peer,int tag, MPI_Comm comm,int rank);
|
||||
static void MapCommRankToWorldRank(int &hashtag, int & comm_world_peer,int tag, MPI_Comm comm,int commrank);
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// routines for master proc must handle any communicator
|
||||
/////////////////////////////////////////////////////////
|
||||
|
||||
static uint64_t QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
||||
std::cerr<< " Queueing send "<< bytes<<std::endl;
|
||||
// std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<<rank <<std::endl;
|
||||
return Slaves[slave].QueueCommand(COMMAND_ISEND,buf,bytes,tag,comm,rank);
|
||||
};
|
||||
|
||||
static uint64_t QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
||||
std::cerr<< " Queueing receive "<< bytes<<std::endl;
|
||||
// std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank <<std::endl;
|
||||
return Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank);
|
||||
};
|
||||
|
||||
static void WaitAll() {
|
||||
for(int s=1;s<VerticalSize;s++) {
|
||||
Slaves[s].QueueWaitAll();
|
||||
}
|
||||
MPI_Barrier(VerticalComm);
|
||||
for(int s=1;s<VerticalSize;s++) {
|
||||
Slaves[s].WaitAll();
|
||||
}
|
||||
MPI_Barrier(VerticalComm);
|
||||
};
|
||||
|
||||
static void GetWork(int nwork, int me, int & mywork, int & myoff,int units){
|
||||
@ -163,6 +173,7 @@ public:
|
||||
GetWork(bytes,s,mywork,myoff,procs);
|
||||
QueueSend(s+1,&cbuf[myoff],mywork,tag,comm,rank);
|
||||
}
|
||||
MPI_Barrier(VerticalComm);
|
||||
};
|
||||
|
||||
static void QueueMultiplexedRecv(void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
||||
@ -173,6 +184,7 @@ public:
|
||||
GetWork(bytes,s,mywork,myoff,procs);
|
||||
QueueRecv(s+1,&cbuf[myoff],mywork,tag,comm,rank);
|
||||
}
|
||||
MPI_Barrier(VerticalComm);
|
||||
};
|
||||
|
||||
};
|
||||
@ -225,7 +237,7 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
|
||||
// Split into groups that can share memory (Verticals)
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// MPI_Comm_split_type(communicator_universe, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&VerticalComm);
|
||||
MPI_Comm_split(communicator_universe,(UniverseRank&0x1),UniverseRank,&VerticalComm);
|
||||
MPI_Comm_split(communicator_universe,(UniverseRank/2),UniverseRank,&VerticalComm);
|
||||
MPI_Comm_rank(VerticalComm ,&VerticalRank);
|
||||
MPI_Comm_size(VerticalComm ,&VerticalSize);
|
||||
|
||||
@ -262,14 +274,14 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
|
||||
ierr|= MPI_Win_lock_all (MPI_MODE_NOCHECK, VerticalWindow);
|
||||
assert(ierr==0);
|
||||
|
||||
std::cerr<<"SHM "<<ShmCommBuf<<std::endl;
|
||||
std::cout<<"SHM "<<ShmCommBuf<<std::endl;
|
||||
|
||||
VerticalShmBufs.resize(VerticalSize);
|
||||
for(int r=0;r<VerticalSize;r++){
|
||||
MPI_Aint sz;
|
||||
int dsp_unit;
|
||||
MPI_Win_shared_query (VerticalWindow, r, &sz, &dsp_unit, &VerticalShmBufs[r]);
|
||||
std::cerr<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
||||
std::cout<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
@ -286,14 +298,16 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
|
||||
///////////////////////////////////////////////////////////
|
||||
if ( VerticalRank != 0 ) {
|
||||
Slave indentured;
|
||||
indentured.Init( (SlaveState *) VerticalShmBufs[VerticalRank], VerticalComm);
|
||||
indentured.Init( (SlaveState *) VerticalShmBufs[VerticalRank], VerticalComm, UniverseRank,VerticalRank);
|
||||
MPI_Barrier(VerticalComm);
|
||||
indentured.EventLoop();
|
||||
assert(0);
|
||||
} else {
|
||||
Slaves.resize(VerticalSize);
|
||||
for(int i=1;i<VerticalSize;i++){
|
||||
Slaves[i].Init((SlaveState *)VerticalShmBufs[i],VerticalComm);
|
||||
Slaves[i].Init((SlaveState *)VerticalShmBufs[i],VerticalComm, UniverseRanks[HorizontalRank][i],i);
|
||||
}
|
||||
MPI_Barrier(VerticalComm);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
@ -337,8 +351,10 @@ void MPIoffloadEngine::MapCommRankToWorldRank(int &hashtag, int & comm_world_pee
|
||||
|
||||
if ( comm == HorizontalComm ) {
|
||||
comm_world_peer = rank;
|
||||
// std::cout << " MapCommRankToWorldRank horiz " <<rank<<"->"<<comm_world_peer<<std::endl;
|
||||
} else if ( comm == communicator_cached ) {
|
||||
comm_world_peer = UserCommunicatorToWorldRanks[rank];
|
||||
// std::cout << " MapCommRankToWorldRank cached " <<rank<<"->"<<comm_world_peer<<std::endl;
|
||||
} else {
|
||||
|
||||
int size;
|
||||
@ -360,6 +376,7 @@ void MPIoffloadEngine::MapCommRankToWorldRank(int &hashtag, int & comm_world_pee
|
||||
MPI_Group_translate_ranks(CachedGroup,size,&cached_ranks[0],WorldGroup, &UserCommunicatorToWorldRanks[0]);
|
||||
|
||||
comm_world_peer = UserCommunicatorToWorldRanks[rank];
|
||||
// std::cout << " MapCommRankToWorldRank cache miss " <<rank<<"->"<<comm_world_peer<<std::endl;
|
||||
|
||||
assert(comm_world_peer != MPI_UNDEFINED);
|
||||
}
|
||||
@ -370,55 +387,30 @@ void MPIoffloadEngine::MapCommRankToWorldRank(int &hashtag, int & comm_world_pee
|
||||
int comm_hash = ((icomm>>0 )&0xFFFF)^((icomm>>16)&0xFFFF)
|
||||
^ ((icomm>>32)&0xFFFF)^((icomm>>48)&0xFFFF);
|
||||
|
||||
hashtag = (comm_hash<<15) | tag;
|
||||
// hashtag = (comm_hash<<15) | tag;
|
||||
hashtag = tag;
|
||||
|
||||
};
|
||||
|
||||
void Slave::Init(SlaveState * _state,MPI_Comm _squadron)
|
||||
void Slave::Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank)
|
||||
{
|
||||
squadron=_squadron;
|
||||
universe_rank=_universe_rank;
|
||||
vertical_rank=_vertical_rank;
|
||||
state =_state;
|
||||
std::cout << "state "<<_state<<" comm "<<_squadron<<" universe_rank"<<universe_rank <<std::endl;
|
||||
state->head = state->tail = state->start = 0;
|
||||
MPI_Barrier(squadron);
|
||||
base = (uint64_t)MPIoffloadEngine::VerticalShmBufs[0];
|
||||
int rank; MPI_Comm_rank(_squadron,&rank);
|
||||
}
|
||||
#define PERI_PLUS(A) ( (A+1)%pool )
|
||||
void Slave::Event (void) {
|
||||
int Slave::Event (void) {
|
||||
|
||||
static int tail_last;
|
||||
static int head_last;
|
||||
static int start_last;
|
||||
int ierr;
|
||||
|
||||
if ( (state->tail != tail_last)
|
||||
||(state->head != head_last)
|
||||
||(state->start != start_last)
|
||||
) {
|
||||
std::cerr<< " Event loop "<< state->tail << " "<< state->start<< " "<< state->head <<std::endl;
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////
|
||||
// Try to advance the tail pointers
|
||||
////////////////////////////////////////////////////
|
||||
/*
|
||||
int t=state->tail;
|
||||
if ( t != state->start ) {
|
||||
int flag=0;
|
||||
|
||||
std::cerr<< " Testing tail "<< t<<" "<< (void *)&state->Descrs[t].request
|
||||
<< " "<<state->Descrs[t].request<<std::endl;
|
||||
// ierr=MPI_Test((MPI_Request *)&state->Descrs[t].request,&flag,MPI_STATUS_IGNORE);
|
||||
// ierr=MPI_Test((MPI_Request *)&state->Descrs[t].request,&flag,MPI_STATUS_IGNORE);
|
||||
assert(ierr==0);
|
||||
if ( flag ) {
|
||||
state->tail = PERI_PLUS(t);
|
||||
std::cerr<< " Tail advanced from "<< t<<std::endl;
|
||||
return;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
////////////////////////////////////////////////////
|
||||
// Try to advance the start pointers
|
||||
////////////////////////////////////////////////////
|
||||
@ -426,11 +418,11 @@ void Slave::Event (void) {
|
||||
if ( s != state->head ) {
|
||||
switch ( state->Descrs[s].command ) {
|
||||
case COMMAND_ISEND:
|
||||
std::cerr<< " Send "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]"
|
||||
<< " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag
|
||||
<< " Comm " << MPIoffloadEngine::communicator_universe<< std::endl;
|
||||
|
||||
std::cerr<< " Request was "<<state->Descrs[s].request<<std::endl;
|
||||
/*
|
||||
std::cout<< " Send "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]"
|
||||
<< " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag
|
||||
<< " Comm " << MPIoffloadEngine::communicator_universe<< " me " <<universe_rank<< std::endl;
|
||||
*/
|
||||
ierr = MPI_Isend((void *)(state->Descrs[s].buf+base),
|
||||
state->Descrs[s].bytes,
|
||||
MPI_CHAR,
|
||||
@ -438,18 +430,17 @@ void Slave::Event (void) {
|
||||
state->Descrs[s].tag,
|
||||
MPIoffloadEngine::communicator_universe,
|
||||
(MPI_Request *)&state->Descrs[s].request);
|
||||
std::cerr<< " Request is "<<state->Descrs[s].request<<std::endl;
|
||||
std::cerr<< " Request0 is "<<state->Descrs[0].request<<std::endl;
|
||||
assert(ierr==0);
|
||||
state->start = PERI_PLUS(s);
|
||||
return 1;
|
||||
break;
|
||||
|
||||
case COMMAND_IRECV:
|
||||
std::cerr<< " Recv "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]"
|
||||
<< " to " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag
|
||||
<< " Comm " << MPIoffloadEngine::communicator_universe<< std::endl;
|
||||
|
||||
std::cerr<< " Request was "<<state->Descrs[s].request<<std::endl;
|
||||
/*
|
||||
std::cout<< " Recv "<<s << " ptr "<< state<<" "<< state->Descrs[s].buf<< "["<<state->Descrs[s].bytes<<"]"
|
||||
<< " from " << state->Descrs[s].rank<< " tag" << state->Descrs[s].tag
|
||||
<< " Comm " << MPIoffloadEngine::communicator_universe<< " me "<< universe_rank<< std::endl;
|
||||
*/
|
||||
ierr=MPI_Irecv((void *)(state->Descrs[s].buf+base),
|
||||
state->Descrs[s].bytes,
|
||||
MPI_CHAR,
|
||||
@ -457,30 +448,32 @@ void Slave::Event (void) {
|
||||
state->Descrs[s].tag,
|
||||
MPIoffloadEngine::communicator_universe,
|
||||
(MPI_Request *)&state->Descrs[s].request);
|
||||
std::cerr<< " Request is "<<state->Descrs[s].request<<std::endl;
|
||||
std::cerr<< " Request0 is "<<state->Descrs[0].request<<std::endl;
|
||||
|
||||
// std::cout<< " Request is "<<state->Descrs[s].request<<std::endl;
|
||||
// std::cout<< " Request0 is "<<state->Descrs[0].request<<std::endl;
|
||||
assert(ierr==0);
|
||||
state->start = PERI_PLUS(s);
|
||||
return 1;
|
||||
break;
|
||||
|
||||
case COMMAND_WAITALL:
|
||||
std::cerr<< " Wait all "<<std::endl;
|
||||
|
||||
for(int t=state->tail;t!=s; t=PERI_PLUS(t) ){
|
||||
std::cerr<< " Wait ["<<t<<"] "<<state->Descrs[t].request <<std::endl;
|
||||
std::cerr<< " Request0 is "<<state->Descrs[0].request<<std::endl;
|
||||
MPI_Wait((MPI_Request *)&state->Descrs[t].request,MPI_STATUS_IGNORE);
|
||||
};
|
||||
s=PERI_PLUS(s);
|
||||
state->start = s;
|
||||
state->tail = s;
|
||||
MPI_Barrier(squadron);
|
||||
return 1;
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
break;
|
||||
}
|
||||
return;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// External interaction with the queue
|
||||
@ -500,17 +493,29 @@ uint64_t Slave::QueueCommand(int command,void *buf, int bytes, int tag, MPI_Comm
|
||||
MPI_Comm communicator;
|
||||
MPI_Request request;
|
||||
|
||||
MPIoffloadEngine::MapCommRankToWorldRank(hashtag,commrank,tag,comm,worldrank);
|
||||
MPIoffloadEngine::MapCommRankToWorldRank(hashtag,worldrank,tag,comm,commrank);
|
||||
|
||||
int VerticalRank = MPIoffloadEngine::VerticalRank;
|
||||
uint64_t relative= (uint64_t)buf - base;
|
||||
state->Descrs[head].buf = relative;
|
||||
state->Descrs[head].bytes = bytes;
|
||||
state->Descrs[head].rank = MPIoffloadEngine::UniverseRanks[worldrank][VerticalRank];
|
||||
state->Descrs[head].rank = MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank];
|
||||
state->Descrs[head].tag = hashtag;
|
||||
state->Descrs[head].command= command;
|
||||
std::cerr<< " QueueCommand "<<buf<<"["<<bytes<<"]" << std::endl;
|
||||
|
||||
/*
|
||||
if ( command == COMMAND_ISEND ) {
|
||||
std::cout << "QueueSend from "<< universe_rank <<" to commrank " << commrank
|
||||
<< " to worldrank " << worldrank <<std::endl;
|
||||
std::cout << " via VerticalRank "<< vertical_rank <<" to universerank " << MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]<<std::endl;
|
||||
std::cout << " QueueCommand "<<buf<<"["<<bytes<<"]" << std::endl;
|
||||
}
|
||||
if ( command == COMMAND_IRECV ) {
|
||||
std::cout << "QueueRecv on "<< universe_rank <<" from commrank " << commrank
|
||||
<< " from worldrank " << worldrank <<std::endl;
|
||||
std::cout << " via VerticalRank "<< vertical_rank <<" from universerank " << MPIoffloadEngine::UniverseRanks[worldrank][vertical_rank]<<std::endl;
|
||||
std::cout << " QueueSend "<<buf<<"["<<bytes<<"]" << std::endl;
|
||||
}
|
||||
*/
|
||||
// Block until FIFO has space
|
||||
while( state->tail==next );
|
||||
|
||||
@ -671,6 +676,8 @@ void CartesianCommunicator::StencilSendToRecvFromBegin(std::vector<CommsRequest_
|
||||
// assert xmit and recv lie in shared memory region
|
||||
assert( (xmit_i >= shm) && (xmit_i+bytes <= shm+MAX_MPI_SHM_BYTES) );
|
||||
assert( (recv_i >= shm) && (recv_i+bytes <= shm+MAX_MPI_SHM_BYTES) );
|
||||
assert(from!=_processor);
|
||||
assert(dest!=_processor);
|
||||
MPIoffloadEngine::QueueMultiplexedSend(xmit,bytes,_processor,communicator,dest);
|
||||
MPIoffloadEngine::QueueMultiplexedRecv(recv,bytes,from,communicator,from);
|
||||
}
|
||||
|
Reference in New Issue
Block a user