demonstrate inmem checkpoint restart on MPI layer:
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 9c07bb44c32cfacf7f014eb93002cc7e1b78acd5..ea44ebbbde4d6054a6e83210ca43d90e72b2eb46 100644 (file)
@@ -60,7 +60,11 @@ void noopck(const char*, ...)
 
 // assume NO extra processors--1
 // assume extra processors--0
+#if CMK_CONVERSE_MPI
+#define CK_NO_PROC_POOL                                0
+#else
 #define CK_NO_PROC_POOL                                1
+#endif
 
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
@@ -301,6 +305,13 @@ CkMemCheckPT::CkMemCheckPT(int w)
   ackCount = 0;
   expectCount = -1;
   where = w;
+
+#if CMK_CONVERSE_MPI
+  void pingBuddy();
+  void pingCheckHandler();
+  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+#endif
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -322,6 +333,12 @@ void CkMemCheckPT::pup(PUP::er& p)
   p|where;                     // where to checkpoint
   if (p.isUnpacking()) {
     recvCount = peCount = 0;
+#if CMK_CONVERSE_MPI
+    void pingBuddy();
+    void pingCheckHandler();
+    CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+#endif
   }
 }
 
@@ -1222,12 +1239,16 @@ void notify_crash(int node)
   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
   CkMemCheckPT::inRestarting = 1;
 
+/*
 #ifdef CMK_SMP
+*/
+  int pe = CmiNodeFirst(CkMyNode());
   for(int i=0;i<CkMyNodeSize();i++){
        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
        CmiSetHandler(msg, notifyHandlerIdx);
-       CmiSyncSendAndFree(CkMyNode()*CkMyNodeSize()+i, CmiMsgHeaderSizeBytes, (char *)msg);
+       CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
   }
+/*
  return;
 #else 
     // this may be in interrupt handler, send a message to reset QD
@@ -1235,11 +1256,103 @@ void notify_crash(int node)
   CmiSetHandler(msg, notifyHandlerIdx);
   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
 #endif
+*/
 #endif
 }
 
 extern "C" void (*notify_crash_fn)(int node);
 
+#if CMK_CONVERSE_MPI
+static int pingHandlerIdx;
+static int pingBackHandlerIdx;
+static int pingCheckHandlerIdx;
+static int buddyDieHandlerIdx;
+static double lastPingTime = -1;
+static int died = 0;
+
+extern void _initCharm(int argc, char **argv);
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe);
+extern void CkDeleteChares();
+
+void pingCheckHandler();
+
+#if 0
+extern "C" void mpi_restart(int argc, char **argv)
+{
+   died = 1;
+
+   CkDeleteChares();
+
+   _initCharm(argc, argv);
+   CsdScheduler(-1);
+   ConverseExit();
+}
+#endif
+
+void buddyDieHandler(char *msg)
+{
+   // notify
+   int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+   notify_crash(diepe);
+   // send message to crash pe to let it restart
+   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+   int newrank = find_spare_mpirank(diepe);
+   int buddy = obj->BuddyPE(CmiMyPe());
+   if (buddy == diepe)  {
+     mpi_restart_crashed(diepe, newrank);
+     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+   }
+}
+
+void pingHandler(void *msg)
+{
+  lastPingTime = CmiWallTimer();
+  CmiFree(msg);
+}
+
+void pingBackHandler(char *msg)
+{
+  CmiSetHandler(msg, pingHandlerIdx);
+  int from = *(int *)(msg+CmiMsgHeaderSizeBytes);
+  CmiSyncSendAndFree(from, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+}
+
+void pingCheckHandler()
+{
+  double now = CmiWallTimer();
+  if (lastPingTime > 0 && now - lastPingTime > 4) {
+    // tell everyone the buddy dies
+    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+    int buddy = obj->BuddyPE(CkMyPe());
+    CmiPrintf("[%d] detected buddy processor %d died. \n", CmiMyPe(), buddy);
+    for (int pe = 0; pe < CmiNumPes(); pe++) {
+      if (obj->isFailed(pe) || pe == buddy) continue;
+      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
+      CmiSetHandler(msg, buddyDieHandlerIdx);
+      CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    }
+  }
+  else 
+    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+}
+
+void pingBuddy()
+{
+  CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+  if (obj) {
+    int buddy = obj->BuddyPE(CkMyPe());
+//printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+    *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
+    CmiSetHandler(msg, pingBackHandlerIdx);
+    CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+  }
+}
+#endif
+
 // initproc
 void CkRegisterRestartHandler( )
 {
@@ -1250,10 +1363,19 @@ void CkRegisterRestartHandler( )
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
 
+#if CMK_CONVERSE_MPI
+  pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
+  pingBackHandlerIdx = CkRegisterHandler((CmiHandler)pingBackHandler);
+  pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
+  buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
+
+#endif
+
   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
   CpvAccess(procChkptBuf) = NULL;
 
   notify_crash_fn = notify_crash;
+
 #if 1
   // for debugging
   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());