Merge branch 'charm' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define STREAMING_INFORMHOME                    1
69 CpvDeclare(int, _crashedNode);
70
71 // static, so that it is accessible from Converse part
72 int CkMemCheckPT::inRestarting = 0;
73 double CkMemCheckPT::startTime;
74 char *CkMemCheckPT::stage;
75 CkCallback CkMemCheckPT::cpCallback;
76
77 int _memChkptOn = 1;                    // checkpoint is on or off
78
79 CkGroupID ckCheckPTGroupID;             // readonly
80
81 static int checkpointed = 0;
82
83 /// @todo the following declarations should be moved into a separate file for all 
84 // fault tolerant strategies
85
86 #ifdef CMK_MEM_CHECKPOINT
87 // name of the kill file that contains processes to be killed 
88 char *killFile;                                               
89 // flag for the kill file         
90 int killFlag=0;
91 // variable for storing the killing time
92 double killTime=0.0;
93 #endif
94
95 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
96 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
97
98 // compute the backup processor
99 // FIXME: avoid crashed processors
100 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
101
102 inline int CkMemCheckPT::BuddyPE(int pe)
103 {
104   int budpe;
105 #if NODE_CHECKPOINT
106     // buddy is the processor with same rank on the next physical node
107   int r1 = CmiPhysicalRank(pe);
108   int budnode = CmiPhysicalNodeID(pe);
109   do {
110     budnode = (budnode+1)%CmiNumPhysicalNodes();
111     int *pelist;
112     int num;
113     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
114     budpe = pelist[r1 % num];
115   } while (isFailed(budpe));
116   if (budpe == pe) {
117     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
118     CmiAbort("Failed to find a buddy processor");
119   }
120 #else
121   budpe = pe;
122   while (budpe == pe || isFailed(budpe)) 
123           budpe = (budpe+1)%CkNumPes();
124 #endif
125   return budpe;
126 }
127
128 // called in array element constructor
129 // choose and register with 2 buddies for checkpoiting 
130 #if CMK_MEM_CHECKPOINT
131 void ArrayElement::init_checkpt() {
132         if (_memChkptOn == 0) return;
133         if (CkInRestarting()) {
134           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
135         }
136         // only master init checkpoint
137         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
138
139         budPEs[0] = CkMyPe();
140         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
141         CmiAssert(budPEs[0] != budPEs[1]);
142         // inform checkPTMgr
143         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
144         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
145         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
146         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
147 }
148 #endif
149
150 // entry function invoked by checkpoint mgr asking for checkpoint data
151 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
152 #if CMK_MEM_CHECKPOINT
153 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
154 //char index[128];   thisIndexMax.sprint(index);
155 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
156   CkLocMgr *locMgr = thisArray->getLocMgr();
157   CmiAssert(myRec!=NULL);
158   int size;
159   {
160         PUP::sizer p;
161         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
162         size = p.size();
163   }
164   int packSize = size/sizeof(double) +1;
165   CkArrayCheckPTMessage *msg =
166                  new (packSize, 0) CkArrayCheckPTMessage;
167   msg->len = size;
168   msg->index =thisIndexMax;
169   msg->aid = thisArrayID;
170   msg->locMgr = locMgr->getGroupID();
171   msg->cp_flag = 1;
172   {
173         PUP::toMem p(msg->packData);
174         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
175   }
176
177   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
178   checkptMgr.recvData(msg, 2, budPEs);
179   delete m;
180 #endif
181 }
182
183 // checkpoint holder class - for memory checkpointing
184 class CkMemCheckPTInfo: public CkCheckPTInfo
185 {
186   CkArrayCheckPTMessage *ckBuffer;
187 public:
188   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
189                     CkCheckPTInfo(a, loc, idx, pno)
190   {
191     ckBuffer = NULL;
192   }
193   ~CkMemCheckPTInfo() 
194   {
195     if (ckBuffer) delete ckBuffer; 
196   }
197   inline void updateBuffer(CkArrayCheckPTMessage *data) 
198   {
199     CmiAssert(data!=NULL);
200     if (ckBuffer) delete ckBuffer;
201     ckBuffer = data;
202   }    
203   inline CkArrayCheckPTMessage * getCopy()
204   {
205     if (ckBuffer == NULL) {
206       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
207       CmiAbort("Abort!");
208     }
209     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
210   }     
211   inline void updateBuddy(int b1, int b2) {
212      CmiAssert(ckBuffer);
213      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
214      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
215      CmiAssert(pNo != CkMyPe());
216   }
217   inline int getSize() { 
218      CmiAssert(ckBuffer);
219      return ckBuffer->len; 
220   }
221 };
222
223 // checkpoint holder class - for in-disk checkpointing
224 class CkDiskCheckPTInfo: public CkCheckPTInfo 
225 {
226   char *fname;
227   int bud1, bud2;
228   int len;                      // checkpoint size
229 public:
230   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
231   {
232 #if CMK_USE_MKSTEMP
233     fname = new char[64];
234     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
235     mkstemp(fname);
236 #else
237     fname=tmpnam(NULL);
238 #endif
239     bud1 = bud2 = -1;
240     len = 0;
241   }
242   ~CkDiskCheckPTInfo() 
243   {
244     remove(fname);
245   }
246   inline void updateBuffer(CkArrayCheckPTMessage *data) 
247   {
248     double t = CmiWallTimer();
249     // unpack it
250     envelope *env = UsrToEnv(data);
251     CkUnpackMessage(&env);
252     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
253     FILE *f = fopen(fname,"wb");
254     PUP::toDisk p(f);
255     CkPupMessage(p, (void **)&data);
256     // delay sync to the end because otherwise the messages are blocked
257 //    fsync(fileno(f));
258     fclose(f);
259     bud1 = data->bud1;
260     bud2 = data->bud2;
261     len = data->len;
262     delete data;
263     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
264   }
265   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
266   {
267     CkArrayCheckPTMessage *data;
268     FILE *f = fopen(fname,"rb");
269     PUP::fromDisk p(f);
270     CkPupMessage(p, (void **)&data);
271     fclose(f);
272     data->bud1 = bud1;                          // update the buddies
273     data->bud2 = bud2;
274     return data;
275   }
276   inline void updateBuddy(int b1, int b2) {
277      bud1 = b1; bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      return len; 
283   }
284 };
285
286 CkMemCheckPT::CkMemCheckPT(int w)
287 {
288   int numnodes = 0;
289 #if NODE_CHECKPOINT
290   numnodes = CmiNumPhysicalNodes();
291 #else
292   numnodes = CkNumPes();
293 #endif
294 #if CK_NO_PROC_POOL
295   if (numnodes <= 2)
296 #else
297   if (numnodes  == 1)
298 #endif
299   {
300     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
301     _memChkptOn = 0;
302   }
303   inRestarting = 0;
304   recvCount = peCount = 0;
305   ackCount = 0;
306   expectCount = -1;
307   where = w;
308
309 #if CMK_CONVERSE_MPI
310   void pingBuddy();
311   void pingCheckHandler();
312   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
313   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
314 #endif
315 }
316
317 CkMemCheckPT::~CkMemCheckPT()
318 {
319   int len = ckTable.length();
320   for (int i=0; i<len; i++) {
321     delete ckTable[i];
322   }
323 }
324
325 void CkMemCheckPT::pup(PUP::er& p) 
326
327   CBase_CkMemCheckPT::pup(p); 
328   p|cpStarter;
329   p|thisFailedPe;
330   p|failedPes;
331   p|ckCheckPTGroupID;           // recover global variable
332   p|cpCallback;                 // store callback
333   p|where;                      // where to checkpoint
334   p|peCount;
335   if (p.isUnpacking()) {
336     recvCount = 0;
337 #if CMK_CONVERSE_MPI
338     void pingBuddy();
339     void pingCheckHandler();
340     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
341     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
342 #endif
343   }
344 }
345
346 // called by checkpoint mgr to restore an array element
347 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
348 {
349 #if CMK_MEM_CHECKPOINT
350   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
351   // m->index.print();
352   PUP::fromMem p(m->packData);
353   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
354   CmiAssert(mgr);
355 #if STREAMING_INFORMHOME
356   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
357 #else
358   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
359 #endif
360
361   // find a list of array elements bound together
362   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
363   CmiAssert(elt);
364   CkLocRec_local *rec = elt->myRec;
365   CkVec<CkMigratable *> list;
366   mgr->migratableList(rec, list);
367   CmiAssert(list.length() > 0);
368   for (int l=0; l<list.length(); l++) {
369     elt = (ArrayElement *)list[l];
370     elt->budPEs[0] = m->bud1;
371     elt->budPEs[1] = m->bud2;
372     //    reset, may not needed now
373     // for now.
374     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
375       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
376       if (c) c->redNo = 0;
377     }
378   }
379 #endif
380 }
381
382 // return 1 if pe is a crashed processor
383 int CkMemCheckPT::isFailed(int pe)
384 {
385   for (int i=0; i<failedPes.length(); i++)
386     if (failedPes[i] == pe) return 1;
387   return 0;
388 }
389
390 // add pe into history list of all failed processors
391 void CkMemCheckPT::failed(int pe)
392 {
393   if (isFailed(pe)) return;
394   failedPes.push_back(pe);
395 }
396
397 int CkMemCheckPT::totalFailed()
398 {
399   return failedPes.length();
400 }
401
402 // create an checkpoint entry for array element of aid with index.
403 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
404 {
405   // error check, no duplicate
406   int idx, len = ckTable.size();
407   for (idx=0; idx<len; idx++) {
408     CkCheckPTInfo *entry = ckTable[idx];
409     if (index == entry->index) {
410       if (loc == entry->locMgr) {
411           // bindTo array elements
412           return;
413       }
414         // for array inheritance, the following check may fail
415         // because ArrayElement constructor of all superclasses are called
416       if (aid == entry->aid) {
417         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
418         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
419       }
420     }
421   }
422   CkCheckPTInfo *newEntry;
423   if (where == CkCheckPoint_inMEM)
424     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
425   else
426     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
427   ckTable.push_back(newEntry);
428   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
429 }
430
431 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
432 {
433   int buddy = msg->bud1;
434   if (buddy == CkMyPe()) buddy = msg->bud2;
435   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
436   recvData(msg);
437     // ack
438   thisProxy[buddy].gotData();
439 }
440
441 // loop through my checkpoint table and ask checkpointed array elements
442 // to send me checkpoint data.
443 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
444 {
445   checkpointed = 1;
446   cpCallback = cb;
447   cpStarter = starter;
448   if (CkMyPe() == cpStarter) {
449     startTime = CmiWallTimer();
450     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
451   }
452
453   int len = ckTable.length();
454   for (int i=0; i<len; i++) {
455     CkCheckPTInfo *entry = ckTable[i];
456       // always let the bigger number processor send request
457     //if (CkMyPe() < entry->pNo) continue;
458       // always let the smaller number processor send request, may on same proc
459     if (!isMaster(entry->pNo)) continue;
460       // call inmem_checkpoint to the array element, ask it to send
461       // back checkpoint data via recvData().
462     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
463     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
464   }
465     // if my table is empty, then I am done
466   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
467
468   // pack and send proc level data
469   sendProcData();
470 }
471
472 // don't handle array elements
473 static inline void _handleProcData(PUP::er &p)
474 {
475     // save readonlys, and callback BTW
476     CkPupROData(p);
477
478     // save mainchares 
479     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
480         
481 #ifndef CMK_CHARE_USE_PTR
482     // save non-migratable chare
483     CkPupChareData(p);
484 #endif
485
486     // save groups into Groups.dat
487     CkPupGroupData(p);
488
489     // save nodegroups into NodeGroups.dat
490     if(CkMyRank()==0) CkPupNodeGroupData(p);
491 }
492
493 void CkMemCheckPT::sendProcData()
494 {
495   // find out size of buffer
496   int size;
497   {
498     PUP::sizer p;
499     _handleProcData(p);
500     size = p.size();
501   }
502   int packSize = size;
503   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
504   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
505   {
506     PUP::toMem p(msg->packData);
507     _handleProcData(p);
508   }
509   msg->pe = CkMyPe();
510   msg->len = size;
511   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
512   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
513 }
514
515 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
516 {
517   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
518   CpvAccess(procChkptBuf) = msg;
519   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
520   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
521 }
522
523 // ArrayElement call this function to give us the checkpointed data
524 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
525 {
526   int len = ckTable.length();
527   int idx;
528   for (idx=0; idx<len; idx++) {
529     CkCheckPTInfo *entry = ckTable[idx];
530     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
531   }
532   CkAssert(idx < len);
533   int isChkpting = msg->cp_flag;
534   ckTable[idx]->updateBuffer(msg);
535   if (isChkpting) {
536       // all my array elements have returned their inmem data
537       // inform starter processor that I am done.
538     recvCount ++;
539     if (recvCount == ckTable.length()) {
540       if (where == CkCheckPoint_inMEM) {
541         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
542       }
543       else if (where == CkCheckPoint_inDISK) {
544         // another barrier for finalize the writing using fsync
545         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
546         contribute(0,NULL,CkReduction::sum_int,localcb);
547       }
548       else
549         CmiAbort("Unknown checkpoint scheme");
550       recvCount = 0;
551     } 
552   }
553 }
554
555 // only used in disk checkpointing
556 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
557 {
558   delete m;
559 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
560   system("sync");
561 #endif
562   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
563 }
564
565 // only is called on cpStarter when checkpoint is done
566 void CkMemCheckPT::cpFinish()
567 {
568   CmiAssert(CkMyPe() == cpStarter);
569   peCount++;
570     // now that all processors have finished, activate callback
571   if (peCount == 2) {
572     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
573     cpCallback.send();
574     peCount = 0;
575     thisProxy.report();
576   }
577 }
578
579 // for debugging, report checkpoint info
580 void CkMemCheckPT::report()
581 {
582   int objsize = 0;
583   int len = ckTable.length();
584   for (int i=0; i<len; i++) {
585     CkCheckPTInfo *entry = ckTable[i];
586     CmiAssert(entry);
587     objsize += entry->getSize();
588   }
589   CmiAssert(CpvAccess(procChkptBuf));
590 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
591 }
592
593 /*****************************************************************************
594                         RESTART Procedure
595 *****************************************************************************/
596
597 // master processor of two buddies
598 inline int CkMemCheckPT::isMaster(int buddype)
599 {
600 #if 0
601   int mype = CkMyPe();
602 //CkPrintf("ismaster: %d %d\n", pe, mype);
603   if (CkNumPes() - totalFailed() == 2) {
604     return mype > buddype;
605   }
606   for (int i=1; i<CkNumPes(); i++) {
607     int me = (buddype+i)%CkNumPes();
608     if (isFailed(me)) continue;
609     if (me == mype) return 1;
610     else return 0;
611   }
612   return 0;
613 #else
614     // smaller one
615   int mype = CkMyPe();
616 //CkPrintf("ismaster: %d %d\n", pe, mype);
617   if (CkNumPes() - totalFailed() == 2) {
618     return mype < buddype;
619   }
620 #if NODE_CHECKPOINT
621   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
622   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
623 #else
624   for (int i=1; i<CkNumPes(); i++) {
625 #endif
626     int me = (mype+i)%CkNumPes();
627     if (isFailed(me)) continue;
628     if (me == buddype) return 1;
629     else return 0;
630   }
631   return 0;
632 #endif
633 }
634
635 #ifdef CKLOCMGR_LOOP
636 #undef CKLOCMGR_LOOP
637 #endif
638
639 // loop over all CkLocMgr and do "code"
640 #define  CKLOCMGR_LOOP(code)    {       \
641   int numGroups = CkpvAccess(_groupIDTable)->size();    \
642   for(int i=0;i<numGroups;i++) {        \
643     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
644     if(obj->isLocMgr())  {      \
645       CkLocMgr *mgr = (CkLocMgr*)obj;   \
646       code      \
647     }   \
648   }     \
649  }
650
651 #if 0
652 // helper class to pup all elements that belong to same ckLocMgr
653 class ElementDestoryer : public CkLocIterator {
654 private:
655         CkLocMgr *locMgr;
656 public:
657         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
658         void addLocation(CkLocation &loc) {
659                 CkArrayIndex idx=loc.getIndex();
660                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
661                 loc.destroy();
662         }
663 };
664 #endif
665
666 // restore the bitmap vector for LB
667 void CkMemCheckPT::resetLB(int diepe)
668 {
669 #if CMK_LBDB_ON
670   int i;
671   char *bitmap = new char[CkNumPes()];
672   // set processor available bitmap
673   get_avail_vector(bitmap);
674
675   for (i=0; i<failedPes.length(); i++)
676     bitmap[failedPes[i]] = 0; 
677   bitmap[diepe] = 0;
678
679 #if CK_NO_PROC_POOL
680   set_avail_vector(bitmap);
681 #endif
682
683   // if I am the crashed pe, rebuild my failedPEs array
684   if (CkMyNode() == diepe)
685     for (i=0; i<CkNumPes(); i++) 
686       if (bitmap[i]==0) failed(i);
687
688   delete [] bitmap;
689 #endif
690 }
691
692 // in case when failedPe dies, everybody go through its checkpoint table:
693 // destory all array elements
694 // recover lost buddies
695 // reconstruct all array elements from check point data
696 // called on all processors
697 void CkMemCheckPT::restart(int diePe)
698 {
699 #if CMK_MEM_CHECKPOINT
700   double curTime = CmiWallTimer();
701   if (CkMyPe() == diePe)
702     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
703   stage = (char*)"resetLB";
704   startTime = curTime;
705   if (CkMyPe() == diePe)
706     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
707
708 #if CK_NO_PROC_POOL
709   failed(diePe);        // add into the list of failed pes
710 #endif
711   thisFailedPe = diePe;
712
713   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
714
715   inRestarting = 1;
716                                                                                 
717   // disable load balancer's barrier
718   if (CkMyPe() != diePe) resetLB(diePe);
719
720   CKLOCMGR_LOOP(mgr->startInserting(););
721
722   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
723   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
724 /*
725   if (CkMyPe() == 0)
726     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
727 */
728 #endif
729 }
730
731 // loally remove all array elements
732 void CkMemCheckPT::removeArrayElements()
733 {
734 #if CMK_MEM_CHECKPOINT
735   int len = ckTable.length();
736   double curTime = CmiWallTimer();
737   if (CkMyPe() == thisFailedPe) 
738     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
739   stage = (char*)"removeArrayElements";
740   startTime = curTime;
741
742   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
743   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
744
745   // get rid of all buffering and remote recs
746   // including destorying all array elements
747   CKLOCMGR_LOOP(mgr->flushAllRecs(););
748
749 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
750
751   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
752   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
753 #endif
754 }
755
756 // flush state in reduction manager
757 void CkMemCheckPT::resetReductionMgr()
758 {
759   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
760   int numGroups = CkpvAccess(_groupIDTable)->size();
761   for(int i=0;i<numGroups;i++) {
762     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
763     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
764     obj->flushStates();
765     obj->ckJustMigrated();
766   }
767   // reset again
768   //CpvAccess(_qd)->flushStates();
769
770 #if 1
771   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
772   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
773 #else
774   if (CkMyPe() == 0)
775     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
776 #endif
777 }
778
779 // recover the lost buddies
780 void CkMemCheckPT::recoverBuddies()
781 {
782   int idx;
783   int len = ckTable.length();
784   // ready to flush reduction manager
785   // cannot be CkMemCheckPT::restart because destory will modify states
786   double curTime = CmiWallTimer();
787   if (CkMyPe() == thisFailedPe)
788   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
789   stage = (char *)"recoverBuddies";
790   if (CkMyPe() == thisFailedPe)
791   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
792   startTime = curTime;
793
794   // recover buddies
795   expectCount = 0;
796   for (idx=0; idx<len; idx++) {
797     CkCheckPTInfo *entry = ckTable[idx];
798     if (entry->pNo == thisFailedPe) {
799 #if CK_NO_PROC_POOL
800       // find a new buddy
801 /*
802       int budPe = CkMyPe();
803 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
804       while (budPe == CkMyPe() || isFailed(budPe)) 
805           budPe = (budPe+1)%CkNumPes();
806 */
807       int budPe = BuddyPE(CkMyPe());
808       //entry->pNo = budPe;
809 #else
810       int budPe = thisFailedPe;
811 #endif
812       entry->updateBuddy(CkMyPe(), budPe);
813 #if 0
814       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
815       CkArrayCheckPTMessage *msg = entry->getCopy();
816       msg->cp_flag = 0;            // not checkpointing
817       thisProxy[budPe].recvData(msg);
818 #else
819       CkArrayCheckPTMessage *msg = entry->getCopy();
820       msg->bud1 = budPe;
821       msg->bud2 = CkMyPe();
822       msg->cp_flag = 0;            // not checkpointing
823       thisProxy[budPe].recoverEntry(msg);
824 #endif
825       expectCount ++;
826     }
827   }
828
829 #if 1
830   if (expectCount == 0) {
831     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
832     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
833   }
834 #else
835   if (CkMyPe() == 0) {
836     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
837   }
838 #endif
839
840   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
841 }
842
843 void CkMemCheckPT::gotData()
844 {
845   ackCount ++;
846   if (ackCount == expectCount) {
847     ackCount = 0;
848     expectCount = -1;
849     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
850     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
851   }
852 }
853
854 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
855 {
856   for (int i=0; i<n; i++) {
857     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
858     mgr->updateLocation(idx[i], nowOnPe);
859   }
860 }
861
862 // restore array elements
863 void CkMemCheckPT::recoverArrayElements()
864 {
865   double curTime = CmiWallTimer();
866   int len = ckTable.length();
867   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
868   stage = (char *)"recoverArrayElements";
869   if (CkMyPe() == thisFailedPe)
870   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
871   startTime = curTime;
872
873   // recover all array elements
874   int count = 0;
875 #if STREAMING_INFORMHOME
876   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
877   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
878 #endif
879   for (int idx=0; idx<len; idx++)
880   {
881     CkCheckPTInfo *entry = ckTable[idx];
882 #if CK_NO_PROC_POOL
883     // the bigger one will do 
884 //    if (CkMyPe() < entry->pNo) continue;
885     if (!isMaster(entry->pNo)) continue;
886 #else
887     // smaller one do it, which has the original object
888     if (CkMyPe() == entry->pNo+1 || 
889         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
890 #endif
891 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
892
893     entry->updateBuddy(CkMyPe(), entry->pNo);
894     CkArrayCheckPTMessage *msg = entry->getCopy();
895     // gzheng
896     //thisProxy[CkMyPe()].inmem_restore(msg);
897     inmem_restore(msg);
898 #if STREAMING_INFORMHOME
899     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
900     int homePe = mgr->homePe(msg->index);
901     if (homePe != CkMyPe()) {
902       gmap[homePe].push_back(msg->locMgr);
903       imap[homePe].push_back(msg->index);
904     }
905 #endif
906     CkFreeMsg(msg);
907     count ++;
908   }
909 #if STREAMING_INFORMHOME
910   for (int i=0; i<CkNumPes(); i++) {
911     if (gmap[i].size() && i!=CkMyPe()) {
912       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
913     }
914   }
915   delete [] imap;
916   delete [] gmap;
917 #endif
918   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
919
920   CKLOCMGR_LOOP(mgr->doneInserting(););
921
922   inRestarting = 0;
923   // _crashedNode = -1;
924   CpvAccess(_crashedNode) = -1;
925
926   if (CkMyPe() == 0)
927     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
928 }
929
930 static double restartT;
931
932 // on every processor
933 // turn load balancer back on
934 void CkMemCheckPT::finishUp()
935 {
936   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
937   //CKLOCMGR_LOOP(mgr->doneInserting(););
938   
939   if (CkMyPe() == thisFailedPe)
940   {
941        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
942        //CkStartQD(cpCallback);
943        cpCallback.send();
944        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
945   }
946
947 #if CK_NO_PROC_POOL
948 #if NODE_CHECKPOINT
949   int numnodes = CmiNumPhysicalNodes();
950 #else
951   int numnodes = CkNumPes();
952 #endif
953   if (numnodes-totalFailed() <=2) {
954     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
955     _memChkptOn = 0;
956   }
957 #endif
958 }
959
960 // called only on 0
961 void CkMemCheckPT::quiescence(CkCallback &cb)
962 {
963   static int pe_count = 0;
964   pe_count ++;
965   CmiAssert(CkMyPe() == 0);
966   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
967   if (pe_count == CkNumPes()) {
968     pe_count = 0;
969     cb.send();
970   }
971 }
972
973 // User callable function - to start a checkpoint
974 // callback cb is used to pass control back
975 void CkStartMemCheckpoint(CkCallback &cb)
976 {
977 #if CMK_MEM_CHECKPOINT
978   if (_memChkptOn == 0) {
979     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
980     cb.send();
981     return;
982   }
983   if (CkInRestarting()) {
984       // trying to checkpointing during restart
985     cb.send();
986     return;
987   }
988     // store user callback and user data
989   CkMemCheckPT::cpCallback = cb;
990
991     // broadcast to start check pointing
992   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
993   checkptMgr.doItNow(CkMyPe(), cb);
994 #else
995   // when mem checkpoint is disabled, invike cb immediately
996   CkPrintf("Warning: In-Memory checkpoint has been disabled! Please use -syncft when build Charm++\n");
997   cb.send();
998 #endif
999 }
1000
1001 void CkRestartCheckPoint(int diePe)
1002 {
1003 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1004   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1005   // broadcast
1006   checkptMgr.restart(diePe);
1007 }
1008
1009 static int _diePE = -1;
1010
1011 // callback function used locally by ccs handler
1012 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1013 {
1014 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1015   CkRestartCheckPoint(_diePE);
1016 }
1017
1018 // Converse function handles
1019 static int askPhaseHandlerIdx;
1020 static int recvPhaseHandlerIdx;
1021 static int askProcDataHandlerIdx;
1022 static int restartBcastHandlerIdx;
1023 static int recoverProcDataHandlerIdx;
1024 static int restartBeginHandlerIdx;
1025 static int notifyHandlerIdx;
1026
1027 // called on crashed PE
1028 static void restartBeginHandler(char *msg)
1029 {
1030 #if CMK_MEM_CHECKPOINT
1031   CmiFree(msg);
1032   static int count = 0;
1033   CmiAssert(CkMyPe() == _diePE);
1034   count ++;
1035   if (count == CkNumPes()) {
1036     CkRestartCheckPointCallback(NULL, NULL);
1037     count = 0;
1038   }
1039 #endif
1040 }
1041
1042 extern void _discard_charm_message();
1043 extern void _resume_charm_message();
1044
1045 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1046         return data;
1047 }
1048
1049 static void restartBcastHandler(char *msg)
1050 {
1051 #if CMK_MEM_CHECKPOINT
1052   // advance phase counter
1053   CkMemCheckPT::inRestarting = 1;
1054   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1055   // gzheng
1056   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1057
1058   if (CkMyPe()==_diePE)
1059     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1060
1061   // reset QD counters
1062 /*  gzheng
1063   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1064 */
1065
1066 /*  gzheng
1067   if (CkMyPe()==_diePE)
1068       CkRestartCheckPointCallback(NULL, NULL);
1069 */
1070   CmiFree(msg);
1071
1072   _resume_charm_message();
1073
1074     // reduction
1075   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1076   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1077   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1078   checkpointed = 0;
1079 #endif
1080 }
1081
1082 extern void _initDone();
1083
1084 // called on crashed processor
1085 static void recoverProcDataHandler(char *msg)
1086 {
1087 #if CMK_MEM_CHECKPOINT
1088    int i;
1089    envelope *env = (envelope *)msg;
1090    CkUnpackMessage(&env);
1091    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1092    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1093    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1094    //cur_restart_phase ++;
1095      // gzheng ?
1096    //CpvAccess(_qd)->flushStates();
1097
1098    // restore readonly, mainchare, group, nodegroup
1099 //   int temp = cur_restart_phase;
1100 //   cur_restart_phase = -1;
1101    PUP::fromMem p(procMsg->packData);
1102    _handleProcData(p);
1103
1104    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1105    // gzheng
1106    CKLOCMGR_LOOP(mgr->startInserting(););
1107
1108    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1109    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1110    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1111    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1112
1113    _initDone();
1114 //   CpvAccess(_qd)->flushStates();
1115    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1116 #endif
1117 }
1118
1119 // called on its backup processor
1120 // get backup message buffer and sent to crashed processor
1121 static void askProcDataHandler(char *msg)
1122 {
1123 #if CMK_MEM_CHECKPOINT
1124     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1125     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1126     if (CpvAccess(procChkptBuf) == NULL)  {
1127       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1128       CkAbort("no checkpoint found");
1129     }
1130     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1131     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1132
1133     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1134
1135     CkPackMessage(&env);
1136     CmiSetHandler(env, recoverProcDataHandlerIdx);
1137     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1138     CpvAccess(procChkptBuf) = NULL;
1139 #endif
1140 }
1141
1142 // called on PE 0
1143 void qd_callback(void *m)
1144 {
1145    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1146    CkFreeMsg(m);
1147 #ifdef CMK_SMP
1148    for(int i=0;i<CmiMyNodeSize();i++){
1149    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1150    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1151         CmiSetHandler(msg, askProcDataHandlerIdx);
1152         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1153         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1154    }
1155    return;
1156 #endif
1157    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1158    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1159    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1160    CmiSetHandler(msg, askProcDataHandlerIdx);
1161    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1162    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1163
1164 }
1165
1166 // on crashed node
1167 void CkMemRestart(const char *dummy, CkArgMsg *args)
1168 {
1169 #if CMK_MEM_CHECKPOINT
1170    _diePE = CmiMyNode();
1171    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1172    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1173    CkMemCheckPT::inRestarting = 1;
1174
1175   CpvAccess( _crashedNode )= CmiMyNode();
1176         
1177   _discard_charm_message();
1178   
1179   if(CmiMyRank()==0){
1180     CkCallback cb(qd_callback);
1181     CkStartQD(cb);
1182     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1183   }
1184 #else
1185    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1186 #endif
1187 }
1188
1189 // can be called in other files
1190 // return true if it is in restarting
1191 extern "C"
1192 int CkInRestarting()
1193 {
1194 #if CMK_MEM_CHECKPOINT
1195   if (CpvAccess( _crashedNode)!=-1) return 1;
1196   // gzheng
1197   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1198   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1199   return CkMemCheckPT::inRestarting;
1200 #else
1201   return 0;
1202 #endif
1203 }
1204
1205 /*****************************************************************************
1206                 module initialization
1207 *****************************************************************************/
1208
1209 static int arg_where = CkCheckPoint_inMEM;
1210
1211 #if CMK_MEM_CHECKPOINT
1212 void init_memcheckpt(char **argv)
1213 {
1214     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1215       arg_where = CkCheckPoint_inDISK;
1216     }
1217
1218         // initiliazing _crashedNode variable
1219         CpvInitialize(int, _crashedNode);
1220         CpvAccess(_crashedNode) = -1;
1221
1222 }
1223 #endif
1224
1225 class CkMemCheckPTInit: public Chare {
1226 public:
1227   CkMemCheckPTInit(CkArgMsg *m) {
1228 #if CMK_MEM_CHECKPOINT
1229     if (arg_where == CkCheckPoint_inDISK) {
1230       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1231     }
1232     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1233     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1234 #endif
1235   }
1236 };
1237
1238 static void notifyHandler(char *msg)
1239 {
1240 #if CMK_MEM_CHECKPOINT
1241   CmiFree(msg);
1242       /* immediately increase restart phase to filter old messages */
1243   CpvAccess(_curRestartPhase) ++;
1244   CpvAccess(_qd)->flushStates();
1245   _discard_charm_message();
1246 #endif
1247 }
1248
1249 extern "C"
1250 void notify_crash(int node)
1251 {
1252 #ifdef CMK_MEM_CHECKPOINT
1253   CpvAccess( _crashedNode) = node;
1254 #ifdef CMK_SMP
1255   for(int i=0;i<CkMyNodeSize();i++){
1256         CpvAccessOther(_crashedNode,i)=node;
1257   }
1258 #endif
1259   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1260   CkMemCheckPT::inRestarting = 1;
1261
1262     // this may be in interrupt handler, send a message to reset QD
1263   int pe = CmiNodeFirst(CkMyNode());
1264   for(int i=0;i<CkMyNodeSize();i++){
1265         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1266         CmiSetHandler(msg, notifyHandlerIdx);
1267         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1268   }
1269 #endif
1270 }
1271
1272 extern "C" void (*notify_crash_fn)(int node);
1273
1274 #if CMK_CONVERSE_MPI
1275 static int pingHandlerIdx;
1276 static int pingCheckHandlerIdx;
1277 static int buddyDieHandlerIdx;
1278 static double lastPingTime = -1;
1279
1280 extern "C" void mpi_restart_crashed(int pe, int rank);
1281 extern "C" int  find_spare_mpirank(int pe);
1282
1283 void pingBuddy();
1284 void pingCheckHandler();
1285
1286 void buddyDieHandler(char *msg)
1287 {
1288 #if CMK_MEM_CHECKPOINT
1289    // notify
1290    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1291    notify_crash(diepe);
1292    // send message to crash pe to let it restart
1293    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1294    int newrank = find_spare_mpirank(diepe);
1295    int buddy = obj->BuddyPE(CmiMyPe());
1296    if (buddy == diepe)  {
1297      mpi_restart_crashed(diepe, newrank);
1298      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1299    }
1300 #endif
1301 }
1302
1303 void pingHandler(void *msg)
1304 {
1305   lastPingTime = CmiWallTimer();
1306   CmiFree(msg);
1307 }
1308
1309 void pingCheckHandler()
1310 {
1311 #if CMK_MEM_CHECKPOINT
1312   double now = CmiWallTimer();
1313   if (lastPingTime > 0 && now - lastPingTime > 4) {
1314     int i, pe, buddy;
1315     // tell everyone the buddy dies
1316     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1317     for (i = 1; i < CmiNumPes(); i++) {
1318        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1319        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1320     }
1321     buddy = pe;
1322     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1323     for (int pe = 0; pe < CmiNumPes(); pe++) {
1324       if (obj->isFailed(pe) || pe == buddy) continue;
1325       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1326       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1327       CmiSetHandler(msg, buddyDieHandlerIdx);
1328       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1329     }
1330   }
1331   else 
1332     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1333 #endif
1334 }
1335
1336 void pingBuddy()
1337 {
1338 #if CMK_MEM_CHECKPOINT
1339   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1340   if (obj) {
1341     int buddy = obj->BuddyPE(CkMyPe());
1342 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1343     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1344     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1345     CmiSetHandler(msg, pingHandlerIdx);
1346     CmiGetRestartPhase(msg) = 9999;
1347     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1348   }
1349   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1350 #endif
1351 }
1352 #endif
1353
1354 // initproc
1355 void CkRegisterRestartHandler( )
1356 {
1357 #if CMK_MEM_CHECKPOINT
1358   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1359   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1360   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1361   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1362   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1363
1364 #if CMK_CONVERSE_MPI
1365   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1366   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1367   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1368 #endif
1369
1370   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1371   CpvAccess(procChkptBuf) = NULL;
1372
1373   notify_crash_fn = notify_crash;
1374
1375 #if ! CMK_CONVERSE_MPI
1376   // print pid to kill
1377 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1378 //  sleep(4);
1379 #endif
1380 #endif
1381 }
1382
1383
1384 extern "C"
1385 int CkHasCheckpoints()
1386 {
1387   return checkpointed;
1388 }
1389
1390 /// @todo: the following definitions should be moved to a separate file containing
1391 // structures and functions about fault tolerance strategies
1392
1393 /**
1394  *  * @brief: function for killing a process                                             
1395  *   */
1396 #ifdef CMK_MEM_CHECKPOINT
1397 #if CMK_HAS_GETPID
1398 void killLocal(void *_dummy,double curWallTime){
1399         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1400         if(CmiWallTimer()<killTime-1){
1401                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1402         }else{ 
1403 #if CMK_CONVERSE_MPI
1404                                 CkDieNow();
1405 #else 
1406                 kill(getpid(),SIGKILL);                                               
1407 #endif
1408         }              
1409
1410 #else
1411 void killLocal(void *_dummy,double curWallTime){
1412   CmiAbort("kill() not supported!");
1413 }
1414 #endif
1415 #endif
1416
1417 #ifdef CMK_MEM_CHECKPOINT
1418 /**
1419  * @brief: reads the file with the kill information
1420  */
1421 void readKillFile(){
1422         FILE *fp=fopen(killFile,"r");
1423         if(!fp){
1424                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1425                 return;
1426         }
1427         int proc;
1428         double sec;
1429         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1430                 if(proc == CkMyNode() && CkMyRank() == 0){
1431                         killTime = CmiWallTimer()+sec;
1432                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1433                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1434                 }
1435         }
1436         fclose(fp);
1437 }
1438
1439 #if ! CMK_CONVERSE_MPI
1440 void CkDieNow()
1441 {
1442 #ifdef CMK_MEM_CHECKPOINT || (defined(_FAULT_MLOG_) || defined(_FAULT_CAUSAL_))
1443          // ignored for non-mpi version
1444         CmiPrintf("[%d] die now.\n", CmiMyPe());
1445         killTime = CmiWallTimer()+0.001;
1446         CcdCallFnAfter(killLocal,NULL,1);
1447 #endif
1448 }
1449 #endif
1450
1451 #endif
1452
1453 #include "CkMemCheckpoint.def.h"
1454
1455