demonstrate inmem checkpoint restart on MPI layer:
authorGengbin Zheng <gzheng@illinois.edu>
Fri, 28 Oct 2011 22:04:08 +0000 (17:04 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Fri, 28 Oct 2011 22:04:08 +0000 (17:04 -0500)
1. CkDieNow()  to fake die a processor
2. +wp <numpe> to specify the number of charm processors,  +p fires with spared processors
3. a spared processor will be activated to replace the crashed one.

src/arch/mpi/machine.c
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.h
src/conv-core/conv-conds.c
src/conv-core/converse.h

index c3b632933486ae153d82aa0bc7e187d41c61e9d7..184064c1184c44c19c9fa199028d846996e8cc74 100644 (file)
@@ -204,6 +204,14 @@ static CmiNodeLock  sendMsgBufLock = NULL;        /* for sendMsgBuf */
 #endif
 /* =====End of Declarations of Machine Specific Variables===== */
 
+#if CMK_MEM_CHECKPOINT
+#define FAIL_TAG   1200
+int num_workpes, total_pes;
+int *petorank = NULL;
+int *ranktope = NULL;
+int  nextrank;
+void mpi_end_crashed();
+#endif
 
 /* =====Beginning of Declarations of Machine Specific Functions===== */
 /* Utility functions */
@@ -321,7 +329,12 @@ static CmiCommHandle MPISendOneMsg(SMSG_LIST *smsg) {
     }
 #else
     START_EVENT();
-    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,node,TAG,MPI_COMM_WORLD,&(smsg->req)))
+#if CMK_MEM_CHECKPOINT
+    int dstrank = petorank[node];
+#else
+    int dstrank = node;
+#endif
+    if (MPI_SUCCESS != MPI_Isend((void *)msg,size,MPI_BYTE,dstrank,TAG,MPI_COMM_WORLD,&(smsg->req)))
         CmiAbort("MPISendOneMsg: MPI_Isend failed!\n");
     /*END_EVENT(40);*/
 #endif
@@ -929,6 +942,9 @@ void LrtsExit() {
 
 #if ! CMK_AUTOBUILD
     signal(SIGINT, signal_int);
+#if CMK_MEM_CHECKPOINT
+    if (CmiMyPe() == 1) mpi_end_crashed();
+#endif
     MPI_Finalize();
 #endif
     exit(0);
@@ -1061,6 +1077,75 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
         printf("Charm++> Running on MPI version: %d.%d multi-thread support: %s (max supported: %s)\n", ver, subver, thread_level_tostring(thread_level), thread_level_tostring(provided));
     }
 
+    {
+        int debug = CmiGetArgFlag(largv,"++debug");
+        int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
+        if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
+#if CMK_HAS_GETPID
+            printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
+            fflush(stdout);
+            if (!debug_no_pause)
+                sleep(15);
+#else
+            printf("++debug ignored.\n");
+#endif
+        }
+    }
+
+
+#if CMK_MEM_CHECKPOINT
+    if (CmiGetArgInt(largv,"+wp",&num_workpes)) {
+       CmiAssert(num_workpes <= *numNodes);
+       total_pes = *numNodes;
+       *numNodes = num_workpes;
+    }
+    else
+       num_workpes = *numNodes;
+    petorank = (int *)malloc(sizeof(int) * num_workpes);
+    ranktope = (int *)malloc(sizeof(int) * num_workpes);
+    for (i=0; i<num_workpes; i++)  petorank[i] = ranktope[i] = i;
+    nextrank = num_workpes;
+
+    char msg[1];
+    MPI_Status sts;
+
+    if (*myNodeID >= num_workpes) {
+      int vals[2];
+      MPI_Recv(vals,2,MPI_INT,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
+      int newpe = vals[0];
+      CpvAccess(_curRestartPhase) = vals[1];
+
+      ranktope[*myNodeID] = newpe;
+      petorank[newpe] = *myNodeID;
+      *myNodeID = newpe;
+      myNID = newpe;
+
+      // set identity
+      // add +restartaftercrash to argv
+      char *phase_str;
+      char **restart_argv;
+      int i=0;
+      while(largv[i]!= NULL) i++;
+      restart_argv = (char **)malloc(sizeof(char *)*(i+3));
+      i=0;
+      while(largv[i]!= NULL){
+                restart_argv[i] = largv[i];
+                i++;
+      }
+      restart_argv[i] = "+restartaftercrash";
+      phase_str = (char*)malloc(10);
+      sprintf(phase_str,"%d", CpvAccess(_curRestartPhase));
+      restart_argv[i+1]=phase_str;
+      restart_argv[i+2]=NULL;
+      *argv = restart_argv;
+      *argc = i+2;
+      largc = *argc;
+      largv = *argv;
+i=0;
+      while(largv[i]!= NULL) { printf("%s\n", largv[i]); i++; }
+    }
+#endif
+
     idleblock = CmiGetArgFlag(largv, "+idleblocking");
     if (idleblock && _Cmi_mynode == 0) {
         printf("Charm++: Running in idle blocking mode.\n");
@@ -1127,21 +1212,6 @@ static void MachineInitForMPI(int *argc, char ***argv, int *numNodes, int *myNod
 #endif
     }
 
-    {
-        int debug = CmiGetArgFlag(largv,"++debug");
-        int debug_no_pause = CmiGetArgFlag(largv,"++debug-no-pause");
-        if (debug || debug_no_pause) {  /*Pause so user has a chance to start and attach debugger*/
-#if CMK_HAS_GETPID
-            printf("CHARMDEBUG> Processor %d has PID %d\n",myNID,getpid());
-            fflush(stdout);
-            if (!debug_no_pause)
-                sleep(15);
-#else
-            printf("++debug ignored.\n");
-#endif
-        }
-    }
-
     procState = (ProcState *)malloc((_Cmi_mynodesize+1) * sizeof(ProcState));
     for (i=0; i<_Cmi_mynodesize+1; i++) {
 #if MULTI_SENDQUEUE
@@ -1329,9 +1399,11 @@ void CmiTimerInit(char **argv) {
             starttimer = minTimer;
         }
     } else { /* we don't have a synchronous timer, set our own start time */
+#if ! CMK_MEM_CHECKPOINT
         CmiBarrier();
         CmiBarrier();
         CmiBarrier();
+#endif
 #if CMK_TIMER_USE_XT3_DCLOCK
         starttimer = dclock();
 #else
@@ -1406,7 +1478,7 @@ double CmiCpuTimer(void) {
     return t;
 }
 
-#endif
+#endif     /* CMK_TIMER_USE_SPECIAL */
 
 /************Barrier Related Functions****************/
 /* must be called on all ranks including comm thread in SMP */
@@ -1468,5 +1540,74 @@ int CmiBarrierZero() {
     return 0;
 }
 
+#if CMK_MEM_CHECKPOINT
+
+extern void mpi_restart(int argc, char **argv);
+
+void mpi_restart_crashed(int pe, int rank)
+{
+    int vals[2];
+    vals[0] = pe;
+    vals[1] = CpvAccess(_curRestartPhase)+1;
+    MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,MPI_COMM_WORLD);
+}
+
+void mpi_end_crashed()
+{
+    int i;
+    for (i=0; i<total_pes; i++) {
+      if (ranktope[i] == -1) {
+        char msg[1];
+printf("[%d] end crash: %d\n", CmiMyPe(), i);
+        MPI_Send((void *)msg,1,MPI_BYTE,i,FAIL_TAG,MPI_COMM_WORLD);
+      }
+    }
+}
+
+int find_spare_mpirank(int pe)
+{
+    if (nextrank == total_pes) {
+      CmiAbort("Charm++> ran out of spared processors");
+    }
+    ranktope[petorank[pe]] = -1;
+    petorank[pe] = nextrank;
+    ranktope[nextrank] = pe;
+    nextrank++;
+    return nextrank-1;
+}
+
+void CkDieNow()
+{
+    char msg[1];
+    char phase_str[10];
+    MPI_Status sts;
+
+    CmiPrintf("[%d] die now\n", CmiMyPe());
+
+    MPI_Recv(msg,1,MPI_BYTE,MPI_ANY_SOURCE,FAIL_TAG, MPI_COMM_WORLD,&sts);
+    MPI_Finalize();
+
+#if 0
+    CmiPrintf("[%d] Restarted\n", CmiMyPe());
+
+    char **restart_argv;
+    int i=0;
+    while(Cmi_argvcopy[i]!= NULL) i++;
+    restart_argv = (char **)malloc(sizeof(char *)*(i+3));
+    i=0;
+    while(Cmi_argvcopy[i]!= NULL){
+                restart_argv[i] = Cmi_argvcopy[i];
+                i++;
+    }
+    restart_argv[i] = "+restartaftercrash";
+    sprintf(phase_str,"%d", ++CpvAccess(_curRestartPhase));
+    restart_argv[i+1]=phase_str;
+    restart_argv[i+2]=NULL;
+
+    mpi_restart(CmiGetArgc(restart_argv), restart_argv);
+#endif
+}
+#endif
+
 /*@}*/
 
index 9c07bb44c32cfacf7f014eb93002cc7e1b78acd5..ea44ebbbde4d6054a6e83210ca43d90e72b2eb46 100644 (file)
@@ -60,7 +60,11 @@ void noopck(const char*, ...)
 
 // assume NO extra processors--1
 // assume extra processors--0
+#if CMK_CONVERSE_MPI
+#define CK_NO_PROC_POOL                                0
+#else
 #define CK_NO_PROC_POOL                                1
+#endif
 
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
@@ -301,6 +305,13 @@ CkMemCheckPT::CkMemCheckPT(int w)
   ackCount = 0;
   expectCount = -1;
   where = w;
+
+#if CMK_CONVERSE_MPI
+  void pingBuddy();
+  void pingCheckHandler();
+  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+#endif
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -322,6 +333,12 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|where;                     // where to checkpoint
   if (p.isUnpacking()) {
     recvCount = peCount = 0;
+#if CMK_CONVERSE_MPI
+    void pingBuddy();
+    void pingCheckHandler();
+    CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+#endif
   }
 }
 
@@ -1222,12 +1239,16 @@ void notify_crash(int node)
   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
   CkMemCheckPT::inRestarting = 1;
 
+/*
 #ifdef CMK_SMP
+*/
+  int pe = CmiNodeFirst(CkMyNode());
   for(int i=0;i<CkMyNodeSize();i++){
        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
        CmiSetHandler(msg, notifyHandlerIdx);
-       CmiSyncSendAndFree(CkMyNode()*CkMyNodeSize()+i, CmiMsgHeaderSizeBytes, (char *)msg);
+       CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
   }
+/*
  return;
 #else 
     // this may be in interrupt handler, send a message to reset QD
@@ -1235,11 +1256,103 @@ void notify_crash(int node)
   CmiSetHandler(msg, notifyHandlerIdx);
   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
 #endif
+*/
 #endif
 }
 
 extern "C" void (*notify_crash_fn)(int node);
 
+#if CMK_CONVERSE_MPI
+static int pingHandlerIdx;
+static int pingBackHandlerIdx;
+static int pingCheckHandlerIdx;
+static int buddyDieHandlerIdx;
+static double lastPingTime = -1;
+static int died = 0;
+
+extern void _initCharm(int argc, char **argv);
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe);
+extern void CkDeleteChares();
+
+void pingCheckHandler();
+
+#if 0
+extern "C" void mpi_restart(int argc, char **argv)
+{
+   died = 1;
+
+   CkDeleteChares();
+
+   _initCharm(argc, argv);
+   CsdScheduler(-1);
+   ConverseExit();
+}
+#endif
+
+void buddyDieHandler(char *msg)
+{
+   // notify
+   int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+   notify_crash(diepe);
+   // send message to crash pe to let it restart
+   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+   int newrank = find_spare_mpirank(diepe);
+   int buddy = obj->BuddyPE(CmiMyPe());
+   if (buddy == diepe)  {
+     mpi_restart_crashed(diepe, newrank);
+     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+   }
+}
+
+void pingHandler(void *msg)
+{
+  lastPingTime = CmiWallTimer();
+  CmiFree(msg);
+}
+
+void pingBackHandler(char *msg)
+{
+  CmiSetHandler(msg, pingHandlerIdx);
+  int from = *(int *)(msg+CmiMsgHeaderSizeBytes);
+  CmiSyncSendAndFree(from, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+}
+
+void pingCheckHandler()
+{
+  double now = CmiWallTimer();
+  if (lastPingTime > 0 && now - lastPingTime > 4) {
+    // tell everyone the buddy dies
+    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+    int buddy = obj->BuddyPE(CkMyPe());
+    CmiPrintf("[%d] detected buddy processor %d died. \n", CmiMyPe(), buddy);
+    for (int pe = 0; pe < CmiNumPes(); pe++) {
+      if (obj->isFailed(pe) || pe == buddy) continue;
+      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
+      CmiSetHandler(msg, buddyDieHandlerIdx);
+      CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    }
+  }
+  else 
+    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+}
+
+void pingBuddy()
+{
+  CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+  if (obj) {
+    int buddy = obj->BuddyPE(CkMyPe());
+//printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+    *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
+    CmiSetHandler(msg, pingBackHandlerIdx);
+    CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+  }
+}
+#endif
+
 // initproc
 void CkRegisterRestartHandler( )
 {
@@ -1250,10 +1363,19 @@ void CkRegisterRestartHandler( )
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
 
+#if CMK_CONVERSE_MPI
+  pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
+  pingBackHandlerIdx = CkRegisterHandler((CmiHandler)pingBackHandler);
+  pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
+  buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
+
+#endif
+
   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
   CpvAccess(procChkptBuf) = NULL;
 
   notify_crash_fn = notify_crash;
+
 #if 1
   // for debugging
   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
index 47029fe96b9e1ea30d61ab11d0df71792fcfa5da..3ca66a18976c18e5f9f8d55ebe49189393b8dd2b 100644 (file)
@@ -57,7 +57,7 @@ public:
 class CkMemCheckPT: public CBase_CkMemCheckPT {
 public:
   CkMemCheckPT(int w);
-  CkMemCheckPT(CkMigrateMessage *m):CBase_CkMemCheckPT(m) {}
+  CkMemCheckPT(CkMigrateMessage *m):CBase_CkMemCheckPT(m) {};
   virtual ~CkMemCheckPT();
   void pup(PUP::er& p);
   inline int BuddyPE(int pe);
@@ -80,6 +80,7 @@ public:
   void inmem_restore(CkArrayCheckPTMessage *m);
   void updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe);
   void resetLB(int diepe);
+  int  isFailed(int pe);
 public:
   static CkCallback  cpCallback;
 
@@ -101,7 +102,6 @@ private:
 private:
   inline int isMaster(int pe);
 
-  int  isFailed(int pe);
   void failed(int pe);
   int  totalFailed();
 
@@ -118,4 +118,8 @@ void CkStartMemCheckpoint(CkCallback &cb);
 // true if inside a restarting phase
 extern "C" int CkInRestarting(); 
 
+#if CMK_CONVERSE_MPI
+extern "C" void CkDieNow();
+#endif
+
 #endif
index b97ed3780aa3ffb378a8a1b51b0fc3ebbf8dec0d..9663a2fd14b24e2ec7102b6127e718439de3d37d 100644 (file)
@@ -224,9 +224,9 @@ CpvStaticDeclare(ccd_cond_callbacks, conds);
 
 
 /*Make sure this matches the CcdPERIODIC_* list in converse.h*/
-#define CCD_PERIODIC_MAX 11
+#define CCD_PERIODIC_MAX 12
 const static double periodicCallInterval[CCD_PERIODIC_MAX]=
-{0.001, 0.010, 0.100, 1.0, 10.0, 60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
+{0.001, 0.010, 0.100, 1.0, 5.0, 10.0, 60.0, 5*60.0, 10*60.0, 3600.0, 12*3600.0, 24*3600.0};
 
 /**
  * List of periodic callbacks maintained by the scheduler
@@ -510,7 +510,6 @@ void CcdCallFnAfterOnPE(CcdVoidFn fnp, void *arg, double deltaT, int pe)
     ccd_heap_insert(tcall, fnp, arg, pe);
 } 
 
-
 /**
  * Register a callback function that will be triggered after a minimum 
  * delay of deltaT
index 21c53506d9e73dc28b5748e4cebd38cc4dbf1970..d1da74dd75acf86c04ef59640fcf762ddacf6c4a 100644 (file)
@@ -1516,15 +1516,17 @@ typedef void (*CcdVoidFn)(void *userParam,double curWallTime);
 #define CcdPERIODIC_100ms 18 /*every 100ms (10Hz)*/
 #define CcdPERIODIC_1second  19 /*every second*/
 #define CcdPERIODIC_1s       19 /*every second*/
-#define CcdPERIODIC_10second 20 /*every 10 seconds*/
-#define CcdPERIODIC_10seconds 20 /*every 10 seconds*/
-#define CcdPERIODIC_10s      20 /*every 10 seconds*/
-#define CcdPERIODIC_1minute  21 /*every minute*/
-#define CcdPERIODIC_5minute  22 /*every 5 minute*/
-#define CcdPERIODIC_10minute 23 /*every 10 minutes*/
-#define CcdPERIODIC_1hour    24 /*every hour*/
-#define CcdPERIODIC_12hour   25 /*every 12 hours*/
-#define CcdPERIODIC_1day     26 /*every day*/
+#define CcdPERIODIC_5s       20 /*every second*/
+#define CcdPERIODIC_5seconds 20 /*every second*/
+#define CcdPERIODIC_10second 21 /*every 10 seconds*/
+#define CcdPERIODIC_10seconds 21 /*every 10 seconds*/
+#define CcdPERIODIC_10s      21 /*every 10 seconds*/
+#define CcdPERIODIC_1minute  22 /*every minute*/
+#define CcdPERIODIC_5minute  23 /*every 5 minute*/
+#define CcdPERIODIC_10minute 24 /*every 10 minutes*/
+#define CcdPERIODIC_1hour    25 /*every hour*/
+#define CcdPERIODIC_12hour   26 /*every 12 hours*/
+#define CcdPERIODIC_1day     27 /*every day*/
 
 /*Other conditions*/
 #define CcdQUIESCENCE 30