1
0
mirror of https://github.com/paboyle/Grid.git synced 2025-04-27 22:25:56 +01:00

Semaphore sleep/wake up on remote processes.

This commit is contained in:
paboyle 2016-11-02 09:27:20 +00:00
parent bb94ddd0eb
commit 32375aca65

View File

@ -27,7 +27,24 @@ Author: Peter Boyle <paboyle@ph.ed.ac.uk>
/* END LEGAL */ /* END LEGAL */
#include "Grid.h" #include "Grid.h"
#include <mpi.h> #include <mpi.h>
/// bloody mac os doesn't implement unnamed semaphores since it is "optional" posix.
/// darwin dispatch semaphores don't seem to be multiprocess.
//#ifdef __APPLE__
#include <semaphore.h> #include <semaphore.h>
typedef sem_t *Grid_semaphore;
#define SEM_INIT(S) S = sem_open(sem_name,0,00777,0);
#define SEM_INIT_EXCL(S) sem_unlink(sem_name); S = sem_open(sem_name,O_CREAT|O_EXCL,00777,0);
#define SEM_POST(S) sem_post(S);
#define SEM_WAIT(S) sem_wait(S);
//#else
//#include <semaphore.h>
//typedef sem_t Grid_semaphore;
//#define SEM_INIT(S)
//#define SEM_INIT_EXCL(S) sem_init(&S,0);
//#define SEM_POST(S) sem_post(&S);
//#define SEM_WAIT(S) sem_wait(&S);
//#endif
namespace Grid { namespace Grid {
@ -46,7 +63,8 @@ const int pool = 48;
class SlaveState { class SlaveState {
public: public:
sem_t sem; Grid_semaphore sem_head;
Grid_semaphore sem_tail;
volatile int head; volatile int head;
volatile int start; volatile int start;
volatile int tail; volatile int tail;
@ -60,6 +78,7 @@ public:
uint64_t base; uint64_t base;
int universe_rank; int universe_rank;
int vertical_rank; int vertical_rank;
char sem_name [NAME_MAX];
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// Descriptor circular pointers // Descriptor circular pointers
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
@ -67,11 +86,35 @@ public:
void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank); void Init(SlaveState * _state,MPI_Comm _squadron,int _universe_rank,int _vertical_rank);
void SemInit(void) {
sprintf(sem_name,"/Grid_mpi3_head_%d",universe_rank);
SEM_INIT(state->sem_head);
sprintf(sem_name,"/Grid_mpi3_tail_%d",universe_rank);
SEM_INIT(state->sem_tail);
}
void SemInitExcl(void) {
sprintf(sem_name,"/Grid_mpi3_head_%d",universe_rank);
SEM_INIT_EXCL(state->sem_head);
sprintf(sem_name,"/Grid_mpi3_tail_%d",universe_rank);
SEM_INIT_EXCL(state->sem_tail);
}
void WakeUpDMA(void) {
SEM_POST(state->sem_head);
};
void WakeUpCompute(void) {
SEM_POST(state->sem_tail);
};
void WaitForCommand(void) {
SEM_WAIT(state->sem_head);
};
void WaitForComplete(void) {
SEM_WAIT(state->sem_tail);
};
void EventLoop (void) { void EventLoop (void) {
std::cout<< " Entering event loop "<<std::endl; std::cout<< " Entering event loop "<<std::endl;
while(1){ while(1){
WaitForCommand();
Event(); Event();
MPI_Barrier(squadron);
} }
} }
@ -79,11 +122,11 @@ public:
uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ; uint64_t QueueCommand(int command,void *buf, int bytes, int hashtag, MPI_Comm comm,int u_rank) ;
void QueueWaitAll() {
QueueCommand(COMMAND_WAITALL,0,0,0,squadron,0);
}
void WaitAll() { void WaitAll() {
while ( state->tail != state->head ); QueueCommand(COMMAND_WAITALL,0,0,0,squadron,0);
WakeUpDMA();
WaitForComplete();
assert ( state->tail == state->head );
} }
}; };
@ -130,25 +173,22 @@ public:
// routines for master proc must handle any communicator // routines for master proc must handle any communicator
///////////////////////////////////////////////////////// /////////////////////////////////////////////////////////
static uint64_t QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) { static void QueueSend(int slave,void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
// std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<<rank <<std::endl; // std::cout<< " Queueing send "<< bytes<< " slave "<< slave << " to comm "<<rank <<std::endl;
return Slaves[slave].QueueCommand(COMMAND_ISEND,buf,bytes,tag,comm,rank); Slaves[slave].QueueCommand(COMMAND_ISEND,buf,bytes,tag,comm,rank);
Slaves[slave].WakeUpDMA();
}; };
static uint64_t QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) { static void QueueRecv(int slave, void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
// std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank <<std::endl; // std::cout<< " Queueing recv "<< bytes<< " slave "<< slave << " from comm "<<rank <<std::endl;
return Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank); Slaves[slave].QueueCommand(COMMAND_IRECV,buf,bytes,tag,comm,rank);
Slaves[slave].WakeUpDMA();
}; };
static void WaitAll() { static void WaitAll() {
for(int s=1;s<VerticalSize;s++) {
Slaves[s].QueueWaitAll();
}
MPI_Barrier(VerticalComm);
for(int s=1;s<VerticalSize;s++) { for(int s=1;s<VerticalSize;s++) {
Slaves[s].WaitAll(); Slaves[s].WaitAll();
} }
MPI_Barrier(VerticalComm);
}; };
static void GetWork(int nwork, int me, int & mywork, int & myoff,int units){ static void GetWork(int nwork, int me, int & mywork, int & myoff,int units){
@ -173,7 +213,6 @@ public:
GetWork(bytes,s,mywork,myoff,procs); GetWork(bytes,s,mywork,myoff,procs);
QueueSend(s+1,&cbuf[myoff],mywork,tag,comm,rank); QueueSend(s+1,&cbuf[myoff],mywork,tag,comm,rank);
} }
MPI_Barrier(VerticalComm);
}; };
static void QueueMultiplexedRecv(void *buf, int bytes, int tag, MPI_Comm comm,int rank) { static void QueueMultiplexedRecv(void *buf, int bytes, int tag, MPI_Comm comm,int rank) {
@ -184,7 +223,6 @@ public:
GetWork(bytes,s,mywork,myoff,procs); GetWork(bytes,s,mywork,myoff,procs);
QueueRecv(s+1,&cbuf[myoff],mywork,tag,comm,rank); QueueRecv(s+1,&cbuf[myoff],mywork,tag,comm,rank);
} }
MPI_Barrier(VerticalComm);
}; };
}; };
@ -299,6 +337,8 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
if ( VerticalRank != 0 ) { if ( VerticalRank != 0 ) {
Slave indentured; Slave indentured;
indentured.Init( (SlaveState *) VerticalShmBufs[VerticalRank], VerticalComm, UniverseRank,VerticalRank); indentured.Init( (SlaveState *) VerticalShmBufs[VerticalRank], VerticalComm, UniverseRank,VerticalRank);
indentured.SemInitExcl();// init semaphore in shared memory
MPI_Barrier(VerticalComm);
MPI_Barrier(VerticalComm); MPI_Barrier(VerticalComm);
indentured.EventLoop(); indentured.EventLoop();
assert(0); assert(0);
@ -308,6 +348,10 @@ void MPIoffloadEngine::CommunicatorInit (MPI_Comm &communicator_world,
Slaves[i].Init((SlaveState *)VerticalShmBufs[i],VerticalComm, UniverseRanks[HorizontalRank][i],i); Slaves[i].Init((SlaveState *)VerticalShmBufs[i],VerticalComm, UniverseRanks[HorizontalRank][i],i);
} }
MPI_Barrier(VerticalComm); MPI_Barrier(VerticalComm);
for(int i=1;i<VerticalSize;i++){
Slaves[i].SemInit();// init semaphore in shared memory
}
MPI_Barrier(VerticalComm);
} }
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////
@ -464,7 +508,9 @@ int Slave::Event (void) {
s=PERI_PLUS(s); s=PERI_PLUS(s);
state->start = s; state->start = s;
state->tail = s; state->tail = s;
MPI_Barrier(squadron);
WakeUpCompute();
return 1; return 1;
break; break;