Merge branch 'charm' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 //use CmiReduce in converse restart
58 #define CMK_USE_BARRIER 1       
59
60 // pick buddy processor from a different physical node
61 #define NODE_CHECKPOINT                        0
62
63 // assume NO extra processors--1
64 // assume extra processors--0
65 #if CMK_CONVERSE_MPI
66 #define CK_NO_PROC_POOL                         0
67 #else
68 #define CK_NO_PROC_POOL                         0
69 #endif
70
71 #define STREAMING_INFORMHOME                    1
72 CpvDeclare(int, _crashedNode);
73
74 // static, so that it is accessible from Converse part
75 int CkMemCheckPT::inRestarting = 0;
76 double CkMemCheckPT::startTime;
77 char *CkMemCheckPT::stage;
78 CkCallback CkMemCheckPT::cpCallback;
79
80 int _memChkptOn = 1;                    // checkpoint is on or off
81
82 CkGroupID ckCheckPTGroupID;             // readonly
83
84 static int checkpointed = 0;
85
86 /// @todo the following declarations should be moved into a separate file for all 
87 // fault tolerant strategies
88
89 #ifdef CMK_MEM_CHECKPOINT
90 // name of the kill file that contains processes to be killed 
91 char *killFile;                                               
92 // flag for the kill file         
93 int killFlag=0;
94 // variable for storing the killing time
95 double killTime=0.0;
96 #endif
97
98 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
99 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
100
101 // compute the backup processor
102 // FIXME: avoid crashed processors
103 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
104
105 inline int CkMemCheckPT::BuddyPE(int pe)
106 {
107   int budpe;
108 #if NODE_CHECKPOINT
109     // buddy is the processor with same rank on the next physical node
110   int r1 = CmiPhysicalRank(pe);
111   int budnode = CmiPhysicalNodeID(pe);
112   do {
113     budnode = (budnode+1)%CmiNumPhysicalNodes();
114     int *pelist;
115     int num;
116     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
117     budpe = pelist[r1 % num];
118   } while (isFailed(budpe));
119   if (budpe == pe) {
120     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
121     CmiAbort("Failed to find a buddy processor");
122   }
123 #else
124   budpe = pe;
125   while (budpe == pe || isFailed(budpe)) 
126           budpe = (budpe+1)%CkNumPes();
127 #endif
128   return budpe;
129 }
130
131 // called in array element constructor
132 // choose and register with 2 buddies for checkpoiting 
133 #if CMK_MEM_CHECKPOINT
134 void ArrayElement::init_checkpt() {
135         if (_memChkptOn == 0) return;
136         if (CkInRestarting()) {
137           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
138         }
139         // only master init checkpoint
140         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
141
142         budPEs[0] = CkMyPe();
143         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
144         CmiAssert(budPEs[0] != budPEs[1]);
145         // inform checkPTMgr
146         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
147         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
148         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
149         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
150 }
151 #endif
152
153 // entry function invoked by checkpoint mgr asking for checkpoint data
154 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
155 #if CMK_MEM_CHECKPOINT
156 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
157 //char index[128];   thisIndexMax.sprint(index);
158 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
159   CkLocMgr *locMgr = thisArray->getLocMgr();
160   CmiAssert(myRec!=NULL);
161   int size;
162   {
163         PUP::sizer p;
164         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
165         size = p.size();
166   }
167   int packSize = size/sizeof(double) +1;
168   CkArrayCheckPTMessage *msg =
169                  new (packSize, 0) CkArrayCheckPTMessage;
170   msg->len = size;
171   msg->index =thisIndexMax;
172   msg->aid = thisArrayID;
173   msg->locMgr = locMgr->getGroupID();
174   msg->cp_flag = 1;
175   {
176         PUP::toMem p(msg->packData);
177         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
178   }
179
180   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
181   checkptMgr.recvData(msg, 2, budPEs);
182   delete m;
183 #endif
184 }
185
186 // checkpoint holder class - for memory checkpointing
187 class CkMemCheckPTInfo: public CkCheckPTInfo
188 {
189   CkArrayCheckPTMessage *ckBuffer;
190 public:
191   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
192                     CkCheckPTInfo(a, loc, idx, pno)
193   {
194     ckBuffer = NULL;
195   }
196   ~CkMemCheckPTInfo() 
197   {
198     if (ckBuffer) delete ckBuffer; 
199   }
200   inline void updateBuffer(CkArrayCheckPTMessage *data) 
201   {
202     CmiAssert(data!=NULL);
203     if (ckBuffer) delete ckBuffer;
204     ckBuffer = data;
205   }    
206   inline CkArrayCheckPTMessage * getCopy()
207   {
208     if (ckBuffer == NULL) {
209       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
210       CmiAbort("Abort!");
211     }
212     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
213   }     
214   inline void updateBuddy(int b1, int b2) {
215      CmiAssert(ckBuffer);
216      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
217      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
218      CmiAssert(pNo != CkMyPe());
219   }
220   inline int getSize() { 
221      CmiAssert(ckBuffer);
222      return ckBuffer->len; 
223   }
224 };
225
226 // checkpoint holder class - for in-disk checkpointing
227 class CkDiskCheckPTInfo: public CkCheckPTInfo 
228 {
229   char *fname;
230   int bud1, bud2;
231   int len;                      // checkpoint size
232 public:
233   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
234   {
235 #if CMK_USE_MKSTEMP
236     fname = new char[64];
237     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
238     mkstemp(fname);
239 #else
240     fname=tmpnam(NULL);
241 #endif
242     bud1 = bud2 = -1;
243     len = 0;
244   }
245   ~CkDiskCheckPTInfo() 
246   {
247     remove(fname);
248   }
249   inline void updateBuffer(CkArrayCheckPTMessage *data) 
250   {
251     double t = CmiWallTimer();
252     // unpack it
253     envelope *env = UsrToEnv(data);
254     CkUnpackMessage(&env);
255     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
256     FILE *f = fopen(fname,"wb");
257     PUP::toDisk p(f);
258     CkPupMessage(p, (void **)&data);
259     // delay sync to the end because otherwise the messages are blocked
260 //    fsync(fileno(f));
261     fclose(f);
262     bud1 = data->bud1;
263     bud2 = data->bud2;
264     len = data->len;
265     delete data;
266     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
267   }
268   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
269   {
270     CkArrayCheckPTMessage *data;
271     FILE *f = fopen(fname,"rb");
272     PUP::fromDisk p(f);
273     CkPupMessage(p, (void **)&data);
274     fclose(f);
275     data->bud1 = bud1;                          // update the buddies
276     data->bud2 = bud2;
277     return data;
278   }
279   inline void updateBuddy(int b1, int b2) {
280      bud1 = b1; bud2 = b2;
281      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
282      CmiAssert(pNo != CkMyPe());
283   }
284   inline int getSize() { 
285      return len; 
286   }
287 };
288
289 CkMemCheckPT::CkMemCheckPT(int w)
290 {
291   int numnodes = 0;
292 #if NODE_CHECKPOINT
293   numnodes = CmiNumPhysicalNodes();
294 #else
295   numnodes = CkNumPes();
296 #endif
297 #if CK_NO_PROC_POOL
298   if (numnodes <= 2)
299 #else
300   if (numnodes  == 1)
301 #endif
302   {
303     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
304     _memChkptOn = 0;
305   }
306   inRestarting = 0;
307   recvCount = peCount = 0;
308   ackCount = 0;
309   expectCount = -1;
310   where = w;
311
312 #if CMK_CONVERSE_MPI
313   void pingBuddy();
314   void pingCheckHandler();
315   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
316   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
317 #endif
318 }
319
320 CkMemCheckPT::~CkMemCheckPT()
321 {
322   int len = ckTable.length();
323   for (int i=0; i<len; i++) {
324     delete ckTable[i];
325   }
326 }
327
328 void CkMemCheckPT::pup(PUP::er& p) 
329
330   CBase_CkMemCheckPT::pup(p); 
331   p|cpStarter;
332   p|thisFailedPe;
333   p|failedPes;
334   p|ckCheckPTGroupID;           // recover global variable
335   p|cpCallback;                 // store callback
336   p|where;                      // where to checkpoint
337   p|peCount;
338   if (p.isUnpacking()) {
339     recvCount = 0;
340 #if CMK_CONVERSE_MPI
341     void pingBuddy();
342     void pingCheckHandler();
343     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
344     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
345 #endif
346   }
347 }
348
349 // called by checkpoint mgr to restore an array element
350 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
351 {
352 #if CMK_MEM_CHECKPOINT
353   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
354   // m->index.print();
355   PUP::fromMem p(m->packData);
356   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
357   CmiAssert(mgr);
358 #if STREAMING_INFORMHOME
359   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
360 #else
361   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
362 #endif
363
364   // find a list of array elements bound together
365   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
366   CmiAssert(elt);
367   CkLocRec_local *rec = elt->myRec;
368   CkVec<CkMigratable *> list;
369   mgr->migratableList(rec, list);
370   CmiAssert(list.length() > 0);
371   for (int l=0; l<list.length(); l++) {
372     elt = (ArrayElement *)list[l];
373     elt->budPEs[0] = m->bud1;
374     elt->budPEs[1] = m->bud2;
375     //    reset, may not needed now
376     // for now.
377     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
378       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
379       if (c) c->redNo = 0;
380     }
381   }
382 #endif
383 }
384
385 // return 1 if pe is a crashed processor
386 int CkMemCheckPT::isFailed(int pe)
387 {
388   for (int i=0; i<failedPes.length(); i++)
389     if (failedPes[i] == pe) return 1;
390   return 0;
391 }
392
393 // add pe into history list of all failed processors
394 void CkMemCheckPT::failed(int pe)
395 {
396   if (isFailed(pe)) return;
397   failedPes.push_back(pe);
398 }
399
400 int CkMemCheckPT::totalFailed()
401 {
402   return failedPes.length();
403 }
404
405 // create an checkpoint entry for array element of aid with index.
406 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
407 {
408   // error check, no duplicate
409   int idx, len = ckTable.size();
410   for (idx=0; idx<len; idx++) {
411     CkCheckPTInfo *entry = ckTable[idx];
412     if (index == entry->index) {
413       if (loc == entry->locMgr) {
414           // bindTo array elements
415           return;
416       }
417         // for array inheritance, the following check may fail
418         // because ArrayElement constructor of all superclasses are called
419       if (aid == entry->aid) {
420         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
421         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
422       }
423     }
424   }
425   CkCheckPTInfo *newEntry;
426   if (where == CkCheckPoint_inMEM)
427     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
428   else
429     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
430   ckTable.push_back(newEntry);
431   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
432 }
433
434 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
435 {
436   int buddy = msg->bud1;
437   if (buddy == CkMyPe()) buddy = msg->bud2;
438   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
439   recvData(msg);
440     // ack
441   thisProxy[buddy].gotData();
442 }
443
444 // loop through my checkpoint table and ask checkpointed array elements
445 // to send me checkpoint data.
446 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
447 {
448   checkpointed = 1;
449   cpCallback = cb;
450   cpStarter = starter;
451   if (CkMyPe() == cpStarter) {
452     startTime = CmiWallTimer();
453     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
454   }
455
456   int len = ckTable.length();
457   for (int i=0; i<len; i++) {
458     CkCheckPTInfo *entry = ckTable[i];
459       // always let the bigger number processor send request
460     //if (CkMyPe() < entry->pNo) continue;
461       // always let the smaller number processor send request, may on same proc
462     if (!isMaster(entry->pNo)) continue;
463       // call inmem_checkpoint to the array element, ask it to send
464       // back checkpoint data via recvData().
465     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
466     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
467   }
468     // if my table is empty, then I am done
469   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
470
471   // pack and send proc level data
472   sendProcData();
473 }
474
475 // don't handle array elements
476 static inline void _handleProcData(PUP::er &p)
477 {
478     // save readonlys, and callback BTW
479     CkPupROData(p);
480
481     // save mainchares 
482     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
483         
484 #ifndef CMK_CHARE_USE_PTR
485     // save non-migratable chare
486     CkPupChareData(p);
487 #endif
488
489     // save groups into Groups.dat
490     CkPupGroupData(p);
491
492     // save nodegroups into NodeGroups.dat
493     if(CkMyRank()==0) CkPupNodeGroupData(p);
494 }
495
496 void CkMemCheckPT::sendProcData()
497 {
498   // find out size of buffer
499   int size;
500   {
501     PUP::sizer p;
502     _handleProcData(p);
503     size = p.size();
504   }
505   int packSize = size;
506   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
507   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
508   {
509     PUP::toMem p(msg->packData);
510     _handleProcData(p);
511   }
512   msg->pe = CkMyPe();
513   msg->len = size;
514   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
515   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
516 }
517
518 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
519 {
520   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
521   CpvAccess(procChkptBuf) = msg;
522   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
523   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
524 }
525
526 // ArrayElement call this function to give us the checkpointed data
527 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
528 {
529   int len = ckTable.length();
530   int idx;
531   for (idx=0; idx<len; idx++) {
532     CkCheckPTInfo *entry = ckTable[idx];
533     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
534   }
535   CkAssert(idx < len);
536   int isChkpting = msg->cp_flag;
537   ckTable[idx]->updateBuffer(msg);
538   if (isChkpting) {
539       // all my array elements have returned their inmem data
540       // inform starter processor that I am done.
541     recvCount ++;
542     if (recvCount == ckTable.length()) {
543       if (where == CkCheckPoint_inMEM) {
544         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
545       }
546       else if (where == CkCheckPoint_inDISK) {
547         // another barrier for finalize the writing using fsync
548         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
549         contribute(0,NULL,CkReduction::sum_int,localcb);
550       }
551       else
552         CmiAbort("Unknown checkpoint scheme");
553       recvCount = 0;
554     } 
555   }
556 }
557
558 // only used in disk checkpointing
559 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
560 {
561   delete m;
562 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
563   system("sync");
564 #endif
565   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
566 }
567
568 // only is called on cpStarter when checkpoint is done
569 void CkMemCheckPT::cpFinish()
570 {
571   CmiAssert(CkMyPe() == cpStarter);
572   peCount++;
573     // now that all processors have finished, activate callback
574   if (peCount == 2) {
575     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
576     cpCallback.send();
577     peCount = 0;
578     thisProxy.report();
579   }
580 }
581
582 // for debugging, report checkpoint info
583 void CkMemCheckPT::report()
584 {
585   int objsize = 0;
586   int len = ckTable.length();
587   for (int i=0; i<len; i++) {
588     CkCheckPTInfo *entry = ckTable[i];
589     CmiAssert(entry);
590     objsize += entry->getSize();
591   }
592   CmiAssert(CpvAccess(procChkptBuf));
593 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
594 }
595
596 /*****************************************************************************
597                         RESTART Procedure
598 *****************************************************************************/
599
600 // master processor of two buddies
601 inline int CkMemCheckPT::isMaster(int buddype)
602 {
603 #if 0
604   int mype = CkMyPe();
605 //CkPrintf("ismaster: %d %d\n", pe, mype);
606   if (CkNumPes() - totalFailed() == 2) {
607     return mype > buddype;
608   }
609   for (int i=1; i<CkNumPes(); i++) {
610     int me = (buddype+i)%CkNumPes();
611     if (isFailed(me)) continue;
612     if (me == mype) return 1;
613     else return 0;
614   }
615   return 0;
616 #else
617     // smaller one
618   int mype = CkMyPe();
619 //CkPrintf("ismaster: %d %d\n", pe, mype);
620   if (CkNumPes() - totalFailed() == 2) {
621     return mype < buddype;
622   }
623 #if NODE_CHECKPOINT
624   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
625   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
626 #else
627   for (int i=1; i<CkNumPes(); i++) {
628 #endif
629     int me = (mype+i)%CkNumPes();
630     if (isFailed(me)) continue;
631     if (me == buddype) return 1;
632     else return 0;
633   }
634   return 0;
635 #endif
636 }
637
638 #ifdef CKLOCMGR_LOOP
639 #undef CKLOCMGR_LOOP
640 #endif
641
642 // loop over all CkLocMgr and do "code"
643 #define  CKLOCMGR_LOOP(code)    {       \
644   int numGroups = CkpvAccess(_groupIDTable)->size();    \
645   for(int i=0;i<numGroups;i++) {        \
646     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
647     if(obj->isLocMgr())  {      \
648       CkLocMgr *mgr = (CkLocMgr*)obj;   \
649       code      \
650     }   \
651   }     \
652  }
653
654 #if 0
655 // helper class to pup all elements that belong to same ckLocMgr
656 class ElementDestoryer : public CkLocIterator {
657 private:
658         CkLocMgr *locMgr;
659 public:
660         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
661         void addLocation(CkLocation &loc) {
662                 CkArrayIndex idx=loc.getIndex();
663                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
664                 loc.destroy();
665         }
666 };
667 #endif
668
669 // restore the bitmap vector for LB
670 void CkMemCheckPT::resetLB(int diepe)
671 {
672 #if CMK_LBDB_ON
673   int i;
674   char *bitmap = new char[CkNumPes()];
675   // set processor available bitmap
676   get_avail_vector(bitmap);
677
678   for (i=0; i<failedPes.length(); i++)
679     bitmap[failedPes[i]] = 0; 
680   bitmap[diepe] = 0;
681
682 #if CK_NO_PROC_POOL
683   set_avail_vector(bitmap);
684 #endif
685
686   // if I am the crashed pe, rebuild my failedPEs array
687   if (CkMyNode() == diepe)
688     for (i=0; i<CkNumPes(); i++) 
689       if (bitmap[i]==0) failed(i);
690
691   delete [] bitmap;
692 #endif
693 }
694
695 // in case when failedPe dies, everybody go through its checkpoint table:
696 // destory all array elements
697 // recover lost buddies
698 // reconstruct all array elements from check point data
699 // called on all processors
700 void CkMemCheckPT::restart(int diePe)
701 {
702 #if CMK_MEM_CHECKPOINT
703   double curTime = CmiWallTimer();
704   if (CkMyPe() == diePe)
705     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
706   stage = (char*)"resetLB";
707   startTime = curTime;
708   if (CkMyPe() == diePe)
709     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
710
711 #if CK_NO_PROC_POOL
712   failed(diePe);        // add into the list of failed pes
713 #endif
714   thisFailedPe = diePe;
715
716   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
717
718   inRestarting = 1;
719                                                                                 
720   // disable load balancer's barrier
721   if (CkMyPe() != diePe) resetLB(diePe);
722
723   CKLOCMGR_LOOP(mgr->startInserting(););
724
725   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
726   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
727 /*
728   if (CkMyPe() == 0)
729     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
730 */
731 #endif
732 }
733
734 // loally remove all array elements
735 void CkMemCheckPT::removeArrayElements()
736 {
737 #if CMK_MEM_CHECKPOINT
738   int len = ckTable.length();
739   double curTime = CmiWallTimer();
740   if (CkMyPe() == thisFailedPe) 
741     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
742   stage = (char*)"removeArrayElements";
743   startTime = curTime;
744
745   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
746   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
747
748   // get rid of all buffering and remote recs
749   // including destorying all array elements
750   CKLOCMGR_LOOP(mgr->flushAllRecs(););
751
752 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
753
754   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
755   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
756 #endif
757 }
758
759 // flush state in reduction manager
760 void CkMemCheckPT::resetReductionMgr()
761 {
762   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
763   int numGroups = CkpvAccess(_groupIDTable)->size();
764   for(int i=0;i<numGroups;i++) {
765     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
766     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
767     obj->flushStates();
768     obj->ckJustMigrated();
769   }
770   // reset again
771   //CpvAccess(_qd)->flushStates();
772
773 #if 1
774   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
775   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
776 #else
777   if (CkMyPe() == 0)
778     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
779 #endif
780 }
781
782 // recover the lost buddies
783 void CkMemCheckPT::recoverBuddies()
784 {
785   int idx;
786   int len = ckTable.length();
787   // ready to flush reduction manager
788   // cannot be CkMemCheckPT::restart because destory will modify states
789   double curTime = CmiWallTimer();
790   if (CkMyPe() == thisFailedPe)
791   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
792   stage = (char *)"recoverBuddies";
793   if (CkMyPe() == thisFailedPe)
794   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
795   startTime = curTime;
796
797   // recover buddies
798   expectCount = 0;
799   for (idx=0; idx<len; idx++) {
800     CkCheckPTInfo *entry = ckTable[idx];
801     if (entry->pNo == thisFailedPe) {
802 #if CK_NO_PROC_POOL
803       // find a new buddy
804 /*
805       int budPe = CkMyPe();
806 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
807       while (budPe == CkMyPe() || isFailed(budPe)) 
808           budPe = (budPe+1)%CkNumPes();
809 */
810       int budPe = BuddyPE(CkMyPe());
811       //entry->pNo = budPe;
812 #else
813       int budPe = thisFailedPe;
814 #endif
815       entry->updateBuddy(CkMyPe(), budPe);
816 #if 0
817       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
818       CkArrayCheckPTMessage *msg = entry->getCopy();
819       msg->cp_flag = 0;            // not checkpointing
820       thisProxy[budPe].recvData(msg);
821 #else
822       CkArrayCheckPTMessage *msg = entry->getCopy();
823       msg->bud1 = budPe;
824       msg->bud2 = CkMyPe();
825       msg->cp_flag = 0;            // not checkpointing
826       thisProxy[budPe].recoverEntry(msg);
827 #endif
828       expectCount ++;
829     }
830   }
831
832 #if 1
833   if (expectCount == 0) {
834     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
835     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
836   }
837 #else
838   if (CkMyPe() == 0) {
839     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
840   }
841 #endif
842
843   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
844 }
845
846 void CkMemCheckPT::gotData()
847 {
848   ackCount ++;
849   if (ackCount == expectCount) {
850     ackCount = 0;
851     expectCount = -1;
852     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
853     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
854   }
855 }
856
857 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
858 {
859   for (int i=0; i<n; i++) {
860     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
861     mgr->updateLocation(idx[i], nowOnPe);
862   }
863 }
864
865 // restore array elements
866 void CkMemCheckPT::recoverArrayElements()
867 {
868   double curTime = CmiWallTimer();
869   int len = ckTable.length();
870   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
871   stage = (char *)"recoverArrayElements";
872   if (CkMyPe() == thisFailedPe)
873   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
874   startTime = curTime;
875
876   // recover all array elements
877   int count = 0;
878 #if STREAMING_INFORMHOME
879   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
880   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
881 #endif
882   for (int idx=0; idx<len; idx++)
883   {
884     CkCheckPTInfo *entry = ckTable[idx];
885 #if CK_NO_PROC_POOL
886     // the bigger one will do 
887 //    if (CkMyPe() < entry->pNo) continue;
888     if (!isMaster(entry->pNo)) continue;
889 #else
890     // smaller one do it, which has the original object
891     if (CkMyPe() == entry->pNo+1 || 
892         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
893 #endif
894 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
895
896     entry->updateBuddy(CkMyPe(), entry->pNo);
897     CkArrayCheckPTMessage *msg = entry->getCopy();
898     // gzheng
899     //thisProxy[CkMyPe()].inmem_restore(msg);
900     inmem_restore(msg);
901 #if STREAMING_INFORMHOME
902     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
903     int homePe = mgr->homePe(msg->index);
904     if (homePe != CkMyPe()) {
905       gmap[homePe].push_back(msg->locMgr);
906       imap[homePe].push_back(msg->index);
907     }
908 #endif
909     CkFreeMsg(msg);
910     count ++;
911   }
912 #if STREAMING_INFORMHOME
913   for (int i=0; i<CkNumPes(); i++) {
914     if (gmap[i].size() && i!=CkMyPe()) {
915       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
916     }
917   }
918   delete [] imap;
919   delete [] gmap;
920 #endif
921   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
922
923   CKLOCMGR_LOOP(mgr->doneInserting(););
924
925   inRestarting = 0;
926   // _crashedNode = -1;
927   CpvAccess(_crashedNode) = -1;
928
929   if (CkMyPe() == 0)
930     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
931 }
932
933 static double restartT;
934
935 // on every processor
936 // turn load balancer back on
937 void CkMemCheckPT::finishUp()
938 {
939   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
940   //CKLOCMGR_LOOP(mgr->doneInserting(););
941   
942   if (CkMyPe() == thisFailedPe)
943   {
944        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
945        //CkStartQD(cpCallback);
946        cpCallback.send();
947        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
948   }
949
950 #if CK_NO_PROC_POOL
951 #if NODE_CHECKPOINT
952   int numnodes = CmiNumPhysicalNodes();
953 #else
954   int numnodes = CkNumPes();
955 #endif
956   if (numnodes-totalFailed() <=2) {
957     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
958     _memChkptOn = 0;
959   }
960 #endif
961 }
962
963 // called only on 0
964 void CkMemCheckPT::quiescence(CkCallback &cb)
965 {
966   static int pe_count = 0;
967   pe_count ++;
968   CmiAssert(CkMyPe() == 0);
969   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
970   if (pe_count == CkNumPes()) {
971     pe_count = 0;
972     cb.send();
973   }
974 }
975
976 // User callable function - to start a checkpoint
977 // callback cb is used to pass control back
978 void CkStartMemCheckpoint(CkCallback &cb)
979 {
980 #if CMK_MEM_CHECKPOINT
981   if (_memChkptOn == 0) {
982     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
983     cb.send();
984     return;
985   }
986   if (CkInRestarting()) {
987       // trying to checkpointing during restart
988     cb.send();
989     return;
990   }
991     // store user callback and user data
992   CkMemCheckPT::cpCallback = cb;
993
994     // broadcast to start check pointing
995   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
996   checkptMgr.doItNow(CkMyPe(), cb);
997 #else
998   // when mem checkpoint is disabled, invike cb immediately
999   cb.send();
1000 #endif
1001 }
1002
1003 void CkRestartCheckPoint(int diePe)
1004 {
1005 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1006   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1007   // broadcast
1008   checkptMgr.restart(diePe);
1009 }
1010
1011 static int _diePE = -1;
1012
1013 // callback function used locally by ccs handler
1014 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1015 {
1016 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1017   CkRestartCheckPoint(_diePE);
1018 }
1019
1020 // Converse function handles
1021 static int askPhaseHandlerIdx;
1022 static int recvPhaseHandlerIdx;
1023 static int askProcDataHandlerIdx;
1024 static int restartBcastHandlerIdx;
1025 static int recoverProcDataHandlerIdx;
1026 static int restartBeginHandlerIdx;
1027 static int notifyHandlerIdx;
1028
1029 // called on crashed PE
1030 static void restartBeginHandler(char *msg)
1031 {
1032 #if CMK_MEM_CHECKPOINT
1033   CmiFree(msg);
1034 #if     CMK_USE_BARRIER
1035         if(CkMyPe()!=_diePE){
1036                 char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1037                 CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1038                 CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1039         }else{
1040                 CkRestartCheckPointCallback(NULL,NULL);
1041         }
1042 #else
1043   static int count = 0;
1044   CmiAssert(CkMyPe() == _diePE);
1045   count ++;
1046   if (count == CkNumPes()) {
1047     CkRestartCheckPointCallback(NULL, NULL);
1048     count = 0;
1049   }
1050 #endif
1051 #endif
1052 }
1053
1054 extern void _discard_charm_message();
1055 extern void _resume_charm_message();
1056
1057 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1058         return data;
1059 }
1060
1061 static void restartBcastHandler(char *msg)
1062 {
1063 #if CMK_MEM_CHECKPOINT
1064   // advance phase counter
1065   CkMemCheckPT::inRestarting = 1;
1066   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1067   // gzheng
1068   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1069
1070   if (CkMyPe()==_diePE)
1071     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1072
1073   // reset QD counters
1074 /*  gzheng
1075   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1076 */
1077
1078 /*  gzheng
1079   if (CkMyPe()==_diePE)
1080       CkRestartCheckPointCallback(NULL, NULL);
1081 */
1082   CmiFree(msg);
1083
1084   _resume_charm_message();
1085
1086     // reduction
1087   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1088   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1089 #if CMK_USE_BARRIER
1090         CmiReduce(restartmsg,CmiMsgHeaderSizeBytes,doNothingMsg);
1091 #else
1092   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1093 #endif
1094   checkpointed = 0;
1095 #endif
1096 }
1097
1098 extern void _initDone();
1099
1100 // called on crashed processor
1101 static void recoverProcDataHandler(char *msg)
1102 {
1103 #if CMK_MEM_CHECKPOINT
1104    int i;
1105    envelope *env = (envelope *)msg;
1106    CkUnpackMessage(&env);
1107    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1108    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1109    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1110    //cur_restart_phase ++;
1111      // gzheng ?
1112    //CpvAccess(_qd)->flushStates();
1113
1114    // restore readonly, mainchare, group, nodegroup
1115 //   int temp = cur_restart_phase;
1116 //   cur_restart_phase = -1;
1117    PUP::fromMem p(procMsg->packData);
1118    _handleProcData(p);
1119
1120    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1121    // gzheng
1122    CKLOCMGR_LOOP(mgr->startInserting(););
1123
1124    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1125    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1126    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1127    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1128
1129    _initDone();
1130 //   CpvAccess(_qd)->flushStates();
1131    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1132 #endif
1133 }
1134
1135 // called on its backup processor
1136 // get backup message buffer and sent to crashed processor
1137 static void askProcDataHandler(char *msg)
1138 {
1139 #if CMK_MEM_CHECKPOINT
1140     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1141     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1142     if (CpvAccess(procChkptBuf) == NULL)  {
1143       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1144       CkAbort("no checkpoint found");
1145     }
1146     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1147     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1148
1149     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1150
1151     CkPackMessage(&env);
1152     CmiSetHandler(env, recoverProcDataHandlerIdx);
1153     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1154     CpvAccess(procChkptBuf) = NULL;
1155 #endif
1156 }
1157
1158 // called on PE 0
1159 void qd_callback(void *m)
1160 {
1161    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1162    CkFreeMsg(m);
1163 #ifdef CMK_SMP
1164    for(int i=0;i<CmiMyNodeSize();i++){
1165    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1166    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1167         CmiSetHandler(msg, askProcDataHandlerIdx);
1168         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1169         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1170    }
1171    return;
1172 #endif
1173    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1174    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1175    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1176    CmiSetHandler(msg, askProcDataHandlerIdx);
1177    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1178    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1179
1180 }
1181
1182 // on crashed node
1183 void CkMemRestart(const char *dummy, CkArgMsg *args)
1184 {
1185 #if CMK_MEM_CHECKPOINT
1186    _diePE = CmiMyNode();
1187    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1188    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1189    CkMemCheckPT::inRestarting = 1;
1190
1191   CpvAccess( _crashedNode )= CmiMyNode();
1192         
1193   _discard_charm_message();
1194   
1195   if(CmiMyRank()==0){
1196     CkCallback cb(qd_callback);
1197     CkStartQD(cb);
1198     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1199   }
1200 #else
1201    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1202 #endif
1203 }
1204
1205 // can be called in other files
1206 // return true if it is in restarting
1207 extern "C"
1208 int CkInRestarting()
1209 {
1210 #if CMK_MEM_CHECKPOINT
1211   if (CpvAccess( _crashedNode)!=-1) return 1;
1212   // gzheng
1213   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1214   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1215   return CkMemCheckPT::inRestarting;
1216 #else
1217   return 0;
1218 #endif
1219 }
1220
1221 /*****************************************************************************
1222                 module initialization
1223 *****************************************************************************/
1224
1225 static int arg_where = CkCheckPoint_inMEM;
1226
1227 #if CMK_MEM_CHECKPOINT
1228 void init_memcheckpt(char **argv)
1229 {
1230     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1231       arg_where = CkCheckPoint_inDISK;
1232     }
1233
1234         // initiliazing _crashedNode variable
1235         CpvInitialize(int, _crashedNode);
1236         CpvAccess(_crashedNode) = -1;
1237
1238 }
1239 #endif
1240
1241 class CkMemCheckPTInit: public Chare {
1242 public:
1243   CkMemCheckPTInit(CkArgMsg *m) {
1244 #if CMK_MEM_CHECKPOINT
1245     if (arg_where == CkCheckPoint_inDISK) {
1246       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1247     }
1248     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1249     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1250 #endif
1251   }
1252 };
1253
1254 static void notifyHandler(char *msg)
1255 {
1256 #if CMK_MEM_CHECKPOINT
1257   CmiFree(msg);
1258       /* immediately increase restart phase to filter old messages */
1259   CpvAccess(_curRestartPhase) ++;
1260   CpvAccess(_qd)->flushStates();
1261   _discard_charm_message();
1262 #endif
1263 }
1264
1265 extern "C"
1266 void notify_crash(int node)
1267 {
1268 #ifdef CMK_MEM_CHECKPOINT
1269   CpvAccess( _crashedNode) = node;
1270 #ifdef CMK_SMP
1271   for(int i=0;i<CkMyNodeSize();i++){
1272         CpvAccessOther(_crashedNode,i)=node;
1273   }
1274 #endif
1275   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1276   CkMemCheckPT::inRestarting = 1;
1277
1278     // this may be in interrupt handler, send a message to reset QD
1279   int pe = CmiNodeFirst(CkMyNode());
1280   for(int i=0;i<CkMyNodeSize();i++){
1281         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1282         CmiSetHandler(msg, notifyHandlerIdx);
1283         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1284   }
1285 #endif
1286 }
1287
1288 extern "C" void (*notify_crash_fn)(int node);
1289
1290 #if CMK_CONVERSE_MPI
1291 static int pingHandlerIdx;
1292 static int pingCheckHandlerIdx;
1293 static int buddyDieHandlerIdx;
1294 static double lastPingTime = -1;
1295
1296 extern "C" void mpi_restart_crashed(int pe, int rank);
1297 extern "C" int  find_spare_mpirank(int pe);
1298
1299 void pingBuddy();
1300 void pingCheckHandler();
1301
1302 void buddyDieHandler(char *msg)
1303 {
1304 #if CMK_MEM_CHECKPOINT
1305    // notify
1306    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1307    notify_crash(diepe);
1308    // send message to crash pe to let it restart
1309    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1310    int newrank = find_spare_mpirank(diepe);
1311    int buddy = obj->BuddyPE(CmiMyPe());
1312    if (buddy == diepe)  {
1313      mpi_restart_crashed(diepe, newrank);
1314      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1315    }
1316 #endif
1317 }
1318
1319 void pingHandler(void *msg)
1320 {
1321   lastPingTime = CmiWallTimer();
1322   CmiFree(msg);
1323 }
1324
1325 void pingCheckHandler()
1326 {
1327 #if CMK_MEM_CHECKPOINT
1328   double now = CmiWallTimer();
1329   if (lastPingTime > 0 && now - lastPingTime > 4) {
1330     int i, pe, buddy;
1331     // tell everyone the buddy dies
1332     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1333     for (i = 1; i < CmiNumPes(); i++) {
1334        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1335        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1336     }
1337     buddy = pe;
1338     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1339     for (int pe = 0; pe < CmiNumPes(); pe++) {
1340       if (obj->isFailed(pe) || pe == buddy) continue;
1341       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1342       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1343       CmiSetHandler(msg, buddyDieHandlerIdx);
1344       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1345     }
1346   }
1347   else 
1348     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1349 #endif
1350 }
1351
1352 void pingBuddy()
1353 {
1354 #if CMK_MEM_CHECKPOINT
1355   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1356   if (obj) {
1357     int buddy = obj->BuddyPE(CkMyPe());
1358 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1359     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1360     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1361     CmiSetHandler(msg, pingHandlerIdx);
1362     CmiGetRestartPhase(msg) = 9999;
1363     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1364   }
1365   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1366 #endif
1367 }
1368 #endif
1369
1370 // initproc
1371 void CkRegisterRestartHandler( )
1372 {
1373 #if CMK_MEM_CHECKPOINT
1374   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1375   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1376   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1377   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1378   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1379
1380 #if CMK_CONVERSE_MPI
1381   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1382   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1383   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1384 #endif
1385
1386   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1387   CpvAccess(procChkptBuf) = NULL;
1388
1389   notify_crash_fn = notify_crash;
1390
1391 #if ! CMK_CONVERSE_MPI
1392   // print pid to kill
1393 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1394 //  sleep(4);
1395 #endif
1396 #endif
1397 }
1398
1399
1400 extern "C"
1401 int CkHasCheckpoints()
1402 {
1403   return checkpointed;
1404 }
1405
1406 /// @todo: the following definitions should be moved to a separate file containing
1407 // structures and functions about fault tolerance strategies
1408
1409 /**
1410  *  * @brief: function for killing a process                                             
1411  *   */
1412 #ifdef CMK_MEM_CHECKPOINT
1413 #if CMK_HAS_GETPID
1414 void killLocal(void *_dummy,double curWallTime){
1415         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1416         if(CmiWallTimer()<killTime-1){
1417                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1418         }else{ 
1419 #if CMK_CONVERSE_MPI
1420                                 CkDieNow();
1421 #else 
1422                 kill(getpid(),SIGKILL);                                               
1423 #endif
1424         }              
1425
1426 #else
1427 void killLocal(void *_dummy,double curWallTime){
1428   CmiAbort("kill() not supported!");
1429 }
1430 #endif
1431 #endif
1432
1433 #ifdef CMK_MEM_CHECKPOINT
1434 /**
1435  * @brief: reads the file with the kill information
1436  */
1437 void readKillFile(){
1438         FILE *fp=fopen(killFile,"r");
1439         if(!fp){
1440                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1441                 return;
1442         }
1443         int proc;
1444         double sec;
1445         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1446                 if(proc == CkMyNode() && CkMyRank() == 0){
1447                         killTime = CmiWallTimer()+sec;
1448                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1449                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1450                 }
1451         }
1452         fclose(fp);
1453 }
1454
1455 #if ! CMK_CONVERSE_MPI
1456 void CkDieNow()
1457 {
1458          // ignored for non-mpi version
1459         CmiPrintf("[%d] die now.\n", CmiMyPe());
1460         killTime = CmiWallTimer()+0.001;
1461         CcdCallFnAfter(killLocal,NULL,1);
1462 }
1463 #endif
1464
1465 #endif
1466
1467 #include "CkMemCheckpoint.def.h"
1468
1469