add phase change notification
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3    Charm++ support for fault tolerance of
4    In memory synchronous checkpointing and restart
5
6    written by Gengbin Zheng, gzheng@uiuc.edu
7    Lixia Shi,     lixiashi@uiuc.edu
8
9    added 12/18/03:
10
11    To support fault tolerance while allowing migration, it uses double
12    checkpointing scheme for each array element (not a infallible scheme).
13    In this version, checkpointing is done based on array elements. 
14    Each array element individully sends its checkpoint data to two buddies.
15
16    In this implementation, assume only one failure happens at a time,
17    or two failures on two processors which are not buddy to each other;
18    also assume there is no failure during a checkpointing or restarting phase.
19
20    Restart phase contains two steps:
21    1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24    2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27    added 3/14/04:
28    1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31    added 4/16/04:
32    1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37 restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40  */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ckfaultinjector.h"
46 #include "ck.h"
47 #include "register.h"
48 #include "conv-ccs.h"
49 #include <signal.h>
50 #include <map>
51 void noopck(const char*, ...)
52 {}
53
54
55 //#define DEBUGF       CkPrintf
56 #define DEBUGF noopck
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         0
67 #endif
68
69 #define CMK_CHKP_ALL            1
70 #define CMK_USE_BARRIER         0
71 #define CMK_USE_CHECKSUM                1
72
73 //stream remote records happned only if CK_NO_PROC_POOL =1 which means the chares to pe map will change
74 #define STREAMING_INFORMHOME                    1
75 CpvDeclare(int, _crashedNode);
76 CpvDeclare(int, use_checksum);
77 CpvDeclare(int, resilience);
78 CpvDeclare(int, _remoteCrashedNode);
79
80 // static, so that it is accessible from Converse part
81 int CkMemCheckPT::inRestarting = 0;
82 int CkMemCheckPT::inCheckpointing = 0;
83 int CkMemCheckPT::replicaAlive = 1;
84 int CkMemCheckPT::inLoadbalancing = 0;
85 double CkMemCheckPT::startTime;
86 char *CkMemCheckPT::stage;
87 CkCallback CkMemCheckPT::cpCallback;
88
89 int _memChkptOn = 1;                    // checkpoint is on or off
90
91 CkGroupID ckCheckPTGroupID;             // readonly
92
93 static int checkpointed = 0;
94
95 /// @todo the following declarations should be moved into a separate file for all 
96 // fault tolerant strategies
97
98 #ifdef CMK_MEM_CHECKPOINT
99 // name of the kill file that contains processes to be killed 
100 char *killFile;                                               
101 // flag for the kill file         
102 int killFlag=0;
103 // variable for storing the killing time
104 double killTime=0.0;
105 #endif
106
107 #ifdef CKLOCMGR_LOOP
108 #undef CKLOCMGR_LOOP
109 #endif
110 // loop over all CkLocMgr and do "code"
111 #define  CKLOCMGR_LOOP(code)    {       \
112   int numGroups = CkpvAccess(_groupIDTable)->size();    \
113   for(int i=0;i<numGroups;i++) {        \
114     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
115     if(obj->isLocMgr())  {      \
116       CkLocMgr *mgr = (CkLocMgr*)obj;   \
117       code      \
118     }   \
119   }     \
120 }
121
122 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
123 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
124 CpvDeclare(CkCheckPTMessage**, chkpBuf);
125 CpvDeclare(CkCheckPTMessage**, localProcChkpBuf);
126 //store the checkpoint of the buddy to compare
127 //do not need the whole msg, can be the checksum
128 CpvDeclare(CkCheckPTMessage*, buddyBuf);
129 CpvDeclare(CkCheckPTMessage*, recoverProcBuf);
130 CpvDeclare(CkCheckPTMessage*, recoverArrayBuf);
131 //pointer of the checkpoint going to be written
132 CpvDeclare(int, curPointer);
133 CpvDeclare(int, recvdRemote);
134 CpvDeclare(int, recvdLocal);
135 CpvDeclare(int, localChkpDone);
136 CpvDeclare(int, remoteChkpDone);
137 CpvDeclare(int, remoteStarted);
138 CpvDeclare(int, localStarted);
139 CpvDeclare(int, remoteReady);
140 CpvDeclare(int, localReady);
141 CpvDeclare(int, recvdArrayChkp);
142 CpvDeclare(int, recvdProcChkp);
143 CpvDeclare(int, localChecksum);
144 CpvDeclare(int, remoteChecksum);
145
146 bool compare(char * buf1, char * buf2);
147 int getChecksum(char * buf);
148 static inline void _handleProcData(PUP::er &p,CmiBool create= CmiTrue);
149 // Converse function handles
150 static int askPhaseHandlerIdx;
151 static int recvPhaseHandlerIdx;
152 static int askProcDataHandlerIdx;
153 static int askRecoverDataHandlerIdx;
154 static int restartBcastHandlerIdx;
155 static int recoverProcDataHandlerIdx;
156 static int restartBeginHandlerIdx;
157 static int recvRemoteChkpHandlerIdx;
158 static int replicaDieHandlerIdx;
159 static int replicaChkpStartHandlerIdx;
160 static int replicaDieBcastHandlerIdx;
161 static int replicaRecoverHandlerIdx;
162 static int replicaChkpDoneHandlerIdx;
163 static int recoverRemoteProcDataHandlerIdx;
164 static int recoverRemoteArrayDataHandlerIdx;
165 static int notifyHandlerIdx;
166 // compute the backup processor
167 // FIXME: avoid crashed processors
168 #if CMK_CONVERSE_MPI
169 static int pingHandlerIdx;
170 static int pingCheckHandlerIdx;
171 static int buddyDieHandlerIdx;
172 static double lastPingTime = -1;
173
174 extern "C" void mpi_restart_crashed(int pe, int rank);
175 extern "C" int  find_spare_mpirank(int pe,int partition);
176
177 void pingBuddy();
178 void pingCheckHandler();
179
180 #endif
181 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
182
183 inline int CkMemCheckPT::BuddyPE(int pe)
184 {
185   int budpe;
186 #if NODE_CHECKPOINT
187   // buddy is the processor with same rank on the next physical node
188   int r1 = CmiPhysicalRank(pe);
189   int budnode = CmiPhysicalNodeID(pe);
190   do {
191     budnode = (budnode+1)%CmiNumPhysicalNodes();
192     int *pelist;
193     int num;
194     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
195     budpe = pelist[r1 % num];
196   } while (isFailed(budpe));
197   if (budpe == pe) {
198     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
199     CmiAbort("Failed to find a buddy processor");
200   }
201 #else
202   budpe = pe;
203   budpe = (budpe+1)%CkNumPes();
204 #endif
205   return budpe;
206 }
207
208 // called in array element constructor
209 // choose and register with 2 buddies for checkpoiting 
210 #if CMK_MEM_CHECKPOINT
211 void ArrayElement::init_checkpt() {
212   if (_memChkptOn == 0) return;
213   if (CkInRestarting()) {
214     CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
215   }
216   // only master init checkpoint
217   if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
218
219   budPEs[0] = CkMyPe();
220   budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
221   CmiAssert(budPEs[0] != budPEs[1]);
222   // inform checkPTMgr
223   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
224   //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
225   checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
226   checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
227 }
228 #endif
229
230 // entry function invoked by checkpoint mgr asking for checkpoint data
231 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
232 #if CMK_MEM_CHECKPOINT
233   //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
234   //char index[128];   thisIndexMax.sprint(index);
235   //printf("[%d] checkpointing %s\n", CkMyPe(), index);
236   CkLocMgr *locMgr = thisArray->getLocMgr();
237   CmiAssert(myRec!=NULL);
238   int size;
239   {
240     PUP::sizer p;
241     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
242     size = p.size();
243   }
244   int packSize = size/sizeof(double) +1;
245   CkArrayCheckPTMessage *msg =
246     new (packSize, 0) CkArrayCheckPTMessage;
247   msg->len = size;
248   msg->index =thisIndexMax;
249   msg->aid = thisArrayID;
250   msg->locMgr = locMgr->getGroupID();
251   msg->cp_flag = 1;
252   {
253     PUP::toMem p(msg->packData);
254     locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
255   }
256
257   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
258   checkptMgr.recvData(msg, 2, budPEs);
259   delete m;
260 #endif
261 }
262
263 // checkpoint holder class - for memory checkpointing
264 class CkMemCheckPTInfo: public CkCheckPTInfo
265 {
266   CkArrayCheckPTMessage *ckBuffer;
267   public:
268   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
269     CkCheckPTInfo(a, loc, idx, pno)
270   {
271     ckBuffer = NULL;
272   }
273   ~CkMemCheckPTInfo() 
274   {
275     if (ckBuffer) delete ckBuffer; 
276   }
277   inline void updateBuffer(CkArrayCheckPTMessage *data) 
278   {
279     CmiAssert(data!=NULL);
280     if (ckBuffer) delete ckBuffer;
281     ckBuffer = data;
282   }    
283   inline CkArrayCheckPTMessage * getCopy()
284   {
285     if (ckBuffer == NULL) {
286       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
287       CmiAbort("Abort!");
288     }
289     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
290   }     
291   inline void updateBuddy(int b1, int b2) {
292     CmiAssert(ckBuffer);
293     ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
294     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
295     CmiAssert(pNo != CkMyPe());
296   }
297   inline int getSize() { 
298     CmiAssert(ckBuffer);
299     return ckBuffer->len; 
300   }
301 };
302
303 // checkpoint holder class - for in-disk checkpointing
304 class CkDiskCheckPTInfo: public CkCheckPTInfo 
305 {
306   char *fname;
307   int bud1, bud2;
308   int len;                      // checkpoint size
309   public:
310   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
311   {
312 #if CMK_USE_MKSTEMP
313     fname = new char[64];
314     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
315     mkstemp(fname);
316 #else
317     fname=tmpnam(NULL);
318 #endif
319     bud1 = bud2 = -1;
320     len = 0;
321   }
322   ~CkDiskCheckPTInfo() 
323   {
324     remove(fname);
325   }
326   inline void updateBuffer(CkArrayCheckPTMessage *data) 
327   {
328     double t = CmiWallTimer();
329     // unpack it
330     envelope *env = UsrToEnv(data);
331     CkUnpackMessage(&env);
332     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
333     FILE *f = fopen(fname,"wb");
334     PUP::toDisk p(f);
335     CkPupMessage(p, (void **)&data);
336     // delay sync to the end because otherwise the messages are blocked
337     //    fsync(fileno(f));
338     fclose(f);
339     bud1 = data->bud1;
340     bud2 = data->bud2;
341     len = data->len;
342     delete data;
343     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
344   }
345   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
346   {
347     CkArrayCheckPTMessage *data;
348     FILE *f = fopen(fname,"rb");
349     PUP::fromDisk p(f);
350     CkPupMessage(p, (void **)&data);
351     fclose(f);
352     data->bud1 = bud1;                          // update the buddies
353     data->bud2 = bud2;
354     return data;
355   }
356   inline void updateBuddy(int b1, int b2) {
357     bud1 = b1; bud2 = b2;
358     pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
359     CmiAssert(pNo != CkMyPe());
360   }
361   inline int getSize() { 
362     return len; 
363   }
364 };
365
366 CkMemCheckPT::CkMemCheckPT(int w)
367 {
368   int numnodes = 0;
369 #if NODE_CHECKPOINT
370   numnodes = CmiNumPhysicalNodes();
371 #else
372   numnodes = CkNumPes();
373 #endif
374 #if CK_NO_PROC_POOL
375   if (numnodes <= 2)
376 #else
377     if (numnodes  == 1)
378 #endif
379     {
380       if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
381       _memChkptOn = 0;
382     }
383   inRestarting = 0;
384   inCheckpointing = 0;
385   recvCount = peCount = 0;
386   ackCount = 0;
387   expectCount = -1;
388   where = w;
389   replicaAlive = 1;
390   notifyReplica = 0;
391 #if CMK_CONVERSE_MPI
392   void pingBuddy();
393   void pingCheckHandler();
394   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
395   CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
396 #endif
397   chkpTable[0] = NULL;
398   chkpTable[1] = NULL;
399   maxIter = -1;
400   recvIterCount = 0;
401   localDecided = false;
402 }
403
404 CkMemCheckPT::~CkMemCheckPT()
405 {
406   int len = ckTable.length();
407   for (int i=0; i<len; i++) {
408     delete ckTable[i];
409   }
410 }
411
412 void CkMemCheckPT::pup(PUP::er& p) 
413
414   CBase_CkMemCheckPT::pup(p); 
415   p|cpStarter;
416   p|thisFailedPe;
417   p|failedPes;
418   p|ckCheckPTGroupID;           // recover global variable
419   p|cpCallback;                 // store callback
420   p|where;                      // where to checkpoint
421   p|peCount;
422   if (p.isUnpacking()) {
423     recvCount = 0;
424 #if CMK_CONVERSE_MPI
425     void pingBuddy();
426     void pingCheckHandler();
427     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
428     CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
429 #endif
430     maxIter = -1;
431     recvIterCount = 0;
432     localDecided = false;
433   }
434 }
435
436 void CkMemCheckPT::getIter(){
437   localDecided = true;
438   localMaxIter = maxIter+1;
439   contribute(sizeof(int),&localMaxIter,CkReduction::max_int,CkCallback(CkReductionTarget(CkMemCheckPT,recvMaxIter),thisProxy));
440   int elemCount = CkCountChkpSyncElements();
441   if(CkMyPe()==0)
442     startTime = CmiWallTimer();
443   if(elemCount == 0){
444     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
445   }
446 }
447
448 //when one replica failes, decide the next chkp iter
449
450 void CkMemCheckPT::recvIter(int iter){
451   if(maxIter<iter){
452     maxIter=iter;
453   }
454 }
455
456 void CkMemCheckPT::recvMaxIter(int iter){
457   localDecided = false;
458   CKLOCMGR_LOOP(mgr->recvChkpIter(iter););
459 }
460
461 void CkMemCheckPT::reachChkpIter(){
462   recvIterCount++;
463   elemCount = CkCountChkpSyncElements();
464   if(recvIterCount == elemCount){
465     recvIterCount = 0;
466     contribute(CkCallback(CkReductionTarget(CkMemCheckPT,startChkp),thisProxy[0]));
467   }
468 }
469
470 void CkMemCheckPT::startChkp(){
471   CkPrintf("start checkpoint at %lf in %lf\n",CmiWallTimer(),CmiWallTimer()-startTime);
472   CkStartMemCheckpoint(cpCallback);
473 }
474
475 // called by checkpoint mgr to restore an array element
476 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
477 {
478 #if CMK_MEM_CHECKPOINT
479   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
480   // m->index.print();
481   PUP::fromMem p(m->packData);
482   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
483   CmiAssert(mgr);
484 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
485   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
486 #else
487   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
488 #endif
489
490   // find a list of array elements bound together
491   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
492   CmiAssert(elt);
493   CkLocRec_local *rec = elt->myRec;
494   CkVec<CkMigratable *> list;
495   mgr->migratableList(rec, list);
496   CmiAssert(list.length() > 0);
497   for (int l=0; l<list.length(); l++) {
498     elt = (ArrayElement *)list[l];
499     elt->budPEs[0] = m->bud1;
500     elt->budPEs[1] = m->bud2;
501     //    reset, may not needed now
502     // for now.
503     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
504       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
505       if (c) c->redNo = 0;
506     }
507   }
508 #endif
509 }
510
511 // return 1 if pe is a crashed processor
512 int CkMemCheckPT::isFailed(int pe)
513 {
514   for (int i=0; i<failedPes.length(); i++)
515     if (failedPes[i] == pe) return 1;
516   return 0;
517 }
518
519 // add pe into history list of all failed processors
520 void CkMemCheckPT::failed(int pe)
521 {
522   if (isFailed(pe)) return;
523   failedPes.push_back(pe);
524 }
525
526 int CkMemCheckPT::totalFailed()
527 {
528   return failedPes.length();
529 }
530
531 // create an checkpoint entry for array element of aid with index.
532 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
533 {
534   // error check, no duplicate
535   int idx, len = ckTable.size();
536   for (idx=0; idx<len; idx++) {
537     CkCheckPTInfo *entry = ckTable[idx];
538     if (index == entry->index) {
539       if (loc == entry->locMgr) {
540         // bindTo array elements
541         return;
542       }
543       // for array inheritance, the following check may fail
544       // because ArrayElement constructor of all superclasses are called
545       if (aid == entry->aid) {
546         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
547         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
548       }
549     }
550   }
551   CkCheckPTInfo *newEntry;
552   if (where == CkCheckPoint_inMEM)
553     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
554   else
555     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
556   ckTable.push_back(newEntry);
557   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
558 }
559
560 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
561 {
562 #if !CMK_CHKP_ALL       
563   int buddy = msg->bud1;
564   if (buddy == CkMyPe()) buddy = msg->bud2;
565   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
566   recvData(msg);
567   // ack
568   thisProxy[buddy].gotData();
569 #else
570   chkpTable[0] = NULL;
571   chkpTable[1] = NULL;
572   recvArrayCheckpoint(msg);
573   thisProxy[msg->bud2].gotData();
574 #endif
575 }
576
577 // loop through my checkpoint table and ask checkpointed array elements
578 // to send me checkpoint data.
579 //void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
580 void CkMemCheckPT::doItNow(int starter)
581 {
582   checkpointed = 1;
583   //cpCallback = cb;
584   cpStarter = starter;
585   inCheckpointing = 1;
586   if (CkMyPe() == cpStarter) {
587     startTime = CmiWallTimer();
588     CkPrintf("[%d] Start checkpointing  starter: %d... at %lf\n", CkMyPe(), cpStarter,startTime);
589   }
590 #if CMK_CONVERSE_MPI
591   if(CmiNumPartition()==1)
592 #endif    
593   {
594
595 #if !CMK_CHKP_ALL
596     int len = ckTable.length();
597     for (int i=0; i<len; i++) {
598       CkCheckPTInfo *entry = ckTable[i];
599       // always let the bigger number processor send request
600       //if (CkMyPe() < entry->pNo) continue;
601       // always let the smaller number processor send request, may on same proc
602       if (!isMaster(entry->pNo)) continue;
603       // call inmem_checkpoint to the array element, ask it to send
604       // back checkpoint data via recvData().
605       CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
606       CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
607     }
608     // if my table is empty, then I am done
609     if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
610 #else
611     startArrayCheckpoint();
612 #endif
613     sendProcData();
614   }
615 #if CMK_CONVERSE_MPI
616   else
617   {
618     startCheckpoint();
619   }
620 #endif
621   // pack and send proc level data
622 }
623
624 class MemElementPacker : public CkLocIterator{
625   private:
626     //    CkLocMgr *locMgr;
627     PUP::er &p;
628     std::map<CkHashCode,CkLocation> arrayMap;
629   public:
630     MemElementPacker(PUP::er &p_):p(p_){};
631     void addLocation(CkLocation &loc){
632       CkArrayIndexMax idx = loc.getIndex();
633       arrayMap[idx.hash()] = loc;
634     }
635     void writeCheckpoint(){
636       std::map<CkHashCode, CkLocation>::iterator it;
637       for(it = arrayMap.begin();it!=arrayMap.end();it++){
638         CkLocation loc = it->second;
639         CkLocMgr *locMgr = loc.getManager();
640         CkArrayIndexMax idx = loc.getIndex();
641         CkGroupID gID = locMgr->ckGetGroupID();
642         p|gID;
643         p|idx;
644         locMgr->pupElementsFor(p,loc.getLocalRecord(),CkElementCreation_migrate);
645       }
646     }
647 };
648
649 void pupAllElements(PUP::er &p){
650 #if CMK_CHKP_ALL && CMK_MEM_CHECKPOINT
651   int numElements;
652   if(!p.isUnpacking()){
653     numElements = CkCountArrayElements();
654   }
655   p | numElements;
656   if(!p.isUnpacking()){
657     MemElementPacker packer(p);
658     CKLOCMGR_LOOP(mgr->iterate(packer););
659     packer.writeCheckpoint();
660   }
661 #endif
662 }
663
664 void CkMemCheckPT::startArrayCheckpoint(){
665 #if CMK_CHKP_ALL
666   int size;
667   {
668     PUP::sizer psizer;
669     pupAllElements(psizer);
670     size = psizer.size();
671   }
672   int packSize = size/sizeof(double)+1;
673   // CkPrintf("[%d]checkpoint size :%d\n",CkMyPe(),packSize);
674   CkArrayCheckPTMessage * msg = new (packSize,0) CkArrayCheckPTMessage;
675   msg->len = size;
676   msg->cp_flag = 1;
677   int budPEs[2];
678   msg->bud1=CkMyPe();
679   msg->bud2=ChkptOnPe(CkMyPe());
680   {
681     PUP::toMem p(msg->packData);
682     pupAllElements(p);
683   }
684   thisProxy[msg->bud2].recvArrayCheckpoint((CkArrayCheckPTMessage *)CkCopyMsg((void **)&msg));
685   if(chkpTable[0]) delete chkpTable[0];
686   chkpTable[0] = msg;
687   //send the checkpoint to my 
688   recvCount++;
689 #endif
690 }
691
692 void CkMemCheckPT::startCheckpoint(){
693 #if CMK_CONVERSE_MPI
694   if(CkMyPe() == CpvAccess(_remoteCrashedNode))
695     CkPrintf("in start checkpointing!!!!\n");
696   int size;
697   {
698     PUP::sizer p;
699     _handleProcData(p,CmiFalse);
700     size = p.size();
701   }
702   CkCheckPTMessage * procMsg = new (size,0) CkCheckPTMessage;
703   procMsg->len = size;
704   procMsg->cp_flag = 1;
705   {
706     PUP::toMem p(procMsg->packData);
707     _handleProcData(p,CmiFalse);
708   }
709   int pointer = CpvAccess(curPointer);
710   if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
711   CpvAccess(localProcChkpBuf)[pointer] = procMsg;
712
713   {
714     PUP::sizer psizer;
715     pupAllElements(psizer);
716     size = psizer.size();
717   }
718   CkCheckPTMessage * msg = new (size,0) CkCheckPTMessage;
719   msg->len = size;
720   msg->cp_flag = 1;
721   int checksum;
722   {
723     if(CpvAccess(use_checksum)&&CkReplicaAlive()==1){
724       PUP::checker p(msg->packData);
725       pupAllElements(p);
726       checksum = p.getChecksum();
727     }else{  
728 //    CmiPrintf("[%d][%d] checksum %d\n",CmiMyPartition(),CkMyPe(),checksum);
729       PUP::toMem p(msg->packData);
730       pupAllElements(p);
731     }
732   }
733   pointer = CpvAccess(curPointer);
734   if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
735   CpvAccess(chkpBuf)[pointer] = msg;
736   if(CkMyPe()==0)
737     CmiPrintf("[%d][%d] local checkpoint done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
738   if(CkReplicaAlive()==1){
739     CpvAccess(recvdLocal) = 1;
740     if(CpvAccess(use_checksum)){
741       CpvAccess(localChecksum) = checksum;
742       char *chkpMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
743       *(int *)(chkpMsg+CmiMsgHeaderSizeBytes) = CpvAccess(localChecksum);
744       //only one reaplica will send
745       if(CmiMyPartition()==0){
746         CmiSetHandler(chkpMsg,recvRemoteChkpHandlerIdx);
747         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),chkpMsg);
748       }
749     }else{
750       envelope * env = (envelope *)(UsrToEnv((CkCheckPTMessage *)CkCopyMsg((void **)&msg)));
751       CkPackMessage(&env);
752       if(CmiMyPartition()==0){
753         CmiSetHandler(env,recvRemoteChkpHandlerIdx);
754         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
755       }
756     }
757     if(CmiMyPartition()==0){
758       notifyReplica = 1;
759       thisProxy[CkMyPe()].doneComparison(true);
760     }
761   }
762   if(CpvAccess(recvdRemote)==1){
763     //only partition 1 will do it
764     //compare the checkpoint 
765     int size = CpvAccess(chkpBuf)[pointer]->len;
766     if(CpvAccess(use_checksum)){
767       if(CpvAccess(localChecksum) == CpvAccess(remoteChecksum)){
768         thisProxy[CkMyPe()].doneComparison(true);
769       }
770       else{
771         thisProxy[CkMyPe()].doneComparison(false);
772       }
773     }else{
774       if(CpvAccess(buddyBuf)->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(CpvAccess(buddyBuf)->packData))){
775         thisProxy[CkMyPe()].doneComparison(true);
776       }
777       else{
778         //CkPrintf("[%d][%d] failed the test pointer %d \n",CmiMyPartition(),CkMyPe(),pointer);
779         thisProxy[CkMyPe()].doneComparison(false);
780       }
781     }
782     if(CkMyPe()==0)
783       CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
784   }
785   else{
786     if(CkReplicaAlive()==0){//TODO add flag if sent already but the replica hasn't recovered when the next checkpoint
787       //control when the message can be sent: after the crashed replica change the phase number, otherwise buffer it
788       if(CpvAccess(remoteReady)==1){
789         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
790         {       
791           int pointer = CpvAccess(curPointer);
792           //send the proc data
793           CkCheckPTMessage * procMsg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
794           procMsg->pointer = pointer;
795           envelope * env = (envelope *)(UsrToEnv(procMsg));
796           CkPackMessage(&env);
797           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
798           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
799           if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
800             CkPrintf("[%d] sendProcdata\n",CkMyPe());
801           }
802         }
803         //send the array checkpoint data
804         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
805         msg->pointer = CpvAccess(curPointer);
806         envelope * env = (envelope *)(UsrToEnv(msg));
807         CkPackMessage(&env);
808         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
809         CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
810         if(CkMyPe() == CpvAccess(_remoteCrashedNode))
811           CkPrintf("[%d] sendArraydata\n",CkMyPe());
812       }else{
813         if(CkMyPe() == CpvAccess(_remoteCrashedNode)){
814           int pointer = CpvAccess(curPointer);
815           CpvAccess(recoverArrayBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(localProcChkpBuf)[pointer]);
816           (CpvAccess(recoverArrayBuf))->pointer = pointer;
817         }
818         CpvAccess(recoverProcBuf) = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
819         (CpvAccess(recoverProcBuf))->pointer = CpvAccess(curPointer);
820         CpvAccess(localReady) = 1;
821       }
822       //can continue work, no need to wait for my replica
823       int _ret = 1;
824       notifyReplica = 1;
825       CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
826       contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
827     }
828   }
829 #endif
830 }
831
832 void CkMemCheckPT::doneComparison(bool ret){
833   int _ret = 1;
834   if(!ret){
835     //CkPrintf("[%d][%d] fail in doneComparison \n", CmiMyPartition(),CkMyPe());
836     _ret = 0;
837   }else{
838     _ret = 1;
839   }
840   CkCallback cb(CkReductionTarget(CkMemCheckPT,doneRComparison),thisProxy[0]);
841   contribute(sizeof(int),&_ret,CkReduction::sum_int,cb);
842 }
843
844 void CkMemCheckPT::doneRComparison(int ret){
845   //    if(CpvAccess(curPointer) == 0){
846   //if(ret==CkNumPes()){
847     CpvAccess(localChkpDone) = 1;
848     if(CpvAccess(remoteChkpDone) ==1){
849       thisProxy.doneBothComparison();
850     }else{
851       CmiPrintf("[%d][%d] Local checkpoint finished in %f seconds at %lf, waiting for replica ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer());
852     }
853     if(notifyReplica == 0){
854       //notify the replica am done
855       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
856       *(int *)(msg+CmiMsgHeaderSizeBytes) = ret;
857       CmiSetHandler(msg,replicaChkpDoneHandlerIdx);
858       CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)msg);
859       notifyReplica = 1;
860     }
861  // }
862  /* else{
863     CkPrintf("[%d][%d] going to RollBack %d at %lf checkpoint in %lf\n", CmiMyPartition(),CkMyPe(),ret,CmiWallTimer(), CmiWallTimer()-startTime);
864     startTime = CmiWallTimer();
865     thisProxy.RollBack();
866   }*/
867 }
868
869 void CkMemCheckPT::doneBothComparison(){
870   CpvAccess(recvdRemote) = 0;
871   CpvAccess(remoteReady) = 0;
872   CpvAccess(localReady) = 0;
873   CpvAccess(recvdLocal) = 0;
874   CpvAccess(localChkpDone) = 0;
875   CpvAccess(remoteChkpDone) = 0;
876   CpvAccess(remoteStarted) = 0;
877   CpvAccess(localStarted) = 0;
878   CpvAccess(_remoteCrashedNode) = -1;
879   CkMemCheckPT::replicaAlive = 1;
880   int size = CpvAccess(chkpBuf)[CpvAccess(curPointer)]->len;
881   CpvAccess(curPointer)^=1;
882   inCheckpointing = 0;
883   notifyReplica = 0;
884   if(CkMyPe() == 0){
885     CmiPrintf("[%d][%d] Checkpoint finished in %f seconds at %lf, checkpoint size %d, sending callback ... \n", CmiMyPartition(),CkMyPe(), CmiWallTimer()-startTime,CmiWallTimer(),size);
886   }
887   CKLOCMGR_LOOP(mgr->resumeFromChkp(););//TODO wait until the replica finish the checkpoint
888 }
889
890 void CkMemCheckPT::RollBack(){
891   //restore group data
892   checkpointed = 0;
893   CkMemCheckPT::inRestarting = 1;
894   int pointer = CpvAccess(curPointer)^1;//use the previous one
895   CkCheckPTMessage* chkpMsg = CpvAccess(chkpBuf)[pointer];
896   PUP::fromMem p(chkpMsg->packData);    
897
898   //destroy array elements
899   CKLOCMGR_LOOP(mgr->flushLocalRecs(););
900   int numGroups = CkpvAccess(_groupIDTable)->size();
901   for(int i=0;i<numGroups;i++) {
902     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
903     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
904     obj->flushStates();
905     obj->ckJustMigrated();
906   }
907   //restore array elements
908
909   int numElements;
910   p|numElements;
911
912   if(p.isUnpacking()){
913     for(int i=0;i<numElements;i++){
914       CkGroupID gID;
915       CkArrayIndex idx;
916       p|gID;
917       p|idx;
918       CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
919       CmiAssert(mgr);
920       mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
921     }
922     }
923     CkCallback cb(CkReductionTarget(CkMemCheckPT,recoverFromSoftFailure),thisProxy);
924     contribute(cb);
925   }
926
927   void CkMemCheckPT::notifyReplicaDie(int pe){
928     replicaAlive = 0;
929     CpvAccess(_remoteCrashedNode) = pe;
930   }
931
932   void CkMemCheckPT::recvArrayCheckpoint(CkArrayCheckPTMessage *msg)
933   {
934 #if CMK_CHKP_ALL
935     int idx = 1;
936     if(msg->bud1 == CkMyPe()){
937       idx = 0;
938     }
939     int isChkpting = msg->cp_flag;
940     if(isChkpting == 1){
941       if(chkpTable[idx]) delete chkpTable[idx];
942     }
943     chkpTable[idx] = msg;
944     if(isChkpting){
945       recvCount++;
946       if(recvCount == 2){
947         if (where == CkCheckPoint_inMEM) {
948           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
949         }
950         else if (where == CkCheckPoint_inDISK) {
951           // another barrier for finalize the writing using fsync
952           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
953           contribute(0,NULL,CkReduction::sum_int,localcb);
954         }
955         else
956           CmiAbort("Unknown checkpoint scheme");
957         recvCount = 0;
958       }
959     }
960 #endif
961   }
962
963   // don't handle array elements
964   static inline void _handleProcData(PUP::er &p, CmiBool create)
965   {
966     // save readonlys, and callback BTW
967     CkPupROData(p);
968
969     // save mainchares 
970     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
971
972 #ifndef CMK_CHARE_USE_PTR
973     // save non-migratable chare
974     CkPupChareData(p);
975 #endif
976
977     // save groups into Groups.dat
978     CkPupGroupData(p,create);
979
980     // save nodegroups into NodeGroups.dat
981     if(CkMyRank()==0) CkPupNodeGroupData(p,create);
982   }
983
984   void CkMemCheckPT::sendProcData()
985   {
986     // find out size of buffer
987     int size;
988     {
989       PUP::sizer p;
990       _handleProcData(p,CmiTrue);
991       size = p.size();
992     }
993     int packSize = size;
994     CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
995     DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
996     {
997       PUP::toMem p(msg->packData);
998       _handleProcData(p,CmiTrue);
999     }
1000     msg->pe = CkMyPe();
1001     msg->len = size;
1002     msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
1003     thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
1004   }
1005
1006   void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
1007   {
1008     if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
1009     CpvAccess(procChkptBuf) = msg;
1010     DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
1011     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
1012   }
1013
1014   // ArrayElement call this function to give us the checkpointed data
1015   void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
1016   {
1017     int len = ckTable.length();
1018     int idx;
1019     for (idx=0; idx<len; idx++) {
1020       CkCheckPTInfo *entry = ckTable[idx];
1021       if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
1022     }
1023     CkAssert(idx < len);
1024     int isChkpting = msg->cp_flag;
1025     ckTable[idx]->updateBuffer(msg);
1026     if (isChkpting) {
1027       // all my array elements have returned their inmem data
1028       // inform starter processor that I am done.
1029       recvCount ++;
1030       if (recvCount == ckTable.length()) {
1031         if (where == CkCheckPoint_inMEM) {
1032           contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1033         }
1034         else if (where == CkCheckPoint_inDISK) {
1035           // another barrier for finalize the writing using fsync
1036           CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
1037           contribute(0,NULL,CkReduction::sum_int,localcb);
1038         }
1039         else
1040           CmiAbort("Unknown checkpoint scheme");
1041         recvCount = 0;
1042       } 
1043     }
1044   }
1045
1046   // only used in disk checkpointing
1047   void CkMemCheckPT::syncFiles(CkReductionMsg *m)
1048   {
1049     delete m;
1050 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
1051     system("sync");
1052 #endif
1053     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
1054   }
1055
1056   // only is called on cpStarter when checkpoint is done
1057   void CkMemCheckPT::cpFinish()
1058   {
1059     CmiAssert(CkMyPe() == cpStarter);
1060     peCount++;
1061     // now that all processors have finished, activate callback
1062     if (peCount == 2) 
1063     {
1064       CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
1065       peCount = 0;
1066       thisProxy.report();
1067     }
1068   }
1069
1070   // for debugging, report checkpoint info
1071   void CkMemCheckPT::report()
1072   {
1073     CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1074     inCheckpointing = 0;
1075 #if !CMK_CHKP_ALL
1076     int objsize = 0;
1077     int len = ckTable.length();
1078     for (int i=0; i<len; i++) {
1079       CkCheckPTInfo *entry = ckTable[i];
1080       CmiAssert(entry);
1081       objsize += entry->getSize();
1082     }
1083     CmiAssert(CpvAccess(procChkptBuf));
1084     //CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
1085 #else
1086     if(CkMyPe()==0)
1087       CkPrintf("[%d] Checkpoint Processor data: %d \n", CkMyPe(), CpvAccess(procChkptBuf)->len);
1088 #endif
1089   }
1090
1091   /*****************************************************************************
1092     RESTART Procedure
1093    *****************************************************************************/
1094
1095   // master processor of two buddies
1096   inline int CkMemCheckPT::isMaster(int buddype)
1097   {
1098 #if 0
1099     int mype = CkMyPe();
1100     //CkPrintf("ismaster: %d %d\n", pe, mype);
1101     if (CkNumPes() - totalFailed() == 2) {
1102       return mype > buddype;
1103     }
1104     for (int i=1; i<CkNumPes(); i++) {
1105       int me = (buddype+i)%CkNumPes();
1106       if (isFailed(me)) continue;
1107       if (me == mype) return 1;
1108       else return 0;
1109     }
1110     return 0;
1111 #else
1112     // smaller one
1113     int mype = CkMyPe();
1114     //CkPrintf("ismaster: %d %d\n", pe, mype);
1115     if (CkNumPes() - totalFailed() == 2) {
1116       return mype < buddype;
1117     }
1118 #if NODE_CHECKPOINT
1119     int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
1120     for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
1121 #else
1122       for (int i=1; i<CkNumPes(); i++) {
1123 #endif
1124         int me = (mype+i)%CkNumPes();
1125         if (isFailed(me)) continue;
1126         if (me == buddype) return 1;
1127         else return 0;
1128       }
1129       return 0;
1130 #endif
1131     }
1132
1133
1134
1135 #if 0
1136     // helper class to pup all elements that belong to same ckLocMgr
1137     class ElementDestoryer : public CkLocIterator {
1138       private:
1139         CkLocMgr *locMgr;
1140       public:
1141         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
1142         void addLocation(CkLocation &loc) {
1143           CkArrayIndex idx=loc.getIndex();
1144           CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
1145           loc.destroy();
1146         }
1147     };
1148 #endif
1149
1150     // restore the bitmap vector for LB
1151     void CkMemCheckPT::resetLB(int diepe)
1152     {
1153 #if CMK_LBDB_ON
1154       int i;
1155       char *bitmap = new char[CkNumPes()];
1156       // set processor available bitmap
1157       get_avail_vector(bitmap);
1158       for (i=0; i<failedPes.length(); i++)
1159         bitmap[failedPes[i]] = 0; 
1160       bitmap[diepe] = 0;
1161
1162 #if CK_NO_PROC_POOL
1163       set_avail_vector(bitmap);
1164 #endif
1165
1166       // if I am the crashed pe, rebuild my failedPEs array
1167       if (CkMyNode() == diepe)
1168         for (i=0; i<CkNumPes(); i++) 
1169           if (bitmap[i]==0) failed(i);
1170
1171       delete [] bitmap;
1172 #endif
1173     }
1174
1175     static double restartT;
1176
1177     // in case when failedPe dies, everybody go through its checkpoint table:
1178     // destory all array elements
1179     // recover lost buddies
1180     // reconstruct all array elements from check point data
1181     // called on all processors
1182     void CkMemCheckPT::restart(int diePe)
1183     {
1184 #if CMK_MEM_CHECKPOINT
1185       double curTime = CmiWallTimer();
1186       if (CkMyPe() == diePe){
1187         restartT = CmiWallTimer();
1188         CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
1189       }
1190       stage = (char*)"resetLB";
1191       startTime = curTime;
1192       if (CkMyPe() == diePe)
1193         CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
1194
1195 #if CK_NO_PROC_POOL
1196       failed(diePe);    // add into the list of failed pes
1197 #endif
1198       thisFailedPe = diePe;
1199
1200       if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
1201
1202       inRestarting = 1;
1203
1204       // disable load balancer's barrier
1205       //if (CkMyPe() != diePe) resetLB(diePe);
1206
1207       CKLOCMGR_LOOP(mgr->startInserting(););
1208
1209
1210       if(CmiNumPartition()==1){
1211         barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
1212       }else{
1213         CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1214         barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1215       }
1216 #endif
1217     }
1218
1219     // loally remove all array elements
1220     void CkMemCheckPT::removeArrayElements()
1221     {
1222 #if CMK_MEM_CHECKPOINT
1223       int len = ckTable.length();
1224       double curTime = CmiWallTimer();
1225       if (CkMyPe() == thisFailedPe) 
1226         CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
1227       stage = (char*)"removeArrayElements";
1228       startTime = curTime;
1229
1230       //  if (cpCallback.isInvalid()) 
1231       //          CkPrintf("invalid pe %d\n",CkMyPe());
1232       //          CkAbort("Didn't set restart callback\n");;
1233       if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
1234
1235       // get rid of all buffering and remote recs
1236       // including destorying all array elements
1237 #if CK_NO_PROC_POOL  
1238       CKLOCMGR_LOOP(mgr->flushAllRecs(););
1239 #else
1240       CKLOCMGR_LOOP(mgr->flushLocalRecs(););
1241 #endif
1242       barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
1243 #endif
1244     }
1245
1246     // flush state in reduction manager
1247     void CkMemCheckPT::resetReductionMgr()
1248     {
1249       if (CkMyPe() == thisFailedPe) 
1250         CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
1251       int numGroups = CkpvAccess(_groupIDTable)->size();
1252       for(int i=0;i<numGroups;i++) {
1253         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1254         IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1255         obj->flushStates();
1256         obj->ckJustMigrated();
1257       }
1258       // reset again
1259       //CpvAccess(_qd)->flushStates();
1260       if(CmiNumPartition()==1){
1261         barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
1262       }
1263       else
1264         barrier(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1265     }
1266
1267     // recover the lost buddies
1268     void CkMemCheckPT::recoverBuddies()
1269     {
1270       int idx;
1271       int len = ckTable.length();
1272       // ready to flush reduction manager
1273       // cannot be CkMemCheckPT::restart because destory will modify states
1274       double curTime = CmiWallTimer();
1275       if (CkMyPe() == thisFailedPe)
1276         CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
1277       stage = (char *)"recoverBuddies";
1278       if (CkMyPe() == thisFailedPe)
1279         CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
1280       startTime = curTime;
1281
1282       // recover buddies
1283       expectCount = 0;
1284 #if !CMK_CHKP_ALL
1285       for (idx=0; idx<len; idx++) {
1286         CkCheckPTInfo *entry = ckTable[idx];
1287         if (entry->pNo == thisFailedPe) {
1288 #if CK_NO_PROC_POOL
1289           // find a new buddy
1290           int budPe = BuddyPE(CkMyPe());
1291 #else
1292           int budPe = thisFailedPe;
1293 #endif
1294           CkArrayCheckPTMessage *msg = entry->getCopy();
1295           msg->bud1 = budPe;
1296           msg->bud2 = CkMyPe();
1297           msg->cp_flag = 0;            // not checkpointing
1298           thisProxy[budPe].recoverEntry(msg);
1299           expectCount ++;
1300         }
1301       }
1302 #else
1303       //send to failed pe
1304       if(CkMyPe()!=thisFailedPe&&chkpTable[1]->bud1==thisFailedPe){
1305 #if CK_NO_PROC_POOL
1306         // find a new buddy
1307         int budPe = BuddyPE(CkMyPe());
1308 #else
1309         int budPe = thisFailedPe;
1310 #endif
1311         CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[1]);
1312         CkPrintf("[%d]got message for crashed pe %d\n",CkMyPe(),thisFailedPe);
1313         msg->cp_flag = 0;            // not checkpointing
1314         msg->bud1 = budPe;
1315         msg->bud2 = CkMyPe();
1316         thisProxy[budPe].recoverEntry(msg);
1317         expectCount ++;
1318       }
1319 #endif
1320
1321       if (expectCount == 0) {
1322         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1323       }
1324       //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
1325     }
1326
1327     void CkMemCheckPT::gotData()
1328     {
1329       ackCount ++;
1330       if (ackCount == expectCount) {
1331         ackCount = 0;
1332         expectCount = -1;
1333         //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
1334         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
1335       }
1336     }
1337
1338     void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
1339     {
1340
1341       for (int i=0; i<n; i++) {
1342         CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
1343         mgr->updateLocation(idx[i], nowOnPe);
1344       }
1345       thisProxy[nowOnPe].gotReply();
1346     }
1347
1348     // restore array elements
1349     void CkMemCheckPT::recoverArrayElements()
1350     {
1351       double curTime = CmiWallTimer();
1352       int len = ckTable.length();
1353       //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
1354       stage = (char *)"recoverArrayElements";
1355       if (CkMyPe() == thisFailedPe)
1356         CmiPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
1357       startTime = curTime;
1358       int flag = 0;
1359       // recover all array elements
1360       int count = 0;
1361
1362 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1363       CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
1364       CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
1365 #endif
1366
1367 #if !CMK_CHKP_ALL
1368       for (int idx=0; idx<len; idx++)
1369       {
1370         CkCheckPTInfo *entry = ckTable[idx];
1371 #if CK_NO_PROC_POOL
1372         // the bigger one will do 
1373         //    if (CkMyPe() < entry->pNo) continue;
1374         if (!isMaster(entry->pNo)) continue;
1375 #else
1376         // smaller one do it, which has the original object
1377         if (CkMyPe() == entry->pNo+1 || 
1378             CkMyPe()+CkNumPes() == entry->pNo+1) continue;
1379 #endif
1380         //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
1381
1382         entry->updateBuddy(CkMyPe(), entry->pNo);
1383         CkArrayCheckPTMessage *msg = entry->getCopy();
1384         // gzheng
1385         //thisProxy[CkMyPe()].inmem_restore(msg);
1386         inmem_restore(msg);
1387 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1388         CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
1389         int homePe = mgr->homePe(msg->index);
1390         if (homePe != CkMyPe()) {
1391           gmap[homePe].push_back(msg->locMgr);
1392           imap[homePe].push_back(msg->index);
1393         }
1394 #endif
1395         CkFreeMsg(msg);
1396         count ++;
1397       }
1398 #else
1399       char * packData;
1400       if(CmiNumPartition()==1){
1401         CkArrayCheckPTMessage * msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&chkpTable[0]);
1402         packData = (char *)msg->packData;
1403       }
1404       else{
1405         int pointer = CpvAccess(curPointer);
1406         CkCheckPTMessage * msg = (CkCheckPTMessage *)CkCopyMsg((void **)&CpvAccess(chkpBuf)[pointer]);
1407         packData = msg->packData;
1408       }
1409 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1410       recoverAll(packData,gmap,imap);
1411 #else
1412       recoverAll(packData);
1413 #endif
1414 #endif
1415 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1416       for (int i=0; i<CkNumPes(); i++) {
1417         if (gmap[i].size() && i!=CkMyPe()&& i==thisFailedPe) {
1418           thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
1419           flag++;       
1420         }
1421       }
1422       delete [] imap;
1423       delete [] gmap;
1424 #endif
1425       DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
1426
1427       CKLOCMGR_LOOP(mgr->doneInserting(););
1428
1429       // _crashedNode = -1;
1430       CpvAccess(_crashedNode) = -1;
1431 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1432       if (CkMyPe() == 0)
1433         CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
1434 #else
1435       if(flag == 0)
1436       {
1437         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1438       }
1439 #endif
1440     }
1441
1442     void CkMemCheckPT::gotReply(){
1443       contribute(CkCallback(CkReductionTarget(CkMemCheckPT, finishUp), thisProxy));
1444     }
1445
1446     void CkMemCheckPT::recoverAll(char * packData,CkVec<CkGroupID> * gmap, CkVec<CkArrayIndex> * imap){
1447 #if CMK_CHKP_ALL
1448       PUP::fromMem p(packData);
1449       int numElements;
1450       p|numElements;
1451       if(p.isUnpacking()){
1452         for(int i=0;i<numElements;i++){
1453           CkGroupID gID;
1454           CkArrayIndex idx;
1455           p|gID;
1456           p|idx;
1457           CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1458           int homePe = mgr->homePe(idx);
1459 #if !STREAMING_INFORMHOME && CK_NO_PROC_POOL
1460           mgr->resume(idx,p,CmiTrue,CmiTrue);
1461 #else
1462           if(CmiNumPartition()==1)
1463             mgr->resume(idx,p,CmiFalse,CmiTrue);        
1464           else{
1465             if(CkMyPe()==thisFailedPe){
1466               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1467             }
1468             else{
1469               mgr->resume(idx,p,CmiFalse,CmiTrue,CmiTrue);
1470             }
1471           }
1472 #endif
1473 #if STREAMING_INFORMHOME && CK_NO_PROC_POOL
1474           homePe = mgr->homePe(idx);
1475           if (homePe != CkMyPe()) {
1476             gmap[homePe].push_back(gID);
1477             imap[homePe].push_back(idx);
1478           }
1479 #endif
1480         }
1481       }
1482 #endif
1483     }
1484
1485
1486     // on every processor
1487     // turn load balancer back on
1488     void CkMemCheckPT::finishUp()
1489     {
1490       //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
1491       //CKLOCMGR_LOOP(mgr->doneInserting(););
1492
1493       if (CkMyPe() == thisFailedPe)
1494       {
1495         CmiPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
1496         CmiPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
1497         fflush(stdout);
1498       }
1499       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1500       inRestarting = 0;
1501
1502 #if CMK_CONVERSE_MPI    
1503       if(CmiNumPartition()!=1){
1504         CpvAccess(recvdProcChkp) = 0;
1505         CpvAccess(recvdArrayChkp) = 0;
1506         CpvAccess(curPointer)^=1;
1507         //notify my replica, restart is done
1508         if (CkMyPe() == 0){
1509           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1510           CmiSetHandler(msg,replicaRecoverHandlerIdx);
1511           CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)msg);
1512         }
1513       }
1514       if (CmiMyPe() == BuddyPE(thisFailedPe)) {
1515         CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
1516       }
1517 #endif
1518
1519 #if CK_NO_PROC_POOL
1520 #if NODE_CHECKPOINT
1521       int numnodes = CmiNumPhysicalNodes();
1522 #else
1523       int numnodes = CkNumPes();
1524 #endif
1525       if (numnodes-totalFailed() <=2) {
1526         if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
1527         _memChkptOn = 0;
1528       }
1529 #endif
1530     }
1531
1532     void CkMemCheckPT::recoverFromSoftFailure()
1533     {
1534       inRestarting = 0;
1535       CpvAccess(recvdRemote) = 0;
1536       CpvAccess(recvdLocal) = 0;
1537       CpvAccess(localChkpDone) = 0;
1538       CpvAccess(remoteChkpDone) = 0;
1539       inCheckpointing = 0;
1540       notifyReplica = 0;
1541       if(CkMyPe() == 0){
1542         CmiPrintf("[%d][%d] Recover From soft failures in %lf, sending callback ... \n", CmiMyPartition(),CkMyPe(),CmiWallTimer()-startTime);
1543       }
1544       CKLOCMGR_LOOP(mgr->resumeFromChkp(););
1545     }
1546     // called only on 0
1547     void CkMemCheckPT::quiescence(CkCallback &cb)
1548     {
1549       static int pe_count = 0;
1550       pe_count ++;
1551       CmiAssert(CkMyPe() == 0);
1552       //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
1553       if (pe_count == CkNumPes()) {
1554         pe_count = 0;
1555         cb.send();
1556       }
1557     }
1558
1559     // User callable function - to start a checkpoint
1560     // callback cb is used to pass control back
1561     void CkStartMemCheckpoint(CkCallback &cb)
1562     {
1563 #if CMK_MEM_CHECKPOINT
1564       CkPrintf("partition %d start checkpoint\n",CmiMyPartition());
1565       /*if (_memChkptOn == 0) {
1566         CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
1567         cb.send();
1568         return;
1569         }*/
1570       if (CkInRestarting()) {
1571         // trying to checkpointing during restart
1572         cb.send();
1573         return;
1574       }
1575       // store user callback and user data
1576       CkMemCheckPT::cpCallback = cb;
1577
1578
1579       //send to my replica that checkpoint begins 
1580       if(CkReplicaAlive()==1){
1581         char * msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1582         CmiSetHandler(msg, replicaChkpStartHandlerIdx);
1583         CmiRemoteSyncSendAndFree(0,CmiMyPartition()^1,CmiMsgHeaderSizeBytes,msg);
1584       }
1585       CpvAccess(localStarted) = 1;
1586       // broadcast to start check pointing
1587       if(CmiNumPartition()==1||(CmiNumPartition()==2&&CpvAccess(remoteStarted)==1)||CkReplicaAlive()==0){
1588         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1589         checkptMgr.doItNow(CkMyPe());
1590       }
1591 #else
1592       // when mem checkpoint is disabled, invike cb immediately
1593       CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
1594       cb.send();
1595 #endif
1596     }
1597
1598     void CkRestartCheckPoint(int diePe)
1599     {
1600       CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1601       CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1602       // broadcast
1603       checkptMgr.restart(diePe);
1604     }
1605
1606     static int _diePE = -1;
1607
1608     // callback function used locally by ccs handler
1609     static void CkRestartCheckPointCallback(void *ignore, void *msg)
1610     {
1611       CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1612       CkRestartCheckPoint(_diePE);
1613     }
1614
1615
1616     // called on crashed PE
1617     static void restartBeginHandler(char *msg)
1618     {
1619       CmiFree(msg);
1620 #if CMK_MEM_CHECKPOINT
1621 #if CMK_USE_BARRIER
1622       if(CkMyPe()!=_diePE){
1623         printf("restar begin on %d\n",CkMyPe());
1624         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1625         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1626         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1627       }else{
1628         CkPrintf("[%d] restartBeginHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1629         CkRestartCheckPointCallback(NULL, NULL);
1630       }
1631 #else
1632       static int count = 0;
1633       CmiAssert(CkMyPe() == _diePE);
1634       count ++;
1635       if (count == CkNumPes()) {
1636         printf("restart begin on %d\n",CkMyPe());
1637         CkRestartCheckPointCallback(NULL, NULL);
1638         count = 0;
1639       }
1640 #endif
1641 #endif
1642     }
1643
1644     extern void _discard_charm_message();
1645     extern void _resume_charm_message();
1646
1647     static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1648       return data;
1649     }
1650
1651     static void restartBcastHandler(char *msg)
1652     {
1653 #if CMK_MEM_CHECKPOINT
1654       // advance phase counter
1655       CkMemCheckPT::inRestarting = 1;
1656       _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1657       // gzheng
1658       //if (CkMyPe() != _diePE) cur_restart_phase ++;
1659
1660       if (CkMyPe()==_diePE)
1661         CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1662
1663       // reset QD counters
1664       /*  gzheng
1665           if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1666        */
1667
1668       /*  gzheng
1669           if (CkMyPe()==_diePE)
1670           CkRestartCheckPointCallback(NULL, NULL);
1671        */
1672       CmiFree(msg);
1673
1674       _resume_charm_message();
1675
1676       // reduction
1677       char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1678       CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1679 #if CMK_USE_BARRIER
1680       //CmiPrintf("before reduce\n");   
1681       CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1682       //CmiPrintf("after reduce\n");    
1683 #else
1684       CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1685 #endif 
1686       checkpointed = 0;
1687 #endif
1688     }
1689
1690     extern void _initDone();
1691
1692     bool compare(char * buf1, char *buf2){
1693       if(CkMyPe()==0)
1694         CmiPrintf("[%d][%d] comparison begin at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1695       PUP::checker pchecker(buf1,buf2);
1696       pchecker.skip();
1697       int numElements;
1698       pchecker|numElements;
1699       for(int i=0;i<numElements;i++){
1700         CkGroupID gID;
1701         CkArrayIndex idx;
1702
1703         pchecker|gID;
1704         pchecker|idx;
1705
1706         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1707         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1708       }
1709       if(CkMyPe()==0)
1710         CmiPrintf("[%d][%d]local comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1711       //int fault_num = pchecker.getFaultNum();
1712       bool result = pchecker.getResult();
1713       /*if(!result){
1714         CmiPrintf("[%d][%d]fault region %d\n",CmiMyPartition(),CkMyPe(),fault_num);
1715       }*/
1716       return result;
1717     }
1718     int getChecksum(char * buf){
1719       PUP::checker pchecker(buf);
1720       pchecker.skip();
1721       int numElements;
1722       pchecker|numElements;
1723 //      CkPrintf("num %d\n",numElements);
1724       for(int i=0;i<numElements;i++){
1725         CkGroupID gID;
1726         CkArrayIndex idx;
1727
1728         pchecker|gID;
1729         pchecker|idx;
1730
1731         CkLocMgr * mgr = (CkLocMgr *)CkpvAccess(_groupTable)->find(gID).getObj();
1732         mgr->resume(idx,pchecker,CmiFalse,CmiFalse,CmiFalse);
1733       }
1734       return pchecker.getChecksum();
1735     }
1736
1737     static void recvRemoteChkpHandler(char *msg){
1738       CpvAccess(remoteChkpDone) = 1;
1739       if(CpvAccess(use_checksum)){
1740         if(CkMyPe()==0)
1741           CmiPrintf("[%d][%d] receive checksum at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1742         CpvAccess(remoteChecksum) = *(int *)(msg+CmiMsgHeaderSizeBytes);
1743         CpvAccess(recvdRemote) = 1;
1744         if(CpvAccess(recvdLocal)==1){
1745           if(CpvAccess(remoteChecksum) == CpvAccess(localChecksum)){
1746             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1747           }
1748           else{
1749             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1750           }
1751         }
1752       }else{
1753         envelope *env = (envelope *)msg;
1754         CkUnpackMessage(&env);
1755         CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1756         if(CpvAccess(recvdLocal)==1){
1757           int pointer = CpvAccess(curPointer);
1758           int size = CpvAccess(chkpBuf)[pointer]->len;
1759           if(chkpMsg->len == size && compare((char *)(CpvAccess(chkpBuf)[pointer]->packData),(char *)(chkpMsg->packData))){
1760             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(true);
1761           }else
1762           {
1763             CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneComparison(false);
1764           }
1765           delete chkpMsg;
1766           if(CkMyPe()==0)
1767             CmiPrintf("[%d][%d] comparison done at %lf\n",CmiMyPartition(),CkMyPe(),CmiWallTimer());
1768         }else{
1769           CpvAccess(recvdRemote) = 1;
1770           if(CpvAccess(buddyBuf)) delete CpvAccess(buddyBuf);
1771           CpvAccess(buddyBuf) = chkpMsg;
1772         }
1773       }
1774     }
1775
1776     static void replicaRecoverHandler(char *msg){
1777       //fflush(stdout);
1778       //CmiPrintf("[%d]receive replica recover\n",CmiMyPe());
1779       CpvAccess(remoteChkpDone) = 1;
1780       if(CpvAccess(localChkpDone) == 1)
1781         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(CkNumPes());
1782       CmiFree(msg);
1783     }
1784
1785     static void replicaChkpDoneHandler(char *msg){
1786       CpvAccess(remoteChkpDone) = 1;
1787       int ret = *(int *)(msg+CmiMsgHeaderSizeBytes);
1788       if(CpvAccess(localChkpDone) == 1)
1789         CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->doneRComparison(ret);
1790       CmiFree(msg);
1791     }
1792
1793     static void replicaDieHandler(char * msg){
1794 #if CMK_CONVERSE_MPI
1795       //broadcast to every one in my replica
1796       CmiSetHandler(msg, replicaDieBcastHandlerIdx);
1797       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1798 #endif
1799     }
1800
1801     static void replicaChkpStartHandler(char * msg){
1802       CpvAccess(remoteStarted) =1;
1803       if(CpvAccess(localStarted)==1){    
1804         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1805         checkptMgr.doItNow(0);
1806       }
1807     }
1808
1809
1810     static void replicaDieBcastHandler(char *msg){
1811       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1812       CpvAccess(_remoteCrashedNode) = diePe;
1813       CkMemCheckPT::replicaAlive = 0;
1814       if(CkMyPe()==diePe){
1815         CmiPrintf("pe %d in replicad word die\n",diePe);
1816         CmiPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
1817         fflush(stdout);
1818       }
1819       find_spare_mpirank(diePe,CmiMyPartition()^1);
1820       //broadcast to my partition to get local max iter
1821       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->getIter();
1822       CmiFree(msg);
1823     }
1824
1825     static void recoverRemoteProcDataHandler(char *msg){
1826       envelope *env = (envelope *)msg;
1827       CkUnpackMessage(&env);
1828       CkCheckPTMessage* procMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1829
1830       //store the checkpoint
1831       int pointer = procMsg->pointer;
1832
1833
1834       if(CkMyPe()==CpvAccess(_crashedNode)){
1835         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1836         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1837         PUP::fromMem p(procMsg->packData);
1838         _handleProcData(p,CmiTrue);
1839         _initDone();
1840       }
1841       else{
1842         if(CpvAccess(localProcChkpBuf)[pointer]) delete CpvAccess(localProcChkpBuf)[pointer];
1843         CpvAccess(localProcChkpBuf)[pointer] = procMsg; 
1844         //_handleProcData(p,CmiFalse);
1845       }
1846       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1847       CKLOCMGR_LOOP(mgr->startInserting(););
1848
1849       CpvAccess(recvdProcChkp) =1;
1850       if(CpvAccess(recvdArrayChkp)==1){
1851         _resume_charm_message();
1852         _diePE = CpvAccess(_crashedNode);
1853         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1854         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1855         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1856       }
1857     }
1858
1859     static void recoverRemoteArrayDataHandler(char *msg){
1860       envelope *env = (envelope *)msg;
1861       CkUnpackMessage(&env);
1862       CkCheckPTMessage* chkpMsg = (CkCheckPTMessage *)(EnvToUsr(env));
1863
1864       //store the checkpoint
1865       int pointer = chkpMsg->pointer;
1866       CpvAccess(curPointer) = pointer;
1867       if(CpvAccess(chkpBuf)[pointer]) delete CpvAccess(chkpBuf)[pointer];
1868       CpvAccess(chkpBuf)[pointer] = chkpMsg;
1869       CpvAccess(recvdArrayChkp) =1;
1870       CkMemCheckPT::inRestarting = 1;
1871       if(CpvAccess(recvdProcChkp) == 1||CkMyPe()!= CpvAccess(_crashedNode)){
1872         _resume_charm_message();
1873         _diePE = CpvAccess(_crashedNode);
1874         //CmiPrintf("[%d] send to die pe %d\n",CkMyPe(),_diePE);
1875         char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1876         CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1877         CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1878       }
1879     }
1880
1881     static void recvPhaseHandler(char * msg)
1882     {
1883       CpvAccess(_curRestartPhase)--;
1884       CkMemCheckPT::inRestarting = 1;
1885       CmiFree(msg);
1886       //notify the buddy in the replica now i can receive the checkpoint msg
1887       char *rmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1888       CmiSetHandler(rmsg, askRecoverDataHandlerIdx);
1889       CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes,(char *)rmsg);
1890       //timer start
1891       if(CpvAccess( _crashedNode )==CkMyPe()){
1892         CkMemCheckPT::startTime = restartT = CmiWallTimer();
1893         CmiPrintf("[%d][%d] ask for checkpoint data in replica at %lf\n",CmiMyPartition(),CmiMyPe(),CmiWallTimer());
1894       }
1895     }
1896     
1897     static void askRecoverDataHandler(char * msg){
1898       CpvAccess(remoteReady)=1;
1899       if(CpvAccess(localReady)==1){
1900         if(CmiMyPe() == CpvAccess(_remoteCrashedNode))
1901         {       
1902           envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1903           CkPackMessage(&env);
1904           CmiSetHandler(env,recoverRemoteProcDataHandlerIdx);
1905           CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1906           CmiPrintf("[%d] sendProcdata\n",CmiMyPe());
1907         }
1908         //send the array checkpoint data
1909         envelope * env = (envelope *)(UsrToEnv(CpvAccess(recoverArrayBuf)));
1910         CkPackMessage(&env);
1911         CmiSetHandler(env,recoverRemoteArrayDataHandlerIdx);
1912         CmiRemoteSyncSendAndFree(CmiMyPe(),CmiMyPartition()^1,env->getTotalsize(),(char *)env);
1913         
1914       }
1915     }
1916     // called on crashed processor
1917     static void recoverProcDataHandler(char *msg)
1918     {
1919 #if CMK_MEM_CHECKPOINT
1920       int i;
1921       envelope *env = (envelope *)msg;
1922       CkUnpackMessage(&env);
1923       CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1924       CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1925       CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1926       PUP::fromMem p(procMsg->packData);
1927       _handleProcData(p,CmiTrue);
1928
1929       CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1930       // gzheng
1931       CKLOCMGR_LOOP(mgr->startInserting(););
1932
1933       char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1934       *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1935       CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1936       CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1937
1938       _initDone();
1939       //   CpvAccess(_qd)->flushStates();
1940       CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1941 #endif
1942     }
1943     //for replica, got the phase number from my neighbor processor in the current partition
1944     static void askPhaseHandler(char *msg)
1945     {
1946 #if CMK_MEM_CHECKPOINT
1947       CmiPrintf("[%d] ---received asks for phase\n",CkMyPe());
1948       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1949       *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_curRestartPhase);
1950       CmiSetHandler(msg, recvPhaseHandlerIdx);
1951       CmiSyncSendAndFree(diePe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1952 #endif
1953     }
1954     // called on its backup processor
1955     // get backup message buffer and sent to crashed processor
1956     static void askProcDataHandler(char *msg)
1957     {
1958 #if CMK_MEM_CHECKPOINT
1959       int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1960       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1961       if (CpvAccess(procChkptBuf) == NULL)  {
1962         CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1963         CkAbort("no checkpoint found");
1964       }
1965       envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1966       CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1967
1968       CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1969
1970       CkPackMessage(&env);
1971       CmiSetHandler(env, recoverProcDataHandlerIdx);
1972       CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1973       CpvAccess(procChkptBuf) = NULL;
1974       CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d done at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1975 #endif
1976     }
1977
1978     // called on PE 0
1979     void qd_callback(void *m)
1980     {
1981       CmiPrintf("[%d] callback after QD for crashed node: %d. at %lf\n", CkMyPe(), CpvAccess(_crashedNode),CmiWallTimer());
1982       fflush(stdout);
1983       CkFreeMsg(m);
1984       if(CmiNumPartition()==1){
1985 #ifdef CMK_SMP
1986         for(int i=0;i<CmiMyNodeSize();i++){
1987           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1988           *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1989           CmiSetHandler(msg, askProcDataHandlerIdx);
1990           int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1991           CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1992         }
1993         return;
1994 #endif
1995         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1996         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1997         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1998         CmiSetHandler(msg, askProcDataHandlerIdx);
1999         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2000         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2001       }
2002       else{
2003         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2004         CmiSetHandler(msg, recvPhaseHandlerIdx);
2005         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes, (char *)msg);
2006       }
2007     }
2008
2009     // on crashed node
2010     void CkMemRestart(const char *dummy, CkArgMsg *args)
2011     {
2012 #if CMK_MEM_CHECKPOINT
2013       _diePE = CmiMyNode();
2014       CpvAccess( _crashedNode )= CmiMyNode();
2015       CkMemCheckPT::inRestarting = 1;
2016       _discard_charm_message();
2017
2018       /*if(CmiMyRank()==0){
2019         CkCallback cb(qd_callback);
2020         CkStartQD(cb);
2021         CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
2022         }*/
2023       CkMemCheckPT::startTime = restartT = CmiWallTimer();
2024       if(CmiNumPartition()==1){
2025         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
2026         restartT = CmiWallTimer();
2027         CmiPrintf("[%d] I am restarting  cur_restart_phase:%d discard charm message at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), restartT);
2028         fflush(stdout);
2029         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2030         *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
2031         // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
2032         CmiSetHandler(msg, askProcDataHandlerIdx);
2033         int pe = ChkptOnPe(CpvAccess(_crashedNode));
2034         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2035       }
2036       else{
2037         CkCallback cb(qd_callback);
2038         CkStartQD(cb);
2039       }
2040 #else
2041       CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
2042 #endif
2043     }
2044
2045     // can be called in other files
2046     // return true if it is in restarting
2047     extern "C"
2048       int CkInRestarting()
2049       {
2050 #if CMK_MEM_CHECKPOINT
2051         if (CpvAccess( _crashedNode)!=-1) return 1;
2052         // gzheng
2053         //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
2054         //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
2055         return CkMemCheckPT::inRestarting;
2056 #else
2057         return 0;
2058 #endif
2059       }
2060
2061     extern "C"
2062       int CkReplicaAlive()
2063       {
2064         //      if(CkMyPe() == CpvAccess(_remoteCrashedNode))
2065         //        CkPrintf("replicaAlive %d\n",CkMemCheckPT::replicaAlive);
2066         return CkMemCheckPT::replicaAlive;
2067
2068         /*if(CkMemCheckPT::replicaDead==1)
2069           return 0;
2070           else          
2071           return 1;*/
2072       }
2073
2074     extern "C"
2075       int CkInCheckpointing()
2076       {
2077         return CkMemCheckPT::inCheckpointing;
2078       }
2079
2080     extern "C"
2081       void CkSetInLdb(){
2082 #if CMK_MEM_CHECKPOINT
2083         CkMemCheckPT::inLoadbalancing = 1;
2084 #endif
2085       }
2086
2087     extern "C"
2088       int CkInLdb(){
2089 #if CMK_MEM_CHECKPOINT
2090         return CkMemCheckPT::inLoadbalancing;
2091 #endif
2092         return 0;
2093       }
2094
2095     extern "C"
2096       void CkResetInLdb(){
2097 #if CMK_MEM_CHECKPOINT
2098         CkMemCheckPT::inLoadbalancing = 0;
2099 #endif
2100       }
2101
2102     /*****************************************************************************
2103       module initialization
2104      *****************************************************************************/
2105
2106     static int arg_where = CkCheckPoint_inMEM;
2107
2108 #if CMK_MEM_CHECKPOINT
2109     void init_memcheckpt(char **argv)
2110     {
2111       if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
2112         arg_where = CkCheckPoint_inDISK;
2113       }
2114       CpvInitialize(int, use_checksum);
2115       CpvInitialize(int, resilience);
2116       CpvAccess(use_checksum)=0;
2117       CpvAccess(resilience)=0;
2118       if(CmiGetArgFlagDesc(argv, "+use_checksum", "use checksum strategy")){
2119         CpvAccess(use_checksum)=1;
2120       }
2121       if(CmiGetArgFlagDesc(argv, "+strong_resilience", "use strong resilience")){
2122         CpvAccess(resilience)=1;
2123       }
2124       if(CmiGetArgFlagDesc(argv, "+weak_resilience", "use strong resilience")){
2125         CpvAccess(resilience)=2;
2126       }
2127       if(CmiGetArgFlagDesc(argv, "+medium_resilience", "use strong resilience")){
2128         CpvAccess(resilience)=3;
2129       }
2130       // initiliazing _crashedNode variable
2131       CpvInitialize(int, _crashedNode);
2132       CpvInitialize(int, _remoteCrashedNode);
2133       CpvAccess(_crashedNode) = -1;
2134       CpvAccess(_remoteCrashedNode) = -1;
2135       init_FI(argv);
2136     }
2137 #endif
2138
2139     class CkMemCheckPTInit: public Chare {
2140       public:
2141         CkMemCheckPTInit(CkArgMsg *m) {
2142 #if CMK_MEM_CHECKPOINT
2143           if (arg_where == CkCheckPoint_inDISK) {
2144             CkPrintf("Charm++> Double-disk Checkpointing. \n");
2145           }
2146           ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
2147           CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
2148 #endif
2149         }
2150     };
2151
2152     static void notifyHandler(char *msg)
2153     {
2154 #if CMK_MEM_CHECKPOINT
2155       CmiFree(msg);
2156       /* immediately increase restart phase to filter old messages */
2157       CpvAccess(_curRestartPhase) ++;
2158       CpvAccess(_qd)->flushStates();
2159       _discard_charm_message();
2160
2161 #endif
2162     }
2163
2164     extern "C"
2165       void notify_crash(int node)
2166       {
2167 #ifdef CMK_MEM_CHECKPOINT
2168         CpvAccess( _crashedNode) = node;
2169 #ifdef CMK_SMP
2170         for(int i=0;i<CkMyNodeSize();i++){
2171           CpvAccessOther(_crashedNode,i)=node;
2172         }
2173 #endif
2174         //CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
2175         CkMemCheckPT::inRestarting = 1;
2176
2177         // this may be in interrupt handler, send a message to reset QD
2178         int pe = CmiNodeFirst(CkMyNode());
2179         for(int i=0;i<CkMyNodeSize();i++){
2180           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
2181           CmiSetHandler(msg, notifyHandlerIdx);
2182           CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
2183         }
2184 #endif
2185       }
2186
2187     extern "C" void (*notify_crash_fn)(int node);
2188
2189
2190 #if CMK_CONVERSE_MPI
2191     void buddyDieHandler(char *msg)
2192     {
2193 #if CMK_MEM_CHECKPOINT
2194       // notify
2195       CkMemCheckPT::inRestarting = 1;
2196       int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
2197       notify_crash(diepe);
2198       // send message to crash pe to let it restart
2199       CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2200       int newrank = find_spare_mpirank(diepe,CmiMyPartition());
2201       int buddy = obj->BuddyPE(CmiMyPe());
2202       //if (CmiMyPe() == obj->BuddyPE(diepe))  {
2203       if (buddy == diepe)  {
2204         mpi_restart_crashed(diepe, newrank);
2205       }
2206 #endif
2207     }
2208
2209     void pingHandler(void *msg)
2210     {
2211       lastPingTime = CmiWallTimer();
2212       CmiFree(msg);
2213     }
2214
2215     void pingCheckHandler()
2216     {
2217 #if CMK_MEM_CHECKPOINT
2218       double now = CmiWallTimer();
2219       if (lastPingTime > 0 && now - lastPingTime > 4 && !CkInLdb() && !CkInRestarting() && !CkInCheckpointing()) {
2220         //if (lastPingTime > 0 && now - lastPingTime > 2 && !CkInLdb()) {
2221         int i, pe, buddy;
2222         // tell everyone the buddy dies
2223         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2224         for (i = 1; i < CmiNumPes(); i++) {
2225           pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
2226           if (obj->BuddyPE(pe) == CmiMyPe()) break;
2227         }
2228         buddy = pe;
2229         CmiPrintf("[%d][%d] detected buddy processor %d died %f %f. \n",CmiMyPartition(), CmiMyPe(), buddy, now, lastPingTime);
2230         fflush(stdout);
2231         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2232         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
2233         CmiSetHandler(msg, buddyDieHandlerIdx);
2234         CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2235         //send to everyone in the other world
2236         if(CmiNumPartition()!=1){
2237           //for(int i=0;i<CmiNumPes();i++){
2238             char * rMsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2239             *(int *)(rMsg+CmiMsgHeaderSizeBytes) = buddy;
2240             CmiSetHandler(rMsg, replicaDieHandlerIdx);
2241             CmiRemoteSyncSendAndFree(CkMyPe(),CmiMyPartition()^1,CmiMsgHeaderSizeBytes+sizeof(int),(char *)rMsg);
2242           //}
2243         }
2244       }
2245         else 
2246           CcdCallOnCondition(CcdPERIODIC_1s,(CcdVoidFn)pingCheckHandler,NULL);
2247 #endif
2248       }
2249
2250       void pingBuddy()
2251       {
2252 #if CMK_MEM_CHECKPOINT
2253         CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
2254         if (obj) {
2255           int buddy = obj->BuddyPE(CkMyPe());
2256           char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
2257           *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
2258           CmiSetHandler(msg, pingHandlerIdx);
2259           CmiGetRestartPhase(msg) = 9999;
2260           CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
2261         }
2262         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
2263 #endif
2264       }
2265 #endif
2266
2267       // initproc
2268       void CkRegisterRestartHandler( )
2269       {
2270 #if CMK_MEM_CHECKPOINT
2271         notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
2272         askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
2273         askRecoverDataHandlerIdx = CkRegisterHandler((CmiHandler)askRecoverDataHandler);
2274         recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
2275         restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
2276         restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
2277
2278         //for replica
2279         recvRemoteChkpHandlerIdx = CkRegisterHandler((CmiHandler)recvRemoteChkpHandler);
2280         replicaDieHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieHandler);
2281         replicaChkpStartHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpStartHandler);
2282         replicaDieBcastHandlerIdx = CkRegisterHandler((CmiHandler)replicaDieBcastHandler);
2283         replicaRecoverHandlerIdx = CkRegisterHandler((CmiHandler)replicaRecoverHandler);
2284         replicaChkpDoneHandlerIdx = CkRegisterHandler((CmiHandler)replicaChkpDoneHandler);
2285         askPhaseHandlerIdx = CkRegisterHandler((CmiHandler)askPhaseHandler);
2286         recvPhaseHandlerIdx = CkRegisterHandler((CmiHandler)recvPhaseHandler);
2287         recoverRemoteProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteProcDataHandler);
2288         recoverRemoteArrayDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverRemoteArrayDataHandler);
2289
2290 #if CMK_CONVERSE_MPI
2291         pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
2292         pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
2293         buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
2294 #endif
2295
2296         CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
2297         CpvInitialize(CkCheckPTMessage **, chkpBuf);
2298         CpvInitialize(CkCheckPTMessage **, localProcChkpBuf);
2299         CpvInitialize(CkCheckPTMessage *, buddyBuf);
2300         CpvInitialize(CkCheckPTMessage *, recoverProcBuf);
2301         CpvInitialize(CkCheckPTMessage *, recoverArrayBuf);
2302         CpvInitialize(int,curPointer);
2303         CpvInitialize(int,recvdLocal);
2304         CpvInitialize(int,localChkpDone);
2305         CpvInitialize(int,remoteChkpDone);
2306         CpvInitialize(int,remoteStarted);
2307         CpvInitialize(int,localStarted);
2308         CpvInitialize(int,localReady);
2309         CpvInitialize(int,remoteReady);
2310         CpvInitialize(int,recvdRemote);
2311         CpvInitialize(int,recvdProcChkp);
2312         CpvInitialize(int,localChecksum);
2313         CpvInitialize(int,remoteChecksum);
2314         CpvInitialize(int,recvdArrayChkp);
2315
2316         CpvAccess(procChkptBuf) = NULL;
2317         CpvAccess(buddyBuf) = NULL;
2318         CpvAccess(recoverProcBuf) = NULL;
2319         CpvAccess(recoverArrayBuf) = NULL;
2320         CpvAccess(chkpBuf) = new CkCheckPTMessage *[2];
2321         CpvAccess(localProcChkpBuf) = new CkCheckPTMessage *[2];
2322         CpvAccess(chkpBuf)[0] = NULL;
2323         CpvAccess(chkpBuf)[1] = NULL;
2324         CpvAccess(localProcChkpBuf)[0] = NULL;
2325         CpvAccess(localProcChkpBuf)[1] = NULL;
2326
2327         CpvAccess(curPointer) = 0;
2328         CpvAccess(recvdLocal) = 0;
2329         CpvAccess(localChkpDone) = 0;
2330         CpvAccess(remoteChkpDone) = 0;
2331         CpvAccess(remoteStarted) = 0;
2332         CpvAccess(localStarted) = 0;
2333         CpvAccess(localReady) = 0;
2334         CpvAccess(remoteReady) = 0;
2335         CpvAccess(recvdRemote) = 0;
2336         CpvAccess(recvdProcChkp) = 0;
2337         CpvAccess(localChecksum) = 0;
2338         CpvAccess(remoteChecksum) = 0;
2339         CpvAccess(recvdArrayChkp) = 0;
2340
2341         notify_crash_fn = notify_crash;
2342
2343 #if ! CMK_CONVERSE_MPI
2344         // print pid to kill
2345         //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
2346         //  sleep(4);
2347 #endif
2348 #endif
2349       }
2350
2351
2352       extern "C"
2353         int CkHasCheckpoints()
2354         {
2355           return checkpointed;
2356         }
2357
2358       /// @todo: the following definitions should be moved to a separate file containing
2359       // structures and functions about fault tolerance strategies
2360
2361       /**
2362        *  * @brief: function for killing a process                                             
2363        *   */
2364 #ifdef CMK_MEM_CHECKPOINT
2365 #if CMK_HAS_GETPID
2366       void killLocal(void *_dummy,double curWallTime){
2367         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
2368         if(CmiWallTimer()<killTime-1){
2369           CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
2370         }else{ 
2371 #if CMK_CONVERSE_MPI
2372           CkDieNow();
2373 #else 
2374           kill(getpid(),SIGKILL);                                               
2375 #endif
2376         }              
2377       } 
2378 #else
2379       void killLocal(void *_dummy,double curWallTime){
2380         CmiAbort("kill() not supported!");
2381       }
2382 #endif
2383 #endif
2384
2385 #ifdef CMK_MEM_CHECKPOINT
2386       /**
2387        * @brief: reads the file with the kill information
2388        */
2389       void readKillFile(){
2390         FILE *fp=fopen(killFile,"r");
2391         if(!fp){
2392           printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
2393           return;
2394         }
2395         int proc;
2396         double sec;
2397         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
2398           if(proc == CkMyNode() && CkMyRank() == 0){
2399             killTime = CmiWallTimer()+sec;
2400             printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
2401             CcdCallFnAfter(killLocal,NULL,sec*1000);
2402           }
2403         }
2404         fclose(fp);
2405       }
2406
2407 #if ! CMK_CONVERSE_MPI
2408       void CkDieNow()
2409       {
2410 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
2411         // ignored for non-mpi version
2412         CmiPrintf("[%d] die now.\n", CmiMyPe());
2413         killTime = CmiWallTimer()+0.001;
2414         CcdCallFnAfter(killLocal,NULL,1);
2415 #endif
2416       }
2417 #endif
2418
2419 #endif
2420
2421 #include "CkMemCheckpoint.def.h"
2422
2423