Added support for MPI failure injection and recovery. It mimics the work done for...
authorEsteban Meneses <emenese2@illinois.edu>
Mon, 3 Sep 2012 15:11:36 +0000 (10:11 -0500)
committerEsteban Meneses <emenese2@illinois.edu>
Mon, 3 Sep 2012 15:11:36 +0000 (10:11 -0500)
src/ck-core/ckmessagelogging.C

index dddf6ec967df70daad137f66086588a2b0dee4fc..efdf53c465b1a19557be486348155580936ba92a 100644 (file)
@@ -158,6 +158,22 @@ double lastRestart=0;
 //update location globals
 int _receiveLocationHandlerIdx;
 
+#if CMK_CONVERSE_MPI
+static int heartBeatHandlerIdx;
+static int heartBeatCheckHandlerIdx;
+static int partnerFailureHandlerIdx;
+static double lastPingTime = -1;
+
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe);
+
+void heartBeatPartner();
+void heartBeatHandler(void *msg);
+void heartBeatCheckHandler();
+void partnerFailureHandler(char *msg);
+int getReverseCheckPointPE();
+#endif
+
 /***** *****/
 
 /** 
@@ -199,6 +215,13 @@ void _messageLoggingInit(){
        
        //handlers for updating locations
        _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
+
+       // handlers for failure detection in MPI layer
+#if CMK_CONVERSE_MPI
+       heartBeatHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatHandler);
+       heartBeatCheckHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatCheckHandler);
+       partnerFailureHandlerIdx = CkRegisterHandler((CmiHandler)partnerFailureHandler);
+#endif
        
        //Cpv variables for message logging
        CpvInitialize(Queue, _outOfOrderMessageQueue);
@@ -248,6 +271,14 @@ void _messageLoggingInit(){
        lastCompletedAlarm=CmiWallTimer();
        lastRestart = CmiWallTimer();
 
+       // fault detection for MPI layer
+#if CMK_CONVERSE_MPI
+  void heartBeatPartner();
+  void heartBeatCheckHandler();
+  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
+  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
+#endif
+
 #if COLLECT_STATS_MSGS
 #if COLLECT_STATS_MSGS_TOTAL
        totalMsgsTarget = 0;
@@ -269,6 +300,72 @@ void _messageLoggingInit(){
 
 }
 
+#if CMK_CONVERSE_MPI
+
+/**
+ * Receives the notification of a failure and updates pe-to-rank mapping.
+ */
+void partnerFailureHandler(char *msg)
+{
+   int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+
+   // send message to crash pe to let it restart
+   int newrank = find_spare_mpirank(diepe);
+   int buddy = getCheckPointPE();
+   if (buddy == diepe)  {
+     mpi_restart_crashed(diepe, newrank);
+     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
+   }
+}
+
+/**
+ * Registers last time it knew about the PE that checkpoints on it.
+ */
+void heartBeatHandler(void *msg)
+{
+       lastPingTime = CmiWallTimer();
+       CmiFree(msg);
+}
+
+/**
+ * Checks whether the PE that checkpoints on it is still alive.
+ */
+void heartBeatCheckHandler()
+{
+       double now = CmiWallTimer();
+       if (lastPingTime > 0 && now - lastPingTime > 4) {
+               int i, pe, buddy;
+               // tell everyone that PE is down
+               buddy = getReverseCheckPointPE();
+               CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
+               
+               for (int pe = 0; pe < CmiNumPes(); pe++) {
+                       if (pe == buddy) continue;
+                       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+                       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
+                       CmiSetHandler(msg, partnerFailureHandlerIdx);
+                       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+               }
+       }
+       else 
+               CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
+}
+
+/**
+ * Pings buddy to let it know this PE is alive. Used for failure detection.
+ */
+void heartBeatPartner()
+{
+       int buddy = getCheckPointPE();
+       //printf("[%d] heartBeatPartner %d\n", CmiMyPe(), buddy);
+       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+       *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
+       CmiSetHandler(msg, heartBeatHandlerIdx);
+       CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+       CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
+}
+#endif
+
 void killLocal(void *_dummy,double curWallTime);       
 
 void readKillFile(){
@@ -312,11 +409,26 @@ void killLocal(void *_dummy,double curWallTime){
        printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
        if(CmiWallTimer()<killTime-1){
                CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
-       }else{  
+       }else{
+#if CMK_CONVERSE_MPI
+               CkDieNow();
+#else
                kill(getpid(),SIGKILL);
+#endif
        }
 }
 
+#if ! CMK_CONVERSE_MPI
+void CkDieNow()
+{
+       // kill -9 for non-mpi version
+       CmiPrintf("[%d] die now.\n", CmiMyPe());
+       killTime = CmiWallTimer()+0.001;
+       CcdCallFnAfter(killLocal,NULL,1);
+}
+#endif
+
+
 /*** Auxiliary Functions ***/
 
 /************************ Message logging methods ****************/
@@ -2277,11 +2389,19 @@ void ChareMlogData::pup(PUP::er &p){
  * Getting the pe number of the current processor's buddy.
  * In the team-based approach each processor might checkpoint in the next team, but currently
  * teams are only meant to reduce memory overhead.
+ * Note: function getReverseCheckPointPE performs the reverse map. It must be changed accordingly.
  */
 int getCheckPointPE(){
        return (CmiMyPe() + 1) % CmiNumPes();
 }
 
+/**
+ * Getting the pe that checkpoints on this pe.
+ */
+int getReverseCheckPointPE(){
+       return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
+}
+
 //assume it is a packed envelope
 envelope *copyEnvelope(envelope *env){
        envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());