fix for migration
[charm.git] / src / ck-core / ckmemcheckpoint.C
index f6e24567f70d4faf04d35be087440985bad6df9a..c4dc028c8dd537c689026fdc02c8c05a34bcbdc7 100644 (file)
@@ -1,52 +1,53 @@
 
 /*
-Charm++ support for fault tolerance of
-In memory synchronous checkpointing and restart
+   Charm++ support for fault tolerance of
+   In memory synchronous checkpointing and restart
 
-written by Gengbin Zheng, gzheng@uiuc.edu
-           Lixia Shi,     lixiashi@uiuc.edu
+   written by Gengbin Zheng, gzheng@uiuc.edu
+   Lixia Shi,     lixiashi@uiuc.edu
 
-added 12/18/03:
+   added 12/18/03:
 
-To support fault tolerance while allowing migration, it uses double
-checkpointing scheme for each array element (not a infallible scheme).
-In this version, checkpointing is done based on array elements. 
-Each array element individully sends its checkpoint data to two buddies.
+   To support fault tolerance while allowing migration, it uses double
+   checkpointing scheme for each array element (not a infallible scheme).
+   In this version, checkpointing is done based on array elements. 
+   Each array element individully sends its checkpoint data to two buddies.
 
-In this implementation, assume only one failure happens at a time,
-or two failures on two processors which are not buddy to each other;
-also assume there is no failure during a checkpointing or restarting phase.
+   In this implementation, assume only one failure happens at a time,
+   or two failures on two processors which are not buddy to each other;
+   also assume there is no failure during a checkpointing or restarting phase.
 
-Restart phase contains two steps:
-1. Converse level restart: the newly created process for the failed
+   Restart phase contains two steps:
+   1. Converse level restart: the newly created process for the failed
    processor recover its system data (no array elements) from 
    its backup processor.
-2. Charm++ level restart: CkMemCheckPT gets control and recover array 
+   2. Charm++ level restart: CkMemCheckPT gets control and recover array 
    elements and reset all states of system groups to be consistent.
 
-added 3/14/04:
-1. also support for double in-disk checkpoint/restart
+   added 3/14/04:
+   1. also support for double in-disk checkpoint/restart
    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
 
-added 4/16/04:
-1. also support the case when there is a pool of extra processors.
+   added 4/16/04:
+   1. also support the case when there is a pool of extra processors.
    set CK_NO_PROC_POOL to 0.
 
 TODO:
 1. checkpoint scheme can be reimplemented based on per processor scheme;
- restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
+restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
 
-*/
+ */
 
 #include "unistd.h"
 
 #include "charm++.h"
+#include "ckfaultinjector.h"
 #include "ck.h"
 #include "register.h"
 #include "conv-ccs.h"
 #include <signal.h>
-
+#include <map>
 void noopck(const char*, ...)
 {}
 
@@ -67,13 +68,19 @@ void noopck(const char*, ...)
 
 #define CMK_CHKP_ALL           1
 #define CMK_USE_BARRIER                0
+#define CMK_USE_CHECKSUM               1
 
 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
+CpvDeclare(int, use_checksum);
+CpvDeclare(int, resilience);
+CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
+int CkMemCheckPT::inCheckpointing = 0;
+int CkMemCheckPT::replicaAlive = 1;
 int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
@@ -110,7 +117,7 @@ double killTime=0.0;
       code     \
     }  \
   }    \
- }
+}
 
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
@@ -119,31 +126,65 @@ CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
 //store the checkpoint of the buddy to compare
 //do not need the whole msg, can be the checksum
 CpvDeclare(CkCheckPTMessage*, buddyBuf);
+CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
+CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
 //pointer of the checkpoint going to be written
 CpvDeclare(int, curPointer);
 CpvDeclare(int, recvdRemote);
 CpvDeclare(int, recvdLocal);
+CpvDeclare(int, localChkpDone);
+CpvDeclare(int, remoteChkpDone);
+CpvDeclare(int, remoteStarted);
+CpvDeclare(int, localStarted);
+CpvDeclare(int, remoteReady);
+CpvDeclare(int, localReady);
+CpvDeclare(int, recvdArrayChkp);
+CpvDeclare(int, recvdProcChkp);
+CpvDeclare(int, localChecksum);
+CpvDeclare(int, remoteChecksum);
 
 bool compare(char * buf1, char * buf2);
+int getChecksum(char * buf);
 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
 // Converse function handles
 static int askPhaseHandlerIdx;
 static int recvPhaseHandlerIdx;
 static int askProcDataHandlerIdx;
+static int askRecoverDataHandlerIdx;
 static int restartBcastHandlerIdx;
 static int recoverProcDataHandlerIdx;
 static int restartBeginHandlerIdx;
 static int recvRemoteChkpHandlerIdx;
+static int replicaDieHandlerIdx;
+static int replicaChkpStartHandlerIdx;
+static int replicaDieBcastHandlerIdx;
+static int replicaRecoverHandlerIdx;
+static int replicaChkpDoneHandlerIdx;
+static int recoverRemoteProcDataHandlerIdx;
+static int recoverRemoteArrayDataHandlerIdx;
 static int notifyHandlerIdx;
 // compute the backup processor
 // FIXME: avoid crashed processors
+#if CMK_CONVERSE_MPI
+static int pingHandlerIdx;
+static int pingCheckHandlerIdx;
+static int buddyDieHandlerIdx;
+static double lastPingTime = -1;
+
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe,int partition);
+
+void pingBuddy();
+void pingCheckHandler();
+
+#endif
 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
 
 inline int CkMemCheckPT::BuddyPE(int pe)
 {
   int budpe;
 #if NODE_CHECKPOINT
-    // buddy is the processor with same rank on the next physical node
+  // buddy is the processor with same rank on the next physical node
   int r1 = CmiPhysicalRank(pe);
   int budnode = CmiPhysicalNodeID(pe);
   do {
@@ -159,8 +200,7 @@ inline int CkMemCheckPT::BuddyPE(int pe)
   }
 #else
   budpe = pe;
-  while (budpe == pe || isFailed(budpe)) 
-          budpe = (budpe+1)%CkNumPes();
+  budpe = (budpe+1)%CkNumPes();
 #endif
   return budpe;
 }
@@ -169,49 +209,49 @@ inline int CkMemCheckPT::BuddyPE(int pe)
 // choose and register with 2 buddies for checkpoiting 
 #if CMK_MEM_CHECKPOINT
 void ArrayElement::init_checkpt() {
-       if (_memChkptOn == 0) return;
-       if (CkInRestarting()) {
-         CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
-       }
-       // only master init checkpoint
-        if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
-
-        budPEs[0] = CkMyPe();
-        budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
-       CmiAssert(budPEs[0] != budPEs[1]);
-        // inform checkPTMgr
-        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-       //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
-        checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
-       checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
+  if (_memChkptOn == 0) return;
+  if (CkInRestarting()) {
+    CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
+  }
+  // only master init checkpoint
+  if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
+
+  budPEs[0] = CkMyPe();
+  budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
+  CmiAssert(budPEs[0] != budPEs[1]);
+  // inform checkPTMgr
+  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+  //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
+  checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
+  checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
 }
 #endif
 
 // entry function invoked by checkpoint mgr asking for checkpoint data
 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 #if CMK_MEM_CHECKPOINT
-//  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
-//char index[128];   thisIndexMax.sprint(index);
-//printf("[%d] checkpointing %s\n", CkMyPe(), index);
+  //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
+  //char index[128];   thisIndexMax.sprint(index);
+  //printf("[%d] checkpointing %s\n", CkMyPe(), index);
   CkLocMgr *locMgr = thisArray->getLocMgr();
   CmiAssert(myRec!=NULL);
   int size;
   {
-        PUP::sizer p;
-        locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
-        size = p.size();
+    PUP::sizer p;
+    locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
+    size = p.size();
   }
   int packSize = size/sizeof(double) +1;
   CkArrayCheckPTMessage *msg =
-                 new (packSize, 0) CkArrayCheckPTMessage;
+    new (packSize, 0) CkArrayCheckPTMessage;
   msg->len = size;
   msg->index =thisIndexMax;
   msg->aid = thisArrayID;
   msg->locMgr = locMgr->getGroupID();
   msg->cp_flag = 1;
   {
-        PUP::toMem p(msg->packData);
-        locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
+    PUP::toMem p(msg->packData);
+    locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
   }
 
   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
@@ -224,9 +264,9 @@ void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 class CkMemCheckPTInfo: public CkCheckPTInfo
 {
   CkArrayCheckPTMessage *ckBuffer;
-public:
+  public:
   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
-                   CkCheckPTInfo(a, loc, idx, pno)
+    CkCheckPTInfo(a, loc, idx, pno)
   {
     ckBuffer = NULL;
   }
@@ -249,14 +289,14 @@ public:
     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
   }     
   inline void updateBuddy(int b1, int b2) {
-     CmiAssert(ckBuffer);
-     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
-     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
-     CmiAssert(pNo != CkMyPe());
+    CmiAssert(ckBuffer);
+    ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
+    pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
+    CmiAssert(pNo != CkMyPe());
   }
   inline int getSize() { 
-     CmiAssert(ckBuffer);
-     return ckBuffer->len; 
+    CmiAssert(ckBuffer);
+    return ckBuffer->len; 
   }
 };
 
@@ -266,7 +306,7 @@ class CkDiskCheckPTInfo: public CkCheckPTInfo
   char *fname;
   int bud1, bud2;
   int len;                     // checkpoint size
-public:
+  public:
   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
   {
 #if CMK_USE_MKSTEMP
@@ -294,7 +334,7 @@ public:
     PUP::toDisk p(f);
     CkPupMessage(p, (void **)&data);
     // delay sync to the end because otherwise the messages are blocked
-//    fsync(fileno(f));
+    //    fsync(fileno(f));
     fclose(f);
     bud1 = data->bud1;
     bud2 = data->bud2;
@@ -314,12 +354,12 @@ public:
     return data;
   }
   inline void updateBuddy(int b1, int b2) {
-     bud1 = b1; bud2 = b2;
-     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
-     CmiAssert(pNo != CkMyPe());
+    bud1 = b1; bud2 = b2;
+    pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
+    CmiAssert(pNo != CkMyPe());
   }
   inline int getSize() { 
-     return len; 
+    return len; 
   }
 };
 
@@ -334,26 +374,31 @@ CkMemCheckPT::CkMemCheckPT(int w)
 #if CK_NO_PROC_POOL
   if (numnodes <= 2)
 #else
-  if (numnodes  == 1)
+    if (numnodes  == 1)
 #endif
-  {
-    if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
-    _memChkptOn = 0;
-  }
+    {
+      if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
+      _memChkptOn = 0;
+    }
   inRestarting = 0;
+  inCheckpointing = 0;
   recvCount = peCount = 0;
   ackCount = 0;
   expectCount = -1;
   where = w;
-
+  replicaAlive = 1;
+  notifyReplica = 0;
 #if CMK_CONVERSE_MPI
   void pingBuddy();
   void pingCheckHandler();
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+  CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
   chkpTable[0] = NULL;
   chkpTable[1] = NULL;
+  maxIter = -1;
+  recvIterCount = 0;
+  localDecided = false;
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -380,11 +425,59 @@ void CkMemCheckPT::pup(PUP::er& p)
     void pingBuddy();
     void pingCheckHandler();
     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+    CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
+    maxIter = -1;
+    recvIterCount = 0;
+    localDecided = false;
   }
 }
 
+void CkMemCheckPT::getIter(){
+  localDecided = true;
+  localMaxIter = maxIter+1;
+  if(CkMyPe()==0){
+    CkPrintf("local max iter is %d\n",localMaxIter);
+  }
+  contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
+  int elemCount = CkCountChkpSyncElements();
+  if(CkMyPe()==0)
+    startTime = CmiWallTimer();
+  if(elemCount == 0){
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
+  }
+}
+
+//when one replica failes, decide the next chkp iter
+
+void CkMemCheckPT::recvIter(int iter){
+  if(maxIter<iter){
+    maxIter=iter;
+  }
+}
+
+void CkMemCheckPT::recvMaxIter(int iter){
+  localDecided = false;
+  if(CkMyPe()==0)
+    CkPrintf("checkpoint iteration is %d\n",iter);
+  CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
+}
+
+void CkMemCheckPT::reachChkpIter(){
+  recvIterCount++;
+  elemCount = CkCountChkpSyncElements();
+  //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
+  if(recvIterCount == elemCount){
+    recvIterCount = 0;
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
+  }
+}
+
+void CkMemCheckPT::startChkp(){
+  CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
+  CkStartMemCheckpoint(cpCallback);
+}
+
 // called by checkpoint mgr to restore an array element
 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
 {
@@ -450,11 +543,11 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index,
     CkCheckPTInfo *entry = ckTable[idx];
     if (index == entry->index) {
       if (loc == entry->locMgr) {
-         // bindTo array elements
-          return;
+        // bindTo array elements
+        return;
       }
-        // for array inheritance, the following check may fail
-        // because ArrayElement constructor of all superclasses are called
+      // for array inheritance, the following check may fail
+      // because ArrayElement constructor of all superclasses are called
       if (aid == entry->aid) {
         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
@@ -477,7 +570,7 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
   if (buddy == CkMyPe()) buddy = msg->bud2;
   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
   recvData(msg);
-    // ack
+  // ack
   thisProxy[buddy].gotData();
 #else
   chkpTable[0] = NULL;
@@ -489,14 +582,16 @@ void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
-void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+//void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+void CkMemCheckPT::doItNow(int starter)
 {
   checkpointed = 1;
-  cpCallback = cb;
+  //cpCallback = cb;
   cpStarter = starter;
+  inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
     startTime = CmiWallTimer();
-    CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
+    CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
   }
 #if CMK_CONVERSE_MPI
   if(CmiNumPartition()==1)
@@ -504,241 +599,382 @@ void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
   {
 
 #if !CMK_CHKP_ALL
-  int len = ckTable.length();
-  for (int i=0; i<len; i++) {
-    CkCheckPTInfo *entry = ckTable[i];
+    int len = ckTable.length();
+    for (int i=0; i<len; i++) {
+      CkCheckPTInfo *entry = ckTable[i];
       // always let the bigger number processor send request
-    //if (CkMyPe() < entry->pNo) continue;
+      //if (CkMyPe() < entry->pNo) continue;
       // always let the smaller number processor send request, may on same proc
-    if (!isMaster(entry->pNo)) continue;
+      if (!isMaster(entry->pNo)) continue;
       // call inmem_checkpoint to the array element, ask it to send
       // back checkpoint data via recvData().
-    CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
-    CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
-  }
+      CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
+      CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
+    }
     // if my table is empty, then I am done
-  if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+    if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
 #else
-  startArrayCheckpoint();
+    startArrayCheckpoint();
 #endif
-       sendProcData();
+    sendProcData();
   }
 #if CMK_CONVERSE_MPI
   else
   {
-       startCheckpoint();
+    startCheckpoint();
   }
 #endif
   // pack and send proc level data
 }
 
 class MemElementPacker : public CkLocIterator{
-       private:
-               CkLocMgr *locMgr;
-               PUP::er &p;
-       public:
-               MemElementPacker(CkLocMgr * mgr_,PUP::er &p_):locMgr(mgr_),p(p_){};
-               void addLocation(CkLocation &loc){
-                       CkArrayIndexMax idx = loc.getIndex();
-                       CkGroupID gID = locMgr->ckGetGroupID();
-                       p|gID;
-                       p|idx;
-                       locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
-               }
+  private:
+    //    CkLocMgr *locMgr;
+    PUP::er &p;
+    std::map<CkHashCode,CkLocation> arrayMap;
+  public:
+    MemElementPacker(PUP::er &p_):p(p_){};
+    void addLocation(CkLocation &loc){
+      CkArrayIndexMax idx = loc.getIndex();
+      arrayMap[idx.hash()] = loc;
+    }
+    void writeCheckpoint(){
+      std::map<CkHashCode, CkLocation>::iterator it;
+      for(it = arrayMap.begin();it!=arrayMap.end();it++){
+        CkLocation loc = it->second;
+        CkLocMgr *locMgr = loc.getManager();
+        CkArrayIndexMax idx = loc.getIndex();
+        CkGroupID gID = locMgr->ckGetGroupID();
+        p|gID;
+        p|idx;
+        locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+      }
+    }
 };
 
-void CkMemCheckPT::pupAllElements(PUP::er &p){
+void pupAllElements(PUP::er &p){
 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
-       int numElements;
-       if(!p.isUnpacking()){
-               numElements = CkCountArrayElements();
-       }
-       p | numElements;
-       if(!p.isUnpacking()){
-               CKLOCMGR_LOOP(MemElementPacker packer(mgr,p);mgr->iterate(packer););
-       }
+  int numElements;
+  if(!p.isUnpacking()){
+    numElements = CkCountArrayElements();
+  }
+  p | numElements;
+  if(!p.isUnpacking()){
+    MemElementPacker packer(p);
+    CKLOCMGR_LOOP(mgr->iterateLocal(packer););
+    packer.writeCheckpoint();
+  }
 #endif
 }
 
 void CkMemCheckPT::startArrayCheckpoint(){
 #if CMK_CHKP_ALL
-       int size;
-       {
-               PUP::sizer psizer;
-               pupAllElements(psizer);
-               size = psizer.size();
-       }
-       int packSize = size/sizeof(double)+1;
- // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
-       CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
-       msg->len = size;
-       msg->cp_flag = 1;
-       int budPEs[2];
-       msg->bud1=CkMyPe();
-       msg->bud2=ChkptOnPe(CkMyPe());
-       {
-               PUP::toMem p(msg->packData);
-               pupAllElements(p);
-       }
-       thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
-       if(chkpTable[0]) delete chkpTable[0];
-       chkpTable[0] = msg;
-       //send the checkpoint to my 
-       recvCount++;
+  int size;
+  {
+    PUP::sizer psizer;
+    pupAllElements(psizer);
+    size = psizer.size();
+  }
+  int packSize = size/sizeof(double)+1;
 // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
+  CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
+  msg->len = size;
+  msg->cp_flag = 1;
+  int budPEs[2];
+  msg->bud1=CkMyPe();
+  msg->bud2=ChkptOnPe(CkMyPe());
+  {
+    PUP::toMem p(msg->packData);
+    pupAllElements(p);
+  }
+  thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
+  if(chkpTable[0]) delete chkpTable[0];
+  chkpTable[0] = msg;
+  //send the checkpoint to my 
+  recvCount++;
 #endif
 }
 
 void CkMemCheckPT::startCheckpoint(){
 #if CMK_CONVERSE_MPI
+  if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+    CkPrintf("in start checkpointing!!!!\n");
   int size;
   {
     PUP::sizer p;
     _handleProcData(p,CmiFalse);
     size = p.size();
   }
-       int packSize = size/sizeof(double)+1;
-       CkCheckPTMessage * procMsg = new (packSize,0) CkCheckPTMessage;
-       procMsg->len = size;
-       procMsg->cp_flag = 1;
-       {
-               PUP::toMem p(procMsg->packData);
-               _handleProcData(p,CmiFalse);
-       }
-       int pointer = CpvAccess(curPointer);
-       if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
-               CpvAccess(localProcChkpBuf)[pointer] = procMsg;
-       
-       {
-               PUP::sizer psizer;
-               pupAllElements(psizer);
-               size = psizer.size();
-       }
-       packSize = size/sizeof(double)+1;
-       CkCheckPTMessage * msg = new (packSize,0) CkCheckPTMessage;
-       msg->len = size;
-       msg->cp_flag = 1;
-       {
-               PUP::toMem p(msg->packData);
-               pupAllElements(p);
-       }
-       pointer = CpvAccess(curPointer);
-       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
-               CpvAccess(chkpBuf)[pointer] = (CkCheckPTMessage *)CkCopyMsg((void **)&msg);
-       CpvAccess(recvdLocal) = 1;
-
-       envelope * env = (envelope *)(UsrToEnv(msg));
-       CkPackMessage(&env);
-       CmiSetHandler(env,recvRemoteChkpHandlerIdx);
-       CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
-       if(CpvAccess(recvdRemote)==1){
-               //compare the checkpoint 
-         int size = CpvAccess(chkpBuf)[pointer]->len;
-         if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
-               thisProxy[CkMyPe()].doneComparison(true);
-         }else{
-               thisProxy[CkMyPe()].doneComparison(true);
-         }
-       }
+  CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
+  procMsg->len = size;
+  procMsg->cp_flag = 1;
+  {
+    PUP::toMem p(procMsg->packData);
+    _handleProcData(p,CmiFalse);
+  }
+  int pointer = CpvAccess(curPointer);
+  if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+  CpvAccess(localProcChkpBuf)[pointer] = procMsg;
+
+  {
+    PUP::sizer psizer;
+    pupAllElements(psizer);
+    size = psizer.size();
+  }
+  CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
+  msg->len = size;
+  msg->cp_flag = 1;
+  int checksum;
+  {
+    if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
+      PUP::checker p(msg->packData);
+      pupAllElements(p);
+      checksum = p.getChecksum();
+    }else{  
+//    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
+      PUP::toMem p(msg->packData);
+      pupAllElements(p);
+    }
+  }
+  pointer = CpvAccess(curPointer);
+  if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+  CpvAccess(chkpBuf)[pointer] = msg;
+  if(CkMyPe()==0)
+    CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+  if(CkReplicaAlive()==1){
+    CpvAccess(recvdLocal) = 1;
+    if(CpvAccess(use_checksum)){
+      CpvAccess(localChecksum) = checksum;
+      char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
+      //only one reaplica will send
+      if(CmiMyPartition()==0){
+        CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
+      }
+    }else{
+      envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
+      CkPackMessage(&env);
+      if(CmiMyPartition()==0){
+        CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+      }
+    }
+    if(CmiMyPartition()==0){
+      notifyReplica = 1;
+      thisProxy[CkMyPe()].doneComparison(true);
+    }
+  }
+  if(CpvAccess(recvdRemote)==1){
+    //only partition 1 will do it
+    //compare the checkpoint 
+    int size = CpvAccess(chkpBuf)[pointer]->len;
+    if(CpvAccess(use_checksum)){
+      if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
+    }else{
+      if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
+    }
+    if(CkMyPe()==0)
+      CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+  }
+  else{
+    if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
+      //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
+      if(CpvAccess(remoteReady)==1){
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        {      
+          int pointer = CpvAccess(curPointer);
+          //send the proc data
+          CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          procMsg->pointer = pointer;
+          envelope * env = (envelope *)(UsrToEnv(procMsg));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+            CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
+          }
+        }
+        //send the array checkpoint data
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        msg->pointer = CpvAccess(curPointer);
+        envelope * env = (envelope *)(UsrToEnv(msg));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+          CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
+      }else{
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+          int pointer = CpvAccess(curPointer);
+          CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          (CpvAccess(recoverProcBuf))->pointer = pointer;
+        }
+        CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
+        CpvAccess(localReady) = 1;
+      }
+      //can continue work, no need to wait for my replica
+      int _ret = 1;
+      notifyReplica = 1;
+      CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+      contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
+    }
+  }
 #endif
 }
 
 void CkMemCheckPT::doneComparison(bool ret){
-       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy);
-       contribute(sizeof(bool),&ret,CkReduction::logical_and,cb);
+  int _ret = 1;
+  if(!ret){
+    //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
+    _ret = 0;
+  }else{
+    _ret = 1;
+  }
+  CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+  contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
+}
+
+void CkMemCheckPT::doneRComparison(int ret){
+  //   if(CpvAccess(curPointer) == 0){
+  //if(ret==CkNumPes()){
+    CpvAccess(localChkpDone) = 1;
+    if(CpvAccess(remoteChkpDone) ==1){
+      thisProxy.doneBothComparison();
+    }else{
+      CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
+    }
+  //}
+  /*else{
+    CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
+    startTime = CmiWallTimer();
+    thisProxy.RollBack();
+  }*/
+  if(notifyReplica == 0){
+    //notify the replica am done
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+    *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
+    CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
+    CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
+    notifyReplica = 1;
+  }
 }
 
-void CkMemCheckPT::doneRComparison(bool ret){
-       CpvAccess(recvdRemote) = 0;
-       CpvAccess(recvdLocal) = 0;
-//     if(CpvAccess(curPointer) == 0){
-       if(ret==true){
-       CpvAccess(curPointer)^=1;
-               if(CkMyPe() == 0){
-                       cpCallback.send();
-               }
-       }else{
-               RollBack();
-       }
+void CkMemCheckPT::doneBothComparison(){
+  CpvAccess(recvdRemote) = 0;
+  CpvAccess(remoteReady) = 0;
+  CpvAccess(localReady) = 0;
+  CpvAccess(recvdLocal) = 0;
+  CpvAccess(localChkpDone) = 0;
+  CpvAccess(remoteChkpDone) = 0;
+  CpvAccess(remoteStarted) = 0;
+  CpvAccess(localStarted) = 0;
+  CpvAccess(_remoteCrashedNode) = -1;
+  CkMemCheckPT::replicaAlive = 1;
+  int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
+  CpvAccess(curPointer)^=1;
+  inCheckpointing = 0;
+  notifyReplica = 0;
+  if(CkMyPe() == 0){
+    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
+  }
+  CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
 }
 
 void CkMemCheckPT::RollBack(){
-       //restore group data
-       CkMemCheckPT::inRestarting = 1;
-       int pointer = CpvAccess(curPointer)^1;//use the previous one
-    CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
-       PUP::fromMem p(chkpMsg->packData);      
-       //_handleProcData(p);
-       
-       //destroy array elements
-       //CKLOCMGR_LOOP(mgr->flushLocalRecs(););
-         int numGroups = CkpvAccess(_groupIDTable)->size();
-         for(int i=0;i<numGroups;i++) {
-               CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
-               IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
-               obj->flushStates();
-               obj->ckJustMigrated();
-         }
-       //restore array elements
-       
-       int numElements;
-       p|numElements;
-       
-       if(p.isUnpacking()){
-               for(int i=0;i<numElements;i++){
-               //for(int i=0;i<1;i++){
-                       CkGroupID gID;
-                       CkArrayIndex idx;
-                       p|gID;
-                       p|idx;
-                       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
-                       CmiAssert(mgr);
-                       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiFalse);
-               }
-       }
-       CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
-       contribute(cb);
-}
+  //restore group data
+  checkpointed = 0;
+  inRestarting = 1;
+  int pointer = CpvAccess(curPointer)^1;//use the previous one
+  CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
+  PUP::fromMem p(chkpMsg->packData);   
 
-void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
-{
+  //destroy array elements
+  CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+  int numGroups = CkpvAccess(_groupIDTable)->size();
+  for(int i=0;i<numGroups;i++) {
+    CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
+    IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
+    obj->flushStates();
+    obj->ckJustMigrated();
+  }
+  //restore array elements
+
+  int numElements;
+  p|numElements;
+
+  if(p.isUnpacking()){
+    for(int i=0;i<numElements;i++){
+      CkGroupID gID;
+      CkArrayIndex idx;
+      p|gID;
+      p|idx;
+      CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+      CmiAssert(mgr);
+      mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+    }
+    }
+    CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
+    contribute(cb);
+  }
+
+  void CkMemCheckPT::notifyReplicaDie(int pe){
+    replicaAlive = 0;
+    CpvAccess(_remoteCrashedNode) = pe;
+  }
+
+  void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
+  {
 #if CMK_CHKP_ALL
-       int idx = 1;
-       if(msg->bud1 == CkMyPe()){
-               idx = 0;
-       }
-       int isChkpting = msg->cp_flag;
-       if(isChkpting == 1){
-               if(chkpTable[idx]) delete chkpTable[idx];
-       }
-       chkpTable[idx] = msg;
-       if(isChkpting){
-               recvCount++;
-               if(recvCount == 2){
-                 if (where == CkCheckPoint_inMEM) {
-                       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-                 }
-                 else if (where == CkCheckPoint_inDISK) {
-                       // another barrier for finalize the writing using fsync
-                       CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
-                       contribute(0,NULL,CkReduction::sum_int,localcb);
-                 }
-                 else
-                       CmiAbort("Unknown checkpoint scheme");
-                 recvCount = 0;
-               }
-       }
+    int idx = 1;
+    if(msg->bud1 == CkMyPe()){
+      idx = 0;
+    }
+    int isChkpting = msg->cp_flag;
+    if(isChkpting == 1){
+      if(chkpTable[idx]) delete chkpTable[idx];
+    }
+    chkpTable[idx] = msg;
+    if(isChkpting){
+      recvCount++;
+      if(recvCount == 2){
+        if (where == CkCheckPoint_inMEM) {
+          contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+        }
+        else if (where == CkCheckPoint_inDISK) {
+          // another barrier for finalize the writing using fsync
+          CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+          contribute(0,NULL,CkReduction::sum_int,localcb);
+        }
+        else
+          CmiAbort("Unknown checkpoint scheme");
+        recvCount = 0;
+      }
+    }
 #endif
-}
+  }
 
-// don't handle array elements
-static inline void _handleProcData(PUP::er &p, CmiBool create)
-{
+  // don't handle array elements
+  static inline void _handleProcData(PUP::er &p, CmiBool create)
+  {
     // save readonlys, and callback BTW
     CkPupROData(p);
 
     // save mainchares 
     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
-       
+
 #ifndef CMK_CHARE_USE_PTR
     // save non-migratable chare
     CkPupChareData(p);
@@ -749,1118 +985,1509 @@ static inline void _handleProcData(PUP::er &p, CmiBool create)
 
     // save nodegroups into NodeGroups.dat
     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
-}
+  }
 
-void CkMemCheckPT::sendProcData()
-{
-  // find out size of buffer
-  int size;
+  void CkMemCheckPT::sendProcData()
   {
-    PUP::sizer p;
-    _handleProcData(p,CmiTrue);
-    size = p.size();
+    // find out size of buffer
+    int size;
+    {
+      PUP::sizer p;
+      _handleProcData(p,CmiTrue);
+      size = p.size();
+    }
+    int packSize = size;
+    CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
+    DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
+    {
+      PUP::toMem p(msg->packData);
+      _handleProcData(p,CmiTrue);
+    }
+    msg->pe = CkMyPe();
+    msg->len = size;
+    msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
+    thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
   }
-  int packSize = size;
-  CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
-  DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
+
+  void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
   {
-    PUP::toMem p(msg->packData);
-    _handleProcData(p,CmiTrue);
+    if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
+    CpvAccess(procChkptBuf) = msg;
+    DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
   }
-  msg->pe = CkMyPe();
-  msg->len = size;
-  msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
-  thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
-}
 
-void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
-{
-  if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
-  CpvAccess(procChkptBuf) = msg;
-  DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
-  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
-}
-
-// ArrayElement call this function to give us the checkpointed data
-void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
-{
-  int len = ckTable.length();
-  int idx;
-  for (idx=0; idx<len; idx++) {
-    CkCheckPTInfo *entry = ckTable[idx];
-    if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
-  }
-  CkAssert(idx < len);
-  int isChkpting = msg->cp_flag;
-  ckTable[idx]->updateBuffer(msg);
-  if (isChkpting) {
+  // ArrayElement call this function to give us the checkpointed data
+  void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
+  {
+    int len = ckTable.length();
+    int idx;
+    for (idx=0; idx<len; idx++) {
+      CkCheckPTInfo *entry = ckTable[idx];
+      if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
+    }
+    CkAssert(idx < len);
+    int isChkpting = msg->cp_flag;
+    ckTable[idx]->updateBuffer(msg);
+    if (isChkpting) {
       // all my array elements have returned their inmem data
       // inform starter processor that I am done.
-    recvCount ++;
-    if (recvCount == ckTable.length()) {
-      if (where == CkCheckPoint_inMEM) {
-        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-      }
-      else if (where == CkCheckPoint_inDISK) {
-        // another barrier for finalize the writing using fsync
-        CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
-        contribute(0,NULL,CkReduction::sum_int,localcb);
-      }
-      else
-        CmiAbort("Unknown checkpoint scheme");
-      recvCount = 0;
-    } 
+      recvCount ++;
+      if (recvCount == ckTable.length()) {
+        if (where == CkCheckPoint_inMEM) {
+          contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+        }
+        else if (where == CkCheckPoint_inDISK) {
+          // another barrier for finalize the writing using fsync
+          CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+          contribute(0,NULL,CkReduction::sum_int,localcb);
+        }
+        else
+          CmiAbort("Unknown checkpoint scheme");
+        recvCount = 0;
+      } 
+    }
   }
-}
 
-// only used in disk checkpointing
-void CkMemCheckPT::syncFiles(CkReductionMsg *m)
-{
-  delete m;
+  // only used in disk checkpointing
+  void CkMemCheckPT::syncFiles(CkReductionMsg *m)
+  {
+    delete m;
 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
-  system("sync");
+    system("sync");
 #endif
-  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-}
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+  }
 
-// only is called on cpStarter when checkpoint is done
-void CkMemCheckPT::cpFinish()
-{
-  CmiAssert(CkMyPe() == cpStarter);
-  peCount++;
+  // only is called on cpStarter when checkpoint is done
+  void CkMemCheckPT::cpFinish()
+  {
+    CmiAssert(CkMyPe() == cpStarter);
+    peCount++;
     // now that all processors have finished, activate callback
-  if (peCount == 2) 
-{
-    CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
-    cpCallback.send();
-    peCount = 0;
-    thisProxy.report();
+    if (peCount == 2) 
+    {
+      CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
+      peCount = 0;
+      thisProxy.report();
+    }
   }
-}
 
-// for debugging, report checkpoint info
-void CkMemCheckPT::report()
-{
+  // for debugging, report checkpoint info
+  void CkMemCheckPT::report()
+  {
+    CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+    inCheckpointing = 0;
 #if !CMK_CHKP_ALL
-  int objsize = 0;
-  int len = ckTable.length();
-  for (int i=0; i<len; i++) {
-    CkCheckPTInfo *entry = ckTable[i];
-    CmiAssert(entry);
-    objsize += entry->getSize();
-  }
-  CmiAssert(CpvAccess(procChkptBuf));
-  //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
+    int objsize = 0;
+    int len = ckTable.length();
+    for (int i=0; i<len; i++) {
+      CkCheckPTInfo *entry = ckTable[i];
+      CmiAssert(entry);
+      objsize += entry->getSize();
+    }
+    CmiAssert(CpvAccess(procChkptBuf));
+    //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
 #else
-  if(CkMyPe()==0)
-  CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
+    if(CkMyPe()==0)
+      CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
 #endif
-}
+  }
 
-/*****************************************************************************
-                       RESTART Procedure
-*****************************************************************************/
+  /*****************************************************************************
+    RESTART Procedure
+   *****************************************************************************/
 
-// master processor of two buddies
-inline int CkMemCheckPT::isMaster(int buddype)
-{
+  // master processor of two buddies
+  inline int CkMemCheckPT::isMaster(int buddype)
+  {
 #if 0
-  int mype = CkMyPe();
-//CkPrintf("ismaster: %d %d\n", pe, mype);
-  if (CkNumPes() - totalFailed() == 2) {
-    return mype > buddype;
-  }
-  for (int i=1; i<CkNumPes(); i++) {
-    int me = (buddype+i)%CkNumPes();
-    if (isFailed(me)) continue;
-    if (me == mype) return 1;
-    else return 0;
-  }
-  return 0;
+    int mype = CkMyPe();
+    //CkPrintf("ismaster: %d %d\n", pe, mype);
+    if (CkNumPes() - totalFailed() == 2) {
+      return mype > buddype;
+    }
+    for (int i=1; i<CkNumPes(); i++) {
+      int me = (buddype+i)%CkNumPes();
+      if (isFailed(me)) continue;
+      if (me == mype) return 1;
+      else return 0;
+    }
+    return 0;
 #else
     // smaller one
-  int mype = CkMyPe();
-//CkPrintf("ismaster: %d %d\n", pe, mype);
-  if (CkNumPes() - totalFailed() == 2) {
-    return mype < buddype;
-  }
+    int mype = CkMyPe();
+    //CkPrintf("ismaster: %d %d\n", pe, mype);
+    if (CkNumPes() - totalFailed() == 2) {
+      return mype < buddype;
+    }
 #if NODE_CHECKPOINT
-  int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
-  for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
+    int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
+    for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
 #else
-  for (int i=1; i<CkNumPes(); i++) {
+      for (int i=1; i<CkNumPes(); i++) {
 #endif
-    int me = (mype+i)%CkNumPes();
-    if (isFailed(me)) continue;
-    if (me == buddype) return 1;
-    else return 0;
-  }
-  return 0;
+        int me = (mype+i)%CkNumPes();
+        if (isFailed(me)) continue;
+        if (me == buddype) return 1;
+        else return 0;
+      }
+      return 0;
 #endif
-}
+    }
 
 
 
 #if 0
-// helper class to pup all elements that belong to same ckLocMgr
-class ElementDestoryer : public CkLocIterator {
-private:
+    // helper class to pup all elements that belong to same ckLocMgr
+    class ElementDestoryer : public CkLocIterator {
+      private:
         CkLocMgr *locMgr;
-public:
+      public:
         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
         void addLocation(CkLocation &loc) {
-               CkArrayIndex idx=loc.getIndex();
-               CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
-               loc.destroy();
+          CkArrayIndex idx=loc.getIndex();
+          CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
+          loc.destroy();
         }
-};
+    };
 #endif
 
-// restore the bitmap vector for LB
-void CkMemCheckPT::resetLB(int diepe)
-{
+    // restore the bitmap vector for LB
+    void CkMemCheckPT::resetLB(int diepe)
+    {
 #if CMK_LBDB_ON
-  int i;
-  char *bitmap = new char[CkNumPes()];
-  // set processor available bitmap
-  get_avail_vector(bitmap);
-
-  for (i=0; i<failedPes.length(); i++)
-    bitmap[failedPes[i]] = 0; 
-  bitmap[diepe] = 0;
+      int i;
+      char *bitmap = new char[CkNumPes()];
+      // set processor available bitmap
+      get_avail_vector(bitmap);
+      for (i=0; i<failedPes.length(); i++)
+        bitmap[failedPes[i]] = 0; 
+      bitmap[diepe] = 0;
 
 #if CK_NO_PROC_POOL
-  set_avail_vector(bitmap);
+      set_avail_vector(bitmap);
 #endif
 
-  // if I am the crashed pe, rebuild my failedPEs array
-  if (CkMyNode() == diepe)
-    for (i=0; i<CkNumPes(); i++) 
-      if (bitmap[i]==0) failed(i);
+      // if I am the crashed pe, rebuild my failedPEs array
+      if (CkMyNode() == diepe)
+        for (i=0; i<CkNumPes(); i++) 
+          if (bitmap[i]==0) failed(i);
 
-  delete [] bitmap;
+      delete [] bitmap;
 #endif
-}
+    }
 
-// in case when failedPe dies, everybody go through its checkpoint table:
-// destory all array elements
-// recover lost buddies
-// reconstruct all array elements from check point data
-// called on all processors
-void CkMemCheckPT::restart(int diePe)
-{
+    static double restartT;
+
+    // in case when failedPe dies, everybody go through its checkpoint table:
+    // destory all array elements
+    // recover lost buddies
+    // reconstruct all array elements from check point data
+    // called on all processors
+    void CkMemCheckPT::restart(int diePe)
+    {
 #if CMK_MEM_CHECKPOINT
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == diePe)
-    CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
-  stage = (char*)"resetLB";
-  startTime = curTime;
-  if (CkMyPe() == diePe)
-    CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
+      thisFailedPe = diePe;
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe){
+        restartT = CmiWallTimer();
+        CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
+      }
+      stage = (char*)"resetLB";
+      startTime = curTime;
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
 
-#if CK_NO_PROC_POOL
-  failed(diePe);       // add into the list of failed pes
-#endif
-  thisFailedPe = diePe;
+//#if CK_NO_PROC_POOL
+//      failed(diePe); // add into the list of failed pes
+//#endif
 
-  if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
+//      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
 
-  inRestarting = 1;
-                                                                                
-  // disable load balancer's barrier
-  if (CkMyPe() != diePe) resetLB(diePe);
+      inRestarting = 1;
 
-  CKLOCMGR_LOOP(mgr->startInserting(););
+      // disable load balancer's barrier
+      //if (CkMyPe() != diePe) resetLB(diePe);
 
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-/*
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-*/
-#endif
-}
+      CKLOCMGR_LOOP(mgr->startInserting(););
 
-// loally remove all array elements
-void CkMemCheckPT::removeArrayElements()
-{
-#if CMK_MEM_CHECKPOINT
-  int len = ckTable.length();
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe) 
-    CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
-  stage = (char*)"removeArrayElements";
-  startTime = curTime;
 
-  if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
-  if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
+      if(CmiNumPartition()==1){
+        barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
+      }else{
+        CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+        barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+      }
+#endif
+    }
 
-  // get rid of all buffering and remote recs
-  // including destorying all array elements
+    // loally remove all array elements
+    void CkMemCheckPT::removeArrayElements()
+    {
+#if CMK_MEM_CHECKPOINT
+      int len = ckTable.length();
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe) 
+        CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
+      stage = (char*)"removeArrayElements";
+      startTime = curTime;
+
+      //  if (cpCallback.isInvalid()) 
+      //         CkPrintf("invalid pe %d\n",CkMyPe());
+      //         CkAbort("Didn't set restart callback\n");;
+      if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
+
+      // get rid of all buffering and remote recs
+      // including destorying all array elements
 #if CK_NO_PROC_POOL  
-       CKLOCMGR_LOOP(mgr->flushAllRecs(););
+      CKLOCMGR_LOOP(mgr->flushAllRecs(););
 #else
-       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+      CKLOCMGR_LOOP(mgr->flushLocalRecs(););
 #endif
-//  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
-
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+      barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
 #endif
-}
-
-// flush state in reduction manager
-void CkMemCheckPT::resetReductionMgr()
-{
-  //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
-  int numGroups = CkpvAccess(_groupIDTable)->size();
-  for(int i=0;i<numGroups;i++) {
-    CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
-    IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
-    obj->flushStates();
-    obj->ckJustMigrated();
-  }
-  // reset again
-  //CpvAccess(_qd)->flushStates();
+    }
 
-#if 1
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-#else
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-#endif
-}
+    // flush state in reduction manager
+    void CkMemCheckPT::resetReductionMgr()
+    {
+      if (CkMyPe() == thisFailedPe) 
+        CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
+      stage = (char *)"resetReductionMgr";
+      int numGroups = CkpvAccess(_groupIDTable)->size();
+      for(int i=0;i<numGroups;i++) {
+        CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
+        IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
+        obj->flushStates();
+        obj->ckJustMigrated();
+      }
+      // reset again
+      //CpvAccess(_qd)->flushStates();
+      if(CmiNumPartition()==1){
+        barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+      }
+      else{
+       if (CkMyPe() == thisFailedPe) 
+         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
+        barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+    }
 
-// recover the lost buddies
-void CkMemCheckPT::recoverBuddies()
-{
-  int idx;
-  int len = ckTable.length();
-  // ready to flush reduction manager
-  // cannot be CkMemCheckPT::restart because destory will modify states
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
-  stage = (char *)"recoverBuddies";
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
-  startTime = curTime;
-
-  // recover buddies
-  expectCount = 0;
+    // recover the lost buddies
+    void CkMemCheckPT::recoverBuddies()
+    {
+      int idx;
+      int len = ckTable.length();
+      // ready to flush reduction manager
+      // cannot be CkMemCheckPT::restart because destory will modify states
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
+      stage = (char *)"recoverBuddies";
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
+      startTime = curTime;
+
+      // recover buddies
+      expectCount = 0;
 #if !CMK_CHKP_ALL
-  for (idx=0; idx<len; idx++) {
-    CkCheckPTInfo *entry = ckTable[idx];
-    if (entry->pNo == thisFailedPe) {
+      for (idx=0; idx<len; idx++) {
+        CkCheckPTInfo *entry = ckTable[idx];
+        if (entry->pNo == thisFailedPe) {
 #if CK_NO_PROC_POOL
-      // find a new buddy
-      int budPe = BuddyPE(CkMyPe());
+          // find a new buddy
+          int budPe = BuddyPE(CkMyPe());
 #else
-      int budPe = thisFailedPe;
-#endif
-      CkArrayCheckPTMessage *msg = entry->getCopy();
-      msg->bud1 = budPe;
-      msg->bud2 = CkMyPe();
-      msg->cp_flag = 0;            // not checkpointing
-      thisProxy[budPe].recoverEntry(msg);
-      expectCount ++;
-    }
-  }
+          int budPe = thisFailedPe;
+#endif
+          CkArrayCheckPTMessage *msg = entry->getCopy();
+          msg->bud1 = budPe;
+          msg->bud2 = CkMyPe();
+          msg->cp_flag = 0;            // not checkpointing
+          thisProxy[budPe].recoverEntry(msg);
+          expectCount ++;
+        }
+      }
 #else
-  //send to failed pe
-  if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
+      //send to failed pe
+      if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
 #if CK_NO_PROC_POOL
-      // find a new buddy
-      int budPe = BuddyPE(CkMyPe());
-#else
-      int budPe = thisFailedPe;
-#endif
-      CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
-      CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
-         msg->cp_flag = 0;            // not checkpointing
-      msg->bud1 = budPe;
-      msg->bud2 = CkMyPe();
-      thisProxy[budPe].recoverEntry(msg);
-      expectCount ++;
-  }
-#endif
-
-#if 1
-  if (expectCount == 0) {
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
-    //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-  }
+        // find a new buddy
+        int budPe = BuddyPE(CkMyPe());
 #else
-  if (CkMyPe() == 0) {
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-  }
+        int budPe = thisFailedPe;
+#endif
+        CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
+        CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
+        msg->cp_flag = 0;            // not checkpointing
+        msg->bud1 = budPe;
+        msg->bud2 = CkMyPe();
+        thisProxy[budPe].recoverEntry(msg);
+        expectCount ++;
+      }
 #endif
 
-  //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
-}
+      if (expectCount == 0) {
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+      //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
+    }
 
-void CkMemCheckPT::gotData()
-{
-  ackCount ++;
-  if (ackCount == expectCount) {
-    ackCount = 0;
-    expectCount = -1;
-    //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
-  }
-}
+    void CkMemCheckPT::gotData()
+    {
+      ackCount ++;
+      if (ackCount == expectCount) {
+        ackCount = 0;
+        expectCount = -1;
+        //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+    }
 
-void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
-{
+    void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
+    {
 
-  for (int i=0; i<n; i++) {
-    CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
-    mgr->updateLocation(idx[i], nowOnPe);
-  }
-       thisProxy[nowOnPe].gotReply();
-}
+      for (int i=0; i<n; i++) {
+        CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
+        mgr->updateLocation(idx[i], nowOnPe);
+      }
+      thisProxy[nowOnPe].gotReply();
+    }
 
-// restore array elements
-void CkMemCheckPT::recoverArrayElements()
-{
-  double curTime = CmiWallTimer();
-  int len = ckTable.length();
-  //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
-  stage = (char *)"recoverArrayElements";
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
-  startTime = curTime;
- int flag = 0;
-  // recover all array elements
-  int count = 0;
+    // restore array elements
+    void CkMemCheckPT::recoverArrayElements()
+    {
+      double curTime = CmiWallTimer();
+      int len = ckTable.length();
+      CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
+      stage = (char *)"recoverArrayElements";
+      startTime = curTime;
+      int flag = 0;
+      // recover all array elements
+      int count = 0;
 
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-  CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
-  CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
+      CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
+      CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
 #endif
 
 #if !CMK_CHKP_ALL
-  for (int idx=0; idx<len; idx++)
-  {
-    CkCheckPTInfo *entry = ckTable[idx];
+      for (int idx=0; idx<len; idx++)
+      {
+        CkCheckPTInfo *entry = ckTable[idx];
 #if CK_NO_PROC_POOL
-    // the bigger one will do 
-//    if (CkMyPe() < entry->pNo) continue;
-    if (!isMaster(entry->pNo)) continue;
+        // the bigger one will do 
+        //    if (CkMyPe() < entry->pNo) continue;
+        if (!isMaster(entry->pNo)) continue;
 #else
-    // smaller one do it, which has the original object
-    if (CkMyPe() == entry->pNo+1 || 
-        CkMyPe()+CkNumPes() == entry->pNo+1) continue;
+        // smaller one do it, which has the original object
+        if (CkMyPe() == entry->pNo+1 || 
+            CkMyPe()+CkNumPes() == entry->pNo+1) continue;
 #endif
-//CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
+        //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
 
-    entry->updateBuddy(CkMyPe(), entry->pNo);
-    CkArrayCheckPTMessage *msg = entry->getCopy();
-    // gzheng
-    //thisProxy[CkMyPe()].inmem_restore(msg);
-    inmem_restore(msg);
+        entry->updateBuddy(CkMyPe(), entry->pNo);
+        CkArrayCheckPTMessage *msg = entry->getCopy();
+        // gzheng
+        //thisProxy[CkMyPe()].inmem_restore(msg);
+        inmem_restore(msg);
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-    CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
-    int homePe = mgr->homePe(msg->index);
-    if (homePe != CkMyPe()) {
-      gmap[homePe].push_back(msg->locMgr);
-      imap[homePe].push_back(msg->index);
-    }
+        CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
+        int homePe = mgr->homePe(msg->index);
+        if (homePe != CkMyPe()) {
+          gmap[homePe].push_back(msg->locMgr);
+          imap[homePe].push_back(msg->index);
+        }
 #endif
-    CkFreeMsg(msg);
-    count ++;
-  }
+        CkFreeMsg(msg);
+        count ++;
+      }
 #else
-       CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+      char * packData;
+      if(CmiNumPartition()==1){
+        CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+        packData = (char *)msg->packData;
+      }
+      else{
+        int pointer = CpvAccess(curPointer);
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        packData = msg->packData;
+      }
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-       recoverAll(msg,gmap,imap);
+      recoverAll(packData,gmap,imap);
 #else
-       recoverAll(msg);
+      recoverAll(packData);
 #endif
-    CkFreeMsg(msg);
 #endif
-  curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe)
-       CkPrintf("[%d] CkMemCheckPT ----- %s streams at %f \n",CkMyPe(), stage, curTime);
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-  for (int i=0; i<CkNumPes(); i++) {
-    if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
-      thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
-       flag++; 
-         }
-  }
-  delete [] imap;
-  delete [] gmap;
+      for (int i=0; i<CkNumPes(); i++) {
+        if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
+          thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
+          flag++;      
+        }
+      }
+      delete [] imap;
+      delete [] gmap;
 #endif
-  DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      //if (CkMyPe() == thisFailedPe)
 
-  CKLOCMGR_LOOP(mgr->doneInserting(););
+      CKLOCMGR_LOOP(mgr->doneInserting(););
 
-  // _crashedNode = -1;
-  CpvAccess(_crashedNode) = -1;
-  inRestarting = 0;
+      // _crashedNode = -1;
+      CpvAccess(_crashedNode) = -1;
 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+      if (CkMyPe() == 0)
+        CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
 #else
-if(flag == 0)
-{
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
-}
+      if(flag == 0)
+      {
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
+      }
 #endif
-}
+    }
 
-void CkMemCheckPT::gotReply(){
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
-}
+    void CkMemCheckPT::gotReply(){
+      contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
+    }
 
-void CkMemCheckPT::recoverAll(CkArrayCheckPTMessage * msg,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
+    void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
 #if CMK_CHKP_ALL
-       PUP::fromMem p(msg->packData);
-       int numElements;
-       p|numElements;
-       CkPrintf("[%d] recover all %d\n",CkMyPe(),numElements);
-       if(p.isUnpacking()){
-               for(int i=0;i<numElements;i++){
-                       CkGroupID gID;
-                       CkArrayIndex idx;
-                       p|gID;
-                       p|idx;
-                       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
-                       int homePe = mgr->homePe(idx);
+      PUP::fromMem p(packData);
+      int numElements;
+      p|numElements;
+      if(p.isUnpacking()){
+        for(int i=0;i<numElements;i++){
+          CkGroupID gID;
+          CkArrayIndex idx;
+          p|gID;
+          p|idx;
+          CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+          int homePe = mgr->homePe(idx);
 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
-                       mgr->resume(idx,p,CmiTrue,CmiTrue);
+          mgr->resume(idx,p,CmiTrue,CmiTrue);
 #else
-                       mgr->resume(idx,p,CmiFalse,CmiTrue);
+          if(CmiNumPartition()==1)
+            mgr->resume(idx,p,CmiFalse,CmiTrue);       
+          else{
+            if(CkMyPe()==thisFailedPe){
+              mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+            }
+            else{
+              mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+            }
+          }
 #endif
 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
-                       homePe = mgr->homePe(idx);
-                       if (homePe != CkMyPe()) {
-                         gmap[homePe].push_back(gID);
-                         imap[homePe].push_back(idx);
-                       }
+          homePe = mgr->homePe(idx);
+          if (homePe != CkMyPe()) {
+            gmap[homePe].push_back(gID);
+            imap[homePe].push_back(idx);
+          }
 #endif
-               }
-       }
-       if(CkMyPe()==thisFailedPe)
-       CkPrintf("recover all ends\n"); 
+        }
+      }
 #endif
-}
+    }
 
-static double restartT;
 
-// on every processor
-// turn load balancer back on
-void CkMemCheckPT::finishUp()
-{
-  //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
-  //CKLOCMGR_LOOP(mgr->doneInserting(););
-  
-  if (CkMyPe() == thisFailedPe)
-  {
-       CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
-       //CkStartQD(cpCallback);
-       cpCallback.send();
-       CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
-  }
+    // on every processor
+    // turn load balancer back on
+    void CkMemCheckPT::finishUp()
+    {
+      //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
+      //CKLOCMGR_LOOP(mgr->doneInserting(););
+
+      if (CkMyPe() == thisFailedPe)
+      {
+        CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
+        CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
+        fflush(stdout);
+      }
+      CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+      inRestarting = 0;
+      maxIter = -1;
+#if CMK_CONVERSE_MPI   
+      if(CmiNumPartition()!=1){
+        CpvAccess(recvdProcChkp) = 0;
+        CpvAccess(recvdArrayChkp) = 0;
+        CpvAccess(curPointer)^=1;
+        //notify my replica, restart is done
+        if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg,replicaRecoverHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        }
+      }
+      if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        lastPingTime = CmiWallTimer();
+        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+      }
+#endif
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
-  int numnodes = CmiNumPhysicalNodes();
+      int numnodes = CmiNumPhysicalNodes();
 #else
-  int numnodes = CkNumPes();
+      int numnodes = CkNumPes();
 #endif
-  if (numnodes-totalFailed() <=2) {
-    if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
-    _memChkptOn = 0;
-  }
+      if (numnodes-totalFailed() <=2) {
+        if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
+        _memChkptOn = 0;
+      }
 #endif
-}
+    }
 
-void CkMemCheckPT::recoverFromSoftFailure()
-{
-       inRestarting = 0;
-       if(CkMyPe()==0)
-       cpCallback.send();
-}
-// called only on 0
-void CkMemCheckPT::quiescence(CkCallback &cb)
-{
-  static int pe_count = 0;
-  pe_count ++;
-  CmiAssert(CkMyPe() == 0);
-  //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
-  if (pe_count == CkNumPes()) {
-    pe_count = 0;
-    cb.send();
-  }
-}
+    void CkMemCheckPT::recoverFromSoftFailure()
+    {
+      inRestarting = 0;
+      maxIter = -1;
+      CpvAccess(recvdRemote) = 0;
+      CpvAccess(recvdLocal) = 0;
+      CpvAccess(localChkpDone) = 0;
+      CpvAccess(remoteChkpDone) = 0;
+      CpvAccess(remoteReady) = 0;
+      CpvAccess(localReady) = 0;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      CpvAccess(remoteStarted) = 0;
+      CpvAccess(localStarted) = 0;
+      CpvAccess(_remoteCrashedNode) = -1;
+      CkMemCheckPT::replicaAlive = 1;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      if(CkMyPe() == 0){
+        CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
+      }
+      CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+    }
+    // called only on 0
+    void CkMemCheckPT::quiescence(CkCallback &cb)
+    {
+      static int pe_count = 0;
+      pe_count ++;
+      CmiAssert(CkMyPe() == 0);
+      //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
+      if (pe_count == CkNumPes()) {
+        pe_count = 0;
+        cb.send();
+      }
+    }
 
-// User callable function - to start a checkpoint
-// callback cb is used to pass control back
-void CkStartMemCheckpoint(CkCallback &cb)
-{
+    // User callable function - to start a checkpoint
+    // callback cb is used to pass control back
+    void CkStartMemCheckpoint(CkCallback &cb)
+    {
 #if CMK_MEM_CHECKPOINT
-  if (_memChkptOn == 0) {
-    CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
-    cb.send();
-    return;
-  }
-  if (CkInRestarting()) {
-      // trying to checkpointing during restart
-    cb.send();
-    return;
-  }
-    // store user callback and user data
-  CkMemCheckPT::cpCallback = cb;
+      CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
+      /*if (_memChkptOn == 0) {
+        CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
+        cb.send();
+        return;
+        }*/
+      if (CkInRestarting()) {
+        // trying to checkpointing during restart
+        cb.send();
+        return;
+      }
+      // store user callback and user data
+      CkMemCheckPT::cpCallback = cb;
 
-    // broadcast to start check pointing
-  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-  checkptMgr.doItNow(CkMyPe(), cb);
+
+      //send to my replica that checkpoint begins 
+      if(CkReplicaAlive()==1){
+        char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, replicaChkpStartHandlerIdx);
+        CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+      }
+      CpvAccess(localStarted) = 1;
+      // broadcast to start check pointing
+      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(CkMyPe());
+      }
 #else
-  // when mem checkpoint is disabled, invike cb immediately
-  CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
-  cb.send();
+      // when mem checkpoint is disabled, invike cb immediately
+      CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
+      cb.send();
 #endif
-}
+    }
 
-void CkRestartCheckPoint(int diePe)
-{
-CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
-  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-  // broadcast
-  checkptMgr.restart(diePe);
-}
+    void CkRestartCheckPoint(int diePe)
+    {
+      CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
+      CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+      // broadcast
+      checkptMgr.restart(diePe);
+    }
 
-static int _diePE = -1;
+    static int _diePE = -1;
 
-// callback function used locally by ccs handler
-static void CkRestartCheckPointCallback(void *ignore, void *msg)
-{
-CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
-  CkRestartCheckPoint(_diePE);
-}
+    // callback function used locally by ccs handler
+    static void CkRestartCheckPointCallback(void *ignore, void *msg)
+    {
+      CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
+      CkRestartCheckPoint(_diePE);
+    }
 
 
-// called on crashed PE
-static void restartBeginHandler(char *msg)
-{
-  CmiFree(msg);
+    // called on crashed PE
+    static void restartBeginHandler(char *msg)
+    {
+      CmiFree(msg);
 #if CMK_MEM_CHECKPOINT
 #if CMK_USE_BARRIER
-       if(CkMyPe()!=_diePE){
-               printf("restar begin on %d\n",CkMyPe());
-               char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-               CmiSetHandler(restartmsg, restartBeginHandlerIdx);
-               CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
-       }else{
-       CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
-       CkRestartCheckPointCallback(NULL, NULL);
-       }
+      if(CkMyPe()!=_diePE){
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+      }else{
+        CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
+        CkRestartCheckPointCallback(NULL, NULL);
+      }
 #else
-  static int count = 0;
-  CmiAssert(CkMyPe() == _diePE);
-  count ++;
-  if (count == CkNumPes()) {
-    CkRestartCheckPointCallback(NULL, NULL);
-    count = 0;
-  }
+      static int count = 0;
+      CmiAssert(CkMyPe() == _diePE);
+      count ++;
+      if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
+        CkRestartCheckPointCallback(NULL, NULL);
+        count = 0;
+      }
 #endif
 #endif
-}
+    }
 
-extern void _discard_charm_message();
-extern void _resume_charm_message();
+    extern void _discard_charm_message();
+    extern void _resume_charm_message();
 
-static void * doNothingMsg(int * size, void * data, void ** remote, int count){
-       return data;
-}
+    static void * doNothingMsg(int * size, void * data, void ** remote, int count){
+      return data;
+    }
 
-static void restartBcastHandler(char *msg)
-{
+    static void restartBcastHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
-  // advance phase counter
-  CkMemCheckPT::inRestarting = 1;
-  _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
-  // gzheng
-  //if (CkMyPe() != _diePE) cur_restart_phase ++;
-
-  if (CkMyPe()==_diePE)
-    CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
-
-  // reset QD counters
-/*  gzheng
-  if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
-*/
-
-/*  gzheng
-  if (CkMyPe()==_diePE)
-      CkRestartCheckPointCallback(NULL, NULL);
-*/
-  CmiFree(msg);
-
-  _resume_charm_message();
-
-    // reduction
-  char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-  CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+      // advance phase counter
+      CkMemCheckPT::inRestarting = 1;
+      _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      // gzheng
+      //if (CkMyPe() != _diePE) cur_restart_phase ++;
+
+      if (CkMyPe()==_diePE)
+        CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
+
+      // reset QD counters
+      /*  gzheng
+          if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
+       */
+
+      /*  gzheng
+          if (CkMyPe()==_diePE)
+          CkRestartCheckPointCallback(NULL, NULL);
+       */
+      CmiFree(msg);
+
+      _resume_charm_message();
+
+      // reduction
+      char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+      CmiSetHandler(restartmsg, restartBeginHandlerIdx);
 #if CMK_USE_BARRIER
-       //CmiPrintf("before reduce\n"); 
-       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
-       //CmiPrintf("after reduce\n");  
+      //CmiPrintf("before reduce\n");  
+      CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
 #else
-  CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+      CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
 #endif 
- checkpointed = 0;
     checkpointed = 0;
 #endif
-}
+    }
 
-extern void _initDone();
+    extern void _initDone();
 
-bool compare(char * buf1, char *buf2){
-       return true;
-}
+    bool compare(char * buf1, char *buf2){
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      PUP::checker pchecker(buf1,buf2);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
 
-static void recvRemoteChkpHandler(char *msg){
-   envelope *env = (envelope *)msg;
-   CkUnpackMessage(&env);
-   CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
-  if(CpvAccess(recvdLocal)==1){
-         int pointer = CpvAccess(curPointer);
-         int size = CpvAccess(chkpBuf)[pointer]->len;
-         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-         if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
-               
-               checkptMgr[CkMyPe()].doneComparison(true);
-               }else
-               {
-                       checkptMgr[CkMyPe()].doneComparison(false);
-               }
-  }else{
-         CpvAccess(recvdRemote) = 1;
-         if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
-         CpvAccess(buddyBuf) = chkpMsg;
-  }  
-}
+        pchecker|gID;
+        pchecker|idx;
 
-// called on crashed processor
-static void recoverProcDataHandler(char *msg)
-{
-#if CMK_MEM_CHECKPOINT
-   int i;
-   envelope *env = (envelope *)msg;
-   CkUnpackMessage(&env);
-   CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
-   CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
-   CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
-   //cur_restart_phase ++;
-     // gzheng ?
-   //CpvAccess(_qd)->flushStates();
-
-   // restore readonly, mainchare, group, nodegroup
-//   int temp = cur_restart_phase;
-//   cur_restart_phase = -1;
-   PUP::fromMem p(procMsg->packData);
-   _handleProcData(p,CmiTrue);
-
-   CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
-   // gzheng
-   CKLOCMGR_LOOP(mgr->startInserting(););
-
-   char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
-   CmiSetHandler(reqmsg, restartBcastHandlerIdx);
-   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
-
-   _initDone();
-//   CpvAccess(_qd)->flushStates();
-   CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+      }
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      //int fault_num = pchecker.getFaultNum();
+      bool result = pchecker.getResult();
+      /*if(!result){
+        CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
+      }*/
+      return result;
+    }
+    int getChecksum(char * buf){
+      PUP::checker pchecker(buf);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+//      CkPrintf("num %d\n",numElements);
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
+
+        pchecker|gID;
+        pchecker|idx;
+
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+      }
+      return pchecker.getChecksum();
+    }
+
+    static void recvRemoteChkpHandler(char *msg){
+      CpvAccess(remoteChkpDone) = 1;
+      if(CpvAccess(use_checksum)){
+        if(CkMyPe()==0)
+          CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
+        CpvAccess(recvdRemote) = 1;
+        if(CpvAccess(recvdLocal)==1){
+          if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }
+          else{
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+        }
+      }else{
+        envelope *env = (envelope *)msg;
+        CkUnpackMessage(&env);
+        CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+        if(CpvAccess(recvdLocal)==1){
+          int pointer = CpvAccess(curPointer);
+          int size = CpvAccess(chkpBuf)[pointer]->len;
+          if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }else
+          {
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+          delete chkpMsg;
+          if(CkMyPe()==0)
+            CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        }else{
+          CpvAccess(recvdRemote) = 1;
+          if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
+          CpvAccess(buddyBuf) = chkpMsg;
+        }
+      }
+    }
+
+    static void replicaRecoverHandler(char *msg){
+      //fflush(stdout);
+      //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
+      CpvAccess(remoteChkpDone) = 1;
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
+      CmiFree(msg);
+    }
+
+    static void replicaChkpDoneHandler(char *msg){
+      CpvAccess(remoteChkpDone) = 1;
+      int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
+      CmiFree(msg);
+    }
+
+    static void replicaDieHandler(char * msg){
+#if CMK_CONVERSE_MPI
+      //broadcast to every one in my replica
+      CmiSetHandler(msg, replicaDieBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
 #endif
-}
+    }
 
-// called on its backup processor
-// get backup message buffer and sent to crashed processor
-static void askProcDataHandler(char *msg)
-{
+    static void replicaChkpStartHandler(char * msg){
+      CpvAccess(remoteStarted) =1;
+      if(CpvAccess(localStarted)==1){    
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(0);
+      }
+    }
+
+
+    static void replicaDieBcastHandler(char *msg){
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      CpvAccess(_remoteCrashedNode) = diePe;
+      if(CkMyPe()==diePe){
+        CmiPrintf("pe %d in replicad word die\n",diePe);
+        CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+        fflush(stdout);
+      }
+      if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
+        find_spare_mpirank(diePe,CmiMyPartition()^1);
+      }
+      //broadcast to my partition to get local max iter
+      if(CpvAccess(resilience)!=1){
+        CkMemCheckPT::replicaAlive = 0;
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+      }
+      CmiFree(msg);
+    }
+
+    static void recoverRemoteProcDataHandler(char *msg){
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+
+      //store the checkpoint
+      int pointer = procMsg->pointer;
+
+
+      if(CkMyPe()==CpvAccess(_crashedNode)){
+        if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+        CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+        PUP::fromMem p(procMsg->packData);
+        _handleProcData(p,CmiTrue);
+        _initDone();
+      }
+      else{
+        if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+        CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+        //_handleProcData(p,CmiFalse);
+      }
+      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+      CKLOCMGR_LOOP(mgr->startInserting(););
+
+      CpvAccess(recvdProcChkp) =1;
+      if(CpvAccess(recvdArrayChkp)==1){
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
+      }
+    }
+
+    static void recoverRemoteArrayDataHandler(char *msg){
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+
+      //store the checkpoint
+      int pointer = chkpMsg->pointer;
+      CpvAccess(curPointer) = pointer;
+      if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+      CpvAccess(chkpBuf)[pointer] = chkpMsg;
+      CpvAccess(recvdArrayChkp) =1;
+      CkMemCheckPT::inRestarting = 1;
+      if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+        //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
+      }
+    }
+
+    static void recvPhaseHandler(char * msg)
+    {
+      CpvAccess(_curRestartPhase)--;
+      CkMemCheckPT::inRestarting = 1;
+      CmiFree(msg);
+      //notify the buddy in the replica now i can receive the checkpoint msg
+      if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
+        char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
+        //timer start
+        if(CpvAccess(_crashedNode)==CkMyPe())
+          CkMemCheckPT::startTime = restartT = CmiWallTimer();
+        if(CpvAccess(_crashedNode)==CmiMyPe())
+          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      }else{
+        CpvAccess(curPointer)^=1;
+        CkMemCheckPT::inRestarting = 1;
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+      }
+    }
+    
+    static void askRecoverDataHandler(char * msg){
+      if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+       CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      if(CpvAccess(resilience)!=1){
+        CpvAccess(remoteReady)=1;
+        if(CpvAccess(localReady)==1){
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+          {    
+            envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
+            CkPackMessage(&env);
+            CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+            CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+            CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
+          }
+          //send the array checkpoint data
+          envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+            CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+        }
+      }else{
+        find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
+        int pointer = CpvAccess(curPointer)^1;
+        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+        procMsg->pointer = pointer;
+        envelope * env = (envelope *)(UsrToEnv(procMsg));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
+        
+        CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        arrayMsg->pointer = pointer;
+        envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
+        CkPackMessage(&env1);
+        CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
+        CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+      }
+    }
+    // called on crashed processor
+    static void recoverProcDataHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
-    int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-    CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
-    if (CpvAccess(procChkptBuf) == NULL)  {
-      CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
-      CkAbort("no checkpoint found");
+      int i;
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
+      CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
+      CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
+      PUP::fromMem p(procMsg->packData);
+      _handleProcData(p,CmiTrue);
+
+      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+      // gzheng
+      CKLOCMGR_LOOP(mgr->startInserting(););
+
+      char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
+      CmiSetHandler(reqmsg, restartBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
+
+      _initDone();
+      //   CpvAccess(_qd)->flushStates();
+      CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
+#endif
     }
-    envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
-    CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
+    //for replica, got the phase number from my neighbor processor in the current partition
+    static void askPhaseHandler(char *msg)
+    {
+#if CMK_MEM_CHECKPOINT
+      CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
+      CmiSetHandler(msg, recvPhaseHandlerIdx);
+      CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+#endif
+    }
+    // called on its backup processor
+    // get backup message buffer and sent to crashed processor
+    static void askProcDataHandler(char *msg)
+    {
+#if CMK_MEM_CHECKPOINT
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
+      if (CpvAccess(procChkptBuf) == NULL)  {
+        CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
+        CkAbort("no checkpoint found");
+      }
+      envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
+      CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
 
-    CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
+      CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
 
-    CkPackMessage(&env);
-    CmiSetHandler(env, recoverProcDataHandlerIdx);
-    CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
-    CpvAccess(procChkptBuf) = NULL;
-    CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
+      CkPackMessage(&env);
+      CmiSetHandler(env, recoverProcDataHandlerIdx);
+      CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
+      CpvAccess(procChkptBuf) = NULL;
+      CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
 #endif
-}
+    }
 
-// called on PE 0
-void qd_callback(void *m)
-{
-   CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
-   CkFreeMsg(m);
+    // called on PE 0
+    void qd_callback(void *m)
+    {
+      CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
+      fflush(stdout);
+      CkFreeMsg(m);
+      if(CmiNumPartition()==1){
 #ifdef CMK_SMP
-   for(int i=0;i<CmiMyNodeSize();i++){
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
-       CmiSetHandler(msg, askProcDataHandlerIdx);
-       int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
-       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-   }
-   return;
-#endif
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-   // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
-   CmiSetHandler(msg, askProcDataHandlerIdx);
-   int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-
-}
+        for(int i=0;i<CmiMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
+          CmiSetHandler(msg, askProcDataHandlerIdx);
+          int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
+          CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        }
+        return;
+#endif
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+        // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+        CmiSetHandler(msg, askProcDataHandlerIdx);
+        int pe = ChkptOnPe(CpvAccess(_crashedNode));
+        CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+      }
+      else{
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, recvPhaseHandlerIdx);
+        CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
+      }
+    }
 
-// on crashed node
-void CkMemRestart(const char *dummy, CkArgMsg *args)
-{
+    // on crashed node
+    void CkMemRestart(const char *dummy, CkArgMsg *args)
+    {
 #if CMK_MEM_CHECKPOINT
-   _diePE = CmiMyNode();
-   CkMemCheckPT::startTime = restartT = CmiWallTimer();
-   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
-   CkMemCheckPT::inRestarting = 1;
-
-  CpvAccess( _crashedNode )= CmiMyNode();
-       
-  _discard_charm_message();
-    restartT = CmiWallTimer();
-   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
-  
-  /*if(CmiMyRank()==0){
-    CkCallback cb(qd_callback);
-    CkStartQD(cb);
-    CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
-  }*/
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-   // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
-   CmiSetHandler(msg, askProcDataHandlerIdx);
-   int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+      _diePE = CmiMyNode();
+      CpvAccess( _crashedNode )= CmiMyNode();
+      CkMemCheckPT::inRestarting = 1;
+      _discard_charm_message();
+
+      /*if(CmiMyRank()==0){
+        CkCallback cb(qd_callback);
+        CkStartQD(cb);
+        CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
+        }*/
+      CkMemCheckPT::startTime = restartT = CmiWallTimer();
+      if(CmiNumPartition()==1){
+        CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+        restartT = CmiWallTimer();
+        CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
+        fflush(stdout);
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+        // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+        CmiSetHandler(msg, askProcDataHandlerIdx);
+        int pe = ChkptOnPe(CpvAccess(_crashedNode));
+        CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+      }
+      else{
+        CkCallback cb(qd_callback);
+        CkStartQD(cb);
+      }
 #else
-   CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
+      CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
 #endif
-}
+    }
 
-// can be called in other files
-// return true if it is in restarting
-extern "C"
-int CkInRestarting()
-{
+    // can be called in other files
+    // return true if it is in restarting
+    extern "C"
+      int CkInRestarting()
+      {
 #if CMK_MEM_CHECKPOINT
-  if (CpvAccess( _crashedNode)!=-1) return 1;
-  // gzheng
-  //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
-  //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
-  return CkMemCheckPT::inRestarting;
+        if (CpvAccess( _crashedNode)!=-1) return 1;
+        // gzheng
+        //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
+        //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
+        return CkMemCheckPT::inRestarting;
 #else
-  return 0;
+        return 0;
 #endif
-}
+      }
+
+    extern "C"
+      int CkReplicaAlive()
+      {
+        //     if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        //       CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+        return CkMemCheckPT::replicaAlive;
+
+        /*if(CkMemCheckPT::replicaDead==1)
+          return 0;
+          else         
+          return 1;*/
+      }
+
+    extern "C"
+      int CkInCheckpointing()
+      {
+        return CkMemCheckPT::inCheckpointing;
+      }
 
-extern "C"
-void CkSetInLdb(){
+    extern "C"
+      void CkSetInLdb(){
 #if CMK_MEM_CHECKPOINT
-       CkMemCheckPT::inLoadbalancing = 1;
+        CkMemCheckPT::inLoadbalancing = 1;
 #endif
-}
+      }
 
-extern "C"
-int CkInLdb(){
+    extern "C"
+      int CkInLdb(){
 #if CMK_MEM_CHECKPOINT
-       return CkMemCheckPT::inLoadbalancing;
+        return CkMemCheckPT::inLoadbalancing;
 #endif
-       return 0;
-}
+        return 0;
+      }
 
-extern "C"
-void CkResetInLdb(){
+    extern "C"
+      void CkResetInLdb(){
 #if CMK_MEM_CHECKPOINT
-       CkMemCheckPT::inLoadbalancing = 0;
+        CkMemCheckPT::inLoadbalancing = 0;
 #endif
-}
+      }
 
-/*****************************************************************************
-                module initialization
-*****************************************************************************/
+    /*****************************************************************************
+      module initialization
+     *****************************************************************************/
 
-static int arg_where = CkCheckPoint_inMEM;
+    static int arg_where = CkCheckPoint_inMEM;
 
 #if CMK_MEM_CHECKPOINT
-void init_memcheckpt(char **argv)
-{
-    if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
-      arg_where = CkCheckPoint_inDISK;
+    void init_memcheckpt(char **argv)
+    {
+      if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
+        arg_where = CkCheckPoint_inDISK;
+      }
+      CpvInitialize(int, use_checksum);
+      CpvInitialize(int, resilience);
+      CpvAccess(use_checksum)=0;
+      CpvAccess(resilience)=0;
+      if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
+        CpvAccess(use_checksum)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
+        CpvAccess(resilience)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
+        CpvAccess(resilience)=2;
+      }
+      if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
+        CpvAccess(resilience)=3;
+      }
+      // initiliazing _crashedNode variable
+      CpvInitialize(int, _crashedNode);
+      CpvInitialize(int, _remoteCrashedNode);
+      CpvAccess(_crashedNode) = -1;
+      CpvAccess(_remoteCrashedNode) = -1;
+      init_FI(argv);
     }
-
-       // initiliazing _crashedNode variable
-       CpvInitialize(int, _crashedNode);
-       CpvAccess(_crashedNode) = -1;
-
-}
 #endif
 
-class CkMemCheckPTInit: public Chare {
-public:
-  CkMemCheckPTInit(CkArgMsg *m) {
+    class CkMemCheckPTInit: public Chare {
+      public:
+        CkMemCheckPTInit(CkArgMsg *m) {
 #if CMK_MEM_CHECKPOINT
-    if (arg_where == CkCheckPoint_inDISK) {
-      CkPrintf("Charm++> Double-disk Checkpointing. \n");
-    }
-    ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
-    CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
+          if (arg_where == CkCheckPoint_inDISK) {
+            CkPrintf("Charm++> Double-disk Checkpointing. \n");
+          }
+          ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
+          CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
 #endif
-  }
-};
+        }
+    };
 
-static void notifyHandler(char *msg)
-{
+    static void notifyHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
-  CmiFree(msg);
+      CmiFree(msg);
       /* immediately increase restart phase to filter old messages */
-  CpvAccess(_curRestartPhase) ++;
-  CpvAccess(_qd)->flushStates();
-  _discard_charm_message();
+      CpvAccess(_curRestartPhase) ++;
+      CpvAccess(_qd)->flushStates();
+      _discard_charm_message();
 
 #endif
-}
+    }
 
-extern "C"
-void notify_crash(int node)
-{
+    extern "C"
+      void notify_crash(int node)
+      {
 #ifdef CMK_MEM_CHECKPOINT
-  CpvAccess( _crashedNode) = node;
+        CpvAccess( _crashedNode) = node;
 #ifdef CMK_SMP
-  for(int i=0;i<CkMyNodeSize();i++){
-       CpvAccessOther(_crashedNode,i)=node;
-  }
+        for(int i=0;i<CkMyNodeSize();i++){
+          CpvAccessOther(_crashedNode,i)=node;
+        }
 #endif
-  CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
-  CkMemCheckPT::inRestarting = 1;
+        //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
+        CkMemCheckPT::inRestarting = 1;
 
-    // this may be in interrupt handler, send a message to reset QD
-  int pe = CmiNodeFirst(CkMyNode());
-  for(int i=0;i<CkMyNodeSize();i++){
-       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-       CmiSetHandler(msg, notifyHandlerIdx);
-       CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
-  }
+        // this may be in interrupt handler, send a message to reset QD
+        int pe = CmiNodeFirst(CkMyNode());
+        for(int i=0;i<CkMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg, notifyHandlerIdx);
+          CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
+        }
 #endif
-}
-
-extern "C" void (*notify_crash_fn)(int node);
-
-#if CMK_CONVERSE_MPI
-static int pingHandlerIdx;
-static int pingCheckHandlerIdx;
-static int buddyDieHandlerIdx;
-static double lastPingTime = -1;
+      }
 
-extern "C" void mpi_restart_crashed(int pe, int rank);
-extern "C" int  find_spare_mpirank(int pe);
+    extern "C" void (*notify_crash_fn)(int node);
 
-void pingBuddy();
-void pingCheckHandler();
 
-void buddyDieHandler(char *msg)
-{
+#if CMK_CONVERSE_MPI
+    void buddyDieHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
-   // notify
-   int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-   notify_crash(diepe);
-   // send message to crash pe to let it restart
-   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-   int newrank = find_spare_mpirank(diepe);
-   int buddy = obj->BuddyPE(CmiMyPe());
-   if (buddy == diepe)  {
-     mpi_restart_crashed(diepe, newrank);
-     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
-   }
+      // notify
+      CkMemCheckPT::inRestarting = 1;
+      int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      notify_crash(diepe);
+      // send message to crash pe to let it restart
+      CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+      int newrank = find_spare_mpirank(diepe,CmiMyPartition());
+      int buddy = obj->BuddyPE(CmiMyPe());
+      //if (CmiMyPe() == obj->BuddyPE(diepe))  {
+      if (buddy == diepe)  {
+        mpi_restart_crashed(diepe, newrank);
+      }
 #endif
-}
+    }
 
-void pingHandler(void *msg)
-{
-  lastPingTime = CmiWallTimer();
-  CmiFree(msg);
-}
+    void pingHandler(void *msg)
+    {
+      lastPingTime = CmiWallTimer();
+      CmiFree(msg);
+    }
 
-void pingCheckHandler()
-{
+    void pingCheckHandler()
+    {
 #if CMK_MEM_CHECKPOINT
-  double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb()) {
-    int i, pe, buddy;
-    // tell everyone the buddy dies
-    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-    for (i = 1; i < CmiNumPes(); i++) {
-       pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
-       if (obj->BuddyPE(pe) == CmiMyPe()) break;
-    }
-    buddy = pe;
-    CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
-    /*for (int pe = 0; pe < CmiNumPes(); pe++) {
-      if (obj->isFailed(pe) || pe == buddy) continue;
-      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-      *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
-      CmiSetHandler(msg, buddyDieHandlerIdx);
-      CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-    }*/
-    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-    *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
-    CmiSetHandler(msg, buddyDieHandlerIdx);
-    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-  }
-  else 
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+      double now = CmiWallTimer();
+      if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
+        //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
+        int i, pe, buddy;
+        // tell everyone the buddy dies
+        CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+        for (i = 1; i < CmiNumPes(); i++) {
+          pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
+          if (obj->BuddyPE(pe) == CmiMyPe()) break;
+        }
+        buddy = pe;
+        CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
+        fflush(stdout);
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
+        CmiSetHandler(msg, buddyDieHandlerIdx);
+        CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        //send to everyone in the other world
+        if(CmiNumPartition()!=1){
+          //for(int i=0;i<CmiNumPes();i++){
+            char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+            *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+            CmiSetHandler(rMsg, replicaDieHandlerIdx);
+            CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+          //}
+        }
+      }
+        else 
+          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
-}
+      }
 
-void pingBuddy()
-{
+      void pingBuddy()
+      {
 #if CMK_MEM_CHECKPOINT
-  CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-  if (obj) {
-    int buddy = obj->BuddyPE(CkMyPe());
-//printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
-    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-    *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
-    CmiSetHandler(msg, pingHandlerIdx);
-    CmiGetRestartPhase(msg) = 9999;
-    CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-  }
-  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+        CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+        if (obj) {
+          int buddy = obj->BuddyPE(CkMyPe());
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
+          CmiSetHandler(msg, pingHandlerIdx);
+          CmiGetRestartPhase(msg) = 9999;
+          CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        }
+        CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
 #endif
-}
+      }
 #endif
 
-// initproc
-void CkRegisterRestartHandler( )
-{
+      // initproc
+      void CkRegisterRestartHandler( )
+      {
 #if CMK_MEM_CHECKPOINT
-  notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
-  askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
-  recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
-  restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
-  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
-  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
-  recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
+        notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
+        askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
+        askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
+        recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
+        restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
+        restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+
+        //for replica
+        recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
+        replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+        replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
+        replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
+        replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
+        replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
+        askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
+        recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
+        recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
+        recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
 
 #if CMK_CONVERSE_MPI
-  pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
-  pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
-  buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
-#endif
-
-  CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
-  CpvInitialize(CkCheckPTMessage **, chkpBuf);
-  CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
-  CpvInitialize(CkCheckPTMessage *, buddyBuf);
-  CpvInitialize(int,curPointer);
-  CpvAccess(procChkptBuf) = NULL;
-  CpvAccess(buddyBuf) = NULL;
-  CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
-  CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
-  CpvAccess(chkpBuf)[0] = NULL;
-  CpvAccess(chkpBuf)[1] = NULL;
-  CpvAccess(localProcChkpBuf)[0] = NULL;
-  CpvAccess(localProcChkpBuf)[1] = NULL;
-  CpvAccess(curPointer) = 0;
-  CpvAccess(recvdLocal) = 0;
-  CpvAccess(recvdRemote) = 0;
-
-  notify_crash_fn = notify_crash;
+        pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
+        pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
+        buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
+#endif
+
+        CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
+        CpvInitialize(CkCheckPTMessage **, chkpBuf);
+        CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
+        CpvInitialize(CkCheckPTMessage *, buddyBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
+        CpvInitialize(int,curPointer);
+        CpvInitialize(int,recvdLocal);
+        CpvInitialize(int,localChkpDone);
+        CpvInitialize(int,remoteChkpDone);
+        CpvInitialize(int,remoteStarted);
+        CpvInitialize(int,localStarted);
+        CpvInitialize(int,localReady);
+        CpvInitialize(int,remoteReady);
+        CpvInitialize(int,recvdRemote);
+        CpvInitialize(int,recvdProcChkp);
+        CpvInitialize(int,localChecksum);
+        CpvInitialize(int,remoteChecksum);
+        CpvInitialize(int,recvdArrayChkp);
+
+        CpvAccess(procChkptBuf) = NULL;
+        CpvAccess(buddyBuf) = NULL;
+        CpvAccess(recoverProcBuf) = NULL;
+        CpvAccess(recoverArrayBuf) = NULL;
+        CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
+        CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
+        CpvAccess(chkpBuf)[0] = NULL;
+        CpvAccess(chkpBuf)[1] = NULL;
+        CpvAccess(localProcChkpBuf)[0] = NULL;
+        CpvAccess(localProcChkpBuf)[1] = NULL;
+
+        CpvAccess(curPointer) = 0;
+        CpvAccess(recvdLocal) = 0;
+        CpvAccess(localChkpDone) = 0;
+        CpvAccess(remoteChkpDone) = 0;
+        CpvAccess(remoteStarted) = 0;
+        CpvAccess(localStarted) = 0;
+        CpvAccess(localReady) = 0;
+        CpvAccess(remoteReady) = 0;
+        CpvAccess(recvdRemote) = 0;
+        CpvAccess(recvdProcChkp) = 0;
+        CpvAccess(localChecksum) = 0;
+        CpvAccess(remoteChecksum) = 0;
+        CpvAccess(recvdArrayChkp) = 0;
+
+        notify_crash_fn = notify_crash;
 
 #if ! CMK_CONVERSE_MPI
-  // print pid to kill
-//  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
-//  sleep(4);
+        // print pid to kill
+        //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
+        //  sleep(4);
 #endif
 #endif
-}
+      }
 
 
-extern "C"
-int CkHasCheckpoints()
-{
-  return checkpointed;
-}
+      extern "C"
+        int CkHasCheckpoints()
+        {
+          return checkpointed;
+        }
 
-/// @todo: the following definitions should be moved to a separate file containing
-// structures and functions about fault tolerance strategies
+      /// @todo: the following definitions should be moved to a separate file containing
+      // structures and functions about fault tolerance strategies
 
-/**
- *  * @brief: function for killing a process                                             
- *   */
+      /**
      *  * @brief: function for killing a process                                             
      *   */
 #ifdef CMK_MEM_CHECKPOINT
 #if CMK_HAS_GETPID
-void killLocal(void *_dummy,double curWallTime){
+      void killLocal(void *_dummy,double curWallTime){
         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
         if(CmiWallTimer()<killTime-1){
-                CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
+          CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
         }else{ 
 #if CMK_CONVERSE_MPI
-                               CkDieNow();
+          CkDieNow();
 #else 
-                kill(getpid(),SIGKILL);                                               
+          kill(getpid(),SIGKILL);                                               
 #endif
         }              
-} 
+      
 #else
-void killLocal(void *_dummy,double curWallTime){
-  CmiAbort("kill() not supported!");
-}
+      void killLocal(void *_dummy,double curWallTime){
+        CmiAbort("kill() not supported!");
+      }
 #endif
 #endif
 
 #ifdef CMK_MEM_CHECKPOINT
-/**
- * @brief: reads the file with the kill information
- */
-void readKillFile(){
+      /**
      * @brief: reads the file with the kill information
      */
+      void readKillFile(){
         FILE *fp=fopen(killFile,"r");
         if(!fp){
-                printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
-                return;
+          printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
+          return;
         }
         int proc;
         double sec;
         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
-                if(proc == CkMyNode() && CkMyRank() == 0){
-                        killTime = CmiWallTimer()+sec;
-                        printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
-                        CcdCallFnAfter(killLocal,NULL,sec*1000);
-                }
+          if(proc == CkMyNode() && CkMyRank() == 0){
+            killTime = CmiWallTimer()+sec;
+            printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
+            CcdCallFnAfter(killLocal,NULL,sec*1000);
+          }
         }
         fclose(fp);
-}
+      }
 
 #if ! CMK_CONVERSE_MPI
-void CkDieNow()
-{
+      void CkDieNow()
+      {
 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
-         // ignored for non-mpi version
+        // ignored for non-mpi version
         CmiPrintf("[%d] die now.\n", CmiMyPe());
         killTime = CmiWallTimer()+0.001;
         CcdCallFnAfter(killLocal,NULL,1);
 #endif
-}
+      }
 #endif
 
 #endif