Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   p|peCount;
336   if (p.isUnpacking()) {
337     recvCount = 0;
338 #if CMK_CONVERSE_MPI
339     void pingBuddy();
340     void pingCheckHandler();
341     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
342     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
343 #endif
344   }
345 }
346
347 // called by checkpoint mgr to restore an array element
348 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
349 {
350 #if CMK_MEM_CHECKPOINT
351   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
352   // m->index.print();
353   PUP::fromMem p(m->packData);
354   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
355   CmiAssert(mgr);
356 #if STREAMING_INFORMHOME
357   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
358 #else
359   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
360 #endif
361
362   // find a list of array elements bound together
363   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
364   CmiAssert(elt);
365   CkLocRec_local *rec = elt->myRec;
366   CkVec<CkMigratable *> list;
367   mgr->migratableList(rec, list);
368   CmiAssert(list.length() > 0);
369   for (int l=0; l<list.length(); l++) {
370     elt = (ArrayElement *)list[l];
371     elt->budPEs[0] = m->bud1;
372     elt->budPEs[1] = m->bud2;
373     //    reset, may not needed now
374     // for now.
375     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
376       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
377       if (c) c->redNo = 0;
378     }
379   }
380 #endif
381 }
382
383 // return 1 if pe is a crashed processor
384 int CkMemCheckPT::isFailed(int pe)
385 {
386   for (int i=0; i<failedPes.length(); i++)
387     if (failedPes[i] == pe) return 1;
388   return 0;
389 }
390
391 // add pe into history list of all failed processors
392 void CkMemCheckPT::failed(int pe)
393 {
394   if (isFailed(pe)) return;
395   failedPes.push_back(pe);
396 }
397
398 int CkMemCheckPT::totalFailed()
399 {
400   return failedPes.length();
401 }
402
403 // create an checkpoint entry for array element of aid with index.
404 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
405 {
406   // error check, no duplicate
407   int idx, len = ckTable.size();
408   for (idx=0; idx<len; idx++) {
409     CkCheckPTInfo *entry = ckTable[idx];
410     if (index == entry->index) {
411       if (loc == entry->locMgr) {
412           // bindTo array elements
413           return;
414       }
415         // for array inheritance, the following check may fail
416         // because ArrayElement constructor of all superclasses are called
417       if (aid == entry->aid) {
418         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
419         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
420       }
421     }
422   }
423   CkCheckPTInfo *newEntry;
424   if (where == CkCheckPoint_inMEM)
425     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
426   else
427     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
428   ckTable.push_back(newEntry);
429   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
430 }
431
432 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
433 {
434   int buddy = msg->bud1;
435   if (buddy == CkMyPe()) buddy = msg->bud2;
436   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
437   recvData(msg);
438     // ack
439   thisProxy[buddy].gotData();
440 }
441
442 // loop through my checkpoint table and ask checkpointed array elements
443 // to send me checkpoint data.
444 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
445 {
446   checkpointed = 1;
447   cpCallback = cb;
448   cpStarter = starter;
449   if (CkMyPe() == cpStarter) {
450     startTime = CmiWallTimer();
451     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
452   }
453
454   int len = ckTable.length();
455   for (int i=0; i<len; i++) {
456     CkCheckPTInfo *entry = ckTable[i];
457       // always let the bigger number processor send request
458     //if (CkMyPe() < entry->pNo) continue;
459       // always let the smaller number processor send request, may on same proc
460     if (!isMaster(entry->pNo)) continue;
461       // call inmem_checkpoint to the array element, ask it to send
462       // back checkpoint data via recvData().
463     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
464     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
465   }
466     // if my table is empty, then I am done
467   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
468
469   // pack and send proc level data
470   sendProcData();
471 }
472
473 // don't handle array elements
474 static inline void _handleProcData(PUP::er &p)
475 {
476     // save readonlys, and callback BTW
477     CkPupROData(p);
478
479     // save mainchares 
480     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
481         
482 #ifndef CMK_CHARE_USE_PTR
483     // save non-migratable chare
484     CkPupChareData(p);
485 #endif
486
487     // save groups into Groups.dat
488     CkPupGroupData(p);
489
490     // save nodegroups into NodeGroups.dat
491     if(CkMyRank()==0) CkPupNodeGroupData(p);
492 }
493
494 void CkMemCheckPT::sendProcData()
495 {
496   // find out size of buffer
497   int size;
498   {
499     PUP::sizer p;
500     _handleProcData(p);
501     size = p.size();
502   }
503   int packSize = size;
504   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
505   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
506   {
507     PUP::toMem p(msg->packData);
508     _handleProcData(p);
509   }
510   msg->pe = CkMyPe();
511   msg->len = size;
512   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
513   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
514 }
515
516 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
517 {
518   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
519   CpvAccess(procChkptBuf) = msg;
520   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
521   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
522 }
523
524 // ArrayElement call this function to give us the checkpointed data
525 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
526 {
527   int len = ckTable.length();
528   int idx;
529   for (idx=0; idx<len; idx++) {
530     CkCheckPTInfo *entry = ckTable[idx];
531     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
532   }
533   CkAssert(idx < len);
534   int isChkpting = msg->cp_flag;
535   ckTable[idx]->updateBuffer(msg);
536   if (isChkpting) {
537       // all my array elements have returned their inmem data
538       // inform starter processor that I am done.
539     recvCount ++;
540     if (recvCount == ckTable.length()) {
541       if (where == CkCheckPoint_inMEM) {
542         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
543       }
544       else if (where == CkCheckPoint_inDISK) {
545         // another barrier for finalize the writing using fsync
546         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
547         contribute(0,NULL,CkReduction::sum_int,localcb);
548       }
549       else
550         CmiAbort("Unknown checkpoint scheme");
551       recvCount = 0;
552     } 
553   }
554 }
555
556 // only used in disk checkpointing
557 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
558 {
559   delete m;
560 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
561   system("sync");
562 #endif
563   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
564 }
565
566 // only is called on cpStarter when checkpoint is done
567 void CkMemCheckPT::cpFinish()
568 {
569   CmiAssert(CkMyPe() == cpStarter);
570   peCount++;
571     // now that all processors have finished, activate callback
572   if (peCount == 2) {
573     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
574     cpCallback.send();
575     peCount = 0;
576     thisProxy.report();
577   }
578 }
579
580 // for debugging, report checkpoint info
581 void CkMemCheckPT::report()
582 {
583   int objsize = 0;
584   int len = ckTable.length();
585   for (int i=0; i<len; i++) {
586     CkCheckPTInfo *entry = ckTable[i];
587     CmiAssert(entry);
588     objsize += entry->getSize();
589   }
590   CmiAssert(CpvAccess(procChkptBuf));
591   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
592 }
593
594 /*****************************************************************************
595                         RESTART Procedure
596 *****************************************************************************/
597
598 // master processor of two buddies
599 inline int CkMemCheckPT::isMaster(int buddype)
600 {
601 #if 0
602   int mype = CkMyPe();
603 //CkPrintf("ismaster: %d %d\n", pe, mype);
604   if (CkNumPes() - totalFailed() == 2) {
605     return mype > buddype;
606   }
607   for (int i=1; i<CkNumPes(); i++) {
608     int me = (buddype+i)%CkNumPes();
609     if (isFailed(me)) continue;
610     if (me == mype) return 1;
611     else return 0;
612   }
613   return 0;
614 #else
615     // smaller one
616   int mype = CkMyPe();
617 //CkPrintf("ismaster: %d %d\n", pe, mype);
618   if (CkNumPes() - totalFailed() == 2) {
619     return mype < buddype;
620   }
621 #if NODE_CHECKPOINT
622   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
623   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
624 #else
625   for (int i=1; i<CkNumPes(); i++) {
626 #endif
627     int me = (mype+i)%CkNumPes();
628     if (isFailed(me)) continue;
629     if (me == buddype) return 1;
630     else return 0;
631   }
632   return 0;
633 #endif
634 }
635
636 #ifdef CKLOCMGR_LOOP
637 #undef CKLOCMGR_LOOP
638 #endif
639
640 // loop over all CkLocMgr and do "code"
641 #define  CKLOCMGR_LOOP(code)    {       \
642   int numGroups = CkpvAccess(_groupIDTable)->size();    \
643   for(int i=0;i<numGroups;i++) {        \
644     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
645     if(obj->isLocMgr())  {      \
646       CkLocMgr *mgr = (CkLocMgr*)obj;   \
647       code      \
648     }   \
649   }     \
650  }
651
652 #if 0
653 // helper class to pup all elements that belong to same ckLocMgr
654 class ElementDestoryer : public CkLocIterator {
655 private:
656         CkLocMgr *locMgr;
657 public:
658         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
659         void addLocation(CkLocation &loc) {
660                 CkArrayIndex idx=loc.getIndex();
661                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
662                 loc.destroy();
663         }
664 };
665 #endif
666
667 // restore the bitmap vector for LB
668 void CkMemCheckPT::resetLB(int diepe)
669 {
670 #if CMK_LBDB_ON
671   int i;
672   char *bitmap = new char[CkNumPes()];
673   // set processor available bitmap
674   get_avail_vector(bitmap);
675
676   for (i=0; i<failedPes.length(); i++)
677     bitmap[failedPes[i]] = 0; 
678   bitmap[diepe] = 0;
679
680 #if CK_NO_PROC_POOL
681   set_avail_vector(bitmap);
682 #endif
683
684   // if I am the crashed pe, rebuild my failedPEs array
685   if (CkMyNode() == diepe)
686     for (i=0; i<CkNumPes(); i++) 
687       if (bitmap[i]==0) failed(i);
688
689   delete [] bitmap;
690 #endif
691 }
692
693 // in case when failedPe dies, everybody go through its checkpoint table:
694 // destory all array elements
695 // recover lost buddies
696 // reconstruct all array elements from check point data
697 // called on all processors
698 void CkMemCheckPT::restart(int diePe)
699 {
700 #if CMK_MEM_CHECKPOINT
701   double curTime = CmiWallTimer();
702   if (CkMyPe() == diePe)
703     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
704   stage = (char*)"resetLB";
705   startTime = curTime;
706   if (CkMyPe() == diePe)
707     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
708
709 #if CK_NO_PROC_POOL
710   failed(diePe);        // add into the list of failed pes
711 #endif
712   thisFailedPe = diePe;
713
714   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
715
716   inRestarting = 1;
717                                                                                 
718   // disable load balancer's barrier
719   if (CkMyPe() != diePe) resetLB(diePe);
720
721   CKLOCMGR_LOOP(mgr->startInserting(););
722
723   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
724 /*
725   if (CkMyPe() == 0)
726     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
727 */
728 #endif
729 }
730
731 // loally remove all array elements
732 void CkMemCheckPT::removeArrayElements()
733 {
734 #if CMK_MEM_CHECKPOINT
735   int len = ckTable.length();
736   double curTime = CmiWallTimer();
737   if (CkMyPe() == thisFailedPe) 
738     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
739   stage = (char*)"removeArrayElements";
740   startTime = curTime;
741
742   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
743   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
744
745   // get rid of all buffering and remote recs
746   // including destorying all array elements
747   CKLOCMGR_LOOP(mgr->flushAllRecs(););
748
749 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
750
751   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
752 #endif
753 }
754
755 // flush state in reduction manager
756 void CkMemCheckPT::resetReductionMgr()
757 {
758   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
759   int numGroups = CkpvAccess(_groupIDTable)->size();
760   for(int i=0;i<numGroups;i++) {
761     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
762     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
763     obj->flushStates();
764     obj->ckJustMigrated();
765   }
766   // reset again
767   //CpvAccess(_qd)->flushStates();
768
769 #if 1
770   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
771 #else
772   if (CkMyPe() == 0)
773     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
774 #endif
775 }
776
777 // recover the lost buddies
778 void CkMemCheckPT::recoverBuddies()
779 {
780   int idx;
781   int len = ckTable.length();
782   // ready to flush reduction manager
783   // cannot be CkMemCheckPT::restart because destory will modify states
784   double curTime = CmiWallTimer();
785   if (CkMyPe() == thisFailedPe)
786   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
787   stage = (char *)"recoverBuddies";
788   if (CkMyPe() == thisFailedPe)
789   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
790   startTime = curTime;
791
792   // recover buddies
793   expectCount = 0;
794   for (idx=0; idx<len; idx++) {
795     CkCheckPTInfo *entry = ckTable[idx];
796     if (entry->pNo == thisFailedPe) {
797 #if CK_NO_PROC_POOL
798       // find a new buddy
799 /*
800       int budPe = CkMyPe();
801 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
802       while (budPe == CkMyPe() || isFailed(budPe)) 
803           budPe = (budPe+1)%CkNumPes();
804 */
805       int budPe = BuddyPE(CkMyPe());
806       //entry->pNo = budPe;
807 #else
808       int budPe = thisFailedPe;
809 #endif
810       entry->updateBuddy(CkMyPe(), budPe);
811 #if 0
812       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
813       CkArrayCheckPTMessage *msg = entry->getCopy();
814       msg->cp_flag = 0;            // not checkpointing
815       thisProxy[budPe].recvData(msg);
816 #else
817       CkArrayCheckPTMessage *msg = entry->getCopy();
818       msg->bud1 = budPe;
819       msg->bud2 = CkMyPe();
820       msg->cp_flag = 0;            // not checkpointing
821       thisProxy[budPe].recoverEntry(msg);
822 #endif
823       expectCount ++;
824     }
825   }
826
827 #if 1
828   if (expectCount == 0) {
829     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
830     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
831   }
832 #else
833   if (CkMyPe() == 0) {
834     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
835   }
836 #endif
837
838   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
839 }
840
841 void CkMemCheckPT::gotData()
842 {
843   ackCount ++;
844   if (ackCount == expectCount) {
845     ackCount = 0;
846     expectCount = -1;
847     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
848     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
849   }
850 }
851
852 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
853 {
854   for (int i=0; i<n; i++) {
855     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
856     mgr->updateLocation(idx[i], nowOnPe);
857   }
858 }
859
860 // restore array elements
861 void CkMemCheckPT::recoverArrayElements()
862 {
863   double curTime = CmiWallTimer();
864   int len = ckTable.length();
865   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
866   stage = (char *)"recoverArrayElements";
867   if (CkMyPe() == thisFailedPe)
868   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
869   startTime = curTime;
870
871   // recover all array elements
872   int count = 0;
873 #if STREAMING_INFORMHOME
874   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
875   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
876 #endif
877   for (int idx=0; idx<len; idx++)
878   {
879     CkCheckPTInfo *entry = ckTable[idx];
880 #if CK_NO_PROC_POOL
881     // the bigger one will do 
882 //    if (CkMyPe() < entry->pNo) continue;
883     if (!isMaster(entry->pNo)) continue;
884 #else
885     // smaller one do it, which has the original object
886     if (CkMyPe() == entry->pNo+1 || 
887         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
888 #endif
889 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
890
891     entry->updateBuddy(CkMyPe(), entry->pNo);
892     CkArrayCheckPTMessage *msg = entry->getCopy();
893     // gzheng
894     //thisProxy[CkMyPe()].inmem_restore(msg);
895     inmem_restore(msg);
896 #if STREAMING_INFORMHOME
897     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
898     int homePe = mgr->homePe(msg->index);
899     if (homePe != CkMyPe()) {
900       gmap[homePe].push_back(msg->locMgr);
901       imap[homePe].push_back(msg->index);
902     }
903 #endif
904     CkFreeMsg(msg);
905     count ++;
906   }
907 #if STREAMING_INFORMHOME
908   for (int i=0; i<CkNumPes(); i++) {
909     if (gmap[i].size() && i!=CkMyPe()) {
910       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
911     }
912   }
913   delete [] imap;
914   delete [] gmap;
915 #endif
916   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
917
918   CKLOCMGR_LOOP(mgr->doneInserting(););
919
920   inRestarting = 0;
921   // _crashedNode = -1;
922   CpvAccess(_crashedNode) = -1;
923
924   if (CkMyPe() == 0)
925     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
926 }
927
928 static double restartT;
929
930 // on every processor
931 // turn load balancer back on
932 void CkMemCheckPT::finishUp()
933 {
934   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
935   //CKLOCMGR_LOOP(mgr->doneInserting(););
936   
937   if (CkMyPe() == thisFailedPe)
938   {
939        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
940        //CkStartQD(cpCallback);
941        cpCallback.send();
942        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
943   }
944
945 #if CK_NO_PROC_POOL
946 #if NODE_CHECKPOINT
947   int numnodes = CmiNumPhysicalNodes();
948 #else
949   int numnodes = CkNumPes();
950 #endif
951   if (numnodes-totalFailed() <=2) {
952     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
953     _memChkptOn = 0;
954   }
955 #endif
956 }
957
958 // called only on 0
959 void CkMemCheckPT::quiescence(CkCallback &cb)
960 {
961   static int pe_count = 0;
962   pe_count ++;
963   CmiAssert(CkMyPe() == 0);
964   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
965   if (pe_count == CkNumPes()) {
966     pe_count = 0;
967     cb.send();
968   }
969 }
970
971 // User callable function - to start a checkpoint
972 // callback cb is used to pass control back
973 void CkStartMemCheckpoint(CkCallback &cb)
974 {
975 #if CMK_MEM_CHECKPOINT
976   if (_memChkptOn == 0) {
977     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
978     cb.send();
979     return;
980   }
981   if (CkInRestarting()) {
982       // trying to checkpointing during restart
983     cb.send();
984     return;
985   }
986     // store user callback and user data
987   CkMemCheckPT::cpCallback = cb;
988
989     // broadcast to start check pointing
990   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
991   checkptMgr.doItNow(CkMyPe(), cb);
992 #else
993   // when mem checkpoint is disabled, invike cb immediately
994   cb.send();
995 #endif
996 }
997
998 void CkRestartCheckPoint(int diePe)
999 {
1000 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1001   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1002   // broadcast
1003   checkptMgr.restart(diePe);
1004 }
1005
1006 static int _diePE = -1;
1007
1008 // callback function used locally by ccs handler
1009 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1010 {
1011 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1012   CkRestartCheckPoint(_diePE);
1013 }
1014
1015 // Converse function handles
1016 static int askPhaseHandlerIdx;
1017 static int recvPhaseHandlerIdx;
1018 static int askProcDataHandlerIdx;
1019 static int restartBcastHandlerIdx;
1020 static int recoverProcDataHandlerIdx;
1021 static int restartBeginHandlerIdx;
1022 static int notifyHandlerIdx;
1023
1024 // called on crashed PE
1025 static void restartBeginHandler(char *msg)
1026 {
1027 #if CMK_MEM_CHECKPOINT
1028   static int count = 0;
1029   CmiFree(msg);
1030   CmiAssert(CkMyPe() == _diePE);
1031   count ++;
1032   if (count == CkNumPes()) {
1033     CkRestartCheckPointCallback(NULL, NULL);
1034     count = 0;
1035   }
1036 #endif
1037 }
1038
1039 extern void _discard_charm_message();
1040 extern void _resume_charm_message();
1041
1042 static void restartBcastHandler(char *msg)
1043 {
1044 #if CMK_MEM_CHECKPOINT
1045   // advance phase counter
1046   CkMemCheckPT::inRestarting = 1;
1047   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1048   // gzheng
1049   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1050
1051   if (CkMyPe()==_diePE)
1052     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1053
1054   // reset QD counters
1055 /*  gzheng
1056   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1057 */
1058
1059 /*  gzheng
1060   if (CkMyPe()==_diePE)
1061       CkRestartCheckPointCallback(NULL, NULL);
1062 */
1063   CmiFree(msg);
1064
1065   _resume_charm_message();
1066
1067     // reduction
1068   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1069   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1070   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1071
1072   checkpointed = 0;
1073 #endif
1074 }
1075
1076 extern void _initDone();
1077
1078 // called on crashed processor
1079 static void recoverProcDataHandler(char *msg)
1080 {
1081 #if CMK_MEM_CHECKPOINT
1082    int i;
1083    envelope *env = (envelope *)msg;
1084    CkUnpackMessage(&env);
1085    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1086    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1087    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1088    //cur_restart_phase ++;
1089      // gzheng ?
1090    //CpvAccess(_qd)->flushStates();
1091
1092    // restore readonly, mainchare, group, nodegroup
1093 //   int temp = cur_restart_phase;
1094 //   cur_restart_phase = -1;
1095    PUP::fromMem p(procMsg->packData);
1096    _handleProcData(p);
1097
1098    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1099    // gzheng
1100    CKLOCMGR_LOOP(mgr->startInserting(););
1101
1102    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1103    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1104    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1105    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1106
1107    _initDone();
1108 //   CpvAccess(_qd)->flushStates();
1109    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1110 #endif
1111 }
1112
1113 // called on its backup processor
1114 // get backup message buffer and sent to crashed processor
1115 static void askProcDataHandler(char *msg)
1116 {
1117 #if CMK_MEM_CHECKPOINT
1118     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1119     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1120     if (CpvAccess(procChkptBuf) == NULL) 
1121       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1122     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1123     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1124     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1125
1126     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1127
1128     CkPackMessage(&env);
1129     CmiSetHandler(env, recoverProcDataHandlerIdx);
1130     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1131     CpvAccess(procChkptBuf) = NULL;
1132 #endif
1133 }
1134
1135 // called on PE 0
1136 void qd_callback(void *m)
1137 {
1138    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1139    CkFreeMsg(m);
1140 #ifdef CMK_SMP
1141    for(int i=0;i<CmiMyNodeSize();i++){
1142    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1143    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1144         CmiSetHandler(msg, askProcDataHandlerIdx);
1145         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1146         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1147    }
1148    return;
1149 #endif
1150    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1151    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1152    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1153    CmiSetHandler(msg, askProcDataHandlerIdx);
1154    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1155    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1156
1157 }
1158
1159 // on crashed node
1160 void CkMemRestart(const char *dummy, CkArgMsg *args)
1161 {
1162 #if CMK_MEM_CHECKPOINT
1163    _diePE = CmiMyNode();
1164    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1165    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1166    CkMemCheckPT::inRestarting = 1;
1167
1168   CpvAccess( _crashedNode )= CmiMyNode();
1169         
1170   _discard_charm_message();
1171   
1172   if(CmiMyRank()==0){
1173     CkCallback cb(qd_callback);
1174     CkStartQD(cb);
1175     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1176   }
1177 #else
1178    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1179 #endif
1180 }
1181
1182 // can be called in other files
1183 // return true if it is in restarting
1184 extern "C"
1185 int CkInRestarting()
1186 {
1187 #if CMK_MEM_CHECKPOINT
1188   if (CpvAccess( _crashedNode)!=-1) return 1;
1189   // gzheng
1190   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1191   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1192   return CkMemCheckPT::inRestarting;
1193 #else
1194   return 0;
1195 #endif
1196 }
1197
1198 /*****************************************************************************
1199                 module initialization
1200 *****************************************************************************/
1201
1202 static int arg_where = CkCheckPoint_inMEM;
1203
1204 #if CMK_MEM_CHECKPOINT
1205 void init_memcheckpt(char **argv)
1206 {
1207     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1208       arg_where = CkCheckPoint_inDISK;
1209     }
1210
1211         // initiliazing _crashedNode variable
1212         CpvInitialize(int, _crashedNode);
1213         CpvAccess(_crashedNode) = -1;
1214
1215 }
1216 #endif
1217
1218 class CkMemCheckPTInit: public Chare {
1219 public:
1220   CkMemCheckPTInit(CkArgMsg *m) {
1221 #if CMK_MEM_CHECKPOINT
1222     if (arg_where == CkCheckPoint_inDISK) {
1223       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1224     }
1225     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1226     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1227 #endif
1228   }
1229 };
1230
1231 static void notifyHandler(char *msg)
1232 {
1233 #if CMK_MEM_CHECKPOINT
1234   CmiFree(msg);
1235       /* immediately increase restart phase to filter old messages */
1236   CpvAccess(_curRestartPhase) ++;
1237   CpvAccess(_qd)->flushStates();
1238   _discard_charm_message();
1239 #endif
1240 }
1241
1242 extern "C"
1243 void notify_crash(int node)
1244 {
1245 #ifdef CMK_MEM_CHECKPOINT
1246   CpvAccess( _crashedNode) = node;
1247 #ifdef CMK_SMP
1248   for(int i=0;i<CkMyNodeSize();i++){
1249         CpvAccessOther(_crashedNode,i)=node;
1250   }
1251 #endif
1252   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1253   CkMemCheckPT::inRestarting = 1;
1254
1255     // this may be in interrupt handler, send a message to reset QD
1256   int pe = CmiNodeFirst(CkMyNode());
1257   for(int i=0;i<CkMyNodeSize();i++){
1258         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1259         CmiSetHandler(msg, notifyHandlerIdx);
1260         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1261   }
1262 #endif
1263 }
1264
1265 extern "C" void (*notify_crash_fn)(int node);
1266
1267 #if CMK_CONVERSE_MPI
1268 static int pingHandlerIdx;
1269 static int pingCheckHandlerIdx;
1270 static int buddyDieHandlerIdx;
1271 static double lastPingTime = -1;
1272
1273 extern "C" void mpi_restart_crashed(int pe, int rank);
1274 extern "C" int  find_spare_mpirank(int pe);
1275
1276 void pingBuddy();
1277 void pingCheckHandler();
1278
1279 void buddyDieHandler(char *msg)
1280 {
1281 #if CMK_MEM_CHECKPOINT
1282    // notify
1283    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1284    notify_crash(diepe);
1285    // send message to crash pe to let it restart
1286    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1287    int newrank = find_spare_mpirank(diepe);
1288    int buddy = obj->BuddyPE(CmiMyPe());
1289    if (buddy == diepe)  {
1290      mpi_restart_crashed(diepe, newrank);
1291      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1292    }
1293 #endif
1294 }
1295
1296 void pingHandler(void *msg)
1297 {
1298   lastPingTime = CmiWallTimer();
1299   CmiFree(msg);
1300 }
1301
1302 void pingCheckHandler()
1303 {
1304 #if CMK_MEM_CHECKPOINT
1305   double now = CmiWallTimer();
1306   if (lastPingTime > 0 && now - lastPingTime > 4) {
1307     int i, pe, buddy;
1308     // tell everyone the buddy dies
1309     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1310     for (i = 1; i < CmiNumPes(); i++) {
1311        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1312        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1313     }
1314     buddy = pe;
1315     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1316     for (int pe = 0; pe < CmiNumPes(); pe++) {
1317       if (obj->isFailed(pe) || pe == buddy) continue;
1318       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1319       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1320       CmiSetHandler(msg, buddyDieHandlerIdx);
1321       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1322     }
1323   }
1324   else 
1325     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1326 #endif
1327 }
1328
1329 void pingBuddy()
1330 {
1331 #if CMK_MEM_CHECKPOINT
1332   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1333   if (obj) {
1334     int buddy = obj->BuddyPE(CkMyPe());
1335 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1336     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1337     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1338     CmiSetHandler(msg, pingHandlerIdx);
1339     CmiGetRestartPhase(msg) = 9999;
1340     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1341   }
1342   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1343 #endif
1344 }
1345 #endif
1346
1347 // initproc
1348 void CkRegisterRestartHandler( )
1349 {
1350 #if CMK_MEM_CHECKPOINT
1351   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1352   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1353   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1354   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1355   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1356
1357 #if CMK_CONVERSE_MPI
1358   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1359   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1360   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1361 #endif
1362
1363   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1364   CpvAccess(procChkptBuf) = NULL;
1365
1366   notify_crash_fn = notify_crash;
1367
1368 #if ! CMK_CONVERSE_MPI
1369   // print pid to kill
1370   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1371 //  sleep(4);
1372 #endif
1373 #endif
1374 }
1375
1376
1377 extern "C"
1378 int CkHasCheckpoints()
1379 {
1380   return checkpointed;
1381 }
1382
1383 /// @todo: the following definitions should be moved to a separate file containing
1384 // structures and functions about fault tolerance strategies
1385
1386 /**
1387  *  * @brief: function for killing a process                                             
1388  *   */
1389 #ifdef CMK_MEM_CHECKPOINT
1390 #if CMK_HAS_GETPID
1391 void killLocal(void *_dummy,double curWallTime){
1392         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1393         if(CmiWallTimer()<killTime-1){
1394                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1395         }else{  
1396                 kill(getpid(),SIGKILL);                                               
1397         }              
1398
1399 #else
1400 void killLocal(void *_dummy,double curWallTime){
1401   CmiAbort("kill() not supported!");
1402 }
1403 #endif
1404 #endif
1405
1406 #ifdef CMK_MEM_CHECKPOINT
1407 /**
1408  * @brief: reads the file with the kill information
1409  */
1410 void readKillFile(){
1411         FILE *fp=fopen(killFile,"r");
1412         if(!fp){
1413                 return;
1414         }
1415         int proc;
1416         double sec;
1417         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1418                 if(proc == CkMyNode() && CkMyRank() == 0){
1419                         killTime = CmiWallTimer()+sec;
1420                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1421                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1422                 }
1423         }
1424         fclose(fp);
1425 }
1426
1427 #if ! CMK_CONVERSE_MPI
1428 void CkDieNow()
1429 {
1430          // ignored for non-mpi version
1431         CmiPrintf("[%d] die now.\n", CmiMyPe());
1432         killTime = CmiWallTimer()+0.001;
1433         CcdCallFnAfter(killLocal,NULL,1);
1434 }
1435 #endif
1436
1437 #endif
1438
1439 #include "CkMemCheckpoint.def.h"
1440
1441