fix for migration
[charm.git] / src / ck-core / ckmemcheckpoint.C
index 627acf9cc231670e51abeb6b2a7842ae74fe066e..c4dc028c8dd537c689026fdc02c8c05a34bcbdc7 100644 (file)
@@ -1,52 +1,53 @@
 
 /*
 
 /*
-Charm++ support for fault tolerance of
-In memory synchronous checkpointing and restart
+   Charm++ support for fault tolerance of
+   In memory synchronous checkpointing and restart
 
 
-written by Gengbin Zheng, gzheng@uiuc.edu
-           Lixia Shi,     lixiashi@uiuc.edu
+   written by Gengbin Zheng, gzheng@uiuc.edu
+   Lixia Shi,     lixiashi@uiuc.edu
 
 
-added 12/18/03:
+   added 12/18/03:
 
 
-To support fault tolerance while allowing migration, it uses double
-checkpointing scheme for each array element (not a infallible scheme).
-In this version, checkpointing is done based on array elements. 
-Each array element individully sends its checkpoint data to two buddies.
+   To support fault tolerance while allowing migration, it uses double
+   checkpointing scheme for each array element (not a infallible scheme).
+   In this version, checkpointing is done based on array elements. 
+   Each array element individully sends its checkpoint data to two buddies.
 
 
-In this implementation, assume only one failure happens at a time,
-or two failures on two processors which are not buddy to each other;
-also assume there is no failure during a checkpointing or restarting phase.
+   In this implementation, assume only one failure happens at a time,
+   or two failures on two processors which are not buddy to each other;
+   also assume there is no failure during a checkpointing or restarting phase.
 
 
-Restart phase contains two steps:
-1. Converse level restart: the newly created process for the failed
+   Restart phase contains two steps:
+   1. Converse level restart: the newly created process for the failed
    processor recover its system data (no array elements) from 
    its backup processor.
    processor recover its system data (no array elements) from 
    its backup processor.
-2. Charm++ level restart: CkMemCheckPT gets control and recover array 
+   2. Charm++ level restart: CkMemCheckPT gets control and recover array 
    elements and reset all states of system groups to be consistent.
 
    elements and reset all states of system groups to be consistent.
 
-added 3/14/04:
-1. also support for double in-disk checkpoint/restart
+   added 3/14/04:
+   1. also support for double in-disk checkpoint/restart
    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
 
    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
 
-added 4/16/04:
-1. also support the case when there is a pool of extra processors.
+   added 4/16/04:
+   1. also support the case when there is a pool of extra processors.
    set CK_NO_PROC_POOL to 0.
 
 TODO:
 1. checkpoint scheme can be reimplemented based on per processor scheme;
    set CK_NO_PROC_POOL to 0.
 
 TODO:
 1. checkpoint scheme can be reimplemented based on per processor scheme;
- restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
+restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
 
 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
 
-*/
+ */
 
 #include "unistd.h"
 
 #include "charm++.h"
 
 #include "unistd.h"
 
 #include "charm++.h"
+#include "ckfaultinjector.h"
 #include "ck.h"
 #include "register.h"
 #include "conv-ccs.h"
 #include <signal.h>
 #include "ck.h"
 #include "register.h"
 #include "conv-ccs.h"
 #include <signal.h>
-
+#include <map>
 void noopck(const char*, ...)
 {}
 
 void noopck(const char*, ...)
 {}
 
@@ -65,11 +66,22 @@ void noopck(const char*, ...)
 #define CK_NO_PROC_POOL                                0
 #endif
 
 #define CK_NO_PROC_POOL                                0
 #endif
 
+#define CMK_CHKP_ALL           1
+#define CMK_USE_BARRIER                0
+#define CMK_USE_CHECKSUM               1
+
+//stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
 #define STREAMING_INFORMHOME                    1
 CpvDeclare(int, _crashedNode);
+CpvDeclare(int, use_checksum);
+CpvDeclare(int, resilience);
+CpvDeclare(int, _remoteCrashedNode);
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
 
 // static, so that it is accessible from Converse part
 int CkMemCheckPT::inRestarting = 0;
+int CkMemCheckPT::inCheckpointing = 0;
+int CkMemCheckPT::replicaAlive = 1;
+int CkMemCheckPT::inLoadbalancing = 0;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
 CkCallback CkMemCheckPT::cpCallback;
 double CkMemCheckPT::startTime;
 char *CkMemCheckPT::stage;
 CkCallback CkMemCheckPT::cpCallback;
@@ -92,18 +104,87 @@ int killFlag=0;
 double killTime=0.0;
 #endif
 
 double killTime=0.0;
 #endif
 
+#ifdef CKLOCMGR_LOOP
+#undef CKLOCMGR_LOOP
+#endif
+// loop over all CkLocMgr and do "code"
+#define  CKLOCMGR_LOOP(code)   {       \
+  int numGroups = CkpvAccess(_groupIDTable)->size();   \
+  for(int i=0;i<numGroups;i++) {       \
+    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
+    if(obj->isLocMgr())  {     \
+      CkLocMgr *mgr = (CkLocMgr*)obj;  \
+      code     \
+    }  \
+  }    \
+}
+
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
-
+CpvDeclare(CkCheckPTMessage**, chkpBuf);
+CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
+//store the checkpoint of the buddy to compare
+//do not need the whole msg, can be the checksum
+CpvDeclare(CkCheckPTMessage*, buddyBuf);
+CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
+CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
+//pointer of the checkpoint going to be written
+CpvDeclare(int, curPointer);
+CpvDeclare(int, recvdRemote);
+CpvDeclare(int, recvdLocal);
+CpvDeclare(int, localChkpDone);
+CpvDeclare(int, remoteChkpDone);
+CpvDeclare(int, remoteStarted);
+CpvDeclare(int, localStarted);
+CpvDeclare(int, remoteReady);
+CpvDeclare(int, localReady);
+CpvDeclare(int, recvdArrayChkp);
+CpvDeclare(int, recvdProcChkp);
+CpvDeclare(int, localChecksum);
+CpvDeclare(int, remoteChecksum);
+
+bool compare(char * buf1, char * buf2);
+int getChecksum(char * buf);
+static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
+// Converse function handles
+static int askPhaseHandlerIdx;
+static int recvPhaseHandlerIdx;
+static int askProcDataHandlerIdx;
+static int askRecoverDataHandlerIdx;
+static int restartBcastHandlerIdx;
+static int recoverProcDataHandlerIdx;
+static int restartBeginHandlerIdx;
+static int recvRemoteChkpHandlerIdx;
+static int replicaDieHandlerIdx;
+static int replicaChkpStartHandlerIdx;
+static int replicaDieBcastHandlerIdx;
+static int replicaRecoverHandlerIdx;
+static int replicaChkpDoneHandlerIdx;
+static int recoverRemoteProcDataHandlerIdx;
+static int recoverRemoteArrayDataHandlerIdx;
+static int notifyHandlerIdx;
 // compute the backup processor
 // FIXME: avoid crashed processors
 // compute the backup processor
 // FIXME: avoid crashed processors
+#if CMK_CONVERSE_MPI
+static int pingHandlerIdx;
+static int pingCheckHandlerIdx;
+static int buddyDieHandlerIdx;
+static double lastPingTime = -1;
+
+extern "C" void mpi_restart_crashed(int pe, int rank);
+extern "C" int  find_spare_mpirank(int pe,int partition);
+
+void pingBuddy();
+void pingCheckHandler();
+
+#endif
 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
 
 inline int CkMemCheckPT::BuddyPE(int pe)
 {
   int budpe;
 #if NODE_CHECKPOINT
 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
 
 inline int CkMemCheckPT::BuddyPE(int pe)
 {
   int budpe;
 #if NODE_CHECKPOINT
-    // buddy is the processor with same rank on the next physical node
+  // buddy is the processor with same rank on the next physical node
   int r1 = CmiPhysicalRank(pe);
   int budnode = CmiPhysicalNodeID(pe);
   do {
   int r1 = CmiPhysicalRank(pe);
   int budnode = CmiPhysicalNodeID(pe);
   do {
@@ -119,8 +200,7 @@ inline int CkMemCheckPT::BuddyPE(int pe)
   }
 #else
   budpe = pe;
   }
 #else
   budpe = pe;
-  while (budpe == pe || isFailed(budpe)) 
-          budpe = (budpe+1)%CkNumPes();
+  budpe = (budpe+1)%CkNumPes();
 #endif
   return budpe;
 }
 #endif
   return budpe;
 }
@@ -129,49 +209,49 @@ inline int CkMemCheckPT::BuddyPE(int pe)
 // choose and register with 2 buddies for checkpoiting 
 #if CMK_MEM_CHECKPOINT
 void ArrayElement::init_checkpt() {
 // choose and register with 2 buddies for checkpoiting 
 #if CMK_MEM_CHECKPOINT
 void ArrayElement::init_checkpt() {
-       if (_memChkptOn == 0) return;
-       if (CkInRestarting()) {
-         CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
-       }
-       // only master init checkpoint
-        if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
-
-        budPEs[0] = CkMyPe();
-        budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
-       CmiAssert(budPEs[0] != budPEs[1]);
-        // inform checkPTMgr
-        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-       //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
-        checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
-       checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
+  if (_memChkptOn == 0) return;
+  if (CkInRestarting()) {
+    CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
+  }
+  // only master init checkpoint
+  if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
+
+  budPEs[0] = CkMyPe();
+  budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
+  CmiAssert(budPEs[0] != budPEs[1]);
+  // inform checkPTMgr
+  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+  //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
+  checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
+  checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
 }
 #endif
 
 // entry function invoked by checkpoint mgr asking for checkpoint data
 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 #if CMK_MEM_CHECKPOINT
 }
 #endif
 
 // entry function invoked by checkpoint mgr asking for checkpoint data
 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 #if CMK_MEM_CHECKPOINT
-//  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
-//char index[128];   thisIndexMax.sprint(index);
-//printf("[%d] checkpointing %s\n", CkMyPe(), index);
+  //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
+  //char index[128];   thisIndexMax.sprint(index);
+  //printf("[%d] checkpointing %s\n", CkMyPe(), index);
   CkLocMgr *locMgr = thisArray->getLocMgr();
   CmiAssert(myRec!=NULL);
   int size;
   {
   CkLocMgr *locMgr = thisArray->getLocMgr();
   CmiAssert(myRec!=NULL);
   int size;
   {
-        PUP::sizer p;
-        locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
-        size = p.size();
+    PUP::sizer p;
+    locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
+    size = p.size();
   }
   int packSize = size/sizeof(double) +1;
   CkArrayCheckPTMessage *msg =
   }
   int packSize = size/sizeof(double) +1;
   CkArrayCheckPTMessage *msg =
-                 new (packSize, 0) CkArrayCheckPTMessage;
+    new (packSize, 0) CkArrayCheckPTMessage;
   msg->len = size;
   msg->index =thisIndexMax;
   msg->aid = thisArrayID;
   msg->locMgr = locMgr->getGroupID();
   msg->cp_flag = 1;
   {
   msg->len = size;
   msg->index =thisIndexMax;
   msg->aid = thisArrayID;
   msg->locMgr = locMgr->getGroupID();
   msg->cp_flag = 1;
   {
-        PUP::toMem p(msg->packData);
-        locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
+    PUP::toMem p(msg->packData);
+    locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
   }
 
   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
   }
 
   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
@@ -184,9 +264,9 @@ void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
 class CkMemCheckPTInfo: public CkCheckPTInfo
 {
   CkArrayCheckPTMessage *ckBuffer;
 class CkMemCheckPTInfo: public CkCheckPTInfo
 {
   CkArrayCheckPTMessage *ckBuffer;
-public:
+  public:
   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
-                   CkCheckPTInfo(a, loc, idx, pno)
+    CkCheckPTInfo(a, loc, idx, pno)
   {
     ckBuffer = NULL;
   }
   {
     ckBuffer = NULL;
   }
@@ -209,14 +289,14 @@ public:
     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
   }     
   inline void updateBuddy(int b1, int b2) {
     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
   }     
   inline void updateBuddy(int b1, int b2) {
-     CmiAssert(ckBuffer);
-     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
-     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
-     CmiAssert(pNo != CkMyPe());
+    CmiAssert(ckBuffer);
+    ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
+    pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
+    CmiAssert(pNo != CkMyPe());
   }
   inline int getSize() { 
   }
   inline int getSize() { 
-     CmiAssert(ckBuffer);
-     return ckBuffer->len; 
+    CmiAssert(ckBuffer);
+    return ckBuffer->len; 
   }
 };
 
   }
 };
 
@@ -226,7 +306,7 @@ class CkDiskCheckPTInfo: public CkCheckPTInfo
   char *fname;
   int bud1, bud2;
   int len;                     // checkpoint size
   char *fname;
   int bud1, bud2;
   int len;                     // checkpoint size
-public:
+  public:
   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
   {
 #if CMK_USE_MKSTEMP
   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
   {
 #if CMK_USE_MKSTEMP
@@ -254,7 +334,7 @@ public:
     PUP::toDisk p(f);
     CkPupMessage(p, (void **)&data);
     // delay sync to the end because otherwise the messages are blocked
     PUP::toDisk p(f);
     CkPupMessage(p, (void **)&data);
     // delay sync to the end because otherwise the messages are blocked
-//    fsync(fileno(f));
+    //    fsync(fileno(f));
     fclose(f);
     bud1 = data->bud1;
     bud2 = data->bud2;
     fclose(f);
     bud1 = data->bud1;
     bud2 = data->bud2;
@@ -274,12 +354,12 @@ public:
     return data;
   }
   inline void updateBuddy(int b1, int b2) {
     return data;
   }
   inline void updateBuddy(int b1, int b2) {
-     bud1 = b1; bud2 = b2;
-     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
-     CmiAssert(pNo != CkMyPe());
+    bud1 = b1; bud2 = b2;
+    pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
+    CmiAssert(pNo != CkMyPe());
   }
   inline int getSize() { 
   }
   inline int getSize() { 
-     return len; 
+    return len; 
   }
 };
 
   }
 };
 
@@ -294,24 +374,31 @@ CkMemCheckPT::CkMemCheckPT(int w)
 #if CK_NO_PROC_POOL
   if (numnodes <= 2)
 #else
 #if CK_NO_PROC_POOL
   if (numnodes <= 2)
 #else
-  if (numnodes  == 1)
+    if (numnodes  == 1)
 #endif
 #endif
-  {
-    if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
-    _memChkptOn = 0;
-  }
+    {
+      if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
+      _memChkptOn = 0;
+    }
   inRestarting = 0;
   inRestarting = 0;
+  inCheckpointing = 0;
   recvCount = peCount = 0;
   ackCount = 0;
   expectCount = -1;
   where = w;
   recvCount = peCount = 0;
   ackCount = 0;
   expectCount = -1;
   where = w;
-
+  replicaAlive = 1;
+  notifyReplica = 0;
 #if CMK_CONVERSE_MPI
   void pingBuddy();
   void pingCheckHandler();
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
 #if CMK_CONVERSE_MPI
   void pingBuddy();
   void pingCheckHandler();
   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-  CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+  CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
 #endif
+  chkpTable[0] = NULL;
+  chkpTable[1] = NULL;
+  maxIter = -1;
+  recvIterCount = 0;
+  localDecided = false;
 }
 
 CkMemCheckPT::~CkMemCheckPT()
 }
 
 CkMemCheckPT::~CkMemCheckPT()
@@ -338,11 +425,59 @@ void CkMemCheckPT::pup(PUP::er& p)
     void pingBuddy();
     void pingCheckHandler();
     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
     void pingBuddy();
     void pingCheckHandler();
     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+    CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
 #endif
+    maxIter = -1;
+    recvIterCount = 0;
+    localDecided = false;
+  }
+}
+
+void CkMemCheckPT::getIter(){
+  localDecided = true;
+  localMaxIter = maxIter+1;
+  if(CkMyPe()==0){
+    CkPrintf("local max iter is %d\n",localMaxIter);
+  }
+  contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
+  int elemCount = CkCountChkpSyncElements();
+  if(CkMyPe()==0)
+    startTime = CmiWallTimer();
+  if(elemCount == 0){
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
+  }
+}
+
+//when one replica failes, decide the next chkp iter
+
+void CkMemCheckPT::recvIter(int iter){
+  if(maxIter<iter){
+    maxIter=iter;
+  }
+}
+
+void CkMemCheckPT::recvMaxIter(int iter){
+  localDecided = false;
+  if(CkMyPe()==0)
+    CkPrintf("checkpoint iteration is %d\n",iter);
+  CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
+}
+
+void CkMemCheckPT::reachChkpIter(){
+  recvIterCount++;
+  elemCount = CkCountChkpSyncElements();
+  //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
+  if(recvIterCount == elemCount){
+    recvIterCount = 0;
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
   }
 }
 
   }
 }
 
+void CkMemCheckPT::startChkp(){
+  CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
+  CkStartMemCheckpoint(cpCallback);
+}
+
 // called by checkpoint mgr to restore an array element
 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
 {
 // called by checkpoint mgr to restore an array element
 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
 {
@@ -352,10 +487,10 @@ void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m)
   PUP::fromMem p(m->packData);
   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
   CmiAssert(mgr);
   PUP::fromMem p(m->packData);
   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
   CmiAssert(mgr);
-#if STREAMING_INFORMHOME
-  mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
-#else
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
+#else
+  mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
 #endif
 
   // find a list of array elements bound together
 #endif
 
   // find a list of array elements bound together
@@ -408,11 +543,11 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index,
     CkCheckPTInfo *entry = ckTable[idx];
     if (index == entry->index) {
       if (loc == entry->locMgr) {
     CkCheckPTInfo *entry = ckTable[idx];
     if (index == entry->index) {
       if (loc == entry->locMgr) {
-         // bindTo array elements
-          return;
+        // bindTo array elements
+        return;
       }
       }
-        // for array inheritance, the following check may fail
-        // because ArrayElement constructor of all superclasses are called
+      // for array inheritance, the following check may fail
+      // because ArrayElement constructor of all superclasses are called
       if (aid == entry->aid) {
         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
       if (aid == entry->aid) {
         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
@@ -430,1022 +565,1929 @@ void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index,
 
 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 {
 
 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
 {
+#if !CMK_CHKP_ALL      
   int buddy = msg->bud1;
   if (buddy == CkMyPe()) buddy = msg->bud2;
   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
   recvData(msg);
   int buddy = msg->bud1;
   if (buddy == CkMyPe()) buddy = msg->bud2;
   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
   recvData(msg);
-    // ack
+  // ack
   thisProxy[buddy].gotData();
   thisProxy[buddy].gotData();
+#else
+  chkpTable[0] = NULL;
+  chkpTable[1] = NULL;
+  recvArrayCheckpoint(msg);
+  thisProxy[msg->bud2].gotData();
+#endif
 }
 
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
 }
 
 // loop through my checkpoint table and ask checkpointed array elements
 // to send me checkpoint data.
-void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+//void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
+void CkMemCheckPT::doItNow(int starter)
 {
   checkpointed = 1;
 {
   checkpointed = 1;
-  cpCallback = cb;
+  //cpCallback = cb;
   cpStarter = starter;
   cpStarter = starter;
+  inCheckpointing = 1;
   if (CkMyPe() == cpStarter) {
     startTime = CmiWallTimer();
   if (CkMyPe() == cpStarter) {
     startTime = CmiWallTimer();
-    CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
+    CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
   }
   }
+#if CMK_CONVERSE_MPI
+  if(CmiNumPartition()==1)
+#endif   
+  {
 
 
-  int len = ckTable.length();
-  for (int i=0; i<len; i++) {
-    CkCheckPTInfo *entry = ckTable[i];
+#if !CMK_CHKP_ALL
+    int len = ckTable.length();
+    for (int i=0; i<len; i++) {
+      CkCheckPTInfo *entry = ckTable[i];
       // always let the bigger number processor send request
       // always let the bigger number processor send request
-    //if (CkMyPe() < entry->pNo) continue;
+      //if (CkMyPe() < entry->pNo) continue;
       // always let the smaller number processor send request, may on same proc
       // always let the smaller number processor send request, may on same proc
-    if (!isMaster(entry->pNo)) continue;
+      if (!isMaster(entry->pNo)) continue;
       // call inmem_checkpoint to the array element, ask it to send
       // back checkpoint data via recvData().
       // call inmem_checkpoint to the array element, ask it to send
       // back checkpoint data via recvData().
-    CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
-    CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
-  }
+      CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
+      CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
+    }
     // if my table is empty, then I am done
     // if my table is empty, then I am done
-  if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-
+    if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+#else
+    startArrayCheckpoint();
+#endif
+    sendProcData();
+  }
+#if CMK_CONVERSE_MPI
+  else
+  {
+    startCheckpoint();
+  }
+#endif
   // pack and send proc level data
   // pack and send proc level data
-  sendProcData();
 }
 
 }
 
-// don't handle array elements
-static inline void _handleProcData(PUP::er &p)
-{
+class MemElementPacker : public CkLocIterator{
+  private:
+    //    CkLocMgr *locMgr;
+    PUP::er &p;
+    std::map<CkHashCode,CkLocation> arrayMap;
+  public:
+    MemElementPacker(PUP::er &p_):p(p_){};
+    void addLocation(CkLocation &loc){
+      CkArrayIndexMax idx = loc.getIndex();
+      arrayMap[idx.hash()] = loc;
+    }
+    void writeCheckpoint(){
+      std::map<CkHashCode, CkLocation>::iterator it;
+      for(it = arrayMap.begin();it!=arrayMap.end();it++){
+        CkLocation loc = it->second;
+        CkLocMgr *locMgr = loc.getManager();
+        CkArrayIndexMax idx = loc.getIndex();
+        CkGroupID gID = locMgr->ckGetGroupID();
+        p|gID;
+        p|idx;
+        locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
+      }
+    }
+};
+
+void pupAllElements(PUP::er &p){
+#if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
+  int numElements;
+  if(!p.isUnpacking()){
+    numElements = CkCountArrayElements();
+  }
+  p | numElements;
+  if(!p.isUnpacking()){
+    MemElementPacker packer(p);
+    CKLOCMGR_LOOP(mgr->iterateLocal(packer););
+    packer.writeCheckpoint();
+  }
+#endif
+}
+
+void CkMemCheckPT::startArrayCheckpoint(){
+#if CMK_CHKP_ALL
+  int size;
+  {
+    PUP::sizer psizer;
+    pupAllElements(psizer);
+    size = psizer.size();
+  }
+  int packSize = size/sizeof(double)+1;
+  // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
+  CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
+  msg->len = size;
+  msg->cp_flag = 1;
+  int budPEs[2];
+  msg->bud1=CkMyPe();
+  msg->bud2=ChkptOnPe(CkMyPe());
+  {
+    PUP::toMem p(msg->packData);
+    pupAllElements(p);
+  }
+  thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
+  if(chkpTable[0]) delete chkpTable[0];
+  chkpTable[0] = msg;
+  //send the checkpoint to my 
+  recvCount++;
+#endif
+}
+
+void CkMemCheckPT::startCheckpoint(){
+#if CMK_CONVERSE_MPI
+  if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+    CkPrintf("in start checkpointing!!!!\n");
+  int size;
+  {
+    PUP::sizer p;
+    _handleProcData(p,CmiFalse);
+    size = p.size();
+  }
+  CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
+  procMsg->len = size;
+  procMsg->cp_flag = 1;
+  {
+    PUP::toMem p(procMsg->packData);
+    _handleProcData(p,CmiFalse);
+  }
+  int pointer = CpvAccess(curPointer);
+  if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+  CpvAccess(localProcChkpBuf)[pointer] = procMsg;
+
+  {
+    PUP::sizer psizer;
+    pupAllElements(psizer);
+    size = psizer.size();
+  }
+  CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
+  msg->len = size;
+  msg->cp_flag = 1;
+  int checksum;
+  {
+    if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
+      PUP::checker p(msg->packData);
+      pupAllElements(p);
+      checksum = p.getChecksum();
+    }else{  
+//    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
+      PUP::toMem p(msg->packData);
+      pupAllElements(p);
+    }
+  }
+  pointer = CpvAccess(curPointer);
+  if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+  CpvAccess(chkpBuf)[pointer] = msg;
+  if(CkMyPe()==0)
+    CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+  if(CkReplicaAlive()==1){
+    CpvAccess(recvdLocal) = 1;
+    if(CpvAccess(use_checksum)){
+      CpvAccess(localChecksum) = checksum;
+      char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
+      //only one reaplica will send
+      if(CmiMyPartition()==0){
+        CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
+      }
+    }else{
+      envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
+      CkPackMessage(&env);
+      if(CmiMyPartition()==0){
+        CmiSetHandler(env,recvRemoteChkpHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+      }
+    }
+    if(CmiMyPartition()==0){
+      notifyReplica = 1;
+      thisProxy[CkMyPe()].doneComparison(true);
+    }
+  }
+  if(CpvAccess(recvdRemote)==1){
+    //only partition 1 will do it
+    //compare the checkpoint 
+    int size = CpvAccess(chkpBuf)[pointer]->len;
+    if(CpvAccess(use_checksum)){
+      if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
+    }else{
+      if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
+        thisProxy[CkMyPe()].doneComparison(true);
+      }
+      else{
+        //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
+        thisProxy[CkMyPe()].doneComparison(false);
+      }
+    }
+    if(CkMyPe()==0)
+      CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+  }
+  else{
+    if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
+      //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
+      if(CpvAccess(remoteReady)==1){
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        {      
+          int pointer = CpvAccess(curPointer);
+          //send the proc data
+          CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          procMsg->pointer = pointer;
+          envelope * env = (envelope *)(UsrToEnv(procMsg));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+            CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
+          }
+        }
+        //send the array checkpoint data
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        msg->pointer = CpvAccess(curPointer);
+        envelope * env = (envelope *)(UsrToEnv(msg));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+          CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
+      }else{
+        if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
+          int pointer = CpvAccess(curPointer);
+          CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+          (CpvAccess(recoverProcBuf))->pointer = pointer;
+        }
+        CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
+        CpvAccess(localReady) = 1;
+      }
+      //can continue work, no need to wait for my replica
+      int _ret = 1;
+      notifyReplica = 1;
+      CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+      contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
+    }
+  }
+#endif
+}
+
+void CkMemCheckPT::doneComparison(bool ret){
+  int _ret = 1;
+  if(!ret){
+    //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
+    _ret = 0;
+  }else{
+    _ret = 1;
+  }
+  CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
+  contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
+}
+
+void CkMemCheckPT::doneRComparison(int ret){
+  //   if(CpvAccess(curPointer) == 0){
+  //if(ret==CkNumPes()){
+    CpvAccess(localChkpDone) = 1;
+    if(CpvAccess(remoteChkpDone) ==1){
+      thisProxy.doneBothComparison();
+    }else{
+      CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
+    }
+  //}
+  /*else{
+    CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
+    startTime = CmiWallTimer();
+    thisProxy.RollBack();
+  }*/
+  if(notifyReplica == 0){
+    //notify the replica am done
+    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+    *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
+    CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
+    CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
+    notifyReplica = 1;
+  }
+}
+
+void CkMemCheckPT::doneBothComparison(){
+  CpvAccess(recvdRemote) = 0;
+  CpvAccess(remoteReady) = 0;
+  CpvAccess(localReady) = 0;
+  CpvAccess(recvdLocal) = 0;
+  CpvAccess(localChkpDone) = 0;
+  CpvAccess(remoteChkpDone) = 0;
+  CpvAccess(remoteStarted) = 0;
+  CpvAccess(localStarted) = 0;
+  CpvAccess(_remoteCrashedNode) = -1;
+  CkMemCheckPT::replicaAlive = 1;
+  int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
+  CpvAccess(curPointer)^=1;
+  inCheckpointing = 0;
+  notifyReplica = 0;
+  if(CkMyPe() == 0){
+    CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
+  }
+  CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
+}
+
+void CkMemCheckPT::RollBack(){
+  //restore group data
+  checkpointed = 0;
+  inRestarting = 1;
+  int pointer = CpvAccess(curPointer)^1;//use the previous one
+  CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
+  PUP::fromMem p(chkpMsg->packData);   
+
+  //destroy array elements
+  CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+  int numGroups = CkpvAccess(_groupIDTable)->size();
+  for(int i=0;i<numGroups;i++) {
+    CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
+    IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
+    obj->flushStates();
+    obj->ckJustMigrated();
+  }
+  //restore array elements
+
+  int numElements;
+  p|numElements;
+
+  if(p.isUnpacking()){
+    for(int i=0;i<numElements;i++){
+      CkGroupID gID;
+      CkArrayIndex idx;
+      p|gID;
+      p|idx;
+      CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+      CmiAssert(mgr);
+      mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+    }
+    }
+    CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
+    contribute(cb);
+  }
+
+  void CkMemCheckPT::notifyReplicaDie(int pe){
+    replicaAlive = 0;
+    CpvAccess(_remoteCrashedNode) = pe;
+  }
+
+  void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
+  {
+#if CMK_CHKP_ALL
+    int idx = 1;
+    if(msg->bud1 == CkMyPe()){
+      idx = 0;
+    }
+    int isChkpting = msg->cp_flag;
+    if(isChkpting == 1){
+      if(chkpTable[idx]) delete chkpTable[idx];
+    }
+    chkpTable[idx] = msg;
+    if(isChkpting){
+      recvCount++;
+      if(recvCount == 2){
+        if (where == CkCheckPoint_inMEM) {
+          contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+        }
+        else if (where == CkCheckPoint_inDISK) {
+          // another barrier for finalize the writing using fsync
+          CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+          contribute(0,NULL,CkReduction::sum_int,localcb);
+        }
+        else
+          CmiAbort("Unknown checkpoint scheme");
+        recvCount = 0;
+      }
+    }
+#endif
+  }
+
+  // don't handle array elements
+  static inline void _handleProcData(PUP::er &p, CmiBool create)
+  {
     // save readonlys, and callback BTW
     CkPupROData(p);
 
     // save mainchares 
     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
     // save readonlys, and callback BTW
     CkPupROData(p);
 
     // save mainchares 
     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
-       
+
 #ifndef CMK_CHARE_USE_PTR
     // save non-migratable chare
     CkPupChareData(p);
 #endif
 
     // save groups into Groups.dat
 #ifndef CMK_CHARE_USE_PTR
     // save non-migratable chare
     CkPupChareData(p);
 #endif
 
     // save groups into Groups.dat
-    CkPupGroupData(p);
+    CkPupGroupData(p,create);
 
     // save nodegroups into NodeGroups.dat
 
     // save nodegroups into NodeGroups.dat
-    if(CkMyRank()==0) CkPupNodeGroupData(p);
-}
+    if(CkMyRank()==0) CkPupNodeGroupData(p,create);
+  }
 
 
-void CkMemCheckPT::sendProcData()
-{
-  // find out size of buffer
-  int size;
+  void CkMemCheckPT::sendProcData()
   {
   {
-    PUP::sizer p;
-    _handleProcData(p);
-    size = p.size();
+    // find out size of buffer
+    int size;
+    {
+      PUP::sizer p;
+      _handleProcData(p,CmiTrue);
+      size = p.size();
+    }
+    int packSize = size;
+    CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
+    DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
+    {
+      PUP::toMem p(msg->packData);
+      _handleProcData(p,CmiTrue);
+    }
+    msg->pe = CkMyPe();
+    msg->len = size;
+    msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
+    thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
   }
   }
-  int packSize = size;
-  CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
-  DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
+
+  void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
   {
   {
-    PUP::toMem p(msg->packData);
-    _handleProcData(p);
+    if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
+    CpvAccess(procChkptBuf) = msg;
+    DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
   }
   }
-  msg->pe = CkMyPe();
-  msg->len = size;
-  msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
-  thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
-}
-
-void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
-{
-  if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
-  CpvAccess(procChkptBuf) = msg;
-  DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
-  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
-}
 
 
-// ArrayElement call this function to give us the checkpointed data
-void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
-{
-  int len = ckTable.length();
-  int idx;
-  for (idx=0; idx<len; idx++) {
-    CkCheckPTInfo *entry = ckTable[idx];
-    if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
-  }
-  CkAssert(idx < len);
-  int isChkpting = msg->cp_flag;
-  ckTable[idx]->updateBuffer(msg);
-  if (isChkpting) {
+  // ArrayElement call this function to give us the checkpointed data
+  void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
+  {
+    int len = ckTable.length();
+    int idx;
+    for (idx=0; idx<len; idx++) {
+      CkCheckPTInfo *entry = ckTable[idx];
+      if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
+    }
+    CkAssert(idx < len);
+    int isChkpting = msg->cp_flag;
+    ckTable[idx]->updateBuffer(msg);
+    if (isChkpting) {
       // all my array elements have returned their inmem data
       // inform starter processor that I am done.
       // all my array elements have returned their inmem data
       // inform starter processor that I am done.
-    recvCount ++;
-    if (recvCount == ckTable.length()) {
-      if (where == CkCheckPoint_inMEM) {
-        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-      }
-      else if (where == CkCheckPoint_inDISK) {
-        // another barrier for finalize the writing using fsync
-        CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
-        contribute(0,NULL,CkReduction::sum_int,localcb);
-      }
-      else
-        CmiAbort("Unknown checkpoint scheme");
-      recvCount = 0;
-    } 
+      recvCount ++;
+      if (recvCount == ckTable.length()) {
+        if (where == CkCheckPoint_inMEM) {
+          contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+        }
+        else if (where == CkCheckPoint_inDISK) {
+          // another barrier for finalize the writing using fsync
+          CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
+          contribute(0,NULL,CkReduction::sum_int,localcb);
+        }
+        else
+          CmiAbort("Unknown checkpoint scheme");
+        recvCount = 0;
+      } 
+    }
   }
   }
-}
 
 
-// only used in disk checkpointing
-void CkMemCheckPT::syncFiles(CkReductionMsg *m)
-{
-  delete m;
+  // only used in disk checkpointing
+  void CkMemCheckPT::syncFiles(CkReductionMsg *m)
+  {
+    delete m;
 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
-  system("sync");
+    system("sync");
 #endif
 #endif
-  contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
-}
+    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
+  }
 
 
-// only is called on cpStarter when checkpoint is done
-void CkMemCheckPT::cpFinish()
-{
-  CmiAssert(CkMyPe() == cpStarter);
-  peCount++;
+  // only is called on cpStarter when checkpoint is done
+  void CkMemCheckPT::cpFinish()
+  {
+    CmiAssert(CkMyPe() == cpStarter);
+    peCount++;
     // now that all processors have finished, activate callback
     // now that all processors have finished, activate callback
-  if (peCount == 2) {
-    CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
-    cpCallback.send();
-    peCount = 0;
-    thisProxy.report();
+    if (peCount == 2) 
+    {
+      CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
+      peCount = 0;
+      thisProxy.report();
+    }
   }
   }
-}
 
 
-// for debugging, report checkpoint info
-void CkMemCheckPT::report()
-{
-  int objsize = 0;
-  int len = ckTable.length();
-  for (int i=0; i<len; i++) {
-    CkCheckPTInfo *entry = ckTable[i];
-    CmiAssert(entry);
-    objsize += entry->getSize();
+  // for debugging, report checkpoint info
+  void CkMemCheckPT::report()
+  {
+    CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+    inCheckpointing = 0;
+#if !CMK_CHKP_ALL
+    int objsize = 0;
+    int len = ckTable.length();
+    for (int i=0; i<len; i++) {
+      CkCheckPTInfo *entry = ckTable[i];
+      CmiAssert(entry);
+      objsize += entry->getSize();
+    }
+    CmiAssert(CpvAccess(procChkptBuf));
+    //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
+#else
+    if(CkMyPe()==0)
+      CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
+#endif
   }
   }
-  CmiAssert(CpvAccess(procChkptBuf));
-//  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
-}
 
 
-/*****************************************************************************
-                       RESTART Procedure
-*****************************************************************************/
+  /*****************************************************************************
+    RESTART Procedure
+   *****************************************************************************/
 
 
-// master processor of two buddies
-inline int CkMemCheckPT::isMaster(int buddype)
-{
+  // master processor of two buddies
+  inline int CkMemCheckPT::isMaster(int buddype)
+  {
 #if 0
 #if 0
-  int mype = CkMyPe();
-//CkPrintf("ismaster: %d %d\n", pe, mype);
-  if (CkNumPes() - totalFailed() == 2) {
-    return mype > buddype;
-  }
-  for (int i=1; i<CkNumPes(); i++) {
-    int me = (buddype+i)%CkNumPes();
-    if (isFailed(me)) continue;
-    if (me == mype) return 1;
-    else return 0;
-  }
-  return 0;
+    int mype = CkMyPe();
+    //CkPrintf("ismaster: %d %d\n", pe, mype);
+    if (CkNumPes() - totalFailed() == 2) {
+      return mype > buddype;
+    }
+    for (int i=1; i<CkNumPes(); i++) {
+      int me = (buddype+i)%CkNumPes();
+      if (isFailed(me)) continue;
+      if (me == mype) return 1;
+      else return 0;
+    }
+    return 0;
 #else
     // smaller one
 #else
     // smaller one
-  int mype = CkMyPe();
-//CkPrintf("ismaster: %d %d\n", pe, mype);
-  if (CkNumPes() - totalFailed() == 2) {
-    return mype < buddype;
-  }
+    int mype = CkMyPe();
+    //CkPrintf("ismaster: %d %d\n", pe, mype);
+    if (CkNumPes() - totalFailed() == 2) {
+      return mype < buddype;
+    }
 #if NODE_CHECKPOINT
 #if NODE_CHECKPOINT
-  int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
-  for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
+    int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
+    for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
 #else
 #else
-  for (int i=1; i<CkNumPes(); i++) {
+      for (int i=1; i<CkNumPes(); i++) {
 #endif
 #endif
-    int me = (mype+i)%CkNumPes();
-    if (isFailed(me)) continue;
-    if (me == buddype) return 1;
-    else return 0;
-  }
-  return 0;
+        int me = (mype+i)%CkNumPes();
+        if (isFailed(me)) continue;
+        if (me == buddype) return 1;
+        else return 0;
+      }
+      return 0;
 #endif
 #endif
-}
+    }
 
 
-#ifdef CKLOCMGR_LOOP
-#undef CKLOCMGR_LOOP
-#endif
 
 
-// loop over all CkLocMgr and do "code"
-#define  CKLOCMGR_LOOP(code)   {       \
-  int numGroups = CkpvAccess(_groupIDTable)->size();   \
-  for(int i=0;i<numGroups;i++) {       \
-    IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();   \
-    if(obj->isLocMgr())  {     \
-      CkLocMgr *mgr = (CkLocMgr*)obj;  \
-      code     \
-    }  \
-  }    \
- }
 
 #if 0
 
 #if 0
-// helper class to pup all elements that belong to same ckLocMgr
-class ElementDestoryer : public CkLocIterator {
-private:
+    // helper class to pup all elements that belong to same ckLocMgr
+    class ElementDestoryer : public CkLocIterator {
+      private:
         CkLocMgr *locMgr;
         CkLocMgr *locMgr;
-public:
+      public:
         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
         void addLocation(CkLocation &loc) {
         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
         void addLocation(CkLocation &loc) {
-               CkArrayIndex idx=loc.getIndex();
-               CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
-               loc.destroy();
+          CkArrayIndex idx=loc.getIndex();
+          CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
+          loc.destroy();
         }
         }
-};
+    };
 #endif
 
 #endif
 
-// restore the bitmap vector for LB
-void CkMemCheckPT::resetLB(int diepe)
-{
+    // restore the bitmap vector for LB
+    void CkMemCheckPT::resetLB(int diepe)
+    {
 #if CMK_LBDB_ON
 #if CMK_LBDB_ON
-  int i;
-  char *bitmap = new char[CkNumPes()];
-  // set processor available bitmap
-  get_avail_vector(bitmap);
-
-  for (i=0; i<failedPes.length(); i++)
-    bitmap[failedPes[i]] = 0; 
-  bitmap[diepe] = 0;
+      int i;
+      char *bitmap = new char[CkNumPes()];
+      // set processor available bitmap
+      get_avail_vector(bitmap);
+      for (i=0; i<failedPes.length(); i++)
+        bitmap[failedPes[i]] = 0; 
+      bitmap[diepe] = 0;
 
 #if CK_NO_PROC_POOL
 
 #if CK_NO_PROC_POOL
-  set_avail_vector(bitmap);
+      set_avail_vector(bitmap);
 #endif
 
 #endif
 
-  // if I am the crashed pe, rebuild my failedPEs array
-  if (CkMyNode() == diepe)
-    for (i=0; i<CkNumPes(); i++) 
-      if (bitmap[i]==0) failed(i);
+      // if I am the crashed pe, rebuild my failedPEs array
+      if (CkMyNode() == diepe)
+        for (i=0; i<CkNumPes(); i++) 
+          if (bitmap[i]==0) failed(i);
 
 
-  delete [] bitmap;
+      delete [] bitmap;
 #endif
 #endif
-}
-
-// in case when failedPe dies, everybody go through its checkpoint table:
-// destory all array elements
-// recover lost buddies
-// reconstruct all array elements from check point data
-// called on all processors
-void CkMemCheckPT::restart(int diePe)
-{
-#if CMK_MEM_CHECKPOINT
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == diePe)
-    CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
-  stage = (char*)"resetLB";
-  startTime = curTime;
-  if (CkMyPe() == diePe)
-    CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
-
-#if CK_NO_PROC_POOL
-  failed(diePe);       // add into the list of failed pes
-#endif
-  thisFailedPe = diePe;
+    }
 
 
-  if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
+    static double restartT;
 
 
-  inRestarting = 1;
-                                                                                
-  // disable load balancer's barrier
-  if (CkMyPe() != diePe) resetLB(diePe);
+    // in case when failedPe dies, everybody go through its checkpoint table:
+    // destory all array elements
+    // recover lost buddies
+    // reconstruct all array elements from check point data
+    // called on all processors
+    void CkMemCheckPT::restart(int diePe)
+    {
+#if CMK_MEM_CHECKPOINT
+      thisFailedPe = diePe;
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe){
+        restartT = CmiWallTimer();
+        CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
+      }
+      stage = (char*)"resetLB";
+      startTime = curTime;
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
 
 
-  CKLOCMGR_LOOP(mgr->startInserting(););
+//#if CK_NO_PROC_POOL
+//      failed(diePe); // add into the list of failed pes
+//#endif
 
 
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-/*
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
-*/
-#endif
-}
+//      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
 
 
-// loally remove all array elements
-void CkMemCheckPT::removeArrayElements()
-{
-#if CMK_MEM_CHECKPOINT
-  int len = ckTable.length();
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe) 
-    CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
-  stage = (char*)"removeArrayElements";
-  startTime = curTime;
+      inRestarting = 1;
 
 
-  if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
-  if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
+      // disable load balancer's barrier
+      //if (CkMyPe() != diePe) resetLB(diePe);
 
 
-  // get rid of all buffering and remote recs
-  // including destorying all array elements
-  CKLOCMGR_LOOP(mgr->flushAllRecs(););
+      CKLOCMGR_LOOP(mgr->startInserting(););
 
 
-//  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
 
 
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+      if(CmiNumPartition()==1){
+        barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
+      }else{
+        CKLOCMGR_LOOP(mgr->flushLocalRecs(););
+        barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+      }
 #endif
 #endif
-}
-
-// flush state in reduction manager
-void CkMemCheckPT::resetReductionMgr()
-{
-  //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
-  int numGroups = CkpvAccess(_groupIDTable)->size();
-  for(int i=0;i<numGroups;i++) {
-    CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
-    IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
-    obj->flushStates();
-    obj->ckJustMigrated();
-  }
-  // reset again
-  //CpvAccess(_qd)->flushStates();
+    }
 
 
-#if 1
-  //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
-  barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+    // loally remove all array elements
+    void CkMemCheckPT::removeArrayElements()
+    {
+#if CMK_MEM_CHECKPOINT
+      int len = ckTable.length();
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe) 
+        CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
+      stage = (char*)"removeArrayElements";
+      startTime = curTime;
+
+      //  if (cpCallback.isInvalid()) 
+      //         CkPrintf("invalid pe %d\n",CkMyPe());
+      //         CkAbort("Didn't set restart callback\n");;
+      if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
+
+      // get rid of all buffering and remote recs
+      // including destorying all array elements
+#if CK_NO_PROC_POOL  
+      CKLOCMGR_LOOP(mgr->flushAllRecs(););
 #else
 #else
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+      CKLOCMGR_LOOP(mgr->flushLocalRecs(););
 #endif
 #endif
-}
+      barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
+#endif
+    }
 
 
-// recover the lost buddies
-void CkMemCheckPT::recoverBuddies()
-{
-  int idx;
-  int len = ckTable.length();
-  // ready to flush reduction manager
-  // cannot be CkMemCheckPT::restart because destory will modify states
-  double curTime = CmiWallTimer();
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
-  stage = (char *)"recoverBuddies";
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
-  startTime = curTime;
-
-  // recover buddies
-  expectCount = 0;
-  for (idx=0; idx<len; idx++) {
-    CkCheckPTInfo *entry = ckTable[idx];
-    if (entry->pNo == thisFailedPe) {
+    // flush state in reduction manager
+    void CkMemCheckPT::resetReductionMgr()
+    {
+      if (CkMyPe() == thisFailedPe) 
+        CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
+      stage = (char *)"resetReductionMgr";
+      int numGroups = CkpvAccess(_groupIDTable)->size();
+      for(int i=0;i<numGroups;i++) {
+        CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
+        IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
+        obj->flushStates();
+        obj->ckJustMigrated();
+      }
+      // reset again
+      //CpvAccess(_qd)->flushStates();
+      if(CmiNumPartition()==1){
+        barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
+      }
+      else{
+       if (CkMyPe() == thisFailedPe) 
+         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
+        barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+    }
+
+    // recover the lost buddies
+    void CkMemCheckPT::recoverBuddies()
+    {
+      int idx;
+      int len = ckTable.length();
+      // ready to flush reduction manager
+      // cannot be CkMemCheckPT::restart because destory will modify states
+      double curTime = CmiWallTimer();
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
+      stage = (char *)"recoverBuddies";
+      if (CkMyPe() == thisFailedPe)
+        CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
+      startTime = curTime;
+
+      // recover buddies
+      expectCount = 0;
+#if !CMK_CHKP_ALL
+      for (idx=0; idx<len; idx++) {
+        CkCheckPTInfo *entry = ckTable[idx];
+        if (entry->pNo == thisFailedPe) {
 #if CK_NO_PROC_POOL
 #if CK_NO_PROC_POOL
-      // find a new buddy
-/*
-      int budPe = CkMyPe();
-//      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
-      while (budPe == CkMyPe() || isFailed(budPe)) 
-          budPe = (budPe+1)%CkNumPes();
-*/
-      int budPe = BuddyPE(CkMyPe());
-      //entry->pNo = budPe;
+          // find a new buddy
+          int budPe = BuddyPE(CkMyPe());
 #else
 #else
-      int budPe = thisFailedPe;
+          int budPe = thisFailedPe;
 #endif
 #endif
-      entry->updateBuddy(CkMyPe(), budPe);
-#if 0
-      thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
-      CkArrayCheckPTMessage *msg = entry->getCopy();
-      msg->cp_flag = 0;            // not checkpointing
-      thisProxy[budPe].recvData(msg);
+          CkArrayCheckPTMessage *msg = entry->getCopy();
+          msg->bud1 = budPe;
+          msg->bud2 = CkMyPe();
+          msg->cp_flag = 0;            // not checkpointing
+          thisProxy[budPe].recoverEntry(msg);
+          expectCount ++;
+        }
+      }
+#else
+      //send to failed pe
+      if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
+#if CK_NO_PROC_POOL
+        // find a new buddy
+        int budPe = BuddyPE(CkMyPe());
 #else
 #else
-      CkArrayCheckPTMessage *msg = entry->getCopy();
-      msg->bud1 = budPe;
-      msg->bud2 = CkMyPe();
-      msg->cp_flag = 0;            // not checkpointing
-      thisProxy[budPe].recoverEntry(msg);
+        int budPe = thisFailedPe;
+#endif
+        CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
+        CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
+        msg->cp_flag = 0;            // not checkpointing
+        msg->bud1 = budPe;
+        msg->bud2 = CkMyPe();
+        thisProxy[budPe].recoverEntry(msg);
+        expectCount ++;
+      }
 #endif
 #endif
-      expectCount ++;
+
+      if (expectCount == 0) {
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+      //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
     }
     }
-  }
 
 
-#if 1
-  if (expectCount == 0) {
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
-    //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-  }
-#else
-  if (CkMyPe() == 0) {
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-  }
-#endif
+    void CkMemCheckPT::gotData()
+    {
+      ackCount ++;
+      if (ackCount == expectCount) {
+        ackCount = 0;
+        expectCount = -1;
+        //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
+      }
+    }
 
 
-  //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
-}
+    void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
+    {
 
 
-void CkMemCheckPT::gotData()
-{
-  ackCount ++;
-  if (ackCount == expectCount) {
-    ackCount = 0;
-    expectCount = -1;
-    //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
-    contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
-  }
-}
+      for (int i=0; i<n; i++) {
+        CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
+        mgr->updateLocation(idx[i], nowOnPe);
+      }
+      thisProxy[nowOnPe].gotReply();
+    }
 
 
-void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
-{
-  for (int i=0; i<n; i++) {
-    CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
-    mgr->updateLocation(idx[i], nowOnPe);
-  }
-}
+    // restore array elements
+    void CkMemCheckPT::recoverArrayElements()
+    {
+      double curTime = CmiWallTimer();
+      int len = ckTable.length();
+      CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
+      stage = (char *)"recoverArrayElements";
+      startTime = curTime;
+      int flag = 0;
+      // recover all array elements
+      int count = 0;
+
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
+      CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
+      CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
+#endif
 
 
-// restore array elements
-void CkMemCheckPT::recoverArrayElements()
-{
-  double curTime = CmiWallTimer();
-  int len = ckTable.length();
-  //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
-  stage = (char *)"recoverArrayElements";
-  if (CkMyPe() == thisFailedPe)
-  CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
-  startTime = curTime;
-
-  // recover all array elements
-  int count = 0;
-#if STREAMING_INFORMHOME
-  CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
-  CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
-#endif
-  for (int idx=0; idx<len; idx++)
-  {
-    CkCheckPTInfo *entry = ckTable[idx];
+#if !CMK_CHKP_ALL
+      for (int idx=0; idx<len; idx++)
+      {
+        CkCheckPTInfo *entry = ckTable[idx];
 #if CK_NO_PROC_POOL
 #if CK_NO_PROC_POOL
-    // the bigger one will do 
-//    if (CkMyPe() < entry->pNo) continue;
-    if (!isMaster(entry->pNo)) continue;
+        // the bigger one will do 
+        //    if (CkMyPe() < entry->pNo) continue;
+        if (!isMaster(entry->pNo)) continue;
 #else
 #else
-    // smaller one do it, which has the original object
-    if (CkMyPe() == entry->pNo+1 || 
-        CkMyPe()+CkNumPes() == entry->pNo+1) continue;
-#endif
-//CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
-
-    entry->updateBuddy(CkMyPe(), entry->pNo);
-    CkArrayCheckPTMessage *msg = entry->getCopy();
-    // gzheng
-    //thisProxy[CkMyPe()].inmem_restore(msg);
-    inmem_restore(msg);
-#if STREAMING_INFORMHOME
-    CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
-    int homePe = mgr->homePe(msg->index);
-    if (homePe != CkMyPe()) {
-      gmap[homePe].push_back(msg->locMgr);
-      imap[homePe].push_back(msg->index);
-    }
-#endif
-    CkFreeMsg(msg);
-    count ++;
-  }
-#if STREAMING_INFORMHOME
-  for (int i=0; i<CkNumPes(); i++) {
-    if (gmap[i].size() && i!=CkMyPe()) {
-      thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
-    }
-  }
-  delete [] imap;
-  delete [] gmap;
+        // smaller one do it, which has the original object
+        if (CkMyPe() == entry->pNo+1 || 
+            CkMyPe()+CkNumPes() == entry->pNo+1) continue;
+#endif
+        //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
+
+        entry->updateBuddy(CkMyPe(), entry->pNo);
+        CkArrayCheckPTMessage *msg = entry->getCopy();
+        // gzheng
+        //thisProxy[CkMyPe()].inmem_restore(msg);
+        inmem_restore(msg);
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
+        CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
+        int homePe = mgr->homePe(msg->index);
+        if (homePe != CkMyPe()) {
+          gmap[homePe].push_back(msg->locMgr);
+          imap[homePe].push_back(msg->index);
+        }
 #endif
 #endif
-  DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+        CkFreeMsg(msg);
+        count ++;
+      }
+#else
+      char * packData;
+      if(CmiNumPartition()==1){
+        CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
+        packData = (char *)msg->packData;
+      }
+      else{
+        int pointer = CpvAccess(curPointer);
+        CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        packData = msg->packData;
+      }
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
+      recoverAll(packData,gmap,imap);
+#else
+      recoverAll(packData);
+#endif
+#endif
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
+      for (int i=0; i<CkNumPes(); i++) {
+        if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
+          thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
+          flag++;      
+        }
+      }
+      delete [] imap;
+      delete [] gmap;
+#endif
+      DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
+      //if (CkMyPe() == thisFailedPe)
 
 
-  CKLOCMGR_LOOP(mgr->doneInserting(););
+      CKLOCMGR_LOOP(mgr->doneInserting(););
 
 
-  inRestarting = 0;
-  // _crashedNode = -1;
-  CpvAccess(_crashedNode) = -1;
+      // _crashedNode = -1;
+      CpvAccess(_crashedNode) = -1;
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
+      if (CkMyPe() == 0)
+        CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
+#else
+      if(flag == 0)
+      {
+        contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
+      }
+#endif
+    }
 
 
-  if (CkMyPe() == 0)
-    CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
-}
+    void CkMemCheckPT::gotReply(){
+      contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
+    }
 
 
-static double restartT;
+    void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
+#if CMK_CHKP_ALL
+      PUP::fromMem p(packData);
+      int numElements;
+      p|numElements;
+      if(p.isUnpacking()){
+        for(int i=0;i<numElements;i++){
+          CkGroupID gID;
+          CkArrayIndex idx;
+          p|gID;
+          p|idx;
+          CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+          int homePe = mgr->homePe(idx);
+#if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
+          mgr->resume(idx,p,CmiTrue,CmiTrue);
+#else
+          if(CmiNumPartition()==1)
+            mgr->resume(idx,p,CmiFalse,CmiTrue);       
+          else{
+            if(CkMyPe()==thisFailedPe){
+              mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+            }
+            else{
+              mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
+            }
+          }
+#endif
+#if STREAMING_INFORMHOME && CK_NO_PROC_POOL
+          homePe = mgr->homePe(idx);
+          if (homePe != CkMyPe()) {
+            gmap[homePe].push_back(gID);
+            imap[homePe].push_back(idx);
+          }
+#endif
+        }
+      }
+#endif
+    }
 
 
-// on every processor
-// turn load balancer back on
-void CkMemCheckPT::finishUp()
-{
-  //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
-  //CKLOCMGR_LOOP(mgr->doneInserting(););
-  
-  if (CkMyPe() == thisFailedPe)
-  {
-       CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
-       //CkStartQD(cpCallback);
-       cpCallback.send();
-       CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
-  }
+
+    // on every processor
+    // turn load balancer back on
+    void CkMemCheckPT::finishUp()
+    {
+      //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
+      //CKLOCMGR_LOOP(mgr->doneInserting(););
+
+      if (CkMyPe() == thisFailedPe)
+      {
+        CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
+        CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
+        fflush(stdout);
+      }
+      CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+      inRestarting = 0;
+      maxIter = -1;
+#if CMK_CONVERSE_MPI   
+      if(CmiNumPartition()!=1){
+        CpvAccess(recvdProcChkp) = 0;
+        CpvAccess(recvdArrayChkp) = 0;
+        CpvAccess(curPointer)^=1;
+        //notify my replica, restart is done
+        if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg,replicaRecoverHandlerIdx);
+          CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
+        }
+      }
+      if (CmiMyPe() == BuddyPE(thisFailedPe)) {
+        lastPingTime = CmiWallTimer();
+        CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
+      }
+#endif
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
 
 #if CK_NO_PROC_POOL
 #if NODE_CHECKPOINT
-  int numnodes = CmiNumPhysicalNodes();
+      int numnodes = CmiNumPhysicalNodes();
 #else
 #else
-  int numnodes = CkNumPes();
+      int numnodes = CkNumPes();
 #endif
 #endif
-  if (numnodes-totalFailed() <=2) {
-    if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
-    _memChkptOn = 0;
-  }
+      if (numnodes-totalFailed() <=2) {
+        if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
+        _memChkptOn = 0;
+      }
 #endif
 #endif
-}
+    }
 
 
-// called only on 0
-void CkMemCheckPT::quiescence(CkCallback &cb)
-{
-  static int pe_count = 0;
-  pe_count ++;
-  CmiAssert(CkMyPe() == 0);
-  //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
-  if (pe_count == CkNumPes()) {
-    pe_count = 0;
-    cb.send();
-  }
-}
+    void CkMemCheckPT::recoverFromSoftFailure()
+    {
+      inRestarting = 0;
+      maxIter = -1;
+      CpvAccess(recvdRemote) = 0;
+      CpvAccess(recvdLocal) = 0;
+      CpvAccess(localChkpDone) = 0;
+      CpvAccess(remoteChkpDone) = 0;
+      CpvAccess(remoteReady) = 0;
+      CpvAccess(localReady) = 0;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      CpvAccess(remoteStarted) = 0;
+      CpvAccess(localStarted) = 0;
+      CpvAccess(_remoteCrashedNode) = -1;
+      CkMemCheckPT::replicaAlive = 1;
+      inCheckpointing = 0;
+      notifyReplica = 0;
+      if(CkMyPe() == 0){
+        CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
+      }
+      CKLOCMGR_LOOP(mgr->resumeFromChkp(););
+    }
+    // called only on 0
+    void CkMemCheckPT::quiescence(CkCallback &cb)
+    {
+      static int pe_count = 0;
+      pe_count ++;
+      CmiAssert(CkMyPe() == 0);
+      //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
+      if (pe_count == CkNumPes()) {
+        pe_count = 0;
+        cb.send();
+      }
+    }
 
 
-// User callable function - to start a checkpoint
-// callback cb is used to pass control back
-void CkStartMemCheckpoint(CkCallback &cb)
-{
+    // User callable function - to start a checkpoint
+    // callback cb is used to pass control back
+    void CkStartMemCheckpoint(CkCallback &cb)
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  if (_memChkptOn == 0) {
-    CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
-    cb.send();
-    return;
-  }
-  if (CkInRestarting()) {
-      // trying to checkpointing during restart
-    cb.send();
-    return;
-  }
-    // store user callback and user data
-  CkMemCheckPT::cpCallback = cb;
+      CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
+      /*if (_memChkptOn == 0) {
+        CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
+        cb.send();
+        return;
+        }*/
+      if (CkInRestarting()) {
+        // trying to checkpointing during restart
+        cb.send();
+        return;
+      }
+      // store user callback and user data
+      CkMemCheckPT::cpCallback = cb;
 
 
-    // broadcast to start check pointing
-  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-  checkptMgr.doItNow(CkMyPe(), cb);
+
+      //send to my replica that checkpoint begins 
+      if(CkReplicaAlive()==1){
+        char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, replicaChkpStartHandlerIdx);
+        CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
+      }
+      CpvAccess(localStarted) = 1;
+      // broadcast to start check pointing
+      if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(CkMyPe());
+      }
 #else
 #else
-  // when mem checkpoint is disabled, invike cb immediately
-  CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
-  cb.send();
+      // when mem checkpoint is disabled, invike cb immediately
+      CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
+      cb.send();
 #endif
 #endif
-}
+    }
 
 
-void CkRestartCheckPoint(int diePe)
-{
-CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
-  CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
-  // broadcast
-  checkptMgr.restart(diePe);
-}
+    void CkRestartCheckPoint(int diePe)
+    {
+      CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
+      CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+      // broadcast
+      checkptMgr.restart(diePe);
+    }
 
 
-static int _diePE = -1;
+    static int _diePE = -1;
 
 
-// callback function used locally by ccs handler
-static void CkRestartCheckPointCallback(void *ignore, void *msg)
-{
-CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
-  CkRestartCheckPoint(_diePE);
-}
+    // callback function used locally by ccs handler
+    static void CkRestartCheckPointCallback(void *ignore, void *msg)
+    {
+      CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
+      CkRestartCheckPoint(_diePE);
+    }
 
 
-// Converse function handles
-static int askPhaseHandlerIdx;
-static int recvPhaseHandlerIdx;
-static int askProcDataHandlerIdx;
-static int restartBcastHandlerIdx;
-static int recoverProcDataHandlerIdx;
-static int restartBeginHandlerIdx;
-static int notifyHandlerIdx;
 
 
-// called on crashed PE
-static void restartBeginHandler(char *msg)
-{
+    // called on crashed PE
+    static void restartBeginHandler(char *msg)
+    {
+      CmiFree(msg);
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  CmiFree(msg);
-  static int count = 0;
-  CmiAssert(CkMyPe() == _diePE);
-  count ++;
-  if (count == CkNumPes()) {
-    CkRestartCheckPointCallback(NULL, NULL);
-    count = 0;
-  }
+#if CMK_USE_BARRIER
+      if(CkMyPe()!=_diePE){
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+      }else{
+        CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
+        CkRestartCheckPointCallback(NULL, NULL);
+      }
+#else
+      static int count = 0;
+      CmiAssert(CkMyPe() == _diePE);
+      count ++;
+      if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
+        printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
+        CkRestartCheckPointCallback(NULL, NULL);
+        count = 0;
+      }
 #endif
 #endif
-}
+#endif
+    }
 
 
-extern void _discard_charm_message();
-extern void _resume_charm_message();
+    extern void _discard_charm_message();
+    extern void _resume_charm_message();
 
 
-static void * doNothingMsg(int * size, void * data, void ** remote, int count){
-       return data;
-}
+    static void * doNothingMsg(int * size, void * data, void ** remote, int count){
+      return data;
+    }
 
 
-static void restartBcastHandler(char *msg)
-{
+    static void restartBcastHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  // advance phase counter
-  CkMemCheckPT::inRestarting = 1;
-  _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
-  // gzheng
-  //if (CkMyPe() != _diePE) cur_restart_phase ++;
-
-  if (CkMyPe()==_diePE)
-    CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
-
-  // reset QD counters
-/*  gzheng
-  if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
-*/
-
-/*  gzheng
-  if (CkMyPe()==_diePE)
-      CkRestartCheckPointCallback(NULL, NULL);
-*/
-  CmiFree(msg);
-
-  _resume_charm_message();
-
-    // reduction
-  char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-  CmiSetHandler(restartmsg, restartBeginHandlerIdx);
-  CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
-  checkpointed = 0;
+      // advance phase counter
+      CkMemCheckPT::inRestarting = 1;
+      _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      // gzheng
+      //if (CkMyPe() != _diePE) cur_restart_phase ++;
+
+      if (CkMyPe()==_diePE)
+        CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
+
+      // reset QD counters
+      /*  gzheng
+          if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
+       */
+
+      /*  gzheng
+          if (CkMyPe()==_diePE)
+          CkRestartCheckPointCallback(NULL, NULL);
+       */
+      CmiFree(msg);
+
+      _resume_charm_message();
+
+      // reduction
+      char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+      CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+      CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+      CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
+      checkpointed = 0;
 #endif
 #endif
-}
+    }
 
 
-extern void _initDone();
+    extern void _initDone();
 
 
-// called on crashed processor
-static void recoverProcDataHandler(char *msg)
-{
-#if CMK_MEM_CHECKPOINT
-   int i;
-   envelope *env = (envelope *)msg;
-   CkUnpackMessage(&env);
-   CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
-   CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
-   CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
-   //cur_restart_phase ++;
-     // gzheng ?
-   //CpvAccess(_qd)->flushStates();
-
-   // restore readonly, mainchare, group, nodegroup
-//   int temp = cur_restart_phase;
-//   cur_restart_phase = -1;
-   PUP::fromMem p(procMsg->packData);
-   _handleProcData(p);
-
-   CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
-   // gzheng
-   CKLOCMGR_LOOP(mgr->startInserting(););
-
-   char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
-   CmiSetHandler(reqmsg, restartBcastHandlerIdx);
-   CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
-
-   _initDone();
-//   CpvAccess(_qd)->flushStates();
-   CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
+    bool compare(char * buf1, char *buf2){
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      PUP::checker pchecker(buf1,buf2);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
+
+        pchecker|gID;
+        pchecker|idx;
+
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+      }
+      if(CkMyPe()==0)
+        CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+      //int fault_num = pchecker.getFaultNum();
+      bool result = pchecker.getResult();
+      /*if(!result){
+        CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
+      }*/
+      return result;
+    }
+    int getChecksum(char * buf){
+      PUP::checker pchecker(buf);
+      pchecker.skip();
+      int numElements;
+      pchecker|numElements;
+//      CkPrintf("num %d\n",numElements);
+      for(int i=0;i<numElements;i++){
+        CkGroupID gID;
+        CkArrayIndex idx;
+
+        pchecker|gID;
+        pchecker|idx;
+
+        CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
+        mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
+      }
+      return pchecker.getChecksum();
+    }
+
+    static void recvRemoteChkpHandler(char *msg){
+      CpvAccess(remoteChkpDone) = 1;
+      if(CpvAccess(use_checksum)){
+        if(CkMyPe()==0)
+          CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
+        CpvAccess(recvdRemote) = 1;
+        if(CpvAccess(recvdLocal)==1){
+          if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }
+          else{
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+        }
+      }else{
+        envelope *env = (envelope *)msg;
+        CkUnpackMessage(&env);
+        CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+        if(CpvAccess(recvdLocal)==1){
+          int pointer = CpvAccess(curPointer);
+          int size = CpvAccess(chkpBuf)[pointer]->len;
+          if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
+          }else
+          {
+            CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
+          }
+          delete chkpMsg;
+          if(CkMyPe()==0)
+            CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
+        }else{
+          CpvAccess(recvdRemote) = 1;
+          if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
+          CpvAccess(buddyBuf) = chkpMsg;
+        }
+      }
+    }
+
+    static void replicaRecoverHandler(char *msg){
+      //fflush(stdout);
+      //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
+      CpvAccess(remoteChkpDone) = 1;
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
+      CmiFree(msg);
+    }
+
+    static void replicaChkpDoneHandler(char *msg){
+      CpvAccess(remoteChkpDone) = 1;
+      int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      if(CpvAccess(localChkpDone) == 1)
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
+      CmiFree(msg);
+    }
+
+    static void replicaDieHandler(char * msg){
+#if CMK_CONVERSE_MPI
+      //broadcast to every one in my replica
+      CmiSetHandler(msg, replicaDieBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
 #endif
 #endif
-}
+    }
 
 
-// called on its backup processor
-// get backup message buffer and sent to crashed processor
-static void askProcDataHandler(char *msg)
-{
+    static void replicaChkpStartHandler(char * msg){
+      CpvAccess(remoteStarted) =1;
+      if(CpvAccess(localStarted)==1){    
+        CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
+        checkptMgr.doItNow(0);
+      }
+    }
+
+
+    static void replicaDieBcastHandler(char *msg){
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      CpvAccess(_remoteCrashedNode) = diePe;
+      if(CkMyPe()==diePe){
+        CmiPrintf("pe %d in replicad word die\n",diePe);
+        CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+        fflush(stdout);
+      }
+      if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
+        find_spare_mpirank(diePe,CmiMyPartition()^1);
+      }
+      //broadcast to my partition to get local max iter
+      if(CpvAccess(resilience)!=1){
+        CkMemCheckPT::replicaAlive = 0;
+        CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
+      }
+      CmiFree(msg);
+    }
+
+    static void recoverRemoteProcDataHandler(char *msg){
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+
+      //store the checkpoint
+      int pointer = procMsg->pointer;
+
+
+      if(CkMyPe()==CpvAccess(_crashedNode)){
+        if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+        CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+        PUP::fromMem p(procMsg->packData);
+        _handleProcData(p,CmiTrue);
+        _initDone();
+      }
+      else{
+        if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
+        CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
+        //_handleProcData(p,CmiFalse);
+      }
+      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+      CKLOCMGR_LOOP(mgr->startInserting(););
+
+      CpvAccess(recvdProcChkp) =1;
+      if(CpvAccess(recvdArrayChkp)==1){
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
+      }
+    }
+
+    static void recoverRemoteArrayDataHandler(char *msg){
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
+
+      //store the checkpoint
+      int pointer = chkpMsg->pointer;
+      CpvAccess(curPointer) = pointer;
+      if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
+      CpvAccess(chkpBuf)[pointer] = chkpMsg;
+      CpvAccess(recvdArrayChkp) =1;
+      CkMemCheckPT::inRestarting = 1;
+      if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+        //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
+        char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(restartmsg, restartBeginHandlerIdx);
+        //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#if CMK_USE_BARRIER
+      //CmiPrintf("before reduce\n");  
+       if(CpvAccess(resilience)==1){
+         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+       }else
+         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
+      //CmiPrintf("after reduce\n");   
+#else
+       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
+#endif 
+      }
+    }
+
+    static void recvPhaseHandler(char * msg)
+    {
+      CpvAccess(_curRestartPhase)--;
+      CkMemCheckPT::inRestarting = 1;
+      CmiFree(msg);
+      //notify the buddy in the replica now i can receive the checkpoint msg
+      if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
+        char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
+        //timer start
+        if(CpvAccess(_crashedNode)==CkMyPe())
+          CkMemCheckPT::startTime = restartT = CmiWallTimer();
+        if(CpvAccess(_crashedNode)==CmiMyPe())
+          CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      }else{
+        CpvAccess(curPointer)^=1;
+        CkMemCheckPT::inRestarting = 1;
+        _resume_charm_message();
+        _diePE = CpvAccess(_crashedNode);
+      }
+    }
+    
+    static void askRecoverDataHandler(char * msg){
+      if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+       CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
+      if(CpvAccess(resilience)!=1){
+        CpvAccess(remoteReady)=1;
+        if(CpvAccess(localReady)==1){
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+          {    
+            envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
+            CkPackMessage(&env);
+            CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+            CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+            CmiPrintf("[%d] sendProcdata after request at \n",CmiMyPe(),CmiWallTimer());
+          }
+          //send the array checkpoint data
+          envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
+          CkPackMessage(&env);
+          CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
+          CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+          if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
+            CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+        }
+      }else{
+        find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
+        int pointer = CpvAccess(curPointer)^1;
+        CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
+        procMsg->pointer = pointer;
+        envelope * env = (envelope *)(UsrToEnv(procMsg));
+        CkPackMessage(&env);
+        CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
+        CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
+        
+        CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
+        arrayMsg->pointer = pointer;
+        envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
+        CkPackMessage(&env1);
+        CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
+        CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
+        CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
+      }
+    }
+    // called on crashed processor
+    static void recoverProcDataHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-    int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-    CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
-    if (CpvAccess(procChkptBuf) == NULL)  {
-      CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
-      CkAbort("no checkpoint found");
+      int i;
+      envelope *env = (envelope *)msg;
+      CkUnpackMessage(&env);
+      CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
+      CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
+      CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
+      PUP::fromMem p(procMsg->packData);
+      _handleProcData(p,CmiTrue);
+
+      CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
+      // gzheng
+      CKLOCMGR_LOOP(mgr->startInserting(););
+
+      char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+      *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
+      CmiSetHandler(reqmsg, restartBcastHandlerIdx);
+      CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
+
+      _initDone();
+      //   CpvAccess(_qd)->flushStates();
+      CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
+#endif
     }
     }
-    envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
-    CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
+    //for replica, got the phase number from my neighbor processor in the current partition
+    static void askPhaseHandler(char *msg)
+    {
+#if CMK_MEM_CHECKPOINT
+      CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
+      CmiSetHandler(msg, recvPhaseHandlerIdx);
+      CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+#endif
+    }
+    // called on its backup processor
+    // get backup message buffer and sent to crashed processor
+    static void askProcDataHandler(char *msg)
+    {
+#if CMK_MEM_CHECKPOINT
+      int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
+      if (CpvAccess(procChkptBuf) == NULL)  {
+        CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
+        CkAbort("no checkpoint found");
+      }
+      envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
+      CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
 
 
-    CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
+      CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
 
 
-    CkPackMessage(&env);
-    CmiSetHandler(env, recoverProcDataHandlerIdx);
-    CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
-    CpvAccess(procChkptBuf) = NULL;
+      CkPackMessage(&env);
+      CmiSetHandler(env, recoverProcDataHandlerIdx);
+      CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
+      CpvAccess(procChkptBuf) = NULL;
+      CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
 #endif
 #endif
-}
+    }
 
 
-// called on PE 0
-void qd_callback(void *m)
-{
-   CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
-   CkFreeMsg(m);
+    // called on PE 0
+    void qd_callback(void *m)
+    {
+      CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
+      fflush(stdout);
+      CkFreeMsg(m);
+      if(CmiNumPartition()==1){
 #ifdef CMK_SMP
 #ifdef CMK_SMP
-   for(int i=0;i<CmiMyNodeSize();i++){
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
-       CmiSetHandler(msg, askProcDataHandlerIdx);
-       int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
-       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-   }
-   return;
-#endif
-   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-   *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
-   // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
-   CmiSetHandler(msg, askProcDataHandlerIdx);
-   int pe = ChkptOnPe(CpvAccess(_crashedNode));
-   CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-
-}
+        for(int i=0;i<CmiMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
+          CmiSetHandler(msg, askProcDataHandlerIdx);
+          int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
+          CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        }
+        return;
+#endif
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+        // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+        CmiSetHandler(msg, askProcDataHandlerIdx);
+        int pe = ChkptOnPe(CpvAccess(_crashedNode));
+        CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+      }
+      else{
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+        CmiSetHandler(msg, recvPhaseHandlerIdx);
+        CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
+      }
+    }
 
 
-// on crashed node
-void CkMemRestart(const char *dummy, CkArgMsg *args)
-{
+    // on crashed node
+    void CkMemRestart(const char *dummy, CkArgMsg *args)
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-   _diePE = CmiMyNode();
-   CkMemCheckPT::startTime = restartT = CmiWallTimer();
-   CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
-   CkMemCheckPT::inRestarting = 1;
-
-  CpvAccess( _crashedNode )= CmiMyNode();
-       
-  _discard_charm_message();
-  
-  if(CmiMyRank()==0){
-    CkCallback cb(qd_callback);
-    CkStartQD(cb);
-    CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
-  }
+      _diePE = CmiMyNode();
+      CpvAccess( _crashedNode )= CmiMyNode();
+      CkMemCheckPT::inRestarting = 1;
+      _discard_charm_message();
+
+      /*if(CmiMyRank()==0){
+        CkCallback cb(qd_callback);
+        CkStartQD(cb);
+        CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
+        }*/
+      CkMemCheckPT::startTime = restartT = CmiWallTimer();
+      if(CmiNumPartition()==1){
+        CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
+        restartT = CmiWallTimer();
+        CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
+        fflush(stdout);
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
+        // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
+        CmiSetHandler(msg, askProcDataHandlerIdx);
+        int pe = ChkptOnPe(CpvAccess(_crashedNode));
+        CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+      }
+      else{
+        CkCallback cb(qd_callback);
+        CkStartQD(cb);
+      }
 #else
 #else
-   CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
+      CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
 #endif
 #endif
-}
+    }
 
 
-// can be called in other files
-// return true if it is in restarting
-extern "C"
-int CkInRestarting()
-{
+    // can be called in other files
+    // return true if it is in restarting
+    extern "C"
+      int CkInRestarting()
+      {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  if (CpvAccess( _crashedNode)!=-1) return 1;
-  // gzheng
-  //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
-  //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
-  return CkMemCheckPT::inRestarting;
+        if (CpvAccess( _crashedNode)!=-1) return 1;
+        // gzheng
+        //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
+        //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
+        return CkMemCheckPT::inRestarting;
 #else
 #else
-  return 0;
+        return 0;
 #endif
 #endif
-}
+      }
 
 
-/*****************************************************************************
-                module initialization
-*****************************************************************************/
+    extern "C"
+      int CkReplicaAlive()
+      {
+        //     if(CkMyPe() == CpvAccess(_remoteCrashedNode))
+        //       CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
+        return CkMemCheckPT::replicaAlive;
+
+        /*if(CkMemCheckPT::replicaDead==1)
+          return 0;
+          else         
+          return 1;*/
+      }
 
 
-static int arg_where = CkCheckPoint_inMEM;
+    extern "C"
+      int CkInCheckpointing()
+      {
+        return CkMemCheckPT::inCheckpointing;
+      }
 
 
+    extern "C"
+      void CkSetInLdb(){
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-void init_memcheckpt(char **argv)
-{
-    if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
-      arg_where = CkCheckPoint_inDISK;
-    }
+        CkMemCheckPT::inLoadbalancing = 1;
+#endif
+      }
 
 
-       // initiliazing _crashedNode variable
-       CpvInitialize(int, _crashedNode);
-       CpvAccess(_crashedNode) = -1;
+    extern "C"
+      int CkInLdb(){
+#if CMK_MEM_CHECKPOINT
+        return CkMemCheckPT::inLoadbalancing;
+#endif
+        return 0;
+      }
 
 
-}
+    extern "C"
+      void CkResetInLdb(){
+#if CMK_MEM_CHECKPOINT
+        CkMemCheckPT::inLoadbalancing = 0;
 #endif
 #endif
+      }
+
+    /*****************************************************************************
+      module initialization
+     *****************************************************************************/
+
+    static int arg_where = CkCheckPoint_inMEM;
 
 
-class CkMemCheckPTInit: public Chare {
-public:
-  CkMemCheckPTInit(CkArgMsg *m) {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-    if (arg_where == CkCheckPoint_inDISK) {
-      CkPrintf("Charm++> Double-disk Checkpointing. \n");
+    void init_memcheckpt(char **argv)
+    {
+      if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
+        arg_where = CkCheckPoint_inDISK;
+      }
+      CpvInitialize(int, use_checksum);
+      CpvInitialize(int, resilience);
+      CpvAccess(use_checksum)=0;
+      CpvAccess(resilience)=0;
+      if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
+        CpvAccess(use_checksum)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
+        CpvAccess(resilience)=1;
+      }
+      if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
+        CpvAccess(resilience)=2;
+      }
+      if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
+        CpvAccess(resilience)=3;
+      }
+      // initiliazing _crashedNode variable
+      CpvInitialize(int, _crashedNode);
+      CpvInitialize(int, _remoteCrashedNode);
+      CpvAccess(_crashedNode) = -1;
+      CpvAccess(_remoteCrashedNode) = -1;
+      init_FI(argv);
     }
     }
-    ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
-    CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
 #endif
 #endif
-  }
-};
 
 
-static void notifyHandler(char *msg)
-{
+    class CkMemCheckPTInit: public Chare {
+      public:
+        CkMemCheckPTInit(CkArgMsg *m) {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  CmiFree(msg);
+          if (arg_where == CkCheckPoint_inDISK) {
+            CkPrintf("Charm++> Double-disk Checkpointing. \n");
+          }
+          ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
+          CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
+#endif
+        }
+    };
+
+    static void notifyHandler(char *msg)
+    {
+#if CMK_MEM_CHECKPOINT
+      CmiFree(msg);
       /* immediately increase restart phase to filter old messages */
       /* immediately increase restart phase to filter old messages */
-  CpvAccess(_curRestartPhase) ++;
-  CpvAccess(_qd)->flushStates();
-  _discard_charm_message();
+      CpvAccess(_curRestartPhase) ++;
+      CpvAccess(_qd)->flushStates();
+      _discard_charm_message();
+
 #endif
 #endif
-}
+    }
 
 
-extern "C"
-void notify_crash(int node)
-{
+    extern "C"
+      void notify_crash(int node)
+      {
 #ifdef CMK_MEM_CHECKPOINT
 #ifdef CMK_MEM_CHECKPOINT
-  CpvAccess( _crashedNode) = node;
+        CpvAccess( _crashedNode) = node;
 #ifdef CMK_SMP
 #ifdef CMK_SMP
-  for(int i=0;i<CkMyNodeSize();i++){
-       CpvAccessOther(_crashedNode,i)=node;
-  }
+        for(int i=0;i<CkMyNodeSize();i++){
+          CpvAccessOther(_crashedNode,i)=node;
+        }
 #endif
 #endif
-  CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
-  CkMemCheckPT::inRestarting = 1;
-
-    // this may be in interrupt handler, send a message to reset QD
-  int pe = CmiNodeFirst(CkMyNode());
-  for(int i=0;i<CkMyNodeSize();i++){
-       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
-       CmiSetHandler(msg, notifyHandlerIdx);
-       CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
-  }
+        //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
+        CkMemCheckPT::inRestarting = 1;
+
+        // this may be in interrupt handler, send a message to reset QD
+        int pe = CmiNodeFirst(CkMyNode());
+        for(int i=0;i<CkMyNodeSize();i++){
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
+          CmiSetHandler(msg, notifyHandlerIdx);
+          CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
+        }
 #endif
 #endif
-}
-
-extern "C" void (*notify_crash_fn)(int node);
-
-#if CMK_CONVERSE_MPI
-static int pingHandlerIdx;
-static int pingCheckHandlerIdx;
-static int buddyDieHandlerIdx;
-static double lastPingTime = -1;
+      }
 
 
-extern "C" void mpi_restart_crashed(int pe, int rank);
-extern "C" int  find_spare_mpirank(int pe);
+    extern "C" void (*notify_crash_fn)(int node);
 
 
-void pingBuddy();
-void pingCheckHandler();
 
 
-void buddyDieHandler(char *msg)
-{
+#if CMK_CONVERSE_MPI
+    void buddyDieHandler(char *msg)
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-   // notify
-   int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
-   notify_crash(diepe);
-   // send message to crash pe to let it restart
-   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-   int newrank = find_spare_mpirank(diepe);
-   int buddy = obj->BuddyPE(CmiMyPe());
-   if (buddy == diepe)  {
-     mpi_restart_crashed(diepe, newrank);
-     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
-   }
+      // notify
+      CkMemCheckPT::inRestarting = 1;
+      int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
+      notify_crash(diepe);
+      // send message to crash pe to let it restart
+      CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+      int newrank = find_spare_mpirank(diepe,CmiMyPartition());
+      int buddy = obj->BuddyPE(CmiMyPe());
+      //if (CmiMyPe() == obj->BuddyPE(diepe))  {
+      if (buddy == diepe)  {
+        mpi_restart_crashed(diepe, newrank);
+      }
 #endif
 #endif
-}
+    }
 
 
-void pingHandler(void *msg)
-{
-  lastPingTime = CmiWallTimer();
-  CmiFree(msg);
-}
+    void pingHandler(void *msg)
+    {
+      lastPingTime = CmiWallTimer();
+      CmiFree(msg);
+    }
 
 
-void pingCheckHandler()
-{
+    void pingCheckHandler()
+    {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  double now = CmiWallTimer();
-  if (lastPingTime > 0 && now - lastPingTime > 4) {
-    int i, pe, buddy;
-    // tell everyone the buddy dies
-    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-    for (i = 1; i < CmiNumPes(); i++) {
-       pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
-       if (obj->BuddyPE(pe) == CmiMyPe()) break;
-    }
-    buddy = pe;
-    CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
-    for (int pe = 0; pe < CmiNumPes(); pe++) {
-      if (obj->isFailed(pe) || pe == buddy) continue;
-      char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-      *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
-      CmiSetHandler(msg, buddyDieHandlerIdx);
-      CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-    }
-  }
-  else 
-    CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
+      double now = CmiWallTimer();
+      if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
+        //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
+        int i, pe, buddy;
+        // tell everyone the buddy dies
+        CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+        for (i = 1; i < CmiNumPes(); i++) {
+          pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
+          if (obj->BuddyPE(pe) == CmiMyPe()) break;
+        }
+        buddy = pe;
+        CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
+        fflush(stdout);
+        char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+        *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
+        CmiSetHandler(msg, buddyDieHandlerIdx);
+        CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        //send to everyone in the other world
+        if(CmiNumPartition()!=1){
+          //for(int i=0;i<CmiNumPes();i++){
+            char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+            *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
+            CmiSetHandler(rMsg, replicaDieHandlerIdx);
+            CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
+          //}
+        }
+      }
+        else 
+          CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
 #endif
 #endif
-}
+      }
 
 
-void pingBuddy()
-{
+      void pingBuddy()
+      {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
-  if (obj) {
-    int buddy = obj->BuddyPE(CkMyPe());
-//printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
-    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
-    *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
-    CmiSetHandler(msg, pingHandlerIdx);
-    CmiGetRestartPhase(msg) = 9999;
-    CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
-  }
-  CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
+        CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
+        if (obj) {
+          int buddy = obj->BuddyPE(CkMyPe());
+          char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
+          *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
+          CmiSetHandler(msg, pingHandlerIdx);
+          CmiGetRestartPhase(msg) = 9999;
+          CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
+        }
+        CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
 #endif
 #endif
-}
+      }
 #endif
 
 #endif
 
-// initproc
-void CkRegisterRestartHandler( )
-{
+      // initproc
+      void CkRegisterRestartHandler( )
+      {
 #if CMK_MEM_CHECKPOINT
 #if CMK_MEM_CHECKPOINT
-  notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
-  askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
-  recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
-  restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
-  restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+        notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
+        askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
+        askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
+        recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
+        restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
+        restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
+
+        //for replica
+        recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
+        replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
+        replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
+        replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
+        replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
+        replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
+        askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
+        recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
+        recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
+        recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
 
 #if CMK_CONVERSE_MPI
 
 #if CMK_CONVERSE_MPI
-  pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
-  pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
-  buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
+        pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
+        pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
+        buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
 #endif
 
 #endif
 
-  CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
-  CpvAccess(procChkptBuf) = NULL;
-
-  notify_crash_fn = notify_crash;
+        CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
+        CpvInitialize(CkCheckPTMessage **, chkpBuf);
+        CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
+        CpvInitialize(CkCheckPTMessage *, buddyBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
+        CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
+        CpvInitialize(int,curPointer);
+        CpvInitialize(int,recvdLocal);
+        CpvInitialize(int,localChkpDone);
+        CpvInitialize(int,remoteChkpDone);
+        CpvInitialize(int,remoteStarted);
+        CpvInitialize(int,localStarted);
+        CpvInitialize(int,localReady);
+        CpvInitialize(int,remoteReady);
+        CpvInitialize(int,recvdRemote);
+        CpvInitialize(int,recvdProcChkp);
+        CpvInitialize(int,localChecksum);
+        CpvInitialize(int,remoteChecksum);
+        CpvInitialize(int,recvdArrayChkp);
+
+        CpvAccess(procChkptBuf) = NULL;
+        CpvAccess(buddyBuf) = NULL;
+        CpvAccess(recoverProcBuf) = NULL;
+        CpvAccess(recoverArrayBuf) = NULL;
+        CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
+        CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
+        CpvAccess(chkpBuf)[0] = NULL;
+        CpvAccess(chkpBuf)[1] = NULL;
+        CpvAccess(localProcChkpBuf)[0] = NULL;
+        CpvAccess(localProcChkpBuf)[1] = NULL;
+
+        CpvAccess(curPointer) = 0;
+        CpvAccess(recvdLocal) = 0;
+        CpvAccess(localChkpDone) = 0;
+        CpvAccess(remoteChkpDone) = 0;
+        CpvAccess(remoteStarted) = 0;
+        CpvAccess(localStarted) = 0;
+        CpvAccess(localReady) = 0;
+        CpvAccess(remoteReady) = 0;
+        CpvAccess(recvdRemote) = 0;
+        CpvAccess(recvdProcChkp) = 0;
+        CpvAccess(localChecksum) = 0;
+        CpvAccess(remoteChecksum) = 0;
+        CpvAccess(recvdArrayChkp) = 0;
+
+        notify_crash_fn = notify_crash;
 
 #if ! CMK_CONVERSE_MPI
 
 #if ! CMK_CONVERSE_MPI
-  // print pid to kill
-//  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
-//  sleep(4);
+        // print pid to kill
+        //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
+        //  sleep(4);
 #endif
 #endif
 #endif
 #endif
-}
+      }
 
 
 
 
-extern "C"
-int CkHasCheckpoints()
-{
-  return checkpointed;
-}
+      extern "C"
+        int CkHasCheckpoints()
+        {
+          return checkpointed;
+        }
 
 
-/// @todo: the following definitions should be moved to a separate file containing
-// structures and functions about fault tolerance strategies
+      /// @todo: the following definitions should be moved to a separate file containing
+      // structures and functions about fault tolerance strategies
 
 
-/**
- *  * @brief: function for killing a process                                             
- *   */
+      /**
      *  * @brief: function for killing a process                                             
      *   */
 #ifdef CMK_MEM_CHECKPOINT
 #if CMK_HAS_GETPID
 #ifdef CMK_MEM_CHECKPOINT
 #if CMK_HAS_GETPID
-void killLocal(void *_dummy,double curWallTime){
+      void killLocal(void *_dummy,double curWallTime){
         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
         if(CmiWallTimer()<killTime-1){
         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
         if(CmiWallTimer()<killTime-1){
-                CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
+          CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
         }else{ 
 #if CMK_CONVERSE_MPI
         }else{ 
 #if CMK_CONVERSE_MPI
-                               CkDieNow();
+          CkDieNow();
 #else 
 #else 
-                kill(getpid(),SIGKILL);                                               
+          kill(getpid(),SIGKILL);                                               
 #endif
         }              
 #endif
         }              
-} 
+      
 #else
 #else
-void killLocal(void *_dummy,double curWallTime){
-  CmiAbort("kill() not supported!");
-}
+      void killLocal(void *_dummy,double curWallTime){
+        CmiAbort("kill() not supported!");
+      }
 #endif
 #endif
 
 #ifdef CMK_MEM_CHECKPOINT
 #endif
 #endif
 
 #ifdef CMK_MEM_CHECKPOINT
-/**
- * @brief: reads the file with the kill information
- */
-void readKillFile(){
+      /**
      * @brief: reads the file with the kill information
      */
+      void readKillFile(){
         FILE *fp=fopen(killFile,"r");
         if(!fp){
         FILE *fp=fopen(killFile,"r");
         if(!fp){
-                printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
-                return;
+          printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
+          return;
         }
         int proc;
         double sec;
         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
         }
         int proc;
         double sec;
         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
-                if(proc == CkMyNode() && CkMyRank() == 0){
-                        killTime = CmiWallTimer()+sec;
-                        printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
-                        CcdCallFnAfter(killLocal,NULL,sec*1000);
-                }
+          if(proc == CkMyNode() && CkMyRank() == 0){
+            killTime = CmiWallTimer()+sec;
+            printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
+            CcdCallFnAfter(killLocal,NULL,sec*1000);
+          }
         }
         fclose(fp);
         }
         fclose(fp);
-}
+      }
 
 #if ! CMK_CONVERSE_MPI
 
 #if ! CMK_CONVERSE_MPI
-void CkDieNow()
-{
+      void CkDieNow()
+      {
 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
-         // ignored for non-mpi version
+        // ignored for non-mpi version
         CmiPrintf("[%d] die now.\n", CmiMyPe());
         killTime = CmiWallTimer()+0.001;
         CcdCallFnAfter(killLocal,NULL,1);
 #endif
         CmiPrintf("[%d] die now.\n", CmiMyPe());
         killTime = CmiWallTimer()+0.001;
         CcdCallFnAfter(killLocal,NULL,1);
 #endif
-}
+      }
 #endif
 
 #endif
 #endif
 
 #endif