Changes to handle failures during checkpointing 15/415/4
authorXiang Ni <xiangni2@illinois.edu>
Tue, 28 Oct 2014 04:24:02 +0000 (23:24 -0500)
committerGerrit Code Review <gerrit2@charm.cs.uiuc.edu>
Tue, 4 Nov 2014 17:32:07 +0000 (11:32 -0600)
This commit enables Charm++ RTS to handle failures that occur during checkpointing: this is achieved by keeping the previous checkpoint around till the new checkpoint is generated on all processors. CmiReduce is used to find the last safe checkpoint on all processors when failures happen.

Change-Id: Ib83de2c5d66b6257809d365b1c617298bb89f9c3

src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.h
src/conv-core/convcore.c
src/conv-core/converse.h

index 318df2758869f406281d5d212d6d02447ee677c6..8de0b313186701e86cc8813bf0aa010dad44d549 100644 (file)
@@ -125,7 +125,11 @@ double killTime=0.0;
  }
 
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
-CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
+//make procChkptBuf an array of two to store both previous and current checkpoint
+CpvDeclare(CkProcCheckPTMessage**, procChkptBuf);
+//point to the checkpoint should be used for recovery
+CpvDeclare(int, chkpPointer);
+CpvDeclare(int, chkpNum);
 
 // compute the backup processor
 // FIXME: avoid crashed processors
@@ -341,6 +345,7 @@ CkMemCheckPT::CkMemCheckPT(int w)
   }
   inRestarting = 0;
   recvCount = peCount = 0;
+  recvChkpCount = 0;
   ackCount = 0;
   expectCount = -1;
   where = w;
@@ -384,7 +389,8 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|peCount;
   if (p.isUnpacking()) {
        recvCount = peCount = 0;
-       ackCount = 0;
+       recvChkpCount = 0;
+       ackCount = 0;
        expectCount = -1;
         inCheckpointing = 0;
 #if CMK_CONVERSE_MPI
@@ -584,7 +590,7 @@ void CkMemCheckPT::startArrayCheckpoint(){
                pupAllElements(p);
        }
        thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
-       chkpTable[0].updateBuffer(msg);
+       chkpTable[0].updateBuffer(CpvAccess(chkpPointer)^1,msg);
         recvCount++;
 #endif
 }
@@ -596,10 +602,26 @@ void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
        if(msg->bud1 == CkMyPe()){
                idx = 0;
        }
+       
        int isChkpting = msg->cp_flag;
-       chkpTable[idx].updateBuffer(msg);
+       int pointer;
+       if(isChkpting)
+         pointer = CpvAccess(chkpPointer)^1;
+       else
+         pointer = CpvAccess(chkpPointer);
+
+       chkpTable[idx].updateBuffer(pointer,msg);
+       
        if(isChkpting){
                recvCount++;
+  
+               recvChkpCount++;
+               if(recvChkpCount==2)
+               {
+                 CpvAccess(chkpNum)++;
+                 recvChkpCount=0;
+               }
+
                if(recvCount == 2){
                  if (where == CkCheckPoint_inMEM) {
                        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
@@ -662,9 +684,18 @@ void CkMemCheckPT::sendProcData()
 
 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
 {
-  if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
-  CpvAccess(procChkptBuf) = msg;
+  int pointer = CpvAccess(chkpPointer)^1;
+  if (CpvAccess(procChkptBuf)[pointer]) delete CpvAccess(procChkptBuf)[pointer];
+  CpvAccess(procChkptBuf)[pointer] = msg;
   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
+  
+  recvChkpCount++;
+  if(recvChkpCount==2)
+  {
+    CpvAccess(chkpNum)++;
+    recvChkpCount=0;
+  }
+
   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
 }
 
@@ -740,11 +771,11 @@ void CkMemCheckPT::report()
     CmiAssert(entry);
     objsize += entry->getSize();
   }
-  CmiAssert(CpvAccess(procChkptBuf));
-  //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
 #else
+  //this checkpoint has finished, update the pointer
+  CpvAccess(chkpPointer) = CpvAccess(chkpPointer)^1;
   if(CkMyPe()==0)
-  CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
+  CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)[CpvAccess(chkpPointer)]->len);
 #endif
 }
 
@@ -967,7 +998,7 @@ void CkMemCheckPT::recoverBuddies()
 #else
       int budPe = thisFailedPe;
 #endif
-      CkArrayCheckPTMessage *msg = chkpTable[1].getCopy();
+      CkArrayCheckPTMessage *msg = chkpTable[1].getCopy(CpvAccess(chkpPointer));
       CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
          msg->cp_flag = 0;            // not checkpointing
       msg->bud1 = budPe;
@@ -1063,7 +1094,7 @@ void CkMemCheckPT::recoverArrayElements()
     count ++;
   }
 #else
-       CkArrayCheckPTMessage * msg = chkpTable[0].getCopy();
+       CkArrayCheckPTMessage * msg = chkpTable[0].getCopy(CpvAccess(chkpPointer));
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
        recoverAll(msg,gmap,imap);
 #else
@@ -1146,7 +1177,8 @@ void CkMemCheckPT::finishUp()
 {
   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
   //CKLOCMGR_LOOP(mgr->doneInserting(););
-  
+  recvCount = peCount = 0;
+  recvChkpCount = 0;
   if (CkMyPe() == thisFailedPe)
   {
        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
@@ -1242,6 +1274,8 @@ static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
 static int notifyHandlerIdx;
+static int reportChkpSeqHandlerIdx;
+static int getChkpSeqHandlerIdx;
 
 // called on crashed PE
 static void restartBeginHandler(char *msg)
@@ -1277,12 +1311,29 @@ static void * doNothingMsg(int * size, void * data, void ** remote, int count){
        return data;
 }
 
+static void * minChkpNumMsg(int * size, void * data, void ** remote, int count)
+{
+  int minNum = *(int *)((char *)data+CmiMsgHeaderSizeBytes);
+  for(int i = 0; i < count;i++)
+  {
+    int num = *(int *)((char *)(remote[i])+CmiMsgHeaderSizeBytes);
+    if(num != -1 && num < minNum)
+    {
+      minNum = num;
+    }
+  }
+  *(int *)((char *)data+CmiMsgHeaderSizeBytes) = minNum;
+  return data;
+}
+
 static void restartBcastHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
   // advance phase counter
   CkMemCheckPT::inRestarting = 1;
   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
+  CpvAccess(chkpNum) = *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int));
+  CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
 
   if (CkMyPe()==_diePE)
     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
@@ -1315,6 +1366,8 @@ static void recoverProcDataHandler(char *msg)
    envelope *env = (envelope *)msg;
    CkUnpackMessage(&env);
    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
+   CpvAccess(chkpNum) = procMsg->pointer;
+   CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
    PUP::fromMem p(procMsg->packData);
@@ -1324,10 +1377,11 @@ static void recoverProcDataHandler(char *msg)
    // gzheng
    CKLOCMGR_LOOP(mgr->startInserting(););
 
-   char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+   char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
+   *(int *)(reqmsg+CmiMsgHeaderSizeBytes+sizeof(int)) = CpvAccess(chkpNum);
    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
-   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
+   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)reqmsg);
 
    _initDone();
 //   CpvAccess(_qd)->flushStates();
@@ -1341,20 +1395,24 @@ static void askProcDataHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+    CpvAccess(chkpNum) = *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int));
+    CmiFree(msg);
+    int pointer = CpvAccess(chkpNum)%2;
+    CpvAccess(chkpPointer) = pointer;
     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
-    if (CpvAccess(procChkptBuf) == NULL)  {
+    if (CpvAccess(procChkptBuf)[pointer] == NULL)  {
       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
       CkAbort("no checkpoint found");
     }
-    envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
-    CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
-
-    CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
+    CpvAccess(procChkptBuf)[pointer]->pointer = CpvAccess(chkpNum);
+    envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)[pointer]));
+    CmiAssert(CpvAccess(procChkptBuf)[pointer]->pe == diePe);
 
+    CpvAccess(procChkptBuf)[pointer]->cur_restart_phase = CpvAccess(_curRestartPhase);
     CkPackMessage(&env);
     CmiSetHandler(env, recoverProcDataHandlerIdx);
-    CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
-    CpvAccess(procChkptBuf) = NULL;
+    CmiSyncSendAndFree(CpvAccess(procChkptBuf)[pointer]->pe, env->getTotalsize(), (char *)env);
+    CpvAccess(procChkptBuf)[pointer] = NULL;
     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
 #endif
 }
@@ -1364,22 +1422,50 @@ void qd_callback(void *m)
 {
    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
    CkFreeMsg(m);
+   //broadcast to collect the checkpoint number
+   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+   CmiSetHandler(msg,reportChkpSeqHandlerIdx);
+   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
+}
+
+static void reportChkpSeqHandler(char * m)
+{
+  CmiFree(m);
+  CmiResetGlobalReduceSeqID();
+  char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+  int num = CpvAccess(chkpNum);
+  if(CkMyNode() == CpvAccess(_crashedNode))
+  {
+    num = -1;
+  }
+  *(int *)(msg+CmiMsgHeaderSizeBytes) = num;
+  CmiSetHandler(msg,getChkpSeqHandlerIdx);
+  CmiReduce(msg,CmiMsgHeaderSizeBytes+sizeof(int),minChkpNumMsg);
+}
+
+static void getChkpSeqHandler(char * m)
+{
+  CpvAccess(chkpNum) = *(int *)(m+CmiMsgHeaderSizeBytes);
+  CpvAccess(chkpPointer) = CpvAccess(chkpNum)%2;
+  CmiFree(m);
 #ifdef CMK_SMP
    for(int i=0;i<CmiMyNodeSize();i++){
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
-       CmiSetHandler(msg, askProcDataHandlerIdx);
-       int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
-       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
+    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
+    *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int)) =CpvAccess(chkpNum);
+    CmiSetHandler(msg, askProcDataHandlerIdx);
+    int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
+    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)msg);
    }
    return;
 #endif
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int)*2);
    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+    *(int *)(msg+CmiMsgHeaderSizeBytes+sizeof(int)) =CpvAccess(chkpNum);
    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
    CmiSetHandler(msg, askProcDataHandlerIdx);
    int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int)*2, (char *)msg);
 }
 
 static void changePhaseHandler(char *msg){
@@ -1416,12 +1502,15 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
     CmiSetHandler(msg, changePhaseHandlerIdx);
     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
   }else{  
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+   /*char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
    CmiSetHandler(msg, askProcDataHandlerIdx);
    int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);*/
+   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+   CmiSetHandler(msg,reportChkpSeqHandlerIdx);
+   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
   }
 #else
    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
@@ -1669,6 +1758,8 @@ void CkRegisterRestartHandler( )
   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+  reportChkpSeqHandlerIdx = CkRegisterHandler((CmiHandler)reportChkpSeqHandler);
+  getChkpSeqHandlerIdx = CkRegisterHandler((CmiHandler)getChkpSeqHandler);
 
 #if CMK_CONVERSE_MPI
   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
@@ -1679,8 +1770,15 @@ void CkRegisterRestartHandler( )
 #endif
   changePhaseHandlerIdx = CkRegisterHandler((CmiHandler)changePhaseHandler);
 
-  CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
-  CpvAccess(procChkptBuf) = NULL;
+  CpvInitialize(CkProcCheckPTMessage **, procChkptBuf);
+  CpvAccess(procChkptBuf) = new CkProcCheckPTMessage *[2];
+  CpvAccess(procChkptBuf)[0] = NULL;
+  CpvAccess(procChkptBuf)[1] = NULL;
+
+  CpvInitialize(int, chkpPointer);
+  CpvAccess(chkpPointer) = 0;
+  CpvInitialize(int, chkpNum);
+  CpvAccess(chkpNum) = 0;
 
   notify_crash_fn = notify_crash;
 
index 26888fb17573acd3189dc984bba68c94efc66a53..6533002baafbb13601bdc883678d3507db2c2a03 100644 (file)
@@ -28,6 +28,7 @@ public:
        int failedpe;
        int cur_restart_phase;
        int len;
+       int pointer;
        char *packData;
 };
 
@@ -55,14 +56,16 @@ public:
 #define CkCheckPoint_inDISK  2
 
 class CkCheckPTEntry{
-  CkArrayCheckPTMessage *data;
+  CkArrayCheckPTMessage **data;
   char * fname;
 public:
   int bud1, bud2;
   int where;
   void init(int _where, int idx)
   {
-    data = NULL;
+    data = new CkArrayCheckPTMessage*[2];
+    data[0] = NULL;
+    data[1] = NULL;
     where = _where;
     if(where == CkCheckPoint_inDISK)
     {
@@ -83,13 +86,13 @@ public:
     }
   }
 
-  void updateBuffer(CkArrayCheckPTMessage * msg)
+  void updateBuffer(int pointer, CkArrayCheckPTMessage * msg)
   {
     if(where == CkCheckPoint_inDISK)
     {
       envelope *env = UsrToEnv(msg);
       CkUnpackMessage(&env);
-      data = (CkArrayCheckPTMessage *)EnvToUsr(env);
+      data[pointer] = (CkArrayCheckPTMessage *)EnvToUsr(env);
       FILE *f = fopen(fname,"wb");
       PUP::toDisk p(f);
       CkPupMessage(p, (void **)&msg);
@@ -103,14 +106,14 @@ public:
     {
       CmiAssert(where == CkCheckPoint_inMEM);
       CmiAssert(msg!=NULL);
-      if (data) delete data;
-      data = msg;
+      delete data[pointer];
+      data[pointer] = msg;
       bud1 = msg->bud1;
       bud2 = msg->bud2;
     }
   }
   
-  CkArrayCheckPTMessage * getCopy()
+  CkArrayCheckPTMessage * getCopy(int pointer)
   {
     if(where == CkCheckPoint_inDISK)
     {
@@ -125,11 +128,11 @@ public:
     }else
     {
       CmiAssert(where == CkCheckPoint_inMEM);
-      if (data == NULL) {
+      if (data[pointer] == NULL) {
         CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
         CmiAbort("Abort!");
       }
-      return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&data);
+      return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&data[pointer]);
     }
   }
 };
@@ -175,13 +178,15 @@ public:
   static int inLoadbalancing;
   static double startTime;
   static char*  stage;
+
 private:
   CkVec<CkCheckPTInfo *> ckTable;
   CkCheckPTEntry chkpTable[2];
 
   int recvCount, peCount;
   int expectCount, ackCount;
-    /// the processor who initiate the checkpointing
+  int recvChkpCount;//expect to receive both the processor checkpoint and array checkpoint from buddy PE
+  /// the processor who initiate the checkpointing
   int cpStarter;
   CkVec<int> failedPes;
   int thisFailedPe;
index 59b7bf4c938f905631acef9c50421ae22b60c4dd..192b2e6218f9cdb290fee3636937a6c9b5d6e13e 100644 (file)
@@ -2389,11 +2389,9 @@ void *CmiReduceMergeFn_random(int *size, void *data, void** remote, int n) {
   return data;
 }
 
-#if CMK_MESSAGE_LOGGING
 void CmiResetGlobalReduceSeqID(){
        CpvAccess(_reduce_seqID_global) = 0;
 }
-#endif
 
 static void CmiGlobalReduce(void *msg, int size, CmiReduceMergeFn mergeFn, CmiReduction *red) {
   CmiAssert(red->localContributed == 0);
index b3674d5a6226d6f811d5a0d1ccea35fe60878b17..8eec8ccff6be975a13efe31b33f18089938c255e 100644 (file)
@@ -1185,9 +1185,7 @@ CmiReductionID CmiGetGlobalReduction();
 CmiReductionID CmiGetDynamicReduction();
 void CmiGetDynamicReductionRemote(int handlerIdx, int pe, int dataSize, void *data);
 
-#if CMK_MESSAGE_LOGGING
 void CmiResetGlobalReduceSeqID();
-#endif
 
 /* If the second parameter (the number of chunks to send) is negative, then
  * every message will be started aligned with 8 bytes, and a message header will