1d3138b30f7bc9a7f5a1a11a640782f706cf7a0b
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         1
71 #define CMK_USE_CHECKSUM                0
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92 unsigned int failureSeed;
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // flag for failure distributiona
104 char * failureDist;
105 // Meantimr between failure
106 int MTBF;
107 // variable for storing the killing time
108 double killTime=0.0;
109 extern void killLocal(void *_dummy,double curWallTime);
110 #endif
111
112 #ifdef CKLOCMGR_LOOP
113 #undef CKLOCMGR_LOOP
114 #endif
115 // loop over all CkLocMgr and do "code"
116 #define  CKLOCMGR_LOOP(code)    {       \
117   int numGroups = CkpvAccess(_groupIDTable)->size();    \
118   for(int i=0;i<numGroups;i++) {        \
119     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
120     if(obj->isLocMgr())  {      \
121       CkLocMgr *mgr = (CkLocMgr*)obj;   \
122       code      \
123     }   \
124   }     \
125 }
126
127 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
128 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
129 CpvDeclare(CkCheckPTMessage**, chkpBuf);
130 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
131 //store the checkpoint of the buddy to compare
132 //do not need the whole msg, can be the checksum
133 CpvDeclare(CkCheckPTMessage*, buddyBuf);
134 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
135 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
136 //pointer of the checkpoint going to be written
137 CpvDeclare(int, curPointer);
138 CpvDeclare(int, recvdRemote);
139 CpvDeclare(int, recvdLocal);
140 CpvDeclare(int, localChkpDone);
141 CpvDeclare(int, remoteChkpDone);
142 CpvDeclare(int, remoteStarted);
143 CpvDeclare(int, localStarted);
144 CpvDeclare(int, remoteReady);
145 CpvDeclare(int, localReady);
146 CpvDeclare(int, recvdArrayChkp);
147 CpvDeclare(int, recvdProcChkp);
148 CpvDeclare(int, localChecksum);
149 CpvDeclare(int, remoteChecksum);
150
151 bool compare(char * buf1, char * buf2);
152 int getChecksum(char * buf);
153 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
154 // Converse function handles
155 static int askPhaseHandlerIdx;
156 static int recvPhaseHandlerIdx;
157 static int askProcDataHandlerIdx;
158 static int askRecoverDataHandlerIdx;
159 static int restartBcastHandlerIdx;
160 static int recoverProcDataHandlerIdx;
161 static int restartBeginHandlerIdx;
162 static int recvRemoteChkpHandlerIdx;
163 static int replicaDieHandlerIdx;
164 static int replicaChkpStartHandlerIdx;
165 static int replicaDieBcastHandlerIdx;
166 static int replicaRecoverHandlerIdx;
167 static int replicaChkpDoneHandlerIdx;
168 static int replicaBeginFailureInjectionHandlerIdx;
169 static int recoverRemoteProcDataHandlerIdx;
170 static int recoverRemoteArrayDataHandlerIdx;
171 static int notifyHandlerIdx;
172 // compute the backup processor
173 // FIXME: avoid crashed processors
174 #if CMK_CONVERSE_MPI
175 static int pingHandlerIdx;
176 static int pingCheckHandlerIdx;
177 static int buddyDieHandlerIdx;
178 static double lastPingTime = -1;
179
180 extern "C" void mpi_restart_crashed(int pe, int rank);
181 extern "C" int  find_spare_mpirank(int pe,int partition);
182
183 void pingBuddy();
184 void pingCheckHandler();
185
186 #endif
187 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
188
189 inline int CkMemCheckPT::BuddyPE(int pe)
190 {
191   int budpe;
192 #if NODE_CHECKPOINT
193   // buddy is the processor with same rank on the next physical node
194   int r1 = CmiPhysicalRank(pe);
195   int budnode = CmiPhysicalNodeID(pe);
196   do {
197     budnode = (budnode+1)%CmiNumPhysicalNodes();
198     int *pelist;
199     int num;
200     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
201     budpe = pelist[r1 % num];
202   } while (isFailed(budpe));
203   if (budpe == pe) {
204     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
205     CmiAbort("Failed to find a buddy processor");
206   }
207 #else
208   budpe = pe;
209   budpe = (budpe+1)%CkNumPes();
210 #endif
211   return budpe;
212 }
213
214 // called in array element constructor
215 // choose and register with 2 buddies for checkpoiting 
216 #if CMK_MEM_CHECKPOINT
217 void ArrayElement::init_checkpt() {
218   if (_memChkptOn == 0) return;
219   if (CkInRestarting()) {
220     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
221   }
222   // only master init checkpoint
223   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
224
225   budPEs[0] = CkMyPe();
226   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
227   CmiAssert(budPEs[0] != budPEs[1]);
228   // inform checkPTMgr
229   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
230   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
231   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
232 }
233 #endif
234
235 // entry function invoked by checkpoint mgr asking for checkpoint data
236 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
237 #if CMK_MEM_CHECKPOINT
238   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
239   //char index[128];   thisIndexMax.sprint(index);
240   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
241   CkLocMgr *locMgr = thisArray->getLocMgr();
242   CmiAssert(myRec!=NULL);
243   int size;
244   {
245     PUP::sizer p;
246     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
247     size = p.size();
248   }
249   int packSize = size/sizeof(double) +1;
250   CkArrayCheckPTMessage *msg =
251     new (packSize, 0) CkArrayCheckPTMessage;
252   msg->len = size;
253   msg->index =thisIndexMax;
254   msg->aid = thisArrayID;
255   msg->locMgr = locMgr->getGroupID();
256   msg->cp_flag = 1;
257   {
258     PUP::toMem p(msg->packData);
259     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
260   }
261
262   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
263   checkptMgr.recvData(msg, 2, budPEs);
264   delete m;
265 #endif
266 }
267
268 // checkpoint holder class - for memory checkpointing
269 class CkMemCheckPTInfo: public CkCheckPTInfo
270 {
271   CkArrayCheckPTMessage *ckBuffer;
272   public:
273   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
274     CkCheckPTInfo(a, loc, idx, pno)
275   {
276     ckBuffer = NULL;
277   }
278   ~CkMemCheckPTInfo() 
279   {
280     if (ckBuffer) delete ckBuffer; 
281   }
282   inline void updateBuffer(CkArrayCheckPTMessage *data) 
283   {
284     CmiAssert(data!=NULL);
285     if (ckBuffer) delete ckBuffer;
286     ckBuffer = data;
287   }    
288   inline CkArrayCheckPTMessage * getCopy()
289   {
290     if (ckBuffer == NULL) {
291       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
292       CmiAbort("Abort!");
293     }
294     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
295   }     
296   inline void updateBuddy(int b1, int b2) {
297     CmiAssert(ckBuffer);
298     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
299     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
300     CmiAssert(pNo != CkMyPe());
301   }
302   inline int getSize() { 
303     CmiAssert(ckBuffer);
304     return ckBuffer->len; 
305   }
306 };
307
308 // checkpoint holder class - for in-disk checkpointing
309 class CkDiskCheckPTInfo: public CkCheckPTInfo 
310 {
311   char *fname;
312   int bud1, bud2;
313   int len;                      // checkpoint size
314   public:
315   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
316   {
317 #if CMK_USE_MKSTEMP
318     fname = new char[64];
319     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
320     mkstemp(fname);
321 #else
322     fname=tmpnam(NULL);
323 #endif
324     bud1 = bud2 = -1;
325     len = 0;
326   }
327   ~CkDiskCheckPTInfo() 
328   {
329     remove(fname);
330   }
331   inline void updateBuffer(CkArrayCheckPTMessage *data) 
332   {
333     double t = CmiWallTimer();
334     // unpack it
335     envelope *env = UsrToEnv(data);
336     CkUnpackMessage(&env);
337     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
338     FILE *f = fopen(fname,"wb");
339     PUP::toDisk p(f);
340     CkPupMessage(p, (void **)&data);
341     // delay sync to the end because otherwise the messages are blocked
342     //    fsync(fileno(f));
343     fclose(f);
344     bud1 = data->bud1;
345     bud2 = data->bud2;
346     len = data->len;
347     delete data;
348     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
349   }
350   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
351   {
352     CkArrayCheckPTMessage *data;
353     FILE *f = fopen(fname,"rb");
354     PUP::fromDisk p(f);
355     CkPupMessage(p, (void **)&data);
356     fclose(f);
357     data->bud1 = bud1;                          // update the buddies
358     data->bud2 = bud2;
359     return data;
360   }
361   inline void updateBuddy(int b1, int b2) {
362     bud1 = b1; bud2 = b2;
363     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
364     CmiAssert(pNo != CkMyPe());
365   }
366   inline int getSize() { 
367     return len; 
368   }
369 };
370
371 CkMemCheckPT::CkMemCheckPT(int w)
372 {
373   int numnodes = 0;
374 #if NODE_CHECKPOINT
375   numnodes = CmiNumPhysicalNodes();
376 #else
377   numnodes = CkNumPes();
378 #endif
379 #if CK_NO_PROC_POOL
380   if (numnodes <= 2)
381 #else
382     if (numnodes  == 1)
383 #endif
384     {
385       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
386       _memChkptOn = 0;
387     }
388   inRestarting = 0;
389   inCheckpointing = 0;
390   recvCount = peCount = 0;
391   ackCount = 0;
392   expectCount = -1;
393   where = w;
394   replicaAlive = 1;
395   notifyReplica = 0;
396 #if CMK_CONVERSE_MPI
397   void pingBuddy();
398   void pingCheckHandler();
399   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
400   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
401 #endif
402   chkpTable[0] = NULL;
403   chkpTable[1] = NULL;
404   maxIter = -1;
405   recvIterCount = 0;
406   localDecided = false;
407   if(killFlag == 2){
408     localSeed = failureSeed;
409     thisProxy[CkMyPe()].generateFailure();
410   }
411 }
412
413 void CkMemCheckPT::replicaInjectFailure(){
414   char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
415   CmiSetHandler(msg, replicaBeginFailureInjectionHandlerIdx);
416   CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(unsigned int),msg);
417 }
418
419 void CkMemCheckPT::generateFailure(){
420   if(strcmp(failureDist,"E")==0){
421     int rand1 = rand_r(&localSeed);
422     int rand2 = rand_r(&localSeed);
423     int rand3 = rand_r(&localSeed);
424     CkPrintf("randome %d %d %d\n", rand1, rand2, rand3);
425     int next_pe = (rand1)%CkNumPes();
426     int next_partition = (rand2)%2;
427     double sec = -log(1.0f - ((double)rand3)/(long long int)(RAND_MAX))*MTBF;
428     if(next_pe == CmiMyPe()&& next_partition == CmiMyPartition()){
429       killTime = CmiWallTimer()+sec;
430       printf("[%d][%d] To be killed after %.6lf s (MEMCKPT) %lf\n",CmiMyPartition(),CkMyPe(),sec, killTime);
431       CcdCallFnAfter(killLocal,NULL,sec*1000);
432     }
433   }
434 }
435
436 CkMemCheckPT::~CkMemCheckPT()
437 {
438   int len = ckTable.length();
439   for (int i=0; i<len; i++) {
440     delete ckTable[i];
441   }
442 }
443
444 void CkMemCheckPT::pup(PUP::er& p) 
445
446   CBase_CkMemCheckPT::pup(p); 
447   p|cpStarter;
448   p|thisFailedPe;
449   p|failedPes;
450   p|ckCheckPTGroupID;           // recover global variable
451   p|cpCallback;                 // store callback
452   p|where;                      // where to checkpoint
453   p|peCount;
454   p|localSeed;
455   if (p.isUnpacking()) {
456     recvCount = 0;
457 #if CMK_CONVERSE_MPI
458     void pingBuddy();
459     void pingCheckHandler();
460     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
461     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
462 #endif
463     maxIter = -1;
464     recvIterCount = 0;
465     localDecided = false;
466   }
467 }
468
469 void CkMemCheckPT::getIter(){
470   localDecided = true;
471   localMaxIter = maxIter+1;
472   if(CkMyPe()==0){
473     CkPrintf("local max iter is %d\n",localMaxIter);
474   }
475   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
476   int elemCount = CkCountChkpSyncElements();
477   if(CkMyPe()==0)
478     startTime = CmiWallTimer();
479   if(elemCount == 0){
480     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
481   }
482 }
483
484 //when one replica failes, decide the next chkp iter
485
486 void CkMemCheckPT::recvIter(int iter){
487   if(maxIter<iter){
488     maxIter=iter;
489   }
490 }
491
492 void CkMemCheckPT::recvMaxIter(int iter){
493   localDecided = false;
494   if(CkMyPe()==0)
495     CkPrintf("checkpoint iteration is %d\n",iter);
496   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
497 }
498
499 void CkMemCheckPT::reachChkpIter(){
500   recvIterCount++;
501   elemCount = CkCountChkpSyncElements();
502   //CkPrintf("[%d] received %d local %d\n",CkMyPe(),recvIterCount, elemCount);
503   if(recvIterCount == elemCount){
504     recvIterCount = 0;
505     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
506   }
507 }
508
509 void CkMemCheckPT::startChkp(){
510   if(CkInCheckpointing()){
511     return;
512   }
513   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
514   CkStartMemCheckpoint(cpCallback);
515 }
516
517 // called by checkpoint mgr to restore an array element
518 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
519 {
520 #if CMK_MEM_CHECKPOINT
521   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
522   // m->index.print();
523   PUP::fromMem p(m->packData);
524   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
525   CmiAssert(mgr);
526 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
527   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
528 #else
529   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
530 #endif
531
532   // find a list of array elements bound together
533   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
534   CmiAssert(elt);
535   CkLocRec_local *rec = elt->myRec;
536   CkVec<CkMigratable *> list;
537   mgr->migratableList(rec, list);
538   CmiAssert(list.length() > 0);
539   for (int l=0; l<list.length(); l++) {
540     elt = (ArrayElement *)list[l];
541     elt->budPEs[0] = m->bud1;
542     elt->budPEs[1] = m->bud2;
543     //    reset, may not needed now
544     // for now.
545     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
546       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
547       if (c) c->redNo = 0;
548     }
549   }
550 #endif
551 }
552
553 // return 1 if pe is a crashed processor
554 int CkMemCheckPT::isFailed(int pe)
555 {
556   for (int i=0; i<failedPes.length(); i++)
557     if (failedPes[i] == pe) return 1;
558   return 0;
559 }
560
561 // add pe into history list of all failed processors
562 void CkMemCheckPT::failed(int pe)
563 {
564   if (isFailed(pe)) return;
565   failedPes.push_back(pe);
566 }
567
568 int CkMemCheckPT::totalFailed()
569 {
570   return failedPes.length();
571 }
572
573 // create an checkpoint entry for array element of aid with index.
574 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
575 {
576   // error check, no duplicate
577   int idx, len = ckTable.size();
578   for (idx=0; idx<len; idx++) {
579     CkCheckPTInfo *entry = ckTable[idx];
580     if (index == entry->index) {
581       if (loc == entry->locMgr) {
582         // bindTo array elements
583         return;
584       }
585       // for array inheritance, the following check may fail
586       // because ArrayElement constructor of all superclasses are called
587       if (aid == entry->aid) {
588         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
589         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
590       }
591     }
592   }
593   CkCheckPTInfo *newEntry;
594   if (where == CkCheckPoint_inMEM)
595     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
596   else
597     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
598   ckTable.push_back(newEntry);
599   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
600 }
601
602 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
603 {
604 #if !CMK_CHKP_ALL       
605   int buddy = msg->bud1;
606   if (buddy == CkMyPe()) buddy = msg->bud2;
607   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
608   recvData(msg);
609   // ack
610   thisProxy[buddy].gotData();
611 #else
612   chkpTable[0] = NULL;
613   chkpTable[1] = NULL;
614   recvArrayCheckpoint(msg);
615   thisProxy[msg->bud2].gotData();
616 #endif
617 }
618
619 void CkMemCheckPT::chkpLocalStart()
620 {
621   CpvAccess(localStarted) = 1;
622 }
623
624 // loop through my checkpoint table and ask checkpointed array elements
625 // to send me checkpoint data.
626 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
627 void CkMemCheckPT::doItNow(int starter)
628 {
629   checkpointed = 1;
630   //cpCallback = cb;
631   cpStarter = starter;
632   inCheckpointing = 1;
633   if (CkMyPe() == cpStarter) {
634     startTime = CmiWallTimer();
635     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
636   }
637 #if CMK_CONVERSE_MPI
638   if(CmiNumPartition()==1)
639 #endif    
640   {
641 #if !CMK_CHKP_ALL
642     int len = ckTable.length();
643     for (int i=0; i<len; i++) {
644       CkCheckPTInfo *entry = ckTable[i];
645       // always let the bigger number processor send request
646       //if (CkMyPe() < entry->pNo) continue;
647       // always let the smaller number processor send request, may on same proc
648       if (!isMaster(entry->pNo)) continue;
649       // call inmem_checkpoint to the array element, ask it to send
650       // back checkpoint data via recvData().
651       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
652       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
653     }
654     // if my table is empty, then I am done
655     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
656 #else
657     startArrayCheckpoint();
658 #endif
659     // pack and send proc level data
660     sendProcData();
661   }
662 #if CMK_CONVERSE_MPI
663   else
664   {
665     startCheckpoint();
666   }
667 #endif
668 }
669
670 class MemElementPacker : public CkLocIterator{
671   private:
672     //    CkLocMgr *locMgr;
673     PUP::er &p;
674     std::map<CkHashCode,CkLocation> arrayMap;
675   public:
676     MemElementPacker(PUP::er &p_):p(p_){};
677     void addLocation(CkLocation &loc){
678       CkArrayIndexMax idx = loc.getIndex();
679       arrayMap[idx.hash()] = loc;
680     }
681     void writeCheckpoint(){
682       std::map<CkHashCode, CkLocation>::iterator it;
683       for(it = arrayMap.begin();it!=arrayMap.end();it++){
684         CkLocation loc = it->second;
685         CkLocMgr *locMgr = loc.getManager();
686         CkArrayIndexMax idx = loc.getIndex();
687         CkGroupID gID = locMgr->ckGetGroupID();
688         p|gID;
689         p|idx;
690         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
691       }
692     }
693 };
694
695 void pupAllElements(PUP::er &p){
696 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
697   int numElements;
698   if(!p.isUnpacking()){
699     numElements = CkCountArrayElements();
700   }
701   p | numElements;
702   if(!p.isUnpacking()){
703     MemElementPacker packer(p);
704     CKLOCMGR_LOOP(mgr->iterateLocal(packer););
705     packer.writeCheckpoint();
706   }
707 #endif
708 }
709
710 void CkMemCheckPT::startArrayCheckpoint(){
711 #if CMK_CHKP_ALL
712   int size;
713   {
714     PUP::sizer psizer;
715     pupAllElements(psizer);
716     size = psizer.size();
717   }
718   int packSize = size/sizeof(double)+1;
719   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
720   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
721   msg->len = size;
722   msg->cp_flag = 1;
723   int budPEs[2];
724   msg->bud1=CkMyPe();
725   msg->bud2=ChkptOnPe(CkMyPe());
726   {
727     PUP::toMem p(msg->packData);
728     pupAllElements(p);
729   }
730   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
731   if(chkpTable[0]) delete chkpTable[0];
732   chkpTable[0] = msg;
733   //send the checkpoint to my 
734   recvCount++;
735 #endif
736 }
737
738 void CkMemCheckPT::startCheckpoint(){
739 #if CMK_CONVERSE_MPI
740   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
741     CkPrintf("in start checkpointing!!!!\n");
742   int size;
743   {
744     PUP::sizer p;
745     _handleProcData(p,CmiFalse);
746     size = p.size();
747   }
748   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
749   procMsg->len = size;
750   procMsg->cp_flag = 1;
751   {
752     PUP::toMem p(procMsg->packData);
753     _handleProcData(p,CmiFalse);
754   }
755   int pointer = CpvAccess(curPointer);
756   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
757   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
758
759   {
760     PUP::sizer psizer;
761     pupAllElements(psizer);
762     size = psizer.size();
763   }
764   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
765   msg->len = size;
766   msg->cp_flag = 1;
767   int checksum;
768   {
769     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
770       PUP::checker p(msg->packData);
771       pupAllElements(p);
772       checksum = p.getChecksum();
773     }else{  
774 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
775       PUP::toMem p(msg->packData);
776       pupAllElements(p);
777     }
778   }
779   pointer = CpvAccess(curPointer);
780   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
781   CpvAccess(chkpBuf)[pointer] = msg;
782   if(CkMyPe()==0)
783     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
784   if(CkReplicaAlive()==1){
785     CpvAccess(recvdLocal) = 1;
786     if(CpvAccess(use_checksum)){
787       CpvAccess(localChecksum) = checksum;
788       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
789       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
790       //only one reaplica will send
791       if(CmiMyPartition()==0){
792         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
793         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
794       }
795     }else{
796       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
797       CkPackMessage(&env);
798       if(CmiMyPartition()==0){
799         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
800         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
801       }
802     }
803     if(CmiMyPartition()==0){
804       notifyReplica = 1;
805       thisProxy[CkMyPe()].doneComparison(true);
806     }
807   }
808   if(CpvAccess(recvdRemote)==1){
809     //only partition 1 will do it
810     //compare the checkpoint 
811     int size = CpvAccess(chkpBuf)[pointer]->len;
812     if(CpvAccess(use_checksum)){
813       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
814         thisProxy[CkMyPe()].doneComparison(true);
815       }
816       else{
817         thisProxy[CkMyPe()].doneComparison(false);
818       }
819     }else{
820       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
821         thisProxy[CkMyPe()].doneComparison(true);
822       }
823       else{
824         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
825         thisProxy[CkMyPe()].doneComparison(false);
826       }
827     }
828     if(CkMyPe()==0)
829       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
830   }
831   else{
832     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
833       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
834       if(CpvAccess(remoteReady)==1){
835         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
836         {       
837           int pointer = CpvAccess(curPointer);
838           //send the proc data
839           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
840           procMsg->pointer = pointer;
841           envelope * env = (envelope *)(UsrToEnv(procMsg));
842           CkPackMessage(&env);
843           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
844           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
845           if(CkMyPe() == CpvAccess(_remoteCrashedNode))
846             CkPrintf("[%d] sendProcdata at %lf\n",CkMyPe(),CmiWallTimer());
847         }
848         //send the array checkpoint data
849         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
850         msg->pointer = CpvAccess(curPointer);
851         envelope * env = (envelope *)(UsrToEnv(msg));
852         CkPackMessage(&env);
853         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
854         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
855         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
856           CkPrintf("[%d] sendArraydata at %lf\n",CkMyPe(),CmiWallTimer());
857       }else{
858         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
859           int pointer = CpvAccess(curPointer);
860           CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
861           (CpvAccess(recoverProcBuf))->pointer = pointer;
862         }
863         CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
864         (CpvAccess(recoverArrayBuf))->pointer = CpvAccess(curPointer);
865         CpvAccess(localReady) = 1;
866       }
867       //can continue work, no need to wait for my replica
868       int _ret = 1;
869       notifyReplica = 1;
870       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
871       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
872     }
873   }
874 #endif
875 }
876
877 void CkMemCheckPT::doneComparison(bool ret){
878   int _ret = 1;
879   if(!ret){
880     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
881     _ret = 0;
882   }else{
883     _ret = 1;
884   }
885   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
886   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
887 }
888
889 void CkMemCheckPT::doneRComparison(int ret){
890   CkPrintf("[%d][%d] doneRComparison\n", CmiMyPartition(), CkMyPe());
891   //    if(CpvAccess(curPointer) == 0){
892   //if(ret==CkNumPes()){
893     CpvAccess(localChkpDone) = 1;
894     if(CpvAccess(remoteChkpDone) ==1){
895       thisProxy.doneBothComparison();
896     }else{
897       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
898     }
899   //}
900   /*else{
901     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
902     startTime = CmiWallTimer();
903     thisProxy.RollBack();
904   }*/
905   if(notifyReplica == 0){
906     //notify the replica am done
907     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
908     *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
909     CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
910     CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
911     notifyReplica = 1;
912   }
913 }
914
915 void CkMemCheckPT::doneBothComparison(){
916   CpvAccess(recvdRemote) = 0;
917   CpvAccess(remoteReady) = 0;
918   CpvAccess(localReady) = 0;
919   CpvAccess(recvdLocal) = 0;
920   CpvAccess(localChkpDone) = 0;
921   CpvAccess(remoteChkpDone) = 0;
922   CpvAccess(remoteStarted) = 0;
923   CpvAccess(localStarted) = 0;
924   CpvAccess(_remoteCrashedNode) = -1;
925   CkMemCheckPT::replicaAlive = 1;
926   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
927   CpvAccess(curPointer)^=1;
928   inCheckpointing = 0;
929   notifyReplica = 0;
930   if(CkMyPe() == 0){
931     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
932   }
933   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
934 }
935
936 void CkMemCheckPT::RollBack(){
937   //restore group data
938   checkpointed = 0;
939   inRestarting = 1;
940   int pointer = CpvAccess(curPointer)^1;//use the previous one
941   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
942   PUP::fromMem p(chkpMsg->packData);    
943
944   //destroy array elements
945   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
946   int numGroups = CkpvAccess(_groupIDTable)->size();
947   for(int i=0;i<numGroups;i++) {
948     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
949     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
950     obj->flushStates();
951     obj->ckJustMigrated();
952   }
953   //restore array elements
954
955   int numElements;
956   p|numElements;
957
958   if(p.isUnpacking()){
959     for(int i=0;i<numElements;i++){
960       CkGroupID gID;
961       CkArrayIndex idx;
962       p|gID;
963       p|idx;
964       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
965       CmiAssert(mgr);
966       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
967     }
968     }
969     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
970     contribute(cb);
971   }
972
973   void CkMemCheckPT::notifyReplicaDie(int pe){
974     replicaAlive = 0;
975     CpvAccess(_remoteCrashedNode) = pe;
976   }
977
978   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
979   {
980 #if CMK_CHKP_ALL
981     int idx = 1;
982     if(msg->bud1 == CkMyPe()){
983       idx = 0;
984     }
985     int isChkpting = msg->cp_flag;
986     if(isChkpting == 1){
987       if(chkpTable[idx]) delete chkpTable[idx];
988     }
989     chkpTable[idx] = msg;
990     if(isChkpting){
991       recvCount++;
992       if(recvCount == 2){
993         if (where == CkCheckPoint_inMEM) {
994           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
995         }
996         else if (where == CkCheckPoint_inDISK) {
997           // another barrier for finalize the writing using fsync
998           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
999           contribute(0,NULL,CkReduction::sum_int,localcb);
1000         }
1001         else
1002           CmiAbort("Unknown checkpoint scheme");
1003         recvCount = 0;
1004       }
1005     }
1006 #endif
1007   }
1008
1009   // don't handle array elements
1010   static inline void _handleProcData(PUP::er &p, CmiBool create)
1011   {
1012     // save readonlys, and callback BTW
1013     CkPupROData(p);
1014
1015     // save mainchares 
1016     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
1017
1018 #ifndef CMK_CHARE_USE_PTR
1019     // save non-migratable chare
1020     CkPupChareData(p);
1021 #endif
1022
1023     // save groups into Groups.dat
1024     CkPupGroupData(p,create);
1025
1026     // save nodegroups into NodeGroups.dat
1027     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
1028   }
1029
1030   void CkMemCheckPT::sendProcData()
1031   {
1032     // find out size of buffer
1033     int size;
1034     {
1035       PUP::sizer p;
1036       _handleProcData(p,CmiTrue);
1037       size = p.size();
1038     }
1039     int packSize = size;
1040     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
1041     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
1042     {
1043       PUP::toMem p(msg->packData);
1044       _handleProcData(p,CmiTrue);
1045     }
1046     msg->pe = CkMyPe();
1047     msg->len = size;
1048     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1049     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1050   }
1051
1052   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1053   {
1054     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1055     CpvAccess(procChkptBuf) = msg;
1056     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1057     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1058   }
1059
1060   // ArrayElement call this function to give us the checkpointed data
1061   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1062   {
1063     int len = ckTable.length();
1064     int idx;
1065     for (idx=0; idx<len; idx++) {
1066       CkCheckPTInfo *entry = ckTable[idx];
1067       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1068     }
1069     CkAssert(idx < len);
1070     int isChkpting = msg->cp_flag;
1071     ckTable[idx]->updateBuffer(msg);
1072     if (isChkpting) {
1073       // all my array elements have returned their inmem data
1074       // inform starter processor that I am done.
1075       recvCount ++;
1076       if (recvCount == ckTable.length()) {
1077         if (where == CkCheckPoint_inMEM) {
1078           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1079         }
1080         else if (where == CkCheckPoint_inDISK) {
1081           // another barrier for finalize the writing using fsync
1082           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1083           contribute(0,NULL,CkReduction::sum_int,localcb);
1084         }
1085         else
1086           CmiAbort("Unknown checkpoint scheme");
1087         recvCount = 0;
1088       } 
1089     }
1090   }
1091
1092   // only used in disk checkpointing
1093   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1094   {
1095     delete m;
1096 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1097     system("sync");
1098 #endif
1099     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1100   }
1101
1102   // only is called on cpStarter when checkpoint is done
1103   void CkMemCheckPT::cpFinish()
1104   {
1105     CmiAssert(CkMyPe() == cpStarter);
1106     peCount++;
1107     // now that all processors have finished, activate callback
1108     if (peCount == 2) 
1109     {
1110       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1111       peCount = 0;
1112       thisProxy.report();
1113     }
1114   }
1115
1116   // for debugging, report checkpoint info
1117   void CkMemCheckPT::report()
1118   {
1119     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1120     inCheckpointing = 0;
1121 #if !CMK_CHKP_ALL
1122     int objsize = 0;
1123     int len = ckTable.length();
1124     for (int i=0; i<len; i++) {
1125       CkCheckPTInfo *entry = ckTable[i];
1126       CmiAssert(entry);
1127       objsize += entry->getSize();
1128     }
1129     CmiAssert(CpvAccess(procChkptBuf));
1130     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1131 #else
1132     if(CkMyPe()==0)
1133       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1134 #endif
1135   }
1136
1137   /*****************************************************************************
1138     RESTART Procedure
1139    *****************************************************************************/
1140
1141   // master processor of two buddies
1142   inline int CkMemCheckPT::isMaster(int buddype)
1143   {
1144 #if 0
1145     int mype = CkMyPe();
1146     //CkPrintf("ismaster: %d %d\n", pe, mype);
1147     if (CkNumPes() - totalFailed() == 2) {
1148       return mype > buddype;
1149     }
1150     for (int i=1; i<CkNumPes(); i++) {
1151       int me = (buddype+i)%CkNumPes();
1152       if (isFailed(me)) continue;
1153       if (me == mype) return 1;
1154       else return 0;
1155     }
1156     return 0;
1157 #else
1158     // smaller one
1159     int mype = CkMyPe();
1160     //CkPrintf("ismaster: %d %d\n", pe, mype);
1161     if (CkNumPes() - totalFailed() == 2) {
1162       return mype < buddype;
1163     }
1164 #if NODE_CHECKPOINT
1165     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1166     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1167 #else
1168       for (int i=1; i<CkNumPes(); i++) {
1169 #endif
1170         int me = (mype+i)%CkNumPes();
1171         if (isFailed(me)) continue;
1172         if (me == buddype) return 1;
1173         else return 0;
1174       }
1175       return 0;
1176 #endif
1177     }
1178
1179
1180
1181 #if 0
1182     // helper class to pup all elements that belong to same ckLocMgr
1183     class ElementDestoryer : public CkLocIterator {
1184       private:
1185         CkLocMgr *locMgr;
1186       public:
1187         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1188         void addLocation(CkLocation &loc) {
1189           CkArrayIndex idx=loc.getIndex();
1190           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1191           loc.destroy();
1192         }
1193     };
1194 #endif
1195
1196     // restore the bitmap vector for LB
1197     void CkMemCheckPT::resetLB(int diepe)
1198     {
1199 #if CMK_LBDB_ON
1200       int i;
1201       char *bitmap = new char[CkNumPes()];
1202       // set processor available bitmap
1203       get_avail_vector(bitmap);
1204       for (i=0; i<failedPes.length(); i++)
1205         bitmap[failedPes[i]] = 0; 
1206       bitmap[diepe] = 0;
1207
1208 #if CK_NO_PROC_POOL
1209       set_avail_vector(bitmap);
1210 #endif
1211
1212       // if I am the crashed pe, rebuild my failedPEs array
1213       if (CkMyNode() == diepe)
1214         for (i=0; i<CkNumPes(); i++) 
1215           if (bitmap[i]==0) failed(i);
1216
1217       delete [] bitmap;
1218 #endif
1219     }
1220
1221     static double restartT;
1222
1223     // in case when failedPe dies, everybody go through its checkpoint table:
1224     // destory all array elements
1225     // recover lost buddies
1226     // reconstruct all array elements from check point data
1227     // called on all processors
1228     void CkMemCheckPT::restart(int diePe)
1229     {
1230 #if CMK_MEM_CHECKPOINT
1231       thisFailedPe = diePe;
1232       double curTime = CmiWallTimer();
1233       if (CkMyPe() == thisFailedPe){
1234         restartT = CmiWallTimer();
1235         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1236       }
1237       stage = (char*)"resetLB";
1238       startTime = curTime;
1239       if (CkMyPe() == thisFailedPe)
1240         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1241
1242 //#if CK_NO_PROC_POOL
1243 //      failed(diePe);  // add into the list of failed pes
1244 //#endif
1245
1246 //      if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1247
1248       inRestarting = 1;
1249
1250       // disable load balancer's barrier
1251       //if (CkMyPe() != diePe) resetLB(diePe);
1252
1253       CKLOCMGR_LOOP(mgr->startInserting(););
1254
1255
1256       if(CmiNumPartition()==1){
1257         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1258       }else{
1259         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1260         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1261       }
1262 #endif
1263     }
1264
1265     // loally remove all array elements
1266     void CkMemCheckPT::removeArrayElements()
1267     {
1268 #if CMK_MEM_CHECKPOINT
1269       int len = ckTable.length();
1270       double curTime = CmiWallTimer();
1271       if (CkMyPe() == thisFailedPe) 
1272         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1273       stage = (char*)"removeArrayElements";
1274       startTime = curTime;
1275
1276       //  if (cpCallback.isInvalid()) 
1277       //          CkPrintf("invalid pe %d\n",CkMyPe());
1278       //          CkAbort("Didn't set restart callback\n");;
1279       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1280
1281       // get rid of all buffering and remote recs
1282       // including destorying all array elements
1283 #if CK_NO_PROC_POOL  
1284       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1285 #else
1286       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1287 #endif
1288       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1289 #endif
1290     }
1291
1292     // flush state in reduction manager
1293     void CkMemCheckPT::resetReductionMgr()
1294     {
1295       if (CkMyPe() == thisFailedPe) 
1296         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr at %lf\n",CkMyPe(),CmiWallTimer());
1297       stage = (char *)"resetReductionMgr";
1298       int numGroups = CkpvAccess(_groupIDTable)->size();
1299       for(int i=0;i<numGroups;i++) {
1300         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1301         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1302         obj->flushStates();
1303         obj->ckJustMigrated();
1304       }
1305
1306       //reset CmiReduce
1307       CmiResetReductions();
1308
1309       // reset again
1310       //CpvAccess(_qd)->flushStates();
1311       if(CmiNumPartition()==1){
1312         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1313       }
1314       else{
1315         if (CkMyPe() == thisFailedPe) 
1316           CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr ends at %lf\n",CkMyPe(),CmiWallTimer());
1317         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1318       }
1319     }
1320
1321     // recover the lost buddies
1322     void CkMemCheckPT::recoverBuddies()
1323     {
1324       int idx;
1325       int len = ckTable.length();
1326       // ready to flush reduction manager
1327       // cannot be CkMemCheckPT::restart because destory will modify states
1328       double curTime = CmiWallTimer();
1329       if (CkMyPe() == thisFailedPe)
1330         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1331       stage = (char *)"recoverBuddies";
1332       if (CkMyPe() == thisFailedPe)
1333         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1334       startTime = curTime;
1335
1336       // recover buddies
1337       expectCount = 0;
1338 #if !CMK_CHKP_ALL
1339       for (idx=0; idx<len; idx++) {
1340         CkCheckPTInfo *entry = ckTable[idx];
1341         if (entry->pNo == thisFailedPe) {
1342 #if CK_NO_PROC_POOL
1343           // find a new buddy
1344           int budPe = BuddyPE(CkMyPe());
1345 #else
1346           int budPe = thisFailedPe;
1347 #endif
1348           CkArrayCheckPTMessage *msg = entry->getCopy();
1349           msg->bud1 = budPe;
1350           msg->bud2 = CkMyPe();
1351           msg->cp_flag = 0;            // not checkpointing
1352           thisProxy[budPe].recoverEntry(msg);
1353           expectCount ++;
1354         }
1355       }
1356 #else
1357       //send to failed pe
1358       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1359 #if CK_NO_PROC_POOL
1360         // find a new buddy
1361         int budPe = BuddyPE(CkMyPe());
1362 #else
1363         int budPe = thisFailedPe;
1364 #endif
1365         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1366         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1367         msg->cp_flag = 0;            // not checkpointing
1368         msg->bud1 = budPe;
1369         msg->bud2 = CkMyPe();
1370         thisProxy[budPe].recoverEntry(msg);
1371         expectCount ++;
1372       }
1373 #endif
1374
1375       if (expectCount == 0) {
1376         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1377       }
1378       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1379     }
1380
1381     void CkMemCheckPT::gotData()
1382     {
1383       ackCount ++;
1384       if (ackCount == expectCount) {
1385         ackCount = 0;
1386         expectCount = -1;
1387         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1388         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1389       }
1390     }
1391
1392     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1393     {
1394
1395       for (int i=0; i<n; i++) {
1396         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1397         mgr->updateLocation(idx[i], nowOnPe);
1398       }
1399       thisProxy[nowOnPe].gotReply();
1400     }
1401
1402     // restore array elements
1403     void CkMemCheckPT::recoverArrayElements()
1404     {
1405       double curTime = CmiWallTimer();
1406       int len = ckTable.length();
1407       if (CkMyPe() == thisFailedPe)
1408         CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds \n",CkMyPe(), stage, curTime-startTime);
1409       stage = (char *)"recoverArrayElements";
1410       startTime = curTime;
1411       int flag = 0;
1412       // recover all array elements
1413       int count = 0;
1414
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1417       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1418 #endif
1419
1420 #if !CMK_CHKP_ALL
1421       for (int idx=0; idx<len; idx++)
1422       {
1423         CkCheckPTInfo *entry = ckTable[idx];
1424 #if CK_NO_PROC_POOL
1425         // the bigger one will do 
1426         //    if (CkMyPe() < entry->pNo) continue;
1427         if (!isMaster(entry->pNo)) continue;
1428 #else
1429         // smaller one do it, which has the original object
1430         if (CkMyPe() == entry->pNo+1 || 
1431             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1432 #endif
1433         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1434
1435         entry->updateBuddy(CkMyPe(), entry->pNo);
1436         CkArrayCheckPTMessage *msg = entry->getCopy();
1437         // gzheng
1438         //thisProxy[CkMyPe()].inmem_restore(msg);
1439         inmem_restore(msg);
1440 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1441         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1442         int homePe = mgr->homePe(msg->index);
1443         if (homePe != CkMyPe()) {
1444           gmap[homePe].push_back(msg->locMgr);
1445           imap[homePe].push_back(msg->index);
1446         }
1447 #endif
1448         CkFreeMsg(msg);
1449         count ++;
1450       }
1451 #else
1452       char * packData;
1453       if(CmiNumPartition()==1){
1454         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1455         packData = (char *)msg->packData;
1456       }
1457       else{
1458         int pointer = CpvAccess(curPointer);
1459         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1460         packData = msg->packData;
1461       }
1462 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1463       recoverAll(packData,gmap,imap);
1464 #else
1465       recoverAll(packData);
1466 #endif
1467 #endif
1468 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1469       for (int i=0; i<CkNumPes(); i++) {
1470         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1471           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1472           flag++;       
1473         }
1474       }
1475       delete [] imap;
1476       delete [] gmap;
1477 #endif
1478       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1479       //if (CkMyPe() == thisFailedPe)
1480
1481       CKLOCMGR_LOOP(mgr->doneInserting(););
1482
1483       // _crashedNode = -1;
1484       CpvAccess(_crashedNode) = -1;
1485 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1486       if (CkMyPe() == 0)
1487         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1488 #else
1489       if(flag == 0)
1490       {
1491         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1492       }
1493 #endif
1494     }
1495
1496     void CkMemCheckPT::gotReply(){
1497       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1498     }
1499
1500     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1501 #if CMK_CHKP_ALL
1502       PUP::fromMem p(packData);
1503       int numElements;
1504       p|numElements;
1505       if(p.isUnpacking()){
1506         for(int i=0;i<numElements;i++){
1507           CkGroupID gID;
1508           CkArrayIndex idx;
1509           p|gID;
1510           p|idx;
1511           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1512           int homePe = mgr->homePe(idx);
1513 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1514           mgr->resume(idx,p,CmiTrue,CmiTrue);
1515 #else
1516           if(CmiNumPartition()==1)
1517             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1518           else{
1519             if(CkMyPe()==thisFailedPe){
1520               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1521             }
1522             else{
1523               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1524             }
1525           }
1526 #endif
1527 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1528           homePe = mgr->homePe(idx);
1529           if (homePe != CkMyPe()) {
1530             gmap[homePe].push_back(gID);
1531             imap[homePe].push_back(idx);
1532           }
1533 #endif
1534         }
1535       }
1536 #endif
1537     }
1538
1539
1540     // on every processor
1541     // turn load balancer back on
1542     void CkMemCheckPT::finishUp()
1543     {
1544       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1545       //CKLOCMGR_LOOP(mgr->doneInserting(););
1546
1547       if (CkMyPe() == thisFailedPe)
1548       {
1549         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1550         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1551         fflush(stdout);
1552       }
1553
1554
1555       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1556       inRestarting = 0;
1557       maxIter = -1;
1558 #if CMK_CONVERSE_MPI    
1559       if(CmiNumPartition()!=1){
1560         CpvAccess(recvdProcChkp) = 0;
1561         CpvAccess(recvdArrayChkp) = 0;
1562         CpvAccess(curPointer)^=1;
1563         //notify my replica, restart is done
1564         if (CkMyPe() == 0&&CpvAccess(resilience)!=1){
1565           CpvAccess(remoteStarted) =0;
1566           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1567           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1568           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1569         }
1570       }
1571       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1572         lastPingTime = CmiWallTimer();
1573         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1574       }
1575       //inject next failure
1576       if(killFlag==2){
1577         if(CkMyPe()==0){
1578           replicaInjectFailure();
1579         }
1580         thisProxy[CkMyPe()].generateFailure();
1581       }
1582 #endif
1583
1584 #if CK_NO_PROC_POOL
1585 #if NODE_CHECKPOINT
1586       int numnodes = CmiNumPhysicalNodes();
1587 #else
1588       int numnodes = CkNumPes();
1589 #endif
1590       if (numnodes-totalFailed() <=2) {
1591         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1592         _memChkptOn = 0;
1593       }
1594 #endif
1595     }
1596
1597     void CkMemCheckPT::recoverFromSoftFailure()
1598     {
1599       inRestarting = 0;
1600       maxIter = -1;
1601       CpvAccess(recvdRemote) = 0;
1602       CpvAccess(recvdLocal) = 0;
1603       CpvAccess(localChkpDone) = 0;
1604       CpvAccess(remoteChkpDone) = 0;
1605       CpvAccess(remoteReady) = 0;
1606       CpvAccess(localReady) = 0;
1607       inCheckpointing = 0;
1608       notifyReplica = 0;
1609       CpvAccess(remoteStarted) = 0;
1610       CpvAccess(localStarted) = 0;
1611       CpvAccess(_remoteCrashedNode) = -1;
1612       CkMemCheckPT::replicaAlive = 1;
1613       inCheckpointing = 0;
1614       notifyReplica = 0;
1615       if(CkMyPe() == 0){
1616         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1617       }
1618       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1619     }
1620     // called only on 0
1621     void CkMemCheckPT::quiescence(CkCallback &cb)
1622     {
1623       static int pe_count = 0;
1624       pe_count ++;
1625       CmiAssert(CkMyPe() == 0);
1626       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1627       if (pe_count == CkNumPes()) {
1628         pe_count = 0;
1629         cb.send();
1630       }
1631     }
1632
1633     // User callable function - to start a checkpoint
1634     // callback cb is used to pass control back
1635     void CkStartMemCheckpoint(CkCallback &cb)
1636     {
1637 #if CMK_MEM_CHECKPOINT
1638       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1639       /*if (_memChkptOn == 0) {
1640         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1641         cb.send();
1642         return;
1643         }*/
1644       if (CkInRestarting()) {
1645         // trying to checkpointing during restart
1646         cb.send();
1647         return;
1648       }
1649       if(CkInCheckpointing())
1650         return;
1651       // store user callback and user data
1652       CkMemCheckPT::cpCallback = cb;
1653
1654
1655       //send to my replica that checkpoint begins 
1656       if(CkReplicaAlive()==1){
1657         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1658         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1659         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1660       }
1661       CpvAccess(localStarted) = 1;
1662       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1663       checkptMgr.chkpLocalStart();
1664       // broadcast to start check pointing
1665       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1666         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1667         checkptMgr.doItNow(CkMyPe());
1668       }
1669 #else
1670       // when mem checkpoint is disabled, invike cb immediately
1671       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1672       cb.send();
1673 #endif
1674     }
1675
1676     void CkRestartCheckPoint(int diePe)
1677     {
1678       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1679       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1680       // broadcast
1681       checkptMgr.restart(diePe);
1682     }
1683
1684     static int _diePE = -1;
1685
1686     // callback function used locally by ccs handler
1687     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1688     {
1689       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1690       CkRestartCheckPoint(_diePE);
1691     }
1692
1693
1694     // called on crashed PE
1695     static void restartBeginHandler(char *msg)
1696     {
1697       CmiFree(msg);
1698 #if CMK_MEM_CHECKPOINT
1699 #if CMK_USE_BARRIER
1700       if(CkMyPe()!=_diePE){
1701         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1702         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1703         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1704         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1705       }else{
1706         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1707         CkRestartCheckPointCallback(NULL, NULL);
1708       }
1709 #else
1710       static int count = 0;
1711       CmiAssert(CkMyPe() == _diePE);
1712       count ++;
1713       if (count == CkNumPes()||(CpvAccess(resilience)==1&&count==1)) {
1714         printf("restart begin on %d at %lf\n",CkMyPe(),CmiWallTimer());
1715         CkRestartCheckPointCallback(NULL, NULL);
1716         count = 0;
1717       }
1718 #endif
1719 #endif
1720     }
1721
1722     extern void _discard_charm_message();
1723     extern void _resume_charm_message();
1724
1725     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1726       return data;
1727     }
1728
1729     static void restartBcastHandler(char *msg)
1730     {
1731 #if CMK_MEM_CHECKPOINT
1732       // advance phase counter
1733       CkMemCheckPT::inRestarting = 1;
1734       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1735       // gzheng
1736       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1737
1738       if (CkMyPe()==_diePE)
1739         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1740
1741       // reset QD counters
1742       /*  gzheng
1743           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1744        */
1745
1746       /*  gzheng
1747           if (CkMyPe()==_diePE)
1748           CkRestartCheckPointCallback(NULL, NULL);
1749        */
1750       CmiFree(msg);
1751
1752       _resume_charm_message();
1753
1754       // reduction
1755       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1756       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1757 #if CMK_USE_BARRIER
1758       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1759 #else
1760       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1761 #endif 
1762       checkpointed = 0;
1763 #endif
1764     }
1765
1766     extern void _initDone();
1767
1768     bool compare(char * buf1, char *buf2){
1769       if(CkMyPe()==0)
1770         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1771       PUP::checker pchecker(buf1,buf2);
1772       pchecker.skip();
1773       int numElements;
1774       pchecker|numElements;
1775       for(int i=0;i<numElements;i++){
1776         CkGroupID gID;
1777         CkArrayIndex idx;
1778
1779         pchecker|gID;
1780         pchecker|idx;
1781
1782         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1783         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1784       }
1785       if(CkMyPe()==0)
1786         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1787       //int fault_num = pchecker.getFaultNum();
1788       bool result = pchecker.getResult();
1789       /*if(!result){
1790         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1791       }*/
1792       return result;
1793     }
1794     int getChecksum(char * buf){
1795       PUP::checker pchecker(buf);
1796       pchecker.skip();
1797       int numElements;
1798       pchecker|numElements;
1799 //      CkPrintf("num %d\n",numElements);
1800       for(int i=0;i<numElements;i++){
1801         CkGroupID gID;
1802         CkArrayIndex idx;
1803
1804         pchecker|gID;
1805         pchecker|idx;
1806
1807         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1808         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1809       }
1810       return pchecker.getChecksum();
1811     }
1812
1813     static void recvRemoteChkpHandler(char *msg){
1814       CpvAccess(remoteChkpDone) = 1;
1815       if(CpvAccess(use_checksum)){
1816         if(CkMyPe()==0)
1817           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1818         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1819         CpvAccess(recvdRemote) = 1;
1820         if(CpvAccess(recvdLocal)==1){
1821           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1822             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1823           }
1824           else{
1825             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1826           }
1827         }
1828       }else{
1829         envelope *env = (envelope *)msg;
1830         CkUnpackMessage(&env);
1831         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1832         if(CpvAccess(recvdLocal)==1){
1833           int pointer = CpvAccess(curPointer);
1834           int size = CpvAccess(chkpBuf)[pointer]->len;
1835           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1836             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1837           }else
1838           {
1839             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1840           }
1841           delete chkpMsg;
1842           if(CkMyPe()==0)
1843             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1844         }else{
1845           CpvAccess(recvdRemote) = 1;
1846           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1847           CpvAccess(buddyBuf) = chkpMsg;
1848         }
1849       }
1850     }
1851
1852     static void replicaRecoverHandler(char *msg){
1853       //fflush(stdout);
1854       CmiPrintf("[%d][%d]receive replica recover\n",CmiMyPartition(),CmiMyPe());
1855       CpvAccess(remoteChkpDone) = 1;
1856       if(CpvAccess(localChkpDone) == 1)
1857         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1858       else{  
1859         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1860         //CmiAbort("impossible state");
1861       }
1862       CmiFree(msg);
1863     }
1864
1865     
1866     static void replicaBeginFailureInjectionHandler(char * msg){
1867       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1868       checkptMgr.generateFailure();
1869       CmiFree(msg);
1870     }
1871     
1872     static void replicaChkpDoneHandler(char *msg){
1873       CmiPrintf("[%d][%d]receive replica checkpoint done\n",CmiMyPartition(),CmiMyPe());
1874       CpvAccess(remoteChkpDone) = 1;
1875       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1876       if(CpvAccess(localChkpDone) == 1)
1877         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1878       else  
1879         CmiPrintf("[%d]localChkpDone hasn't been finished\n",CmiMyPe());
1880       CmiFree(msg);
1881     }
1882
1883     static void replicaDieHandler(char * msg){
1884 #if CMK_CONVERSE_MPI
1885       //broadcast to every one in my replica
1886       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1887       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1888 #endif
1889     }
1890
1891     static void replicaChkpStartHandler(char * msg){
1892       if(CkMyPe()==0){
1893         CkPrintf("[%d][%d] my replica will begin to checkpoint at %lf\n", CmiMyPartition(), CkMyPe(), CmiWallTimer());
1894       }
1895       CpvAccess(remoteStarted) =1;
1896       if(CpvAccess(localStarted)==1){    
1897         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1898         checkptMgr.doItNow(0);
1899       }
1900     }
1901
1902     static void replicaDieBcastHandler(char *msg){
1903       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1904       CpvAccess(_remoteCrashedNode) = diePe;
1905       if(CkMyPe()==diePe){
1906         CmiPrintf("pe %d in replica world die\n",diePe);
1907         fflush(stdout);
1908       }
1909       
1910       if(CpvAccess(resilience)!=1||CkMyPe()!=diePe){
1911         find_spare_mpirank(diePe,CmiMyPartition()^1);
1912       }
1913
1914       //what if am already ready to checkpoint
1915       if(CpvAccess(resilience)!=1){
1916         CkMemCheckPT::replicaAlive = 0;
1917         if(CpvAccess(localStarted)==1){    
1918           if(CkMyPe()==0){
1919             CkPrintf("[%d][%d] start checkpoint after replica die\n", CmiMyPartition(),CkMyPe());
1920             CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1921             checkptMgr.doItNow(0);
1922           }
1923         }
1924         //broadcast to my partition to get local max iter
1925         else{
1926           if(CkMyPe()==diePe){
1927             CkPrintf("[%d][%d] begin to find the next checkpoint iteration\n", CmiMyPartition(),CkMyPe());
1928           }
1929           CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1930         }
1931       }
1932
1933       CmiFree(msg);
1934     }
1935
1936     static void recoverRemoteProcDataHandler(char *msg){
1937       envelope *env = (envelope *)msg;
1938       CkUnpackMessage(&env);
1939       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1940
1941       //store the checkpoint
1942       int pointer = procMsg->pointer;
1943
1944
1945       if(CkMyPe()==CpvAccess(_crashedNode)){
1946         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1947         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1948         PUP::fromMem p(procMsg->packData);
1949         _handleProcData(p,CmiTrue);
1950         _initDone();
1951         CkPrintf("[%d] receive recover proc data at %lf\n", CkMyPe(), CmiWallTimer());
1952       }
1953       else{
1954         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1955         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1956         //_handleProcData(p,CmiFalse);
1957       }
1958       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1959       CKLOCMGR_LOOP(mgr->startInserting(););
1960
1961       CpvAccess(recvdProcChkp) =1;
1962       if(CpvAccess(recvdArrayChkp)==1){
1963         _resume_charm_message();
1964         _diePE = CpvAccess(_crashedNode);
1965         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1966         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1967         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1968 #if CMK_USE_BARRIER
1969         if(CpvAccess(resilience)==1){
1970           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1971         }else
1972           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1973 #else
1974         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1975 #endif 
1976       }
1977     }
1978
1979     static void recoverRemoteArrayDataHandler(char *msg){
1980       envelope *env = (envelope *)msg;
1981       CkUnpackMessage(&env);
1982       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1983
1984       //store the checkpoint
1985       int pointer = chkpMsg->pointer;
1986       CpvAccess(curPointer) = pointer;
1987       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1988       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1989       CpvAccess(recvdArrayChkp) =1;
1990       CkMemCheckPT::inRestarting = 1;
1991
1992       if(CkMyPe()==CpvAccess(_crashedNode))
1993         CkPrintf("[%d] receive recover array data at %lf\n", CkMyPe(), CmiWallTimer());
1994       
1995       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1996         _resume_charm_message();
1997         _diePE = CpvAccess(_crashedNode);
1998         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1999         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2000         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
2001         //CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2002 #if CMK_USE_BARRIER
2003         if(CpvAccess(resilience)==1){
2004           CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2005         }else
2006           CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
2007 #else
2008         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
2009 #endif 
2010       }
2011     }
2012
2013     static void recvPhaseHandler(char * msg)
2014     {
2015       //decrease the phase number so that the crashed replica can communicate with the healthy one
2016       CpvAccess(_curRestartPhase)--;
2017       
2018       CkMemCheckPT::inRestarting = 1;
2019       CmiFree(msg);
2020       //notify the buddy in the replica now i can receive the checkpoint msg
2021       if(CpvAccess(resilience)!=1||CpvAccess(_crashedNode)==CmiMyPe()){
2022         char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2023         CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
2024         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
2025         //timer start
2026         if(CpvAccess(_crashedNode)==CkMyPe()){
2027           CkMemCheckPT::startTime = restartT = CmiWallTimer();
2028           CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf phase %d\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer(), CpvAccess(_curRestartPhase));
2029         }
2030       }else{
2031         CpvAccess(curPointer)^=1;
2032         CkMemCheckPT::inRestarting = 1;
2033         _resume_charm_message();
2034         _diePE = CpvAccess(_crashedNode);
2035       }
2036     }
2037     
2038     static void askRecoverDataHandler(char * msg){
2039       if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2040         CmiPrintf("[%d][%d] receive replica phase change at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
2041       if(CpvAccess(resilience)!=1){
2042         CpvAccess(remoteReady)=1;
2043         if(CpvAccess(localReady)==1){
2044           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2045           {     
2046             envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverProcBuf)));
2047             CkPackMessage(&env);
2048             CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2049             CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2050             CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2051           }
2052           //send the array checkpoint data
2053           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
2054           CkPackMessage(&env);
2055           CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
2056           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2057           if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
2058             CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2059         }
2060       }else{
2061         find_spare_mpirank(CkMyPe(),CmiMyPartition()^1);
2062         int pointer = CpvAccess(curPointer)^1;
2063         CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
2064         procMsg->pointer = pointer;
2065         envelope * env = (envelope *)(UsrToEnv(procMsg));
2066         CkPackMessage(&env);
2067         CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
2068         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
2069         CmiPrintf("[%d] sendProcdata after request at %lf\n",CmiMyPe(),CmiWallTimer());
2070         
2071         CkCheckPTMessage * arrayMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
2072         arrayMsg->pointer = pointer;
2073         envelope * env1 = (envelope *)(UsrToEnv(arrayMsg));
2074         CkPackMessage(&env1);
2075         CmiSetHandler(env1,recoverRemoteArrayDataHandlerIdx);
2076         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env1->getTotalsize(),(char *)env1);
2077         CmiPrintf("[%d] sendArraydata after request\n",CmiMyPe());
2078       }
2079     }
2080     // called on crashed processor
2081     static void recoverProcDataHandler(char *msg)
2082     {
2083 #if CMK_MEM_CHECKPOINT
2084       int i;
2085       envelope *env = (envelope *)msg;
2086       CkUnpackMessage(&env);
2087       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
2088       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
2089       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
2090       PUP::fromMem p(procMsg->packData);
2091       _handleProcData(p,CmiTrue);
2092
2093       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
2094       // gzheng
2095       CKLOCMGR_LOOP(mgr->startInserting(););
2096
2097       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2098       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
2099       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
2100       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
2101
2102       _initDone();
2103       //   CpvAccess(_qd)->flushStates();
2104       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
2105 #endif
2106     }
2107     //for replica, got the phase number from my neighbor processor in the current partition
2108     static void askPhaseHandler(char *msg)
2109     {
2110 #if CMK_MEM_CHECKPOINT
2111       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
2112       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2113       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
2114       CmiSetHandler(msg, recvPhaseHandlerIdx);
2115       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2116 #endif
2117     }
2118     // called on its backup processor
2119     // get backup message buffer and sent to crashed processor
2120     static void askProcDataHandler(char *msg)
2121     {
2122 #if CMK_MEM_CHECKPOINT
2123       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2124       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2125       if (CpvAccess(procChkptBuf) == NULL)  {
2126         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
2127         CkAbort("no checkpoint found");
2128       }
2129       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
2130       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
2131
2132       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
2133
2134       CkPackMessage(&env);
2135       CmiSetHandler(env, recoverProcDataHandlerIdx);
2136       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
2137       CpvAccess(procChkptBuf) = NULL;
2138       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
2139 #endif
2140     }
2141
2142     // called on PE 0
2143     void qd_callback(void *m)
2144     {
2145       CmiPrintf("[%d][%d] callback after QD for crashed node: %d. at %lf\n",CmiMyPartition(), CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
2146       fflush(stdout);
2147       CkFreeMsg(m);
2148       if(CmiNumPartition()==1){
2149 #ifdef CMK_SMP
2150         for(int i=0;i<CmiMyNodeSize();i++){
2151           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2152           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
2153           CmiSetHandler(msg, askProcDataHandlerIdx);
2154           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
2155           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2156         }
2157         return;
2158 #endif
2159         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2160         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2161         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2162         CmiSetHandler(msg, askProcDataHandlerIdx);
2163         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2164         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2165       }
2166       else{
2167         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2168         CmiSetHandler(msg, recvPhaseHandlerIdx);
2169         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2170       }
2171     }
2172
2173     // on crashed node
2174     void CkMemRestart(const char *dummy, CkArgMsg *args)
2175     {
2176 #if CMK_MEM_CHECKPOINT
2177       _diePE = CmiMyNode();
2178       CpvAccess( _crashedNode )= CmiMyNode();
2179       CkMemCheckPT::inRestarting = 1;
2180       _discard_charm_message();
2181
2182       /*if(CmiMyRank()==0){
2183         CkCallback cb(qd_callback);
2184         CkStartQD(cb);
2185         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2186         }*/
2187       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2188       if(CmiNumPartition()==1){
2189         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2190         restartT = CmiWallTimer();
2191         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2192         fflush(stdout);
2193         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2194         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2195         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2196         CmiSetHandler(msg, askProcDataHandlerIdx);
2197         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2198         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2199       }
2200       else{
2201         CkCallback cb(qd_callback);
2202         CkStartQD(cb);
2203       }
2204 #else
2205       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2206 #endif
2207     }
2208
2209     // can be called in other files
2210     // return true if it is in restarting
2211     extern "C"
2212       int CkInRestarting()
2213       {
2214 #if CMK_MEM_CHECKPOINT
2215         if (CpvAccess( _crashedNode)!=-1) return 1;
2216         // gzheng
2217         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2218         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2219         return CkMemCheckPT::inRestarting;
2220 #else
2221         return 0;
2222 #endif
2223       }
2224
2225     extern "C"
2226       int CkReplicaAlive()
2227       {
2228         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2229         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2230         return CkMemCheckPT::replicaAlive;
2231
2232         /*if(CkMemCheckPT::replicaDead==1)
2233           return 0;
2234           else          
2235           return 1;*/
2236       }
2237
2238     extern "C"
2239       int CkInCheckpointing()
2240       {
2241         return CkMemCheckPT::inCheckpointing;
2242       }
2243
2244     extern "C"
2245       void CkSetInLdb(){
2246 #if CMK_MEM_CHECKPOINT
2247         CkMemCheckPT::inLoadbalancing = 1;
2248 #endif
2249       }
2250
2251     extern "C"
2252       int CkInLdb(){
2253 #if CMK_MEM_CHECKPOINT
2254         return CkMemCheckPT::inLoadbalancing;
2255 #endif
2256         return 0;
2257       }
2258
2259     extern "C"
2260       void CkResetInLdb(){
2261 #if CMK_MEM_CHECKPOINT
2262         CkMemCheckPT::inLoadbalancing = 0;
2263 #endif
2264       }
2265
2266     /*****************************************************************************
2267       module initialization
2268      *****************************************************************************/
2269
2270     static int arg_where = CkCheckPoint_inMEM;
2271
2272 #if CMK_MEM_CHECKPOINT
2273     void init_memcheckpt(char **argv)
2274     {
2275       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2276         arg_where = CkCheckPoint_inDISK;
2277       }
2278       CpvInitialize(int, use_checksum);
2279       CpvInitialize(int, resilience);
2280       CpvAccess(use_checksum)=0;
2281       CpvAccess(resilience)=0;//TODO should set a default resilience level
2282       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2283         CpvAccess(use_checksum)=1;
2284       }
2285       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2286         CpvAccess(resilience)=1;
2287       }
2288       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2289         CpvAccess(resilience)=2;
2290       }
2291       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2292         CpvAccess(resilience)=3;
2293       }
2294       // initiliazing _crashedNode variable
2295       CpvInitialize(int, _crashedNode);
2296       CpvInitialize(int, _remoteCrashedNode);
2297       CpvAccess(_crashedNode) = -1;
2298       CpvAccess(_remoteCrashedNode) = -1;
2299       init_FI(argv);
2300     }
2301 #endif
2302
2303     class CkMemCheckPTInit: public Chare {
2304       public:
2305         CkMemCheckPTInit(CkArgMsg *m) {
2306 #if CMK_MEM_CHECKPOINT
2307           if (arg_where == CkCheckPoint_inDISK) {
2308             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2309           }
2310
2311           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2312           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2313 #endif
2314         }
2315     };
2316
2317     static void notifyHandler(char *msg)
2318     {
2319 #if CMK_MEM_CHECKPOINT
2320       CmiFree(msg);
2321       /* immediately increase restart phase to filter old messages */
2322       CpvAccess(_curRestartPhase) ++;
2323       CpvAccess(_qd)->flushStates();
2324       _discard_charm_message();
2325
2326 #endif
2327     }
2328
2329     extern "C"
2330       void notify_crash(int node)
2331       {
2332 #ifdef CMK_MEM_CHECKPOINT
2333         CpvAccess( _crashedNode) = node;
2334 #ifdef CMK_SMP
2335         for(int i=0;i<CkMyNodeSize();i++){
2336           CpvAccessOther(_crashedNode,i)=node;
2337         }
2338 #endif
2339         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2340         CkMemCheckPT::inRestarting = 1;
2341
2342         // this may be in interrupt handler, send a message to reset QD
2343         int pe = CmiNodeFirst(CkMyNode());
2344         for(int i=0;i<CkMyNodeSize();i++){
2345           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2346           CmiSetHandler(msg, notifyHandlerIdx);
2347           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2348         }
2349 #endif
2350       }
2351
2352     extern "C" void (*notify_crash_fn)(int node);
2353
2354
2355 #if CMK_CONVERSE_MPI
2356     void buddyDieHandler(char *msg)
2357     {
2358 #if CMK_MEM_CHECKPOINT
2359       // notify
2360       CkMemCheckPT::inRestarting = 1;
2361       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2362       notify_crash(diepe);
2363       // send message to crash pe to let it restart
2364       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2365       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2366       int buddy = obj->BuddyPE(CmiMyPe());
2367       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2368       if (buddy == diepe)  {
2369         mpi_restart_crashed(diepe, newrank);
2370       }
2371 #endif
2372     }
2373
2374     void pingHandler(void *msg)
2375     {
2376       lastPingTime = CmiWallTimer();
2377       CmiFree(msg);
2378     }
2379
2380     void pingCheckHandler()
2381     {
2382 #if CMK_MEM_CHECKPOINT
2383       double now = CmiWallTimer();
2384       if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2385         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2386         int i, pe, buddy;
2387         // tell everyone the buddy dies
2388         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2389         for (i = 1; i < CmiNumPes(); i++) {
2390           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2391           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2392         }
2393         buddy = pe;
2394         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2395         fflush(stdout);
2396         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2397         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2398         CmiSetHandler(msg, buddyDieHandlerIdx);
2399         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2400         //send to everyone in the other world
2401         if(CmiNumPartition()!=1){
2402           char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2403           *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2404           CmiSetHandler(rMsg, replicaDieHandlerIdx);
2405           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2406         }
2407       }
2408         else 
2409           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2410 #endif
2411       }
2412
2413       void pingBuddy()
2414       {
2415 #if CMK_MEM_CHECKPOINT
2416         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2417         if (obj) {
2418           int buddy = obj->BuddyPE(CkMyPe());
2419           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2420           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2421           CmiSetHandler(msg, pingHandlerIdx);
2422           CmiGetRestartPhase(msg) = 9999;
2423           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2424         }
2425         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2426 #endif
2427       }
2428 #endif
2429
2430       // initproc
2431       void CkRegisterRestartHandler( )
2432       {
2433 #if CMK_MEM_CHECKPOINT
2434         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2435         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2436         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2437         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2438         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2439         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2440
2441         //for replica
2442         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2443         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2444         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2445         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2446         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2447         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2448         replicaBeginFailureInjectionHandlerIdx = CkRegisterHandler((CmiHandler)replicaBeginFailureInjectionHandler);
2449         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2450         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2451         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2452         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2453
2454 #if CMK_CONVERSE_MPI
2455         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2456         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2457         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2458 #endif
2459
2460         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2461         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2462         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2463         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2464         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2465         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2466         CpvInitialize(int,curPointer);
2467         CpvInitialize(int,recvdLocal);
2468         CpvInitialize(int,localChkpDone);
2469         CpvInitialize(int,remoteChkpDone);
2470         CpvInitialize(int,remoteStarted);
2471         CpvInitialize(int,localStarted);
2472         CpvInitialize(int,localReady);
2473         CpvInitialize(int,remoteReady);
2474         CpvInitialize(int,recvdRemote);
2475         CpvInitialize(int,recvdProcChkp);
2476         CpvInitialize(int,localChecksum);
2477         CpvInitialize(int,remoteChecksum);
2478         CpvInitialize(int,recvdArrayChkp);
2479
2480         CpvAccess(procChkptBuf) = NULL;
2481         CpvAccess(buddyBuf) = NULL;
2482         CpvAccess(recoverProcBuf) = NULL;
2483         CpvAccess(recoverArrayBuf) = NULL;
2484         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2485         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2486         CpvAccess(chkpBuf)[0] = NULL;
2487         CpvAccess(chkpBuf)[1] = NULL;
2488         CpvAccess(localProcChkpBuf)[0] = NULL;
2489         CpvAccess(localProcChkpBuf)[1] = NULL;
2490
2491         CpvAccess(curPointer) = 0;
2492         CpvAccess(recvdLocal) = 0;
2493         CpvAccess(localChkpDone) = 0;
2494         CpvAccess(remoteChkpDone) = 0;
2495         CpvAccess(remoteStarted) = 0;
2496         CpvAccess(localStarted) = 0;
2497         CpvAccess(localReady) = 0;
2498         CpvAccess(remoteReady) = 0;
2499         CpvAccess(recvdRemote) = 0;
2500         CpvAccess(recvdProcChkp) = 0;
2501         CpvAccess(localChecksum) = 0;
2502         CpvAccess(remoteChecksum) = 0;
2503         CpvAccess(recvdArrayChkp) = 0;
2504
2505         notify_crash_fn = notify_crash;
2506
2507 #if ! CMK_CONVERSE_MPI
2508         // print pid to kill
2509         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2510         //  sleep(4);
2511 #endif
2512 #endif
2513       }
2514
2515
2516       extern "C"
2517         int CkHasCheckpoints()
2518         {
2519           return checkpointed;
2520         }
2521
2522       /// @todo: the following definitions should be moved to a separate file containing
2523       // structures and functions about fault tolerance strategies
2524
2525       /**
2526        *  * @brief: function for killing a process                                             
2527        *   */
2528 #ifdef CMK_MEM_CHECKPOINT
2529 #if CMK_HAS_GETPID
2530       void killLocal(void *_dummy,double curWallTime){
2531         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2532         if(CmiWallTimer()<killTime-1){
2533           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2534         }else{ 
2535 #if CMK_CONVERSE_MPI
2536           if(CkHasCheckpoints()&&!CkInCheckpointing()&&!CkInRestarting()){
2537             CkDieNow();
2538           }else{
2539             //next failure
2540             if(killFlag == 2){
2541               CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
2542               checkptMgr.generateFailure();
2543               CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->replicaInjectFailure();
2544             }
2545           }
2546 #else 
2547           kill(getpid(),SIGKILL);                                               
2548 #endif
2549         }              
2550       } 
2551 #else
2552       void killLocal(void *_dummy,double curWallTime){
2553         CmiAbort("kill() not supported!");
2554       }
2555 #endif
2556 #endif
2557
2558 #ifdef CMK_MEM_CHECKPOINT
2559       /**
2560        * @brief: reads the file with the kill information
2561        */
2562       void readKillFile(){
2563         FILE *fp=fopen(killFile,"r");
2564         if(!fp){
2565           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2566           return;
2567         }
2568         int proc;
2569         double sec;
2570         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2571           if(proc == CkMyNode() && CkMyRank() == 0){
2572             killTime = CmiWallTimer()+sec;
2573             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2574             CcdCallFnAfter(killLocal,NULL,sec*1000);
2575           }
2576         }
2577         fclose(fp);
2578       }
2579
2580
2581 #if ! CMK_CONVERSE_MPI
2582       void CkDieNow()
2583       {
2584 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2585         // ignored for non-mpi version
2586         CmiPrintf("[%d] die now.\n", CmiMyPe());
2587         killTime = CmiWallTimer()+0.001;
2588         CcdCallFnAfter(killLocal,NULL,1);
2589 #endif
2590       }
2591 #endif
2592
2593 #endif
2594
2595 #include "CkMemCheckpoint.def.h"
2596
2597