mirror of
https://github.com/paboyle/Grid.git
synced 2025-04-25 13:15:55 +01:00
Improvement to use own SHM_OPEN call to avoid openmpi bug.
This commit is contained in:
parent
32375aca65
commit
757a928f9a
@ -48,7 +48,7 @@ int main (int argc, char ** argv)
|
|||||||
std::cout<<GridLogMessage << "= Benchmarking concurrent halo exchange in "<<nmu<<" dimensions"<<std::endl;
|
std::cout<<GridLogMessage << "= Benchmarking concurrent halo exchange in "<<nmu<<" dimensions"<<std::endl;
|
||||||
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
||||||
std::cout<<GridLogMessage << " L "<<"\t\t"<<" Ls "<<"\t\t"<<"bytes"<<"\t\t"<<"MB/s uni"<<"\t\t"<<"MB/s bidi"<<std::endl;
|
std::cout<<GridLogMessage << " L "<<"\t\t"<<" Ls "<<"\t\t"<<"bytes"<<"\t\t"<<"MB/s uni"<<"\t\t"<<"MB/s bidi"<<std::endl;
|
||||||
int maxlat=32;
|
int maxlat=16;
|
||||||
for(int lat=4;lat<=maxlat;lat+=2){
|
for(int lat=4;lat<=maxlat;lat+=2){
|
||||||
for(int Ls=1;Ls<=16;Ls*=2){
|
for(int Ls=1;Ls<=16;Ls*=2){
|
||||||
|
|
||||||
@ -193,7 +193,7 @@ int main (int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Nloop=100;
|
||||||
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
||||||
std::cout<<GridLogMessage << "= Benchmarking concurrent STENCIL halo exchange in "<<nmu<<" dimensions"<<std::endl;
|
std::cout<<GridLogMessage << "= Benchmarking concurrent STENCIL halo exchange in "<<nmu<<" dimensions"<<std::endl;
|
||||||
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
std::cout<<GridLogMessage << "===================================================================================================="<<std::endl;
|
||||||
|
@ -28,23 +28,26 @@ Author: Peter Boyle <paboyle@ph.ed.ac.uk>
|
|||||||
#include "Grid.h"
|
#include "Grid.h"
|
||||||
#include <mpi.h>
|
#include <mpi.h>
|
||||||
|
|
||||||
/// bloody mac os doesn't implement unnamed semaphores since it is "optional" posix.
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
/// Workarounds:
|
||||||
|
/// i) bloody mac os doesn't implement unnamed semaphores since it is "optional" posix.
|
||||||
/// darwin dispatch semaphores don't seem to be multiprocess.
|
/// darwin dispatch semaphores don't seem to be multiprocess.
|
||||||
//#ifdef __APPLE__
|
///
|
||||||
|
/// ii) openmpi under --mca shmem posix works with two squadrons per node;
|
||||||
|
/// openmpi under default mca settings (I think --mca shmem mmap) on MacOS makes two squadrons map the SAME
|
||||||
|
/// memory as each other, despite their living on different communicators. This appears to be a bug in OpenMPI.
|
||||||
|
///
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
#include <semaphore.h>
|
#include <semaphore.h>
|
||||||
typedef sem_t *Grid_semaphore;
|
typedef sem_t *Grid_semaphore;
|
||||||
#define SEM_INIT(S) S = sem_open(sem_name,0,00777,0);
|
|
||||||
#define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,00777,0);
|
#define SEM_INIT(S) S = sem_open(sem_name,0,0600,0); assert ( S != SEM_FAILED );
|
||||||
#define SEM_POST(S) sem_post(S);
|
#define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,0600,0); assert ( S != SEM_FAILED );
|
||||||
#define SEM_WAIT(S) sem_wait(S);
|
#define SEM_POST(S) assert ( sem_post(S) == 0 );
|
||||||
//#else
|
#define SEM_WAIT(S) assert ( sem_wait(S) == 0 );
|
||||||
//#include <semaphore.h>
|
|
||||||
//typedef sem_t Grid_semaphore;
|
#include <sys/mman.h>
|
||||||
//#define SEM_INIT(S)
|
|
||||||
//#define SEM_INIT_EXCL(S) sem_init(&S,0);
|
|
||||||
//#define SEM_POST(S) sem_post(&S);
|
|
||||||
//#define SEM_WAIT(S) sem_wait(&S);
|
|
||||||
//#endif
|
|
||||||
|
|
||||||
namespace Grid {
|
namespace Grid {
|
||||||
|
|
||||||
@ -63,8 +66,6 @@ const int pool = 48;
|
|||||||
|
|
||||||
class SlaveState {
|
class SlaveState {
|
||||||
public:
|
public:
|
||||||
Grid_semaphore sem_head;
|
|
||||||
Grid_semaphore sem_tail;
|
|
||||||
volatile int head;
|
volatile int head;
|
||||||
volatile int start;
|
volatile int start;
|
||||||
volatile int tail;
|
volatile int tail;
|
||||||
@ -73,6 +74,8 @@ public:
|
|||||||
|
|
||||||
class Slave {
|
class Slave {
|
||||||
public:
|
public:
|
||||||
|
Grid_semaphore sem_head;
|
||||||
|
Grid_semaphore sem_tail;
|
||||||
SlaveState *state;
|
SlaveState *state;
|
||||||
MPI_Comm squadron;
|
MPI_Comm squadron;
|
||||||
uint64_t base;
|
uint64_t base;
|
||||||
@ -87,33 +90,38 @@ public:
|
|||||||
void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank);
|
void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank);
|
||||||
|
|
||||||
void SemInit(void) {
|
void SemInit(void) {
|
||||||
sprintf(sem_name,"/Grid_mpi3_head_%d",universe_rank);
|
sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank);
|
||||||
SEM_INIT(state->sem_head);
|
printf("SEM_NAME: %s \n",sem_name);
|
||||||
sprintf(sem_name,"/Grid_mpi3_tail_%d",universe_rank);
|
SEM_INIT(sem_head);
|
||||||
SEM_INIT(state->sem_tail);
|
sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank);
|
||||||
|
printf("SEM_NAME: %s \n",sem_name);
|
||||||
|
SEM_INIT(sem_tail);
|
||||||
}
|
}
|
||||||
void SemInitExcl(void) {
|
void SemInitExcl(void) {
|
||||||
sprintf(sem_name,"/Grid_mpi3_head_%d",universe_rank);
|
sprintf(sem_name,"/Grid_mpi3_sem_head_%d",universe_rank);
|
||||||
SEM_INIT_EXCL(state->sem_head);
|
printf("SEM_INIT_EXCL: %s \n",sem_name);
|
||||||
sprintf(sem_name,"/Grid_mpi3_tail_%d",universe_rank);
|
SEM_INIT_EXCL(sem_head);
|
||||||
SEM_INIT_EXCL(state->sem_tail);
|
sprintf(sem_name,"/Grid_mpi3_sem_tail_%d",universe_rank);
|
||||||
|
printf("SEM_INIT_EXCL: %s \n",sem_name);
|
||||||
|
SEM_INIT_EXCL(sem_tail);
|
||||||
}
|
}
|
||||||
void WakeUpDMA(void) {
|
void WakeUpDMA(void) {
|
||||||
SEM_POST(state->sem_head);
|
SEM_POST(sem_head);
|
||||||
};
|
};
|
||||||
void WakeUpCompute(void) {
|
void WakeUpCompute(void) {
|
||||||
SEM_POST(state->sem_tail);
|
SEM_POST(sem_tail);
|
||||||
};
|
};
|
||||||
void WaitForCommand(void) {
|
void WaitForCommand(void) {
|
||||||
SEM_WAIT(state->sem_head);
|
SEM_WAIT(sem_head);
|
||||||
};
|
};
|
||||||
void WaitForComplete(void) {
|
void WaitForComplete(void) {
|
||||||
SEM_WAIT(state->sem_tail);
|
SEM_WAIT(sem_tail);
|
||||||
};
|
};
|
||||||
void EventLoop (void) {
|
void EventLoop (void) {
|
||||||
std::cout<< " Entering event loop "<<std::endl;
|
std::cout<< " Entering event loop "<<std::endl;
|
||||||
while(1){
|
while(1){
|
||||||
WaitForCommand();
|
WaitForCommand();
|
||||||
|
// std::cout << "Getting command "<<std::endl;
|
||||||
Event();
|
Event();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -123,9 +131,13 @@ public:
|
|||||||
uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ;
|
uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ;
|
||||||
|
|
||||||
void WaitAll() {
|
void WaitAll() {
|
||||||
|
// std::cout << "Queueing WAIT command "<<std::endl;
|
||||||
QueueCommand(COMMAND_WAITALL,0,0,0,squadron,0);
|
QueueCommand(COMMAND_WAITALL,0,0,0,squadron,0);
|
||||||
|
// std::cout << "Waking up DMA "<<std::endl;
|
||||||
WakeUpDMA();
|
WakeUpDMA();
|
||||||
|
// std::cout << "Waiting from semaphore "<<std::endl;
|
||||||
WaitForComplete();
|
WaitForComplete();
|
||||||
|
// std::cout << "Checking FIFO is empty "<<std::endl;
|
||||||
assert ( state->tail == state->head );
|
assert ( state->tail == state->head );
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -176,19 +188,25 @@ public:
|
|||||||
static void QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
static void QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
||||||
// std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<<rank <<std::endl;
|
// std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<<rank <<std::endl;
|
||||||
Slaves[slave].QueueCommand(COMMAND_ISEND,buf,bytes,tag,comm,rank);
|
Slaves[slave].QueueCommand(COMMAND_ISEND,buf,bytes,tag,comm,rank);
|
||||||
|
// std::cout << "Queued send command to rank "<< rank<< " via "<<slave <<std::endl;
|
||||||
Slaves[slave].WakeUpDMA();
|
Slaves[slave].WakeUpDMA();
|
||||||
|
// std::cout << "Waking up DMA "<< slave<<std::endl;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
static void QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
|
||||||
// std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank <<std::endl;
|
// std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank <<std::endl;
|
||||||
Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank);
|
Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank);
|
||||||
|
// std::cout << "Queued recv command from rank "<< rank<< " via "<<slave <<std::endl;
|
||||||
Slaves[slave].WakeUpDMA();
|
Slaves[slave].WakeUpDMA();
|
||||||
|
// std::cout << "Waking up DMA "<< slave<<std::endl;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void WaitAll() {
|
static void WaitAll() {
|
||||||
for(int s=1;s<VerticalSize;s++) {
|
for(int s=1;s<VerticalSize;s++) {
|
||||||
|
// std::cout << "Waiting for slave "<< s<<std::endl;
|
||||||
Slaves[s].WaitAll();
|
Slaves[s].WaitAll();
|
||||||
}
|
}
|
||||||
|
// std::cout << " Wait all Complete "<<std::endl;
|
||||||
};
|
};
|
||||||
|
|
||||||
static void GetWork(int nwork, int me, int & mywork, int & myoff,int units){
|
static void GetWork(int nwork, int me, int & mywork, int & myoff,int units){
|
||||||
@ -247,10 +265,9 @@ int MPIoffloadEngine::HorizontalRank;
|
|||||||
int MPIoffloadEngine::HorizontalSize;
|
int MPIoffloadEngine::HorizontalSize;
|
||||||
|
|
||||||
MPI_Comm MPIoffloadEngine::VerticalComm;
|
MPI_Comm MPIoffloadEngine::VerticalComm;
|
||||||
MPI_Win MPIoffloadEngine::VerticalWindow;
|
|
||||||
int MPIoffloadEngine::VerticalSize;
|
int MPIoffloadEngine::VerticalSize;
|
||||||
int MPIoffloadEngine::VerticalRank;
|
int MPIoffloadEngine::VerticalRank;
|
||||||
|
MPI_Win MPIoffloadEngine::VerticalWindow;
|
||||||
std::vector<void *> MPIoffloadEngine::VerticalShmBufs;
|
std::vector<void *> MPIoffloadEngine::VerticalShmBufs;
|
||||||
std::vector<std::vector<int> > MPIoffloadEngine::UniverseRanks;
|
std::vector<std::vector<int> > MPIoffloadEngine::UniverseRanks;
|
||||||
std::vector<int> MPIoffloadEngine::UserCommunicatorToWorldRanks;
|
std::vector<int> MPIoffloadEngine::UserCommunicatorToWorldRanks;
|
||||||
@ -274,8 +291,12 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
|
|||||||
/////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////
|
||||||
// Split into groups that can share memory (Verticals)
|
// Split into groups that can share memory (Verticals)
|
||||||
/////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////
|
||||||
// MPI_Comm_split_type(communicator_universe, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&VerticalComm);
|
#define MPI_SHARED_MEM_DEBUG
|
||||||
MPI_Comm_split(communicator_universe,(UniverseRank/2),UniverseRank,&VerticalComm);
|
#ifdef MPI_SHARED_MEM_DEBUG
|
||||||
|
MPI_Comm_split(communicator_universe,(UniverseRank/4),UniverseRank,&VerticalComm);
|
||||||
|
#else
|
||||||
|
MPI_Comm_split_type(communicator_universe, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,&VerticalComm);
|
||||||
|
#endif
|
||||||
MPI_Comm_rank(VerticalComm ,&VerticalRank);
|
MPI_Comm_rank(VerticalComm ,&VerticalRank);
|
||||||
MPI_Comm_size(VerticalComm ,&VerticalSize);
|
MPI_Comm_size(VerticalComm ,&VerticalSize);
|
||||||
|
|
||||||
@ -308,19 +329,83 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
|
|||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// allocate the shared window for our group, pass back Shm info to CartesianCommunicator
|
// allocate the shared window for our group, pass back Shm info to CartesianCommunicator
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
VerticalShmBufs.resize(VerticalSize);
|
||||||
|
|
||||||
|
#undef MPI_SHARED_MEM
|
||||||
|
#ifdef MPI_SHARED_MEM
|
||||||
ierr = MPI_Win_allocate_shared(CartesianCommunicator::MAX_MPI_SHM_BYTES,1,MPI_INFO_NULL,VerticalComm,&ShmCommBuf,&VerticalWindow);
|
ierr = MPI_Win_allocate_shared(CartesianCommunicator::MAX_MPI_SHM_BYTES,1,MPI_INFO_NULL,VerticalComm,&ShmCommBuf,&VerticalWindow);
|
||||||
ierr|= MPI_Win_lock_all (MPI_MODE_NOCHECK, VerticalWindow);
|
ierr|= MPI_Win_lock_all (MPI_MODE_NOCHECK, VerticalWindow);
|
||||||
assert(ierr==0);
|
assert(ierr==0);
|
||||||
|
// std::cout<<"SHM "<<ShmCommBuf<<std::endl;
|
||||||
|
|
||||||
std::cout<<"SHM "<<ShmCommBuf<<std::endl;
|
|
||||||
|
|
||||||
VerticalShmBufs.resize(VerticalSize);
|
|
||||||
for(int r=0;r<VerticalSize;r++){
|
for(int r=0;r<VerticalSize;r++){
|
||||||
MPI_Aint sz;
|
MPI_Aint sz;
|
||||||
int dsp_unit;
|
int dsp_unit;
|
||||||
MPI_Win_shared_query (VerticalWindow, r, &sz, &dsp_unit, &VerticalShmBufs[r]);
|
MPI_Win_shared_query (VerticalWindow, r, &sz, &dsp_unit, &VerticalShmBufs[r]);
|
||||||
std::cout<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
// std::cout<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
char shm_name [NAME_MAX];
|
||||||
|
MPI_Barrier(VerticalComm);
|
||||||
|
|
||||||
|
if ( VerticalRank == 0 ) {
|
||||||
|
for(int r=0;r<VerticalSize;r++){
|
||||||
|
|
||||||
|
size_t size = CartesianCommunicator::MAX_MPI_SHM_BYTES;
|
||||||
|
if ( r>0 ) size = sizeof(SlaveState);
|
||||||
|
|
||||||
|
sprintf(shm_name,"/Grid_mpi3_shm_%d_%d",WorldRank,r);
|
||||||
|
|
||||||
|
shm_unlink(shm_name);
|
||||||
|
|
||||||
|
int fd=shm_open(shm_name,O_RDWR|O_CREAT,0600);
|
||||||
|
if ( fd < 0 ) {
|
||||||
|
perror("failed shm_open");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
ftruncate(fd, size);
|
||||||
|
|
||||||
|
VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||||
|
|
||||||
|
if ( VerticalShmBufs[r] == MAP_FAILED ) {
|
||||||
|
perror("failed mmap");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t * check = (uint64_t *) VerticalShmBufs[r];
|
||||||
|
check[0] = WorldRank;
|
||||||
|
check[1] = r;
|
||||||
|
|
||||||
|
// std::cout<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MPI_Barrier(VerticalComm);
|
||||||
|
|
||||||
|
if ( VerticalRank != 0 ) {
|
||||||
|
for(int r=0;r<VerticalSize;r++){
|
||||||
|
|
||||||
|
size_t size = CartesianCommunicator::MAX_MPI_SHM_BYTES ;
|
||||||
|
if ( r>0 ) size = sizeof(SlaveState);
|
||||||
|
|
||||||
|
sprintf(shm_name,"/Grid_mpi3_shm_%d_%d",WorldRank,r);
|
||||||
|
|
||||||
|
int fd=shm_open(shm_name,O_RDWR|O_CREAT,0600);
|
||||||
|
if ( fd<0 ) {
|
||||||
|
perror("failed shm_open");
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
VerticalShmBufs[r] = mmap(NULL,size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||||
|
|
||||||
|
uint64_t * check = (uint64_t *) VerticalShmBufs[r];
|
||||||
|
assert(check[0]== WorldRank);
|
||||||
|
assert(check[1]== r);
|
||||||
|
std::cerr<<"SHM "<<r<<" " <<VerticalShmBufs[r]<<std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
MPI_Barrier(VerticalComm);
|
||||||
|
|
||||||
//////////////////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////////////////
|
||||||
// Map rank of leader on node in their in new world, to the
|
// Map rank of leader on node in their in new world, to the
|
||||||
|
Loading…
x
Reference in New Issue
Block a user