recovery from hard failures
authorXiang Ni <xiangni2@illinois.edu>
Mon, 10 Dec 2012 22:44:43 +0000 (16:44 -0600)
committerXiang Ni <xiangni2@illinois.edu>
Mon, 10 Dec 2012 22:44:43 +0000 (16:44 -0600)
src/arch/mpi/machine.c
src/arch/util/machine-common-core.c
src/ck-core/ckarray.C
src/ck-core/ckmemcheckpoint.C
src/ck-core/ckmemcheckpoint.ci
src/ck-core/ckmemcheckpoint.h

index 243ef585bb44bf97303ee96aa4fae1548839e0ca..368e78ea593e2f6f46ee6666c5638b31bc2a48c0 100644 (file)
@@ -1378,7 +1378,7 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID) {
           exit(0);
       }
 
-      CmiPrintf("Charm++> Spare MPI rank %d is activated for PE %d.\n", *myNodeID, newpe);
+      CmiPrintf("Charm++> Spare MPI rank %d is activated for global PE %d.\n", *myNodeID, newpe);
         /* update petorank */
       MPI_Recv(petorank, num_workpes, MPI_INT,MPI_ANY_SOURCE,FAIL_TAG,charmComm, &sts);
       nextrank = *myNodeID + 1;
@@ -1896,7 +1896,7 @@ int CmiBarrierZero() {
 void mpi_restart_crashed(int pe, int rank)
 {
     int vals[2];
-    vals[0] = pe;
+    vals[0] = CmiGetPeGlobal(pe,CmiMyPartition());
     vals[1] = CpvAccess(_curRestartPhase)+1;
     MPI_Send((void *)vals,2,MPI_INT,rank,FAIL_TAG,charmComm);
     MPI_Send(petorank, num_workpes, MPI_INT,rank,FAIL_TAG,charmComm);
@@ -1912,12 +1912,14 @@ void mpi_end_spare()
     }
 }
 
-int find_spare_mpirank(int pe)
+int find_spare_mpirank(int pe,int partition)
 {
     if (nextrank == total_pes) {
       CmiAbort("Charm++> No spare processor available.");
     }
-    petorank[pe] = nextrank;
+       int newpe = CmiGetPeGlobal(pe,partition);
+       CmiPrintf("[%d] set the rank of %d to %d\n",CmiGetPeGlobal(CmiMyPe(),CmiMyPartition()),newpe,nextrank);
+    petorank[newpe] = nextrank;
     nextrank++;
     return nextrank-1;
 }
index 98a959229529e6411391b7339676ca99fa755af4..925b4248d2727e11f7ca8eeec3d178dee174004a 100644 (file)
@@ -518,7 +518,9 @@ CmiCommHandle CmiRemoteSendNetworkFunc(int destPE, int partition, int size, char
 {
         int rank;
         int destLocalNode = CmiNodeOf(destPE); 
-        int destNode = CmiGetNodeGlobal(destLocalNode,partition); 
+        int destNode = CmiGetNodeGlobal(destLocalNode,partition);
+           if(partition!=CmiMyPartition())
+                       CmiPrintf("send to dest node %d in partition %d\n",destNode,partition); 
 #if CMK_USE_PXSHM       //not handled yet correctly
         if (CmiValidPxshm(destLocalNode, size)) {
           CmiSendMessagePxshm(msg, size, destLocalNode, &refcount);
@@ -567,6 +569,8 @@ INLINE_KEYWORD void CmiFreeSendFn(int destPE, int size, char *msg) {
 void CmiRemoteFreeSendFn(int destPE, int partition, int size, char *msg) {
     CMI_SET_BROADCAST_ROOT(msg, 0);
     CQdCreate(CpvAccess(cQdState), 1);
+       if(partition!=CmiMyPartition())
+               CmiPrintf("[%d] on partition %d send to %d on partition %d\n",CmiMyPe(),CmiMyPartition(),destPE,partition);
     if (CmiMyPe()==destPE && partition == CmiMyPartition()) {
         CmiSendSelf(msg);
 #if CMK_PERSISTENT_COMM
@@ -707,7 +711,9 @@ void CmiCreatePartitions(char **argv) {
   //simple partition, this will be made more complex in future
   partitionInfo.partitionSize = _Cmi_numnodes_global / partitionInfo.numPartitions;
   partitionInfo.myPartition = _Cmi_mynode_global / partitionInfo.partitionSize;
+       
 
+  CmiPrintf("in partition %d size %d\n",partitionInfo.myPartition, partitionInfo.partitionSize);
   //reset local variables
   _Cmi_mynode = CmiGetNodeLocal(_Cmi_mynode);
   _Cmi_numnodes = CmiPartitionSize();
index a659bd102dbf23621dfe0b1272fed8b8c7cfb79e..d41ec3cdecee678b0eb547264e53a3f9f95f29c5 100644 (file)
@@ -1315,7 +1315,6 @@ void CProxy_ArrayBase::ckBroadcast(CkArrayMessage *msg, int ep, int opts) const
          if (CkMyPe()==CpvAccess(serializer))
          {
                DEBB((AA"Sending array broadcast\n"AB));
-               printf("Sending array broadcast\n");
                if (skipsched)
                        CProxy_CkArray(_aid).recvExpeditedBroadcast(msg);
                else
index f6e24567f70d4faf04d35be087440985bad6df9a..b3185318b3f831a29aa2383bf1a5917515134dc8 100644 (file)
@@ -71,9 +71,11 @@ void noopck(const char*, ...)
 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
+CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
+int CkMemCheckPT::replicaAlive = 1;
 int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
@@ -123,6 +125,8 @@ CpvDeclare(CkCheckPTMessage*, buddyBuf);
 CpvDeclare(int, curPointer);
 CpvDeclare(int, recvdRemote);
 CpvDeclare(int, recvdLocal);
+CpvDeclare(int, recvdArrayChkp);
+CpvDeclare(int, recvdProcChkp);
 
 bool compare(char * buf1, char * buf2);
 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
@@ -134,9 +138,27 @@ static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
 static int recvRemoteChkpHandlerIdx;
+static int replicaDieHandlerIdx;
+static int replicaDieBcastHandlerIdx;
+static int replicaRecoverHandlerIdx;
+static int recoverRemoteProcDataHandlerIdx;
+static int recoverRemoteArrayDataHandlerIdx;
 static int notifyHandlerIdx;
 // compute the backup processor
 // FIXME: avoid crashed processors
+#if CMK_CONVERSE_MPI
+static int pingHandlerIdx;
+static int pingCheckHandlerIdx;
+static int buddyDieHandlerIdx;
+static double lastPingTime = -1;
+
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe,int partition);
+
+void pingBuddy();
+void pingCheckHandler();
+
+#endif
 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
 
 inline int CkMemCheckPT::BuddyPE(int pe)
@@ -345,12 +367,12 @@ CkMemCheckPT::CkMemCheckPT(int w)
   ackCount = 0;
   expectCount = -1;
   where = w;
-
+  replicaAlive = 1;
 #if CMK_CONVERSE_MPI
   void pingBuddy();
   void pingCheckHandler();
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+  CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
   chkpTable[0] = NULL;
   chkpTable[1] = NULL;
@@ -380,7 +402,7 @@ void CkMemCheckPT::pup(PUP::er& p)
     void pingBuddy();
     void pingCheckHandler();
     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+    CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
   }
 }
@@ -624,12 +646,13 @@ void CkMemCheckPT::startCheckpoint(){
        pointer = CpvAccess(curPointer);
        if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
                CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
-       CpvAccess(recvdLocal) = 1;
-
-       envelope * env = (envelope *)(UsrToEnv(msg));
-       CkPackMessage(&env);
-       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
-       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+       if(CkReplicaAlive()==1){
+               CpvAccess(recvdLocal) = 1;
+               envelope * env = (envelope *)(UsrToEnv(msg));
+               CkPackMessage(&env);
+               CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+               CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+       }
        if(CpvAccess(recvdRemote)==1){
                //compare the checkpoint 
          int size = CpvAccess(chkpBuf)[pointer]->len;
@@ -639,6 +662,32 @@ void CkMemCheckPT::startCheckpoint(){
                thisProxy[CkMyPe()].doneComparison(true);
          }
        }
+       else{
+               if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
+                       if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+                               int pointer = CpvAccess(curPointer);
+                               //send the proc data
+                               CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+                               procMsg->pointer = pointer;
+                               envelope * env = (envelope *)(UsrToEnv(procMsg));
+                               CkPackMessage(&env);
+                               CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+                               CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+                               CkPrintf("[%d] sendProcdata\n",CkMyPe());
+                       }
+                       //send the array checkpoint data
+                       msg->pointer = CpvAccess(curPointer);
+                       envelope * env = (envelope *)(UsrToEnv(msg));
+                       CkPackMessage(&env);
+                       CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+                       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+                       CkPrintf("[%d] sendArrayData\n",CkMyPe());
+                       //can continue work, no need to wait for my replica
+                       bool ret = true;
+                       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
+                       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
+               }
+       }
 #endif
 }
 
@@ -699,6 +748,12 @@ void CkMemCheckPT::RollBack(){
        contribute(cb);
 }
 
+void CkMemCheckPT::notifyReplicaDie(int pe){
+       CkPrintf("[%d] receive replica die\n",CkMyPe());
+       replicaAlive = 0;
+       CpvAccess(_remoteCrashedNode) = pe;
+}
+
 void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
 {
 #if CMK_CHKP_ALL
@@ -972,12 +1027,12 @@ void CkMemCheckPT::restart(int diePe)
 
   CKLOCMGR_LOOP(mgr->startInserting(););
 
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-/*
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-*/
+
+  if(CmiNumPartition()==1){
+       barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
+  }else{
+       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+  }
 #endif
 }
 
@@ -1002,9 +1057,6 @@ void CkMemCheckPT::removeArrayElements()
 #else
        CKLOCMGR_LOOP(mgr->flushLocalRecs(););
 #endif
-//  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
-
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
 #endif
 }
@@ -1022,14 +1074,11 @@ void CkMemCheckPT::resetReductionMgr()
   }
   // reset again
   //CpvAccess(_qd)->flushStates();
-
-#if 1
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-#else
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-#endif
+  if(CmiNumPartition()==1){
+       barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+  }
+  else
+       barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
 }
 
 // recover the lost buddies
@@ -1086,17 +1135,9 @@ void CkMemCheckPT::recoverBuddies()
   }
 #endif
 
-#if 1
   if (expectCount == 0) {
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
-    //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-  }
-#else
-  if (CkMyPe() == 0) {
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
   }
-#endif
-
   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
 }
 
@@ -1172,13 +1213,21 @@ void CkMemCheckPT::recoverArrayElements()
     count ++;
   }
 #else
-       CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+       double * packData;
+       if(CmiNumPartition()==1){
+               CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+               packData = msg->packData;
+       }
+       else{
+               int pointer = CpvAccess(curPointer);
+               CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+               packData = msg->packData;
+       }
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-       recoverAll(msg,gmap,imap);
+       recoverAll(packData,gmap,imap);
 #else
-       recoverAll(msg);
+       recoverAll(packData);
 #endif
-    CkFreeMsg(msg);
 #endif
   curTime = CmiWallTimer();
   if (CkMyPe() == thisFailedPe)
@@ -1188,7 +1237,7 @@ void CkMemCheckPT::recoverArrayElements()
     if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
        flag++; 
-         }
+       }
   }
   delete [] imap;
   delete [] gmap;
@@ -1215,9 +1264,9 @@ void CkMemCheckPT::gotReply(){
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
 }
 
-void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
+void CkMemCheckPT::recoverAll(double * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
 #if CMK_CHKP_ALL
-       PUP::fromMem p(msg->packData);
+       PUP::fromMem p(packData);
        int numElements;
        p|numElements;
        CkPrintf("[%d] recover all %d\n",CkMyPe(),numElements);
@@ -1232,7 +1281,15 @@ void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gma
 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
                        mgr->resume(idx,p,CmiTrue,CmiTrue);
 #else
-                       mgr->resume(idx,p,CmiFalse,CmiTrue);
+                       if(CmiNumPartition()==1)
+                               mgr->resume(idx,p,CmiFalse,CmiTrue);    
+                       else{
+                               if(CkMyPe()==thisFailedPe){
+                                       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+                               }else{
+                                       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
+                               }
+                       }
 #endif
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
                        homePe = mgr->homePe(idx);
@@ -1260,10 +1317,25 @@ void CkMemCheckPT::finishUp()
   if (CkMyPe() == thisFailedPe)
   {
        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
-       //CkStartQD(cpCallback);
        cpCallback.send();
        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
   }
+       
+  if(CmiNumPartition()!=1){
+       CpvAccess(recvdProcChkp) = 0;
+       CpvAccess(recvdArrayChkp) = 0;
+       CpvAccess(curPointer)^=1;
+   if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        if(CmiMyPartition()==1&&CkMyPe()==2){
+               CmiPrintf("start ping check handler\n");
+        }
+        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+   }
+       //notify my replica, restart is done
+   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+   CmiSetHandler(msg,replicaRecoverHandlerIdx);
+   CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+  }
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
@@ -1302,6 +1374,7 @@ void CkMemCheckPT::quiescence(CkCallback &cb)
 void CkStartMemCheckpoint(CkCallback &cb)
 {
 #if CMK_MEM_CHECKPOINT
+       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
   if (_memChkptOn == 0) {
     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
     cb.send();
@@ -1363,6 +1436,7 @@ static void restartBeginHandler(char *msg)
   CmiAssert(CkMyPe() == _diePE);
   count ++;
   if (count == CkNumPes()) {
+       printf("restar begin on %d\n",CkMyPe());
     CkRestartCheckPointCallback(NULL, NULL);
     count = 0;
   }
@@ -1444,6 +1518,96 @@ static void recvRemoteChkpHandler(char *msg){
   }  
 }
 
+static void replicaRecoverHandler(char *msg){
+       CpvAccess(_remoteCrashedNode) = -1;
+       CkMemCheckPT::replicaAlive = 1;
+}
+
+static void replicaDieHandler(char * msg){
+       CkPrintf("[%d] !!!!remote receive notify %d!!!\n",CkMyPe(), CmiMyPartition());
+       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+       CpvAccess(_remoteCrashedNode) = diePe;
+       CkMemCheckPT::replicaAlive = 0;
+       find_spare_mpirank(diePe,CmiMyPartition()^1);
+       //broadcast to my partition
+       //CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+       //checkptMgr.notifyReplicaDie(diePe);
+    //CmiSetHandler(msg, replicaDieBcastHandlerIdx);
+    //CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+}
+
+
+static void replicaDieBcastHandler(char *msg){
+       CkPrintf("[%d] remote receive notify partition %d!!!\n",CkMyPe(),CmiMyPartition());
+       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+       CpvAccess(_remoteCrashedNode) = diePe;
+       CkMemCheckPT::replicaAlive = 0;
+}
+
+static void recoverRemoteProcDataHandler(char *msg){
+   CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
+   envelope *env = (envelope *)msg;
+   CkUnpackMessage(&env);
+   CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+       
+   //store the checkpoint
+       int pointer = procMsg->pointer;
+       CpvAccess(localProcChkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&procMsg); 
+
+   PUP::fromMem p(procMsg->packData);
+   _handleProcData(p,CmiTrue);
+   CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+   CKLOCMGR_LOOP(mgr->startInserting(););
+   
+   _initDone();
+   CmiPrintf("[%d] ----- recoverRemoteProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
+   CpvAccess(recvdProcChkp) =1;
+       if(CpvAccess(recvdArrayChkp)==1){
+               _resume_charm_message();
+               char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+               CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
+               CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+               CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }
+}
+
+static void recoverRemoteArrayDataHandler(char *msg){
+   CmiPrintf("[%d] ----- recoverRemoteArrayDataHandler  start at %f\n", CkMyPe(), CkWallTimer());
+   envelope *env = (envelope *)msg;
+   CkUnpackMessage(&env);
+   CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+       
+   //store the checkpoint
+       int pointer = chkpMsg->pointer;
+       CpvAccess(curPointer) = pointer;
+       CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&chkpMsg);
+   CpvAccess(recvdArrayChkp) =1;
+       CkMemCheckPT::inRestarting = 1;
+       if(CmiMyPe()!=CpvAccess(_crashedNode)||CpvAccess(recvdProcChkp) == 1){
+               _resume_charm_message();
+               _diePE = CpvAccess(_crashedNode);
+               CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
+               char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+               CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+               CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }
+}
+
+static void recvPhaseHandler(char * msg)
+{
+       CpvAccess(_curRestartPhase)--;
+       CmiPrintf("[%d] ---received phase %d\n",CkMyPe(),CpvAccess(_curRestartPhase));
+  // CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+  // if (CmiMyPe() == obj->BuddyPE(CpvAccess(_crashedNode)))  {
+  //    if(CmiMyPartition()==1&&CkMyPe()==2){
+//             CmiPrintf("start ping check handler\n");
+//      }
+       // CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+  // }
+   //CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+   //CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+
+}
 // called on crashed processor
 static void recoverProcDataHandler(char *msg)
 {
@@ -1454,13 +1618,6 @@ static void recoverProcDataHandler(char *msg)
    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
-   //cur_restart_phase ++;
-     // gzheng ?
-   //CpvAccess(_qd)->flushStates();
-
-   // restore readonly, mainchare, group, nodegroup
-//   int temp = cur_restart_phase;
-//   cur_restart_phase = -1;
    PUP::fromMem p(procMsg->packData);
    _handleProcData(p,CmiTrue);
 
@@ -1478,7 +1635,17 @@ static void recoverProcDataHandler(char *msg)
    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
 #endif
 }
-
+//for replica, got the phase number from my neighbor processor in the current partition
+static void askPhaseHandler(char *msg)
+{
+#if CMK_MEM_CHECKPOINT
+       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
+    int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
+    CmiSetHandler(msg, recvPhaseHandlerIdx);
+       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+#endif
+}
 // called on its backup processor
 // get backup message buffer and sent to crashed processor
 static void askProcDataHandler(char *msg)
@@ -1518,13 +1685,19 @@ void qd_callback(void *m)
    }
    return;
 #endif
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-   // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
-   CmiSetHandler(msg, askProcDataHandlerIdx);
-   int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-
+   if(CmiNumPartition()==1){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+          // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+          CmiSetHandler(msg, askProcDataHandlerIdx);
+          int pe = ChkptOnPe(CpvAccess(_crashedNode));
+          CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+       }
+   else{
+               char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+               CmiSetHandler(msg, recvPhaseHandlerIdx);
+               CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
+   }
 }
 
 // on crashed node
@@ -1532,27 +1705,45 @@ void CkMemRestart(const char *dummy, CkArgMsg *args)
 {
 #if CMK_MEM_CHECKPOINT
    _diePE = CmiMyNode();
-   CkMemCheckPT::startTime = restartT = CmiWallTimer();
-   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+   CpvAccess( _crashedNode )= CmiMyNode();
    CkMemCheckPT::inRestarting = 1;
-
-  CpvAccess( _crashedNode )= CmiMyNode();
-       
-  _discard_charm_message();
-    restartT = CmiWallTimer();
-   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
+   _discard_charm_message();
   
   /*if(CmiMyRank()==0){
     CkCallback cb(qd_callback);
     CkStartQD(cb);
     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
   }*/
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-   // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
-   CmiSetHandler(msg, askProcDataHandlerIdx);
-   int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+  CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+   if(CmiNumPartition()==1){
+          CkMemCheckPT::startTime = restartT = CmiWallTimer();
+               restartT = CmiWallTimer();
+          CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+          // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+          CmiSetHandler(msg, askProcDataHandlerIdx);
+          int pe = ChkptOnPe(CpvAccess(_crashedNode));
+          CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+   }
+   else{
+               CkCallback cb(qd_callback);
+               CkStartQD(cb);
+       //      CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+          //get the restart phase
+          //CmiPrintf("[%d] diePe asks for phase\n",CkMyPe());
+          //char *phaseMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          //*(int *)(phaseMsg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+          //CmiSetHandler(phaseMsg, askPhaseHandlerIdx);
+          //int pe = ChkptOnPe(CpvAccess(_crashedNode));
+          //CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)phaseMsg);
+               //notify the other partition
+          //CkPrintf("[%d] notify remote partition %d\n",CkMyPe(),CmiMyPartition());
+          //char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          //*(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+          //CmiSetHandler(msg,replicaDieHandlerIdx);
+          //CmiRemoteSyncSendAndFree(CkMyPe(),0,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
+   }
 #else
    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
 #endif
@@ -1574,6 +1765,18 @@ int CkInRestarting()
 #endif
 }
 
+extern "C"
+int CkReplicaAlive()
+{
+       CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+       return CkMemCheckPT::replicaAlive;
+
+       /*if(CkMemCheckPT::replicaDead==1)
+               return 0;
+       else            
+               return 1;*/
+}
+
 extern "C"
 void CkSetInLdb(){
 #if CMK_MEM_CHECKPOINT
@@ -1611,8 +1814,9 @@ void init_memcheckpt(char **argv)
 
        // initiliazing _crashedNode variable
        CpvInitialize(int, _crashedNode);
+       CpvInitialize(int, _remoteCrashedNode);
        CpvAccess(_crashedNode) = -1;
-
+       CpvAccess(_remoteCrashedNode) = -1;
 }
 #endif
 
@@ -1664,19 +1868,9 @@ void notify_crash(int node)
 #endif
 }
 
-extern "C" void (*notify_crash_fn)(int node);
-
 #if CMK_CONVERSE_MPI
-static int pingHandlerIdx;
-static int pingCheckHandlerIdx;
-static int buddyDieHandlerIdx;
-static double lastPingTime = -1;
-
-extern "C" void mpi_restart_crashed(int pe, int rank);
-extern "C" int  find_spare_mpirank(int pe);
+extern "C" void (*notify_crash_fn)(int node);
 
-void pingBuddy();
-void pingCheckHandler();
 
 void buddyDieHandler(char *msg)
 {
@@ -1686,11 +1880,11 @@ void buddyDieHandler(char *msg)
    notify_crash(diepe);
    // send message to crash pe to let it restart
    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-   int newrank = find_spare_mpirank(diepe);
+   int newrank = find_spare_mpirank(diepe,CmiMyPartition());
    int buddy = obj->BuddyPE(CmiMyPe());
    if (buddy == diepe)  {
+   //if (CmiMyPe() == obj->BuddyPE(diepe))  {
      mpi_restart_crashed(diepe, newrank);
-     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
    }
 #endif
 }
@@ -1698,6 +1892,9 @@ void buddyDieHandler(char *msg)
 void pingHandler(void *msg)
 {
   lastPingTime = CmiWallTimer();
+  if(CmiMyPartition()==1&&CkMyPe()==2){
+       //CkPrintf("pingHandler lastTime %lf\n",lastPingTime);
+  }
   CmiFree(msg);
 }
 
@@ -1705,7 +1902,10 @@ void pingCheckHandler()
 {
 #if CMK_MEM_CHECKPOINT
   double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
+  if(CmiMyPartition()==1&&CkMyPe()==2){
+       //CkPrintf("lastTime %lf, now %lf\n",lastPingTime,now);
+  }
+  if (lastPingTime > 0 && now - lastPingTime > 1 && !CkInLdb()) {
     int i, pe, buddy;
     // tell everyone the buddy dies
     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
@@ -1726,9 +1926,16 @@ void pingCheckHandler()
     *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
     CmiSetHandler(msg, buddyDieHandlerIdx);
     CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+    //send to everyone in the other world
+       for(int i=0;i<CmiNumPes();i++){
+               char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+       *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+       CmiSetHandler(rMsg, replicaDieHandlerIdx);
+               CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+       }
   }
   else 
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+    CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
 }
 
@@ -1738,7 +1945,6 @@ void pingBuddy()
   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
   if (obj) {
     int buddy = obj->BuddyPE(CkMyPe());
-//printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
     CmiSetHandler(msg, pingHandlerIdx);
@@ -1759,8 +1965,16 @@ void CkRegisterRestartHandler( )
   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
-  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+  
+  //for replica
   recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
+  replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+  replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
+  replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
+  askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
+  recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
+  recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
+  recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
 
 #if CMK_CONVERSE_MPI
   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
@@ -1773,6 +1987,11 @@ void CkRegisterRestartHandler( )
   CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
   CpvInitialize(CkCheckPTMessage *, buddyBuf);
   CpvInitialize(int,curPointer);
+  CpvInitialize(int,recvdLocal);
+  CpvInitialize(int,recvdRemote);
+  CpvInitialize(int,recvdProcChkp);
+  CpvInitialize(int,recvdArrayChkp);
+
   CpvAccess(procChkptBuf) = NULL;
   CpvAccess(buddyBuf) = NULL;
   CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
@@ -1781,9 +2000,12 @@ void CkRegisterRestartHandler( )
   CpvAccess(chkpBuf)[1] = NULL;
   CpvAccess(localProcChkpBuf)[0] = NULL;
   CpvAccess(localProcChkpBuf)[1] = NULL;
+  
   CpvAccess(curPointer) = 0;
   CpvAccess(recvdLocal) = 0;
   CpvAccess(recvdRemote) = 0;
+  CpvAccess(recvdProcChkp) = 0;
+  CpvAccess(recvdArrayChkp) = 0;
 
   notify_crash_fn = notify_crash;
 
index a7214aa8fb31fac2e79f65a9b8dc8aac007522a4..d1eaf9a0ef40d434004ba080dd802fba2ae143ec 100644 (file)
@@ -37,6 +37,7 @@ module CkMemCheckpoint {
        entry [reductiontarget] void finishUp();
        entry [reductiontarget] void doneRComparison(bool);
        entry [reductiontarget] void recoverFromSoftFailure();
+       entry void notifyReplicaDie(int pe);
        entry void doneComparison(bool);
        entry void RollBack();
        entry void gotReply();
index 496146a364c6dec9843d1b1594ce03bc7c99b12e..930273441e116126cbc91e436653150a097c86b5 100644 (file)
@@ -26,6 +26,7 @@ public:
        double *packData;
        int bud1, bud2;
        int len;
+       int pointer;
        int cp_flag;          // 1: from checkpoint 0: from recover
 };
 
@@ -93,17 +94,19 @@ public:
   void pupAllElements(PUP::er &p);
   void startArrayCheckpoint();
   void recvArrayCheckpoint(CkArrayCheckPTMessage *m);
-  void recoverAll(CkArrayCheckPTMessage * msg, CkVec<CkGroupID> * gmap=NULL, CkVec<CkArrayIndex> * imap=NULL);
+  void recoverAll(double * msg, CkVec<CkGroupID> * gmap=NULL, CkVec<CkArrayIndex> * imap=NULL);
   void startCheckpoint();
   void doneComparison(bool);
   void doneRComparison(bool);
   void RollBack();
   void recoverFromSoftFailure();
+  void notifyReplicaDie(int diePe);
 public:
   static CkCallback  cpCallback;
 
   static int inRestarting;
   static int inLoadbalancing;
+  static int replicaAlive;
   static double startTime;
   static char*  stage;
 private:
@@ -140,7 +143,7 @@ extern "C" int CkInRestarting();
 extern "C" int CkInLdb(); 
 extern "C" void CkSetInLdb(); 
 extern "C" void CkResetInLdb();
-
+extern "C" int CkReplicaAlive();
 extern "C" int CkHasCheckpoints();
 
 extern "C" void CkDieNow();