Merge branch 'charm' of charmgit:charm into isomalloc
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   if (p.isUnpacking()) {
336     recvCount = peCount = 0;
337 #if CMK_CONVERSE_MPI
338     void pingBuddy();
339     void pingCheckHandler();
340     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
341     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
342 #endif
343   }
344 }
345
346 // called by checkpoint mgr to restore an array element
347 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
348 {
349 #if CMK_MEM_CHECKPOINT
350   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
351   // m->index.print();
352   PUP::fromMem p(m->packData);
353   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
354   CmiAssert(mgr);
355 #if STREAMING_INFORMHOME
356   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
357 #else
358   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
359 #endif
360
361   // find a list of array elements bound together
362   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
363   CmiAssert(elt);
364   CkLocRec_local *rec = elt->myRec;
365   CkVec<CkMigratable *> list;
366   mgr->migratableList(rec, list);
367   CmiAssert(list.length() > 0);
368   for (int l=0; l<list.length(); l++) {
369     elt = (ArrayElement *)list[l];
370     elt->budPEs[0] = m->bud1;
371     elt->budPEs[1] = m->bud2;
372     //    reset, may not needed now
373     // for now.
374     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
375       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
376       if (c) c->redNo = 0;
377     }
378   }
379 #endif
380 }
381
382 // return 1 if pe is a crashed processor
383 int CkMemCheckPT::isFailed(int pe)
384 {
385   for (int i=0; i<failedPes.length(); i++)
386     if (failedPes[i] == pe) return 1;
387   return 0;
388 }
389
390 // add pe into history list of all failed processors
391 void CkMemCheckPT::failed(int pe)
392 {
393   if (isFailed(pe)) return;
394   failedPes.push_back(pe);
395 }
396
397 int CkMemCheckPT::totalFailed()
398 {
399   return failedPes.length();
400 }
401
402 // create an checkpoint entry for array element of aid with index.
403 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
404 {
405   // error check, no duplicate
406   int idx, len = ckTable.size();
407   for (idx=0; idx<len; idx++) {
408     CkCheckPTInfo *entry = ckTable[idx];
409     if (index == entry->index) {
410       if (loc == entry->locMgr) {
411           // bindTo array elements
412           return;
413       }
414         // for array inheritance, the following check may fail
415         // because ArrayElement constructor of all superclasses are called
416       if (aid == entry->aid) {
417         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
418         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
419       }
420     }
421   }
422   CkCheckPTInfo *newEntry;
423   if (where == CkCheckPoint_inMEM)
424     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
425   else
426     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
427   ckTable.push_back(newEntry);
428   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
429 }
430
431 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
432 {
433   int buddy = msg->bud1;
434   if (buddy == CkMyPe()) buddy = msg->bud2;
435   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
436   recvData(msg);
437     // ack
438   thisProxy[buddy].gotData();
439 }
440
441 // loop through my checkpoint table and ask checkpointed array elements
442 // to send me checkpoint data.
443 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
444 {
445   checkpointed = 1;
446   cpCallback = cb;
447   cpStarter = starter;
448   if (CkMyPe() == cpStarter) {
449     startTime = CmiWallTimer();
450     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
451   }
452
453   int len = ckTable.length();
454   for (int i=0; i<len; i++) {
455     CkCheckPTInfo *entry = ckTable[i];
456       // always let the bigger number processor send request
457     //if (CkMyPe() < entry->pNo) continue;
458       // always let the smaller number processor send request, may on same proc
459     if (!isMaster(entry->pNo)) continue;
460       // call inmem_checkpoint to the array element, ask it to send
461       // back checkpoint data via recvData().
462     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
463     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
464   }
465     // if my table is empty, then I am done
466   if (len == 0) thisProxy[cpStarter].cpFinish();
467
468   // pack and send proc level data
469   sendProcData();
470 }
471
472 // don't handle array elements
473 static inline void _handleProcData(PUP::er &p)
474 {
475     // save readonlys, and callback BTW
476     CkPupROData(p);
477
478     // save mainchares 
479     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
480         
481 #ifndef CMK_CHARE_USE_PTR
482     // save non-migratable chare
483     CkPupChareData(p);
484 #endif
485
486     // save groups into Groups.dat
487     CkPupGroupData(p);
488
489     // save nodegroups into NodeGroups.dat
490     if(CkMyRank()==0) CkPupNodeGroupData(p);
491 }
492
493 void CkMemCheckPT::sendProcData()
494 {
495   // find out size of buffer
496   int size;
497   {
498     PUP::sizer p;
499     _handleProcData(p);
500     size = p.size();
501   }
502   int packSize = size;
503   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
504   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
505   {
506     PUP::toMem p(msg->packData);
507     _handleProcData(p);
508   }
509   msg->pe = CkMyPe();
510   msg->len = size;
511   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
512   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
513 }
514
515 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
516 {
517   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
518   CpvAccess(procChkptBuf) = msg;
519   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
520   thisProxy[msg->reportPe].cpFinish();
521 }
522
523 // ArrayElement call this function to give us the checkpointed data
524 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
525 {
526   int len = ckTable.length();
527   int idx;
528   for (idx=0; idx<len; idx++) {
529     CkCheckPTInfo *entry = ckTable[idx];
530     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
531   }
532   CkAssert(idx < len);
533   int isChkpting = msg->cp_flag;
534   ckTable[idx]->updateBuffer(msg);
535   if (isChkpting) {
536       // all my array elements have returned their inmem data
537       // inform starter processor that I am done.
538     recvCount ++;
539     if (recvCount == ckTable.length()) {
540       if (where == CkCheckPoint_inMEM) {
541         thisProxy[cpStarter].cpFinish();
542       }
543       else if (where == CkCheckPoint_inDISK) {
544         // another barrier for finalize the writing using fsync
545         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
546         contribute(0,NULL,CkReduction::sum_int,localcb);
547       }
548       else
549         CmiAbort("Unknown checkpoint scheme");
550       recvCount = 0;
551     } 
552   }
553 }
554
555 // only used in disk checkpointing
556 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
557 {
558   delete m;
559 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
560   system("sync");
561 #endif
562   thisProxy[cpStarter].cpFinish();
563 }
564
565 // only is called on cpStarter when checkpoint is done
566 void CkMemCheckPT::cpFinish()
567 {
568   CmiAssert(CkMyPe() == cpStarter);
569   peCount++;
570     // now that all processors have finished, activate callback
571   if (peCount == 2*(CkNumPes())) {
572     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
573     cpCallback.send();
574     peCount = 0;
575     thisProxy.report();
576   }
577 }
578
579 // for debugging, report checkpoint info
580 void CkMemCheckPT::report()
581 {
582   int objsize = 0;
583   int len = ckTable.length();
584   for (int i=0; i<len; i++) {
585     CkCheckPTInfo *entry = ckTable[i];
586     CmiAssert(entry);
587     objsize += entry->getSize();
588   }
589   CmiAssert(CpvAccess(procChkptBuf));
590   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
591 }
592
593 /*****************************************************************************
594                         RESTART Procedure
595 *****************************************************************************/
596
597 // master processor of two buddies
598 inline int CkMemCheckPT::isMaster(int buddype)
599 {
600 #if 0
601   int mype = CkMyPe();
602 //CkPrintf("ismaster: %d %d\n", pe, mype);
603   if (CkNumPes() - totalFailed() == 2) {
604     return mype > buddype;
605   }
606   for (int i=1; i<CkNumPes(); i++) {
607     int me = (buddype+i)%CkNumPes();
608     if (isFailed(me)) continue;
609     if (me == mype) return 1;
610     else return 0;
611   }
612   return 0;
613 #else
614     // smaller one
615   int mype = CkMyPe();
616 //CkPrintf("ismaster: %d %d\n", pe, mype);
617   if (CkNumPes() - totalFailed() == 2) {
618     return mype < buddype;
619   }
620   for (int i=1; i<CkNumPes(); i++) {
621     int me = (mype+i)%CkNumPes();
622     if (isFailed(me)) continue;
623     if (me == buddype) return 1;
624     else return 0;
625   }
626   return 0;
627 #endif
628 }
629
630 #ifdef CKLOCMGR_LOOP
631 #undef CKLOCMGR_LOOP
632 #endif
633
634 // loop over all CkLocMgr and do "code"
635 #define  CKLOCMGR_LOOP(code)    {       \
636   int numGroups = CkpvAccess(_groupIDTable)->size();    \
637   for(int i=0;i<numGroups;i++) {        \
638     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
639     if(obj->isLocMgr())  {      \
640       CkLocMgr *mgr = (CkLocMgr*)obj;   \
641       code      \
642     }   \
643   }     \
644  }
645
646 #if 0
647 // helper class to pup all elements that belong to same ckLocMgr
648 class ElementDestoryer : public CkLocIterator {
649 private:
650         CkLocMgr *locMgr;
651 public:
652         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
653         void addLocation(CkLocation &loc) {
654                 CkArrayIndex idx=loc.getIndex();
655                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
656                 loc.destroy();
657         }
658 };
659 #endif
660
661 // restore the bitmap vector for LB
662 void CkMemCheckPT::resetLB(int diepe)
663 {
664 #if CMK_LBDB_ON
665   int i;
666   char *bitmap = new char[CkNumPes()];
667   // set processor available bitmap
668   get_avail_vector(bitmap);
669
670   for (i=0; i<failedPes.length(); i++)
671     bitmap[failedPes[i]] = 0; 
672   bitmap[diepe] = 0;
673
674 #if CK_NO_PROC_POOL
675   set_avail_vector(bitmap);
676 #endif
677
678   // if I am the crashed pe, rebuild my failedPEs array
679   if (CkMyPe() == diepe)
680     for (i=0; i<CkNumPes(); i++) 
681       if (bitmap[i]==0) failed(i);
682
683   delete [] bitmap;
684 #endif
685 }
686
687 // in case when failedPe dies, everybody go through its checkpoint table:
688 // destory all array elements
689 // recover lost buddies
690 // reconstruct all array elements from check point data
691 // called on all processors
692 void CkMemCheckPT::restart(int diePe)
693 {
694 #if CMK_MEM_CHECKPOINT
695   double curTime = CmiWallTimer();
696   if (CkMyPe() == diePe)
697     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
698   stage = (char*)"resetLB";
699   startTime = curTime;
700   if (CkMyPe() == diePe)
701     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
702
703 #if CK_NO_PROC_POOL
704   failed(diePe);        // add into the list of failed pes
705 #endif
706   thisFailedPe = diePe;
707
708   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
709
710   inRestarting = 1;
711                                                                                 
712   // disable load balancer's barrier
713   if (CkMyPe() != diePe) resetLB(diePe);
714
715   CKLOCMGR_LOOP(mgr->startInserting(););
716
717   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
718 /*
719   if (CkMyPe() == 0)
720     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
721 */
722 #endif
723 }
724
725 // loally remove all array elements
726 void CkMemCheckPT::removeArrayElements()
727 {
728 #if CMK_MEM_CHECKPOINT
729   int len = ckTable.length();
730   double curTime = CmiWallTimer();
731   if (CkMyPe() == thisFailedPe) 
732     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
733   stage = (char*)"removeArrayElements";
734   startTime = curTime;
735
736   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
737   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
738
739   // get rid of all buffering and remote recs
740   // including destorying all array elements
741   CKLOCMGR_LOOP(mgr->flushAllRecs(););
742
743 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
744
745   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
746 #endif
747 }
748
749 // flush state in reduction manager
750 void CkMemCheckPT::resetReductionMgr()
751 {
752   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
753   int numGroups = CkpvAccess(_groupIDTable)->size();
754   for(int i=0;i<numGroups;i++) {
755     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
756     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
757     obj->flushStates();
758     obj->ckJustMigrated();
759   }
760   // reset again
761   //CpvAccess(_qd)->flushStates();
762
763 #if 1
764   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
765 #else
766   if (CkMyPe() == 0)
767     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
768 #endif
769 }
770
771 // recover the lost buddies
772 void CkMemCheckPT::recoverBuddies()
773 {
774   int idx;
775   int len = ckTable.length();
776   // ready to flush reduction manager
777   // cannot be CkMemCheckPT::restart because destory will modify states
778   double curTime = CmiWallTimer();
779   if (CkMyPe() == thisFailedPe)
780   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
781   stage = (char *)"recoverBuddies";
782   if (CkMyPe() == thisFailedPe)
783   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
784   startTime = curTime;
785
786   // recover buddies
787   expectCount = 0;
788   for (idx=0; idx<len; idx++) {
789     CkCheckPTInfo *entry = ckTable[idx];
790     if (entry->pNo == thisFailedPe) {
791 #if CK_NO_PROC_POOL
792       // find a new buddy
793 /*
794       int budPe = CkMyPe();
795 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
796       while (budPe == CkMyPe() || isFailed(budPe)) 
797           budPe = (budPe+1)%CkNumPes();
798 */
799       int budPe = BuddyPE(CkMyPe());
800       //entry->pNo = budPe;
801 #else
802       int budPe = thisFailedPe;
803 #endif
804       entry->updateBuddy(CkMyPe(), budPe);
805 #if 0
806       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
807       CkArrayCheckPTMessage *msg = entry->getCopy();
808       msg->cp_flag = 0;            // not checkpointing
809       thisProxy[budPe].recvData(msg);
810 #else
811       CkArrayCheckPTMessage *msg = entry->getCopy();
812       msg->bud1 = budPe;
813       msg->bud2 = CkMyPe();
814       msg->cp_flag = 0;            // not checkpointing
815       thisProxy[budPe].recoverEntry(msg);
816 #endif
817       expectCount ++;
818     }
819   }
820
821 #if 1
822   if (expectCount == 0) {
823     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
824   }
825 #else
826   if (CkMyPe() == 0) {
827     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
828   }
829 #endif
830
831   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
832 }
833
834 void CkMemCheckPT::gotData()
835 {
836   ackCount ++;
837   if (ackCount == expectCount) {
838     ackCount = 0;
839     expectCount = -1;
840     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
841   }
842 }
843
844 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
845 {
846   for (int i=0; i<n; i++) {
847     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
848     mgr->updateLocation(idx[i], nowOnPe);
849   }
850 }
851
852 // restore array elements
853 void CkMemCheckPT::recoverArrayElements()
854 {
855   double curTime = CmiWallTimer();
856   int len = ckTable.length();
857   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
858   stage = (char *)"recoverArrayElements";
859   if (CkMyPe() == thisFailedPe)
860   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
861   startTime = curTime;
862
863   // recover all array elements
864   int count = 0;
865 #if STREAMING_INFORMHOME
866   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
867   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
868 #endif
869   for (int idx=0; idx<len; idx++)
870   {
871     CkCheckPTInfo *entry = ckTable[idx];
872 #if CK_NO_PROC_POOL
873     // the bigger one will do 
874 //    if (CkMyPe() < entry->pNo) continue;
875     if (!isMaster(entry->pNo)) continue;
876 #else
877     // smaller one do it, which has the original object
878     if (CkMyPe() == entry->pNo+1 || 
879         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
880 #endif
881 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
882
883     entry->updateBuddy(CkMyPe(), entry->pNo);
884     CkArrayCheckPTMessage *msg = entry->getCopy();
885     // gzheng
886     //thisProxy[CkMyPe()].inmem_restore(msg);
887     inmem_restore(msg);
888 #if STREAMING_INFORMHOME
889     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
890     int homePe = mgr->homePe(msg->index);
891     if (homePe != CkMyPe()) {
892       gmap[homePe].push_back(msg->locMgr);
893       imap[homePe].push_back(msg->index);
894     }
895 #endif
896     count ++;
897   }
898 #if STREAMING_INFORMHOME
899   for (int i=0; i<CkNumPes(); i++) {
900     if (gmap[i].size() && i!=CkMyPe()) {
901       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
902     }
903   }
904   delete [] imap;
905   delete [] gmap;
906 #endif
907   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
908
909   CKLOCMGR_LOOP(mgr->doneInserting(););
910
911   inRestarting = 0;
912  // _crashedNode = -1;
913 CpvAccess(_crashedNode) = -1;
914
915   if (CkMyPe() == 0)
916     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
917 }
918
919 static double restartT;
920
921 // on every processor
922 // turn load balancer back on
923 void CkMemCheckPT::finishUp()
924 {
925   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
926   //CKLOCMGR_LOOP(mgr->doneInserting(););
927   
928   if (CkMyPe() == thisFailedPe)
929   {
930        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
931        //CkStartQD(cpCallback);
932        cpCallback.send();
933        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
934   }
935
936 #if CK_NO_PROC_POOL
937 #if NODE_CHECKPOINT
938   int numnodes = CmiNumPhysicalNodes();
939 #else
940   int numnodes = CkNumPes();
941 #endif
942   if (numnodes-totalFailed() <=2) {
943     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
944     _memChkptOn = 0;
945   }
946 #endif
947 }
948
949 // called only on 0
950 void CkMemCheckPT::quiescence(CkCallback &cb)
951 {
952   static int pe_count = 0;
953   pe_count ++;
954   CmiAssert(CkMyPe() == 0);
955   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
956   if (pe_count == CkNumPes()) {
957     pe_count = 0;
958     cb.send();
959   }
960 }
961
962 // User callable function - to start a checkpoint
963 // callback cb is used to pass control back
964 void CkStartMemCheckpoint(CkCallback &cb)
965 {
966 #if CMK_MEM_CHECKPOINT
967   if (_memChkptOn == 0) {
968     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
969     cb.send();
970     return;
971   }
972   if (CkInRestarting()) {
973       // trying to checkpointing during restart
974     cb.send();
975     return;
976   }
977     // store user callback and user data
978   CkMemCheckPT::cpCallback = cb;
979
980     // broadcast to start check pointing
981   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
982   checkptMgr.doItNow(CkMyPe(), cb);
983 #else
984   // when mem checkpoint is disabled, invike cb immediately
985   cb.send();
986 #endif
987 }
988
989 void CkRestartCheckPoint(int diePe)
990 {
991 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
992   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
993   // broadcast
994   checkptMgr.restart(diePe);
995 }
996
997 static int _diePE = -1;
998
999 // callback function used locally by ccs handler
1000 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1001 {
1002 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1003   CkRestartCheckPoint(_diePE);
1004 }
1005
1006 // Converse function handles
1007 static int askPhaseHandlerIdx;
1008 static int recvPhaseHandlerIdx;
1009 static int askProcDataHandlerIdx;
1010 static int restartBcastHandlerIdx;
1011 static int recoverProcDataHandlerIdx;
1012 static int restartBeginHandlerIdx;
1013 static int notifyHandlerIdx;
1014
1015 // called on crashed PE
1016 static void restartBeginHandler(char *msg)
1017 {
1018 #if CMK_MEM_CHECKPOINT
1019   static int count = 0;
1020   CmiFree(msg);
1021   CmiAssert(CkMyPe() == _diePE);
1022   count ++;
1023   if (count == CkNumPes()) {
1024     CkRestartCheckPointCallback(NULL, NULL);
1025     count = 0;
1026   }
1027 #endif
1028 }
1029
1030 extern void _discard_charm_message();
1031 extern void _resume_charm_message();
1032
1033 static void restartBcastHandler(char *msg)
1034 {
1035 #if CMK_MEM_CHECKPOINT
1036   // advance phase counter
1037   CkMemCheckPT::inRestarting = 1;
1038   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1039   // gzheng
1040   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1041
1042   if (CkMyPe()==_diePE)
1043     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1044
1045   // reset QD counters
1046 /*  gzheng
1047   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1048 */
1049
1050 /*  gzheng
1051   if (CkMyPe()==_diePE)
1052       CkRestartCheckPointCallback(NULL, NULL);
1053 */
1054   CmiFree(msg);
1055
1056   _resume_charm_message();
1057
1058     // reduction
1059   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1060   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1061   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1062
1063   checkpointed = 0;
1064 #endif
1065 }
1066
1067 extern void _initDone();
1068
1069 // called on crashed processor
1070 static void recoverProcDataHandler(char *msg)
1071 {
1072 #if CMK_MEM_CHECKPOINT
1073    int i;
1074    envelope *env = (envelope *)msg;
1075    CkUnpackMessage(&env);
1076    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1077    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1078    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1079    //cur_restart_phase ++;
1080      // gzheng ?
1081    //CpvAccess(_qd)->flushStates();
1082
1083    // restore readonly, mainchare, group, nodegroup
1084 //   int temp = cur_restart_phase;
1085 //   cur_restart_phase = -1;
1086    PUP::fromMem p(procMsg->packData);
1087    _handleProcData(p);
1088
1089    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1090    // gzheng
1091    CKLOCMGR_LOOP(mgr->startInserting(););
1092
1093    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1094    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1095    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1096    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1097
1098    _initDone();
1099 //   CpvAccess(_qd)->flushStates();
1100    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1101 #endif
1102 }
1103
1104 // called on its backup processor
1105 // get backup message buffer and sent to crashed processor
1106 static void askProcDataHandler(char *msg)
1107 {
1108 #if CMK_MEM_CHECKPOINT
1109     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1110     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1111     if (CpvAccess(procChkptBuf) == NULL) 
1112       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1113     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1114     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1115     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1116
1117     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1118
1119     CkPackMessage(&env);
1120     CmiSetHandler(env, recoverProcDataHandlerIdx);
1121     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1122     CpvAccess(procChkptBuf) = NULL;
1123 #endif
1124 }
1125
1126 // called on PE 0
1127 void qd_callback(void *m)
1128 {
1129    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1130    CkFreeMsg(m);
1131 #ifdef CMK_SMP
1132    for(int i=0;i<CmiMyNodeSize();i++){
1133    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1134    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1135         CmiSetHandler(msg, askProcDataHandlerIdx);
1136         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1137         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1138    }
1139    return;
1140 #endif
1141    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1142    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1143    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1144    CmiSetHandler(msg, askProcDataHandlerIdx);
1145    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1146    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1147
1148 }
1149
1150 // on crashed node
1151 void CkMemRestart(const char *dummy, CkArgMsg *args)
1152 {
1153 #if CMK_MEM_CHECKPOINT
1154    _diePE = CmiMyNode();
1155    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1156    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1157    CkMemCheckPT::inRestarting = 1;
1158
1159   CpvAccess( _crashedNode )= CmiMyNode();
1160         
1161   _discard_charm_message();
1162   
1163   if(CmiMyRank()==0){
1164     CkCallback cb(qd_callback);
1165     CkStartQD(cb);
1166     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1167   }
1168 #else
1169    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1170 #endif
1171 }
1172
1173 // can be called in other files
1174 // return true if it is in restarting
1175 extern "C"
1176 int CkInRestarting()
1177 {
1178 #if CMK_MEM_CHECKPOINT
1179   if (CpvAccess( _crashedNode)!=-1) return 1;
1180   // gzheng
1181   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1182   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1183   return CkMemCheckPT::inRestarting;
1184 #else
1185   return 0;
1186 #endif
1187 }
1188
1189 /*****************************************************************************
1190                 module initialization
1191 *****************************************************************************/
1192
1193 static int arg_where = CkCheckPoint_inMEM;
1194
1195 #if CMK_MEM_CHECKPOINT
1196 void init_memcheckpt(char **argv)
1197 {
1198     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1199       arg_where = CkCheckPoint_inDISK;
1200     }
1201
1202         // initiliazing _crashedNode variable
1203         CpvInitialize(int, _crashedNode);
1204         CpvAccess(_crashedNode) = -1;
1205
1206 }
1207 #endif
1208
1209 class CkMemCheckPTInit: public Chare {
1210 public:
1211   CkMemCheckPTInit(CkArgMsg *m) {
1212 #if CMK_MEM_CHECKPOINT
1213     if (arg_where == CkCheckPoint_inDISK) {
1214       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1215     }
1216     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1217     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1218 #endif
1219   }
1220 };
1221
1222 static void notifyHandler(char *msg)
1223 {
1224 #if CMK_MEM_CHECKPOINT
1225   CmiFree(msg);
1226       /* immediately increase restart phase to filter old messages */
1227   CpvAccess(_curRestartPhase) ++;
1228   CpvAccess(_qd)->flushStates();
1229   _discard_charm_message();
1230 #endif
1231 }
1232
1233 extern "C"
1234 void notify_crash(int node)
1235 {
1236 #ifdef CMK_MEM_CHECKPOINT
1237   CpvAccess( _crashedNode) = node;
1238 #ifdef CMK_SMP
1239   for(int i=0;i<CkMyNodeSize();i++){
1240         CpvAccessOther(_crashedNode,i)=node;
1241   }
1242 #endif
1243   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1244   CkMemCheckPT::inRestarting = 1;
1245
1246 /*
1247 #ifdef CMK_SMP
1248 */
1249   int pe = CmiNodeFirst(CkMyNode());
1250   for(int i=0;i<CkMyNodeSize();i++){
1251         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1252         CmiSetHandler(msg, notifyHandlerIdx);
1253         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1254   }
1255 /*
1256  return;
1257 #else 
1258     // this may be in interrupt handler, send a message to reset QD
1259   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1260   CmiSetHandler(msg, notifyHandlerIdx);
1261   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1262 #endif
1263 */
1264 #endif
1265 }
1266
1267 extern "C" void (*notify_crash_fn)(int node);
1268
1269 #if CMK_CONVERSE_MPI
1270 static int pingHandlerIdx;
1271 static int pingCheckHandlerIdx;
1272 static int buddyDieHandlerIdx;
1273 static double lastPingTime = -1;
1274
1275 extern "C" void mpi_restart_crashed(int pe, int rank);
1276 extern "C" int  find_spare_mpirank(int pe);
1277
1278 void pingBuddy();
1279 void pingCheckHandler();
1280
1281 void buddyDieHandler(char *msg)
1282 {
1283 #if CMK_MEM_CHECKPOINT
1284    // notify
1285    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1286    notify_crash(diepe);
1287    // send message to crash pe to let it restart
1288    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1289    int newrank = find_spare_mpirank(diepe);
1290    int buddy = obj->BuddyPE(CmiMyPe());
1291    if (buddy == diepe)  {
1292      mpi_restart_crashed(diepe, newrank);
1293      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1294    }
1295 #endif
1296 }
1297
1298 void pingHandler(void *msg)
1299 {
1300   lastPingTime = CmiWallTimer();
1301   CmiFree(msg);
1302 }
1303
1304 void pingCheckHandler()
1305 {
1306 #if CMK_MEM_CHECKPOINT
1307   double now = CmiWallTimer();
1308   if (lastPingTime > 0 && now - lastPingTime > 4) {
1309     int i, pe, buddy;
1310     // tell everyone the buddy dies
1311     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1312     for (i = 1; i < CmiNumPes(); i++) {
1313        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1314        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1315     }
1316     buddy = pe;
1317     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1318     for (int pe = 0; pe < CmiNumPes(); pe++) {
1319       if (obj->isFailed(pe) || pe == buddy) continue;
1320       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1321       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1322       CmiSetHandler(msg, buddyDieHandlerIdx);
1323       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1324     }
1325   }
1326   else 
1327     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1328 #endif
1329 }
1330
1331 void pingBuddy()
1332 {
1333 #if CMK_MEM_CHECKPOINT
1334   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1335   if (obj) {
1336     int buddy = obj->BuddyPE(CkMyPe());
1337 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1338     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1339     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1340     CmiSetHandler(msg, pingHandlerIdx);
1341     CmiGetRestartPhase(msg) = 9999;
1342     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1343   }
1344   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1345 #endif
1346 }
1347 #endif
1348
1349 // initproc
1350 void CkRegisterRestartHandler( )
1351 {
1352 #if CMK_MEM_CHECKPOINT
1353   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1354   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1355   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1356   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1357   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1358
1359 #if CMK_CONVERSE_MPI
1360   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1361   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1362   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1363 #endif
1364
1365   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1366   CpvAccess(procChkptBuf) = NULL;
1367
1368   notify_crash_fn = notify_crash;
1369
1370 #if ! CMK_CONVERSE_MPI
1371   // print pid to kill
1372   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1373 //  sleep(4);
1374 #endif
1375 #endif
1376 }
1377
1378
1379 extern "C"
1380 int CkHasCheckpoints()
1381 {
1382   return checkpointed;
1383 }
1384
1385 /// @todo: the following definitions should be moved to a separate file containing
1386 // structures and functions about fault tolerance strategies
1387
1388 /**
1389  *  * @brief: function for killing a process                                             
1390  *   */
1391 #ifdef CMK_MEM_CHECKPOINT
1392 #if CMK_HAS_GETPID
1393 void killLocal(void *_dummy,double curWallTime){
1394         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1395         if(CmiWallTimer()<killTime-1){
1396                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1397         }else{  
1398                 kill(getpid(),SIGKILL);                                               
1399         }              
1400
1401 #else
1402 void killLocal(void *_dummy,double curWallTime){
1403   CmiAbort("kill() not supported!");
1404 }
1405 #endif
1406 #endif
1407
1408 #ifdef CMK_MEM_CHECKPOINT
1409 /**
1410  * @brief: reads the file with the kill information
1411  */
1412 void readKillFile(){
1413         FILE *fp=fopen(killFile,"r");
1414         if(!fp){
1415                 return;
1416         }
1417         int proc;
1418         double sec;
1419         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1420                 if(proc == CkMyNode() && CkMyRank() == 0){
1421                         killTime = CmiWallTimer()+sec;
1422                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1423                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1424                 }
1425         }
1426         fclose(fp);
1427 }
1428
1429 #if ! CMK_CONVERSE_MPI
1430 void CkDieNow()
1431 {
1432          // ignored for non-mpi version
1433         CmiPrintf("[%d] die now.\n", CmiMyPe());
1434         killTime = CmiWallTimer()+0.001;
1435         CcdCallFnAfter(killLocal,NULL,1);
1436 }
1437 #endif
1438
1439 #endif
1440
1441 #include "CkMemCheckpoint.def.h"
1442
1443