fix for migration
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 96351c9e0ec8568f30a0bf9e35cda6945a93255b..c4dc028c8dd537c689026fdc02c8c05a34bcbdc7 100644 (file)
@@ -47,7 +47,7 @@ restart phase should restore/reset group table, etc on all processors, thus flus
 #include "register.h"
 #include "conv-ccs.h"
 #include <signal.h>
-
+#include <map>
 void noopck(const char*, ...)
 {}
 
@@ -68,10 +68,13 @@ void noopck(const char*, ...)
 
 #define CMK_CHKP_ALL           1
 #define CMK_USE_BARRIER                0
+#define CMK_USE_CHECKSUM               1
 
 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
+CpvDeclare(int, use_checksum);
+CpvDeclare(int, resilience);
 CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
@@ -123,26 +126,37 @@ CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
 //store the checkpoint of the buddy to compare
 //do not need the whole msg, can be the checksum
 CpvDeclare(CkCheckPTMessage*, buddyBuf);
+CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
+CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
 //pointer of the checkpoint going to be written
 CpvDeclare(int, curPointer);
 CpvDeclare(int, recvdRemote);
 CpvDeclare(int, recvdLocal);
 CpvDeclare(int, localChkpDone);
 CpvDeclare(int, remoteChkpDone);
+CpvDeclare(int, remoteStarted);
+CpvDeclare(int, localStarted);
+CpvDeclare(int, remoteReady);
+CpvDeclare(int, localReady);
 CpvDeclare(int, recvdArrayChkp);
 CpvDeclare(int, recvdProcChkp);
+CpvDeclare(int, localChecksum);
+CpvDeclare(int, remoteChecksum);
 
 bool compare(char * buf1, char * buf2);
+int getChecksum(char * buf);
 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
 // Converse function handles
 static int askPhaseHandlerIdx;
 static int recvPhaseHandlerIdx;
 static int askProcDataHandlerIdx;
+static int askRecoverDataHandlerIdx;
 static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
 static int recvRemoteChkpHandlerIdx;
 static int replicaDieHandlerIdx;
+static int replicaChkpStartHandlerIdx;
 static int replicaDieBcastHandlerIdx;
 static int replicaRecoverHandlerIdx;
 static int replicaChkpDoneHandlerIdx;
@@ -422,8 +436,13 @@ void CkMemCheckPT::pup(PUP::er& p)
 void CkMemCheckPT::getIter(){
   localDecided = true;
   localMaxIter = maxIter+1;
+  if(CkMyPe()==0){
+    CkPrintf("local max iter is %d\n",localMaxIter);
+  }
   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
   int elemCount = CkCountChkpSyncElements();
+  if(CkMyPe()==0)
+    startTime = CmiWallTimer();
   if(elemCount == 0){
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
   }
@@ -439,12 +458,15 @@ void CkMemCheckPT::recvIter(int iter){
 
 void CkMemCheckPT::recvMaxIter(int iter){
   localDecided = false;
+  if(CkMyPe()==0)
+    CkPrintf("checkpoint iteration is %d\n",iter);
   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
 }
 
 void CkMemCheckPT::reachChkpIter(){
   recvIterCount++;
   elemCount = CkCountChkpSyncElements();
+  //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
   if(recvIterCount == elemCount){
     recvIterCount = 0;
     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
@@ -452,7 +474,7 @@ void CkMemCheckPT::reachChkpIter(){
 }
 
 void CkMemCheckPT::startChkp(){
-  CkPrintf("start checkpoint\n");
+  CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
   CkStartMemCheckpoint(cpCallback);
 }
 
@@ -560,10 +582,11 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
-void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+//void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+void CkMemCheckPT::doItNow(int starter)
 {
   checkpointed = 1;
-  cpCallback = cb;
+  //cpCallback = cb;
   cpStarter = starter;
   inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
@@ -606,16 +629,26 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
 
 class MemElementPacker : public CkLocIterator{
   private:
-    CkLocMgr *locMgr;
+    //    CkLocMgr *locMgr;
     PUP::er &p;
+    std::map<CkHashCode,CkLocation> arrayMap;
   public:
-    MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
+    MemElementPacker(PUP::er &p_):p(p_){};
     void addLocation(CkLocation &loc){
       CkArrayIndexMax idx = loc.getIndex();
-      CkGroupID gID = locMgr->ckGetGroupID();
-      p|gID;
-      p|idx;
-      locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+      arrayMap[idx.hash()] = loc;
+    }
+    void writeCheckpoint(){
+      std::map<CkHashCode, CkLocation>::iterator it;
+      for(it = arrayMap.begin();it!=arrayMap.end();it++){
+        CkLocation loc = it->second;
+        CkLocMgr *locMgr = loc.getManager();
+        CkArrayIndexMax idx = loc.getIndex();
+        CkGroupID gID = locMgr->ckGetGroupID();
+        p|gID;
+        p|idx;
+        locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+      }
     }
 };
 
@@ -627,7 +660,9 @@ void pupAllElements(PUP::er &p){
   }
   p | numElements;
   if(!p.isUnpacking()){
-    CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
+    MemElementPacker packer(p);
+    CKLOCMGR_LOOP(mgr->iterateLocal(packer););
+    packer.writeCheckpoint();
   }
 #endif
 }
@@ -689,9 +724,17 @@ void CkMemCheckPT::startCheckpoint(){
   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
   msg->len = size;
   msg->cp_flag = 1;
+  int checksum;
   {
-    PUP::toMem p(msg->packData);
-    pupAllElements(p);
+    if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
+      PUP::checker p(msg->packData);
+      pupAllElements(p);
+      checksum = p.getChecksum();
+    }else{  
+//    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
+      PUP::toMem p(msg->packData);
+      pupAllElements(p);
+    }
   }
   pointer = CpvAccess(curPointer);
   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
@@ -700,48 +743,93 @@ void CkMemCheckPT::startCheckpoint(){
     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   if(CkReplicaAlive()==1){
     CpvAccess(recvdLocal) = 1;
-    envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
-    CkPackMessage(&env);
-    CmiSetHandler(env,recvRemoteChkpHandlerIdx);
-    CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+    if(CpvAccess(use_checksum)){
+      CpvAccess(localChecksum) = checksum;
+      char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
+      //only one reaplica will send
+      if(CmiMyPartition()==0){
+        CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
+      }
+    }else{
+      envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
+      CkPackMessage(&env);
+      if(CmiMyPartition()==0){
+        CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+      }
+    }
+    if(CmiMyPartition()==0){
+      notifyReplica = 1;
+      thisProxy[CkMyPe()].doneComparison(true);
+    }
   }
   if(CpvAccess(recvdRemote)==1){
+    //only partition 1 will do it
     //compare the checkpoint 
     int size = CpvAccess(chkpBuf)[pointer]->len;
-    if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
-      thisProxy[CkMyPe()].doneComparison(true);
+    if(CpvAccess(use_checksum)){
+      if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
     }else{
-      CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
-      thisProxy[CkMyPe()].doneComparison(false);
+      if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
     }
     if(CkMyPe()==0)
       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
   }
   else{
     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
-      {        
-        int pointer = CpvAccess(curPointer);
-        //send the proc data
-        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
-        procMsg->pointer = pointer;
-        envelope * env = (envelope *)(UsrToEnv(procMsg));
+      //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
+      if(CpvAccess(remoteReady)==1){
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        {      
+          int pointer = CpvAccess(curPointer);
+          //send the proc data
+          CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          procMsg->pointer = pointer;
+          envelope * env = (envelope *)(UsrToEnv(procMsg));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+            CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
+          }
+        }
+        //send the array checkpoint data
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        msg->pointer = CpvAccess(curPointer);
+        envelope * env = (envelope *)(UsrToEnv(msg));
         CkPackMessage(&env);
-        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+          CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
+      }else{
         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
-          CkPrintf("[%d] sendProcdata\n",CkMyPe());
+          int pointer = CpvAccess(curPointer);
+          CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          (CpvAccess(recoverProcBuf))->pointer = pointer;
         }
+        CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
+        CpvAccess(localReady) = 1;
       }
-      //send the array checkpoint data
-      CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
-      msg->pointer = CpvAccess(curPointer);
-      envelope * env = (envelope *)(UsrToEnv(msg));
-      CkPackMessage(&env);
-      CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
-      CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
-        CkPrintf("[%d] sendArraydata\n",CkMyPe());
       //can continue work, no need to wait for my replica
+      int _ret = 1;
+      notifyReplica = 1;
+      CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+      contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
     }
   }
 #endif
@@ -750,7 +838,7 @@ void CkMemCheckPT::startCheckpoint(){
 void CkMemCheckPT::doneComparison(bool ret){
   int _ret = 1;
   if(!ret){
-    CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
+    //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
     _ret = 0;
   }else{
     _ret = 1;
@@ -761,35 +849,46 @@ void CkMemCheckPT::doneComparison(bool ret){
 
 void CkMemCheckPT::doneRComparison(int ret){
   //   if(CpvAccess(curPointer) == 0){
-  if(ret==CkNumPes()){
+  //if(ret==CkNumPes()){
     CpvAccess(localChkpDone) = 1;
     if(CpvAccess(remoteChkpDone) ==1){
       thisProxy.doneBothComparison();
+    }else{
+      CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
     }
-    if(notifyReplica == 0){
-      //notify the replica am done
-      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-      CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
-      CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
-      notifyReplica = 1;
-    }
-  }
-  else{
-    CkPrintf("[%d][%d] going to RollBack %d \n", CmiMyPartition(),CkMyPe(),ret);
+  //}
+  /*else{
+    CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
+    startTime = CmiWallTimer();
     thisProxy.RollBack();
+  }*/
+  if(notifyReplica == 0){
+    //notify the replica am done
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+    *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
+    CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
+    CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
+    notifyReplica = 1;
   }
 }
 
 void CkMemCheckPT::doneBothComparison(){
   CpvAccess(recvdRemote) = 0;
+  CpvAccess(remoteReady) = 0;
+  CpvAccess(localReady) = 0;
   CpvAccess(recvdLocal) = 0;
   CpvAccess(localChkpDone) = 0;
   CpvAccess(remoteChkpDone) = 0;
+  CpvAccess(remoteStarted) = 0;
+  CpvAccess(localStarted) = 0;
+  CpvAccess(_remoteCrashedNode) = -1;
+  CkMemCheckPT::replicaAlive = 1;
+  int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
   CpvAccess(curPointer)^=1;
   inCheckpointing = 0;
   notifyReplica = 0;
   if(CkMyPe() == 0){
-    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime);
+    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
   }
   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
 }
@@ -797,7 +896,7 @@ void CkMemCheckPT::doneBothComparison(){
 void CkMemCheckPT::RollBack(){
   //restore group data
   checkpointed = 0;
-  CkMemCheckPT::inRestarting = 1;
+  inRestarting = 1;
   int pointer = CpvAccess(curPointer)^1;//use the previous one
   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
   PUP::fromMem p(chkpMsg->packData);   
@@ -818,7 +917,6 @@ void CkMemCheckPT::RollBack(){
 
   if(p.isUnpacking()){
     for(int i=0;i<numElements;i++){
-      //for(int i=0;i<1;i++){
       CkGroupID gID;
       CkArrayIndex idx;
       p|gID;
@@ -833,7 +931,6 @@ void CkMemCheckPT::RollBack(){
   }
 
   void CkMemCheckPT::notifyReplicaDie(int pe){
-    //CkPrintf("[%d] receive replica die\n",CkMyPe());
     replicaAlive = 0;
     CpvAccess(_remoteCrashedNode) = pe;
   }
@@ -1091,22 +1188,22 @@ void CkMemCheckPT::RollBack(){
     void CkMemCheckPT::restart(int diePe)
     {
 #if CMK_MEM_CHECKPOINT
+      thisFailedPe = diePe;
       double curTime = CmiWallTimer();
-      if (CkMyPe() == diePe){
+      if (CkMyPe() == thisFailedPe){
         restartT = CmiWallTimer();
         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
       }
       stage = (char*)"resetLB";
       startTime = curTime;
-      if (CkMyPe() == diePe)
+      if (CkMyPe() == thisFailedPe)
         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
 
-#if CK_NO_PROC_POOL
-      failed(diePe);   // add into the list of failed pes
-#endif
-      thisFailedPe = diePe;
+//#if CK_NO_PROC_POOL
+//      failed(diePe); // add into the list of failed pes
+//#endif
 
-      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
+//      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
 
       inRestarting = 1;
 
@@ -1156,7 +1253,8 @@ void CkMemCheckPT::RollBack(){
     void CkMemCheckPT::resetReductionMgr()
     {
       if (CkMyPe() == thisFailedPe) 
-        CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
+        CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
+      stage = (char *)"resetReductionMgr";
       int numGroups = CkpvAccess(_groupIDTable)->size();
       for(int i=0;i<numGroups;i++) {
         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
@@ -1169,8 +1267,11 @@ void CkMemCheckPT::RollBack(){
       if(CmiNumPartition()==1){
         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
       }
-      else
+      else{
+       if (CkMyPe() == thisFailedPe) 
+         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
     }
 
     // recover the lost buddies
@@ -1257,16 +1358,10 @@ void CkMemCheckPT::RollBack(){
     // restore array elements
     void CkMemCheckPT::recoverArrayElements()
     {
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
       double curTime = CmiWallTimer();
       int len = ckTable.length();
-      //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
+      CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
       stage = (char *)"recoverArrayElements";
-      if (CkMyPe() == thisFailedPe)
-        CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
       startTime = curTime;
       int flag = 0;
       // recover all array elements
@@ -1325,7 +1420,6 @@ void CkMemCheckPT::RollBack(){
       recoverAll(packData);
 #endif
 #endif
-      curTime = CmiWallTimer();
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
       for (int i=0; i<CkNumPes(); i++) {
         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
@@ -1337,6 +1431,8 @@ void CkMemCheckPT::RollBack(){
       delete [] gmap;
 #endif
       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      //if (CkMyPe() == thisFailedPe)
 
       CKLOCMGR_LOOP(mgr->doneInserting(););
 
@@ -1351,10 +1447,6 @@ void CkMemCheckPT::RollBack(){
         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
       }
 #endif
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
     }
 
     void CkMemCheckPT::gotReply(){
@@ -1362,10 +1454,6 @@ void CkMemCheckPT::RollBack(){
     }
 
     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d]before recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
 #if CMK_CHKP_ALL
       PUP::fromMem p(packData);
       int numElements;
@@ -1402,10 +1490,6 @@ void CkMemCheckPT::RollBack(){
         }
       }
 #endif
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d]after recover memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
     }
 
 
@@ -1424,18 +1508,21 @@ void CkMemCheckPT::RollBack(){
       }
       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
       inRestarting = 0;
-
+      maxIter = -1;
 #if CMK_CONVERSE_MPI   
       if(CmiNumPartition()!=1){
         CpvAccess(recvdProcChkp) = 0;
         CpvAccess(recvdArrayChkp) = 0;
         CpvAccess(curPointer)^=1;
         //notify my replica, restart is done
-        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-        CmiSetHandler(msg,replicaRecoverHandlerIdx);
-        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg,replicaRecoverHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        }
       }
       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        lastPingTime = CmiWallTimer();
         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
       }
 #endif
@@ -1456,6 +1543,24 @@ void CkMemCheckPT::RollBack(){
     void CkMemCheckPT::recoverFromSoftFailure()
     {
       inRestarting = 0;
+      maxIter = -1;
+      CpvAccess(recvdRemote) = 0;
+      CpvAccess(recvdLocal) = 0;
+      CpvAccess(localChkpDone) = 0;
+      CpvAccess(remoteChkpDone) = 0;
+      CpvAccess(remoteReady) = 0;
+      CpvAccess(localReady) = 0;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      CpvAccess(remoteStarted) = 0;
+      CpvAccess(localStarted) = 0;
+      CpvAccess(_remoteCrashedNode) = -1;
+      CkMemCheckPT::replicaAlive = 1;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      if(CkMyPe() == 0){
+        CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
+      }
       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
     }
     // called only on 0
@@ -1477,11 +1582,11 @@ void CkMemCheckPT::RollBack(){
     {
 #if CMK_MEM_CHECKPOINT
       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
-      if (_memChkptOn == 0) {
+      /*if (_memChkptOn == 0) {
         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
         cb.send();
         return;
-      }
+        }*/
       if (CkInRestarting()) {
         // trying to checkpointing during restart
         cb.send();
@@ -1490,9 +1595,19 @@ void CkMemCheckPT::RollBack(){
       // store user callback and user data
       CkMemCheckPT::cpCallback = cb;
 
+
+      //send to my replica that checkpoint begins 
+      if(CkReplicaAlive()==1){
+        char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, replicaChkpStartHandlerIdx);
+        CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+      }
+      CpvAccess(localStarted) = 1;
       // broadcast to start check pointing
-      CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-      checkptMgr.doItNow(CkMyPe(), cb);
+      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(CkMyPe());
+      }
 #else
       // when mem checkpoint is disabled, invike cb immediately
       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
@@ -1525,7 +1640,7 @@ void CkMemCheckPT::RollBack(){
 #if CMK_MEM_CHECKPOINT
 #if CMK_USE_BARRIER
       if(CkMyPe()!=_diePE){
-        printf("restar begin on %d\n",CkMyPe());
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
@@ -1537,8 +1652,8 @@ void CkMemCheckPT::RollBack(){
       static int count = 0;
       CmiAssert(CkMyPe() == _diePE);
       count ++;
-      if (count == CkNumPes()) {
-        printf("restart begin on %d\n",CkMyPe());
+      if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
         CkRestartCheckPointCallback(NULL, NULL);
         count = 0;
       }
@@ -1595,98 +1710,143 @@ void CkMemCheckPT::RollBack(){
     extern void _initDone();
 
     bool compare(char * buf1, char *buf2){
-       PUP::checker pchecker(buf1,buf2);
-                pchecker.skip();
-
-                int numElements;
-                pchecker|numElements;
-                for(int i=0;i<numElements;i++){
-      //for(int i=0;i<1;i++){
-      CkGroupID gID;
-      CkArrayIndex idx;
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      PUP::checker pchecker(buf1,buf2);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
+
+        pchecker|gID;
+        pchecker|idx;
+
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+      }
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      //int fault_num = pchecker.getFaultNum();
+      bool result = pchecker.getResult();
+      /*if(!result){
+        CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
+      }*/
+      return result;
+    }
+    int getChecksum(char * buf){
+      PUP::checker pchecker(buf);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+//      CkPrintf("num %d\n",numElements);
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
 
-      pchecker|gID;
-      pchecker|idx;
+        pchecker|gID;
+        pchecker|idx;
 
-      CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
-      mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
       }
-      //return pchecker.getResult();
-      return true;
+      return pchecker.getChecksum();
     }
 
     static void recvRemoteChkpHandler(char *msg){
-      envelope *env = (envelope *)msg;
-      CkUnpackMessage(&env);
-      CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
-      if(CpvAccess(recvdLocal)==1){
-        int pointer = CpvAccess(curPointer);
-        int size = CpvAccess(chkpBuf)[pointer]->len;
-        if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
-          CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
-        }else
-        {
-          CkPrintf("[%d][%d] failed the test\n",CmiMyPartition(),CkMyPe());
-          CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
-        }
-        delete chkpMsg;
+      CpvAccess(remoteChkpDone) = 1;
+      if(CpvAccess(use_checksum)){
         if(CkMyPe()==0)
-          CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
-      }else{
+          CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
         CpvAccess(recvdRemote) = 1;
-        if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
-        CpvAccess(buddyBuf) = chkpMsg;
-      }  
+        if(CpvAccess(recvdLocal)==1){
+          if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }
+          else{
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+        }
+      }else{
+        envelope *env = (envelope *)msg;
+        CkUnpackMessage(&env);
+        CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+        if(CpvAccess(recvdLocal)==1){
+          int pointer = CpvAccess(curPointer);
+          int size = CpvAccess(chkpBuf)[pointer]->len;
+          if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }else
+          {
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+          delete chkpMsg;
+          if(CkMyPe()==0)
+            CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        }else{
+          CpvAccess(recvdRemote) = 1;
+          if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
+          CpvAccess(buddyBuf) = chkpMsg;
+        }
+      }
     }
 
     static void replicaRecoverHandler(char *msg){
-      CpvAccess(_remoteCrashedNode) = -1;
-      CkMemCheckPT::replicaAlive = 1;
       //fflush(stdout);
       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
-      bool ret = true;
       CpvAccess(remoteChkpDone) = 1;
-      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(ret);
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
       CmiFree(msg);
-  
     }
+
     static void replicaChkpDoneHandler(char *msg){
       CpvAccess(remoteChkpDone) = 1;
+      int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
       if(CpvAccess(localChkpDone) == 1)
-        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
       CmiFree(msg);
     }
 
     static void replicaDieHandler(char * msg){
-#if CMK_CONVERSE_MPI   
+#if CMK_CONVERSE_MPI
+      //broadcast to every one in my replica
+      CmiSetHandler(msg, replicaDieBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+#endif
+    }
+
+    static void replicaChkpStartHandler(char * msg){
+      CpvAccess(remoteStarted) =1;
+      if(CpvAccess(localStarted)==1){    
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(0);
+      }
+    }
+
+
+    static void replicaDieBcastHandler(char *msg){
       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
       CpvAccess(_remoteCrashedNode) = diePe;
-      CkMemCheckPT::replicaAlive = 0;
       if(CkMyPe()==diePe){
         CmiPrintf("pe %d in replicad word die\n",diePe);
         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
         fflush(stdout);
       }
-      find_spare_mpirank(diePe,CmiMyPartition()^1);
-#endif
+      if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
+        find_spare_mpirank(diePe,CmiMyPartition()^1);
+      }
       //broadcast to my partition to get local max iter
-      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
-      CmiFree(msg);
-    }
-
-
-    static void replicaDieBcastHandler(char *msg){
-      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-      CpvAccess(_remoteCrashedNode) = diePe;
-      CkMemCheckPT::replicaAlive = 0;
+      if(CpvAccess(resilience)!=1){
+        CkMemCheckPT::replicaAlive = 0;
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+      }
       CmiFree(msg);
     }
 
     static void recoverRemoteProcDataHandler(char *msg){
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
       envelope *env = (envelope *)msg;
       CkUnpackMessage(&env);
       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
@@ -1716,11 +1876,17 @@ void CkMemCheckPT::RollBack(){
         _diePE = CpvAccess(_crashedNode);
         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
-        CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
-      }
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
       }
     }
 
@@ -1728,10 +1894,6 @@ void CkMemCheckPT::RollBack(){
       envelope *env = (envelope *)msg;
       CkUnpackMessage(&env);
       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
-      }
 
       //store the checkpoint
       int pointer = chkpMsg->pointer;
@@ -1740,17 +1902,23 @@ void CkMemCheckPT::RollBack(){
       CpvAccess(chkpBuf)[pointer] = chkpMsg;
       CpvAccess(recvdArrayChkp) =1;
       CkMemCheckPT::inRestarting = 1;
-      if(CpvAccess(recvdProcChkp) == 1){
+      if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
         _resume_charm_message();
         _diePE = CpvAccess(_crashedNode);
         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
-        CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
-      }
-      if(CmiMyPartition()==1&&CkMyPe()==0){
-        CmiPrintf("[%d][%d] memory %lf\n",CmiMyPartition(),CkMyPe(),CmiMemoryUsage()/1048576.0);
-        fflush(stdout);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
       }
     }
 
@@ -1759,6 +1927,65 @@ void CkMemCheckPT::RollBack(){
       CpvAccess(_curRestartPhase)--;
       CkMemCheckPT::inRestarting = 1;
       CmiFree(msg);
+      //notify the buddy in the replica now i can receive the checkpoint msg
+      if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
+        char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
+        //timer start
+        if(CpvAccess(_crashedNode)==CkMyPe())
+          CkMemCheckPT::startTime = restartT = CmiWallTimer();
+        if(CpvAccess(_crashedNode)==CmiMyPe())
+          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      }else{
+        CpvAccess(curPointer)^=1;
+        CkMemCheckPT::inRestarting = 1;
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+      }
+    }
+    
+    static void askRecoverDataHandler(char * msg){
+      if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+       CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      if(CpvAccess(resilience)!=1){
+        CpvAccess(remoteReady)=1;
+        if(CpvAccess(localReady)==1){
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+          {    
+            envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
+            CkPackMessage(&env);
+            CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+            CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+            CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
+          }
+          //send the array checkpoint data
+          envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+            CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+        }
+      }else{
+        find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
+        int pointer = CpvAccess(curPointer)^1;
+        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+        procMsg->pointer = pointer;
+        envelope * env = (envelope *)(UsrToEnv(procMsg));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
+        
+        CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        arrayMsg->pointer = pointer;
+        envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
+        CkPackMessage(&env1);
+        CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
+        CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+      }
     }
     // called on crashed processor
     static void recoverProcDataHandler(char *msg)
@@ -1826,7 +2053,7 @@ void CkMemCheckPT::RollBack(){
     void qd_callback(void *m)
     {
       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
-        fflush(stdout);
+      fflush(stdout);
       CkFreeMsg(m);
       if(CmiNumPartition()==1){
 #ifdef CMK_SMP
@@ -1867,8 +2094,8 @@ void CkMemCheckPT::RollBack(){
         CkStartQD(cb);
         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
         }*/
+      CkMemCheckPT::startTime = restartT = CmiWallTimer();
       if(CmiNumPartition()==1){
-        CkMemCheckPT::startTime = restartT = CmiWallTimer();
         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
         restartT = CmiWallTimer();
         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
@@ -1958,13 +2185,28 @@ void CkMemCheckPT::RollBack(){
       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
         arg_where = CkCheckPoint_inDISK;
       }
+      CpvInitialize(int, use_checksum);
+      CpvInitialize(int, resilience);
+      CpvAccess(use_checksum)=0;
+      CpvAccess(resilience)=0;
+      if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
+        CpvAccess(use_checksum)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
+        CpvAccess(resilience)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
+        CpvAccess(resilience)=2;
+      }
+      if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
+        CpvAccess(resilience)=3;
+      }
       // initiliazing _crashedNode variable
       CpvInitialize(int, _crashedNode);
       CpvInitialize(int, _remoteCrashedNode);
       CpvAccess(_crashedNode) = -1;
       CpvAccess(_remoteCrashedNode) = -1;
       init_FI(argv);
-
     }
 #endif
 
@@ -2066,14 +2308,12 @@ void CkMemCheckPT::RollBack(){
         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
         //send to everyone in the other world
         if(CmiNumPartition()!=1){
-          for(int i=0;i<CmiNumPes();i++){
+          //for(int i=0;i<CmiNumPes();i++){
             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
-            //CmiPrintf("[%d][%d] send to processor %d in replica. \n",CmiMyPartition(), CmiMyPe(), i);
-            //fflush(stdout);
             CmiSetHandler(rMsg, replicaDieHandlerIdx);
-            CmiRemoteSyncSendAndFree(i,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
-          }
+            CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+          //}
         }
       }
         else 
@@ -2104,6 +2344,7 @@ void CkMemCheckPT::RollBack(){
 #if CMK_MEM_CHECKPOINT
         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
+        askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
@@ -2111,6 +2352,7 @@ void CkMemCheckPT::RollBack(){
         //for replica
         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+        replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
@@ -2129,16 +2371,26 @@ void CkMemCheckPT::RollBack(){
         CpvInitialize(CkCheckPTMessage **, chkpBuf);
         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
         CpvInitialize(CkCheckPTMessage *, buddyBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
         CpvInitialize(int,curPointer);
         CpvInitialize(int,recvdLocal);
         CpvInitialize(int,localChkpDone);
         CpvInitialize(int,remoteChkpDone);
+        CpvInitialize(int,remoteStarted);
+        CpvInitialize(int,localStarted);
+        CpvInitialize(int,localReady);
+        CpvInitialize(int,remoteReady);
         CpvInitialize(int,recvdRemote);
         CpvInitialize(int,recvdProcChkp);
+        CpvInitialize(int,localChecksum);
+        CpvInitialize(int,remoteChecksum);
         CpvInitialize(int,recvdArrayChkp);
 
         CpvAccess(procChkptBuf) = NULL;
         CpvAccess(buddyBuf) = NULL;
+        CpvAccess(recoverProcBuf) = NULL;
+        CpvAccess(recoverArrayBuf) = NULL;
         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
         CpvAccess(chkpBuf)[0] = NULL;
@@ -2150,8 +2402,14 @@ void CkMemCheckPT::RollBack(){
         CpvAccess(recvdLocal) = 0;
         CpvAccess(localChkpDone) = 0;
         CpvAccess(remoteChkpDone) = 0;
+        CpvAccess(remoteStarted) = 0;
+        CpvAccess(localStarted) = 0;
+        CpvAccess(localReady) = 0;
+        CpvAccess(remoteReady) = 0;
         CpvAccess(recvdRemote) = 0;
         CpvAccess(recvdProcChkp) = 0;
+        CpvAccess(localChecksum) = 0;
+        CpvAccess(remoteChecksum) = 0;
         CpvAccess(recvdArrayChkp) = 0;
 
         notify_crash_fn = notify_crash;