add pup support
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 948cfdaaccbfdd1df129a0829984fd039857c2d1..92f6506540b249bff0277f37cf73f397179a77c0 100644 (file)
@@ -75,6 +75,7 @@ CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
+int CkMemCheckPT::inCheckpointing = 0;
 int CkMemCheckPT::replicaAlive = 1;
 int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
@@ -363,6 +364,7 @@ CkMemCheckPT::CkMemCheckPT(int w)
     _memChkptOn = 0;
   }
   inRestarting = 0;
+  inCheckpointing = 0;
   recvCount = peCount = 0;
   ackCount = 0;
   expectCount = -1;
@@ -516,6 +518,7 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
   checkpointed = 1;
   cpCallback = cb;
   cpStarter = starter;
+  inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
     startTime = CmiWallTimer();
     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
@@ -569,7 +572,7 @@ class MemElementPacker : public CkLocIterator{
                }
 };
 
-void CkMemCheckPT::pupAllElements(PUP::er &p){
+void pupAllElements(PUP::er &p){
 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
        int numElements;
        if(!p.isUnpacking()){
@@ -663,12 +666,13 @@ void CkMemCheckPT::startCheckpoint(){
          if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
                thisProxy[CkMyPe()].doneComparison(true);
          }else{
-               thisProxy[CkMyPe()].doneComparison(true);
+                 CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
+               thisProxy[CkMyPe()].doneComparison(false);
          }
   }
        else{
                if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
-                       if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+                       {       
                                int pointer = CpvAccess(curPointer);
                                //send the proc data
                                CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
@@ -677,7 +681,9 @@ void CkMemCheckPT::startCheckpoint(){
                                CkPackMessage(&env);
                                CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
                                CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-                               CkPrintf("[%d] sendProcdata\n",CkMyPe());
+                               if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+                                       CkPrintf("[%d] sendProcdata\n",CkMyPe());
+                               }
                        }
                        //send the array checkpoint data
                        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
@@ -695,27 +701,37 @@ void CkMemCheckPT::startCheckpoint(){
 }
 
 void CkMemCheckPT::doneComparison(bool ret){
+       int _ret;
+       if(!ret){
+       CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
+               _ret = 0;
+       }else{
+               _ret = 1;
+       }
+       inCheckpointing = 0;
        CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
-       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
+       contribute(sizeof(int),&_ret,CkReduction::logical_and,cb);
 }
 
-void CkMemCheckPT::doneRComparison(bool ret){
+void CkMemCheckPT::doneRComparison(int ret){
        CpvAccess(recvdRemote) = 0;
        CpvAccess(recvdLocal) = 0;
 //     if(CpvAccess(curPointer) == 0){
-       if(ret==true){
+       if(ret==1){
        CpvAccess(curPointer)^=1;
                if(CkMyPe() == 0){
-      CkPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
+               CkPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
                        cpCallback.send();
                }
        }else{
+       CkPrintf("[%d][%d] going to RollBack \n", CmiMyPartition(),CkMyPe());
                RollBack();
        }
 }
 
 void CkMemCheckPT::RollBack(){
        //restore group data
+       checkpointed = 0;
        CkMemCheckPT::inRestarting = 1;
        int pointer = CpvAccess(curPointer)^1;//use the previous one
     CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
@@ -900,6 +916,7 @@ void CkMemCheckPT::cpFinish()
 // for debugging, report checkpoint info
 void CkMemCheckPT::report()
 {
+       inCheckpointing = 0;
 #if !CMK_CHKP_ALL
   int objsize = 0;
   int len = ckTable.length();
@@ -1031,7 +1048,7 @@ void CkMemCheckPT::restart(int diePe)
   inRestarting = 1;
                                                                                 
   // disable load balancer's barrier
-  if (CkMyPe() != diePe) resetLB(diePe);
+  //if (CkMyPe() != diePe) resetLB(diePe);
 
   CKLOCMGR_LOOP(mgr->startInserting(););
 
@@ -1055,7 +1072,9 @@ void CkMemCheckPT::removeArrayElements()
   stage = (char*)"removeArrayElements";
   startTime = curTime;
 
-  if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
+//  if (cpCallback.isInvalid()) 
+//       CkPrintf("invalid pe %d\n",CkMyPe());
+//       CkAbort("Didn't set restart callback\n");;
   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
 
   // get rid of all buffering and remote recs
@@ -1145,7 +1164,7 @@ void CkMemCheckPT::recoverBuddies()
 #endif
 
   if (expectCount == 0) {
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
   }
   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
 }
@@ -1296,7 +1315,7 @@ void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<C
                                if(CkMyPe()==thisFailedPe){
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
                                }
-        else{
+                       else{
                                        mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
                                }
                        }
@@ -1328,18 +1347,20 @@ void CkMemCheckPT::finishUp()
        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
   }
        
+#if CMK_CONVERSE_MPI   
   if(CmiNumPartition()!=1){
        CpvAccess(recvdProcChkp) = 0;
        CpvAccess(recvdArrayChkp) = 0;
        CpvAccess(curPointer)^=1;
-   if (CmiMyPe() == BuddyPE(thisFailedPe)) {
-        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
-   }
        //notify my replica, restart is done
    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
    CmiSetHandler(msg,replicaRecoverHandlerIdx);
    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
   }
+   if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+   }
+#endif
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
@@ -1497,7 +1518,33 @@ static void restartBcastHandler(char *msg)
 extern void _initDone();
 
 bool compare(char * buf1, char *buf2){
-       return true;
+       //buf1 my copy, buf2 from another one 
+//     CkPrintf("[%d][%d]compare buffer\n",CmiMyPartition(),CkMyPe());
+       PUP::checker pchecker(buf1,buf2);
+       pchecker.skip();
+       
+       int numElements;
+       pchecker|numElements;
+       CkPrintf("[%d][%d]numElements:%d\n",CmiMyPartition(),CkMyPe(),numElements);
+       for(int i=0;i<numElements;i++){
+       //for(int i=0;i<1;i++){
+               CkGroupID gID;
+               CkArrayIndex idx;
+               
+               pchecker|gID;
+               pchecker|idx;
+               
+               CkPrintf("[%d][%d]resume\n",CmiMyPartition(),CkMyPe());
+               CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+               mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+               CkPrintf("------[%d][%d]finish element %d\n",CmiMyPartition(),CkMyPe(),i);
+       }
+       if(pchecker.getResult()){
+               CkPrintf("------[%d][%d]pass result\n",CmiMyPartition(),CkMyPe());
+       }else{
+               CkPrintf("------[%d][%d] hasn't passed result\n",CmiMyPartition(),CkMyPe());
+       }
+       return pchecker.getResult();
 }
 
 static void recvRemoteChkpHandler(char *msg){
@@ -1509,12 +1556,12 @@ static void recvRemoteChkpHandler(char *msg){
          int size = CpvAccess(chkpBuf)[pointer]->len;
          CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
          if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
-               
-               checkptMgr[CkMyPe()].doneComparison(true);
-               }else
-               {
+                       checkptMgr[CkMyPe()].doneComparison(true);
+         }else
+         {
+                 CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
                        checkptMgr[CkMyPe()].doneComparison(false);
-               }
+         }
   }else{
          CpvAccess(recvdRemote) = 1;
          if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
@@ -1531,6 +1578,7 @@ static void replicaRecoverHandler(char *msg){
 }
 
 static void replicaDieHandler(char * msg){
+#if CMK_CONVERSE_MPI   
        int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
        CpvAccess(_remoteCrashedNode) = diePe;
        CkMemCheckPT::replicaAlive = 0;
@@ -1539,6 +1587,7 @@ static void replicaDieHandler(char * msg){
       CkPrintf("pe %d in replicad word die\n",diePe);
            CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
     }
+#endif
        //broadcast to my partition
        //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
        //checkptMgr.notifyReplicaDie(diePe);
@@ -1562,19 +1611,27 @@ static void recoverRemoteProcDataHandler(char *msg){
    //store the checkpoint
        int pointer = procMsg->pointer;
 
-       if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
-       CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
 
-   PUP::fromMem p(procMsg->packData);
-   _handleProcData(p,CmiTrue);
+   if(CkMyPe()==CpvAccess(_crashedNode)){
+          if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+          CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+          PUP::fromMem p(procMsg->packData);
+          _handleProcData(p,CmiTrue);
+          _initDone();
+   }
+   else{
+          if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+          CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+          //_handleProcData(p,CmiFalse);
+   }
    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
    CKLOCMGR_LOOP(mgr->startInserting(););
    
-   _initDone();
    CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
    CpvAccess(recvdProcChkp) =1;
        if(CpvAccess(recvdArrayChkp)==1){
                _resume_charm_message();
+               _diePE = CpvAccess(_crashedNode);
                char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
                CmiSetHandler(restartmsg, restartBeginHandlerIdx);
                CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
@@ -1595,7 +1652,7 @@ static void recoverRemoteArrayDataHandler(char *msg){
        CpvAccess(chkpBuf)[pointer] = chkpMsg;
    CpvAccess(recvdArrayChkp) =1;
        CkMemCheckPT::inRestarting = 1;
-       if(CmiMyPe()!=CpvAccess(_crashedNode)||CpvAccess(recvdProcChkp) == 1){
+       if(CpvAccess(recvdProcChkp) == 1){
                _resume_charm_message();
                _diePE = CpvAccess(_crashedNode);
                //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
@@ -1608,6 +1665,7 @@ static void recoverRemoteArrayDataHandler(char *msg){
 static void recvPhaseHandler(char * msg)
 {
        CpvAccess(_curRestartPhase)--;
+       CkMemCheckPT::inRestarting = 1;
        //CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
   // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
   // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
@@ -1687,17 +1745,17 @@ void qd_callback(void *m)
 {
    CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
    CkFreeMsg(m);
+   if(CmiNumPartition()==1){
 #ifdef CMK_SMP
-   for(int i=0;i<CmiMyNodeSize();i++){
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
-       CmiSetHandler(msg, askProcDataHandlerIdx);
-       int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
-       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-   }
-   return;
+          for(int i=0;i<CmiMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
+               CmiSetHandler(msg, askProcDataHandlerIdx);
+               int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
+               CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+          }
+          return;
 #endif
-   if(CmiNumPartition()==1){
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
           *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
           // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
@@ -1728,7 +1786,7 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
   }*/
    if(CmiNumPartition()==1){
           CkMemCheckPT::startTime = restartT = CmiWallTimer();
-  CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+               CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
                restartT = CmiWallTimer();
           CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -1741,20 +1799,6 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
    else{
                CkCallback cb(qd_callback);
                CkStartQD(cb);
-       //      CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-          //get the restart phase
-          //CmiPrintf("[%d] diePe asks for phase\n",CkMyPe());
-          //char *phaseMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-          //*(int *)(phaseMsg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-          //CmiSetHandler(phaseMsg, askPhaseHandlerIdx);
-          //int pe = ChkptOnPe(CpvAccess(_crashedNode));
-          //CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)phaseMsg);
-               //notify the other partition
-          //CkPrintf("[%d] notify remote partition %d\n",CkMyPe(),CmiMyPartition());
-          //char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-          //*(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-          //CmiSetHandler(msg,replicaDieHandlerIdx);
-          //CmiRemoteSyncSendAndFree(CkMyPe(),0,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
    }
 #else
    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
@@ -1790,6 +1834,12 @@ int CkReplicaAlive()
                return 1;*/
 }
 
+extern "C"
+int CkInCheckpointing()
+{
+       return CkMemCheckPT::inCheckpointing;
+}
+
 extern "C"
 void CkSetInLdb(){
 #if CMK_MEM_CHECKPOINT
@@ -1881,23 +1931,25 @@ void notify_crash(int node)
 #endif
 }
 
-#if CMK_CONVERSE_MPI
 extern "C" void (*notify_crash_fn)(int node);
 
 
+#if CMK_CONVERSE_MPI
 void buddyDieHandler(char *msg)
 {
 #if CMK_MEM_CHECKPOINT
    // notify
+       CkMemCheckPT::inRestarting = 1;
    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
    notify_crash(diepe);
    // send message to crash pe to let it restart
    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+   CkPrintf("[%d] finding newrank\n",CkMyPe());
    int newrank = find_spare_mpirank(diepe,CmiMyPartition());
+   CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
    int buddy = obj->BuddyPE(CmiMyPe());
-   if (buddy == diepe)  {
    //if (CmiMyPe() == obj->BuddyPE(diepe))  {
-     CkPrintf("[%d] restart crashed node %d newrank %d\n",CkMyPe(),diepe,newrank);
+   if (buddy == diepe)  {
      mpi_restart_crashed(diepe, newrank);
    }
 #endif
@@ -1913,7 +1965,8 @@ void pingCheckHandler()
 {
 #if CMK_MEM_CHECKPOINT
   double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 1 && !CkInLdb()) {
+  if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
+  //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
     int i, pe, buddy;
     // tell everyone the buddy dies
     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
@@ -1922,7 +1975,7 @@ void pingCheckHandler()
        if (obj->BuddyPE(pe) == CmiMyPe()) break;
     }
     buddy = pe;
-    CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
+    CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
     /*for (int pe = 0; pe < CmiNumPes(); pe++) {
       if (obj->isFailed(pe) || pe == buddy) continue;
       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
@@ -1935,12 +1988,14 @@ void pingCheckHandler()
     CmiSetHandler(msg, buddyDieHandlerIdx);
     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
     //send to everyone in the other world
-    for(int i=0;i<CmiNumPes();i++){
-      char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-      *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-      CmiSetHandler(rMsg, replicaDieHandlerIdx);
-      CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-    }
+       if(CmiNumPartition()!=1){
+               for(int i=0;i<CmiNumPes();i++){
+                 char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+                 *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+                 CmiSetHandler(rMsg, replicaDieHandlerIdx);
+                 CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+               }
+       }
   }
   else 
     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);