c3f8c235d466d091eb0ac48884bfd7ffbdf97b38
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   if (p.isUnpacking()) {
336     recvCount = peCount = 0;
337 #if CMK_CONVERSE_MPI
338     void pingBuddy();
339     void pingCheckHandler();
340     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
341     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
342 #endif
343   }
344 }
345
346 // called by checkpoint mgr to restore an array element
347 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
348 {
349 #if CMK_MEM_CHECKPOINT
350   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
351   // m->index.print();
352   PUP::fromMem p(m->packData);
353   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
354   CmiAssert(mgr);
355 #if STREAMING_INFORMHOME
356   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
357 #else
358   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
359 #endif
360
361   // find a list of array elements bound together
362   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
363   CmiAssert(elt);
364   CkLocRec_local *rec = elt->myRec;
365   CkVec<CkMigratable *> list;
366   mgr->migratableList(rec, list);
367   CmiAssert(list.length() > 0);
368   for (int l=0; l<list.length(); l++) {
369     elt = (ArrayElement *)list[l];
370     elt->budPEs[0] = m->bud1;
371     elt->budPEs[1] = m->bud2;
372     //    reset, may not needed now
373     // for now.
374     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
375       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
376       if (c) c->redNo = 0;
377     }
378   }
379 #endif
380 }
381
382 // return 1 if pe is a crashed processor
383 int CkMemCheckPT::isFailed(int pe)
384 {
385   for (int i=0; i<failedPes.length(); i++)
386     if (failedPes[i] == pe) return 1;
387   return 0;
388 }
389
390 // add pe into history list of all failed processors
391 void CkMemCheckPT::failed(int pe)
392 {
393   if (isFailed(pe)) return;
394   failedPes.push_back(pe);
395 }
396
397 int CkMemCheckPT::totalFailed()
398 {
399   return failedPes.length();
400 }
401
402 // create an checkpoint entry for array element of aid with index.
403 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
404 {
405   // error check, no duplicate
406   int idx, len = ckTable.size();
407   for (idx=0; idx<len; idx++) {
408     CkCheckPTInfo *entry = ckTable[idx];
409     if (index == entry->index) {
410       if (loc == entry->locMgr) {
411           // bindTo array elements
412           return;
413       }
414         // for array inheritance, the following check may fail
415         // because ArrayElement constructor of all superclasses are called
416       if (aid == entry->aid) {
417         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
418         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
419       }
420     }
421   }
422   CkCheckPTInfo *newEntry;
423   if (where == CkCheckPoint_inMEM)
424     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
425   else
426     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
427   ckTable.push_back(newEntry);
428   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
429 }
430
431 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
432 {
433   int buddy = msg->bud1;
434   if (buddy == CkMyPe()) buddy = msg->bud2;
435   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
436   recvData(msg);
437     // ack
438   thisProxy[buddy].gotData();
439 }
440
441 // loop through my checkpoint table and ask checkpointed array elements
442 // to send me checkpoint data.
443 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
444 {
445   checkpointed = 1;
446   cpCallback = cb;
447   cpStarter = starter;
448   if (CkMyPe() == cpStarter) {
449     startTime = CmiWallTimer();
450     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
451   }
452
453   int len = ckTable.length();
454   for (int i=0; i<len; i++) {
455     CkCheckPTInfo *entry = ckTable[i];
456       // always let the bigger number processor send request
457     //if (CkMyPe() < entry->pNo) continue;
458       // always let the smaller number processor send request, may on same proc
459     if (!isMaster(entry->pNo)) continue;
460       // call inmem_checkpoint to the array element, ask it to send
461       // back checkpoint data via recvData().
462     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
463     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
464   }
465     // if my table is empty, then I am done
466   if (len == 0) thisProxy[cpStarter].cpFinish();
467
468   // pack and send proc level data
469   sendProcData();
470 }
471
472 // don't handle array elements
473 static inline void _handleProcData(PUP::er &p)
474 {
475     // save readonlys, and callback BTW
476     CkPupROData(p);
477
478     // save mainchares 
479     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
480         
481 #ifndef CMK_CHARE_USE_PTR
482     // save non-migratable chare
483     CkPupChareData(p);
484 #endif
485
486     // save groups into Groups.dat
487     CkPupGroupData(p);
488
489     // save nodegroups into NodeGroups.dat
490     if(CkMyRank()==0) CkPupNodeGroupData(p);
491 }
492
493 void CkMemCheckPT::sendProcData()
494 {
495   // find out size of buffer
496   int size;
497   {
498     PUP::sizer p;
499     _handleProcData(p);
500     size = p.size();
501   }
502   int packSize = size;
503   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
504   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
505   {
506     PUP::toMem p(msg->packData);
507     _handleProcData(p);
508   }
509   msg->pe = CkMyPe();
510   msg->len = size;
511   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
512   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
513 }
514
515 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
516 {
517   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
518   CpvAccess(procChkptBuf) = msg;
519   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
520   thisProxy[msg->reportPe].cpFinish();
521 }
522
523 // ArrayElement call this function to give us the checkpointed data
524 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
525 {
526   int len = ckTable.length();
527   int idx;
528   for (idx=0; idx<len; idx++) {
529     CkCheckPTInfo *entry = ckTable[idx];
530     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
531   }
532   CkAssert(idx < len);
533   int isChkpting = msg->cp_flag;
534   ckTable[idx]->updateBuffer(msg);
535   if (isChkpting) {
536       // all my array elements have returned their inmem data
537       // inform starter processor that I am done.
538     recvCount ++;
539     if (recvCount == ckTable.length()) {
540       if (where == CkCheckPoint_inMEM) {
541         thisProxy[cpStarter].cpFinish();
542       }
543       else if (where == CkCheckPoint_inDISK) {
544         // another barrier for finalize the writing using fsync
545         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
546         contribute(0,NULL,CkReduction::sum_int,localcb);
547       }
548       else
549         CmiAbort("Unknown checkpoint scheme");
550       recvCount = 0;
551     } 
552   }
553 }
554
555 // only used in disk checkpointing
556 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
557 {
558   delete m;
559 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
560   system("sync");
561 #endif
562   thisProxy[cpStarter].cpFinish();
563 }
564
565 // only is called on cpStarter when checkpoint is done
566 void CkMemCheckPT::cpFinish()
567 {
568   CmiAssert(CkMyPe() == cpStarter);
569   peCount++;
570     // now that all processors have finished, activate callback
571   if (peCount == 2*(CkNumPes())) {
572     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
573     cpCallback.send();
574     peCount = 0;
575     thisProxy.report();
576   }
577 }
578
579 // for debugging, report checkpoint info
580 void CkMemCheckPT::report()
581 {
582   int objsize = 0;
583   int len = ckTable.length();
584   for (int i=0; i<len; i++) {
585     CkCheckPTInfo *entry = ckTable[i];
586     CmiAssert(entry);
587     objsize += entry->getSize();
588   }
589   CmiAssert(CpvAccess(procChkptBuf));
590   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
591 }
592
593 /*****************************************************************************
594                         RESTART Procedure
595 *****************************************************************************/
596
597 // master processor of two buddies
598 inline int CkMemCheckPT::isMaster(int buddype)
599 {
600 #if 0
601   int mype = CkMyPe();
602 //CkPrintf("ismaster: %d %d\n", pe, mype);
603   if (CkNumPes() - totalFailed() == 2) {
604     return mype > buddype;
605   }
606   for (int i=1; i<CkNumPes(); i++) {
607     int me = (buddype+i)%CkNumPes();
608     if (isFailed(me)) continue;
609     if (me == mype) return 1;
610     else return 0;
611   }
612   return 0;
613 #else
614     // smaller one
615   int mype = CkMyPe();
616 //CkPrintf("ismaster: %d %d\n", pe, mype);
617   if (CkNumPes() - totalFailed() == 2) {
618     return mype < buddype;
619   }
620   for (int i=1; i<CkNumPes(); i++) {
621     int me = (mype+i)%CkNumPes();
622     if (isFailed(me)) continue;
623     if (me == buddype) return 1;
624     else return 0;
625   }
626   return 0;
627 #endif
628 }
629
630 #ifdef CKLOCMGR_LOOP
631 #undef CKLOCMGR_LOOP
632 #endif
633
634 // loop over all CkLocMgr and do "code"
635 #define  CKLOCMGR_LOOP(code)    {       \
636   int numGroups = CkpvAccess(_groupIDTable)->size();    \
637   for(int i=0;i<numGroups;i++) {        \
638     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
639     if(obj->isLocMgr())  {      \
640       CkLocMgr *mgr = (CkLocMgr*)obj;   \
641       code      \
642     }   \
643   }     \
644  }
645
646 #if 0
647 // helper class to pup all elements that belong to same ckLocMgr
648 class ElementDestoryer : public CkLocIterator {
649 private:
650         CkLocMgr *locMgr;
651 public:
652         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
653         void addLocation(CkLocation &loc) {
654                 CkArrayIndex idx=loc.getIndex();
655                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
656                 loc.destroy();
657         }
658 };
659 #endif
660
661 // restore the bitmap vector for LB
662 void CkMemCheckPT::resetLB(int diepe)
663 {
664 #if CMK_LBDB_ON
665   int i;
666   char *bitmap = new char[CkNumPes()];
667   // set processor available bitmap
668   get_avail_vector(bitmap);
669
670   for (i=0; i<failedPes.length(); i++)
671     bitmap[failedPes[i]] = 0; 
672   bitmap[diepe] = 0;
673
674 #if CK_NO_PROC_POOL
675   set_avail_vector(bitmap);
676 #endif
677
678   // if I am the crashed pe, rebuild my failedPEs array
679   if (CkMyPe() == diepe)
680     for (i=0; i<CkNumPes(); i++) 
681       if (bitmap[i]==0) failed(i);
682
683   delete [] bitmap;
684 #endif
685 }
686
687 // in case when failedPe dies, everybody go through its checkpoint table:
688 // destory all array elements
689 // recover lost buddies
690 // reconstruct all array elements from check point data
691 // called on all processors
692 void CkMemCheckPT::restart(int diePe)
693 {
694 #if CMK_MEM_CHECKPOINT
695   double curTime = CmiWallTimer();
696   if (CkMyPe() == diePe)
697     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
698   stage = (char*)"resetLB";
699   startTime = curTime;
700   if (CkMyPe() == diePe)
701     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
702
703 #if CK_NO_PROC_POOL
704   failed(diePe);        // add into the list of failed pes
705 #endif
706   thisFailedPe = diePe;
707
708   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
709
710   inRestarting = 1;
711                                                                                 
712   // disable load balancer's barrier
713   if (CkMyPe() != diePe) resetLB(diePe);
714
715   CKLOCMGR_LOOP(mgr->startInserting(););
716
717   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
718 /*
719   if (CkMyPe() == 0)
720     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
721 */
722 #endif
723 }
724
725 // loally remove all array elements
726 void CkMemCheckPT::removeArrayElements()
727 {
728 #if CMK_MEM_CHECKPOINT
729   int len = ckTable.length();
730   double curTime = CmiWallTimer();
731   if (CkMyPe() == thisFailedPe) 
732     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
733   stage = (char*)"removeArrayElements";
734   startTime = curTime;
735
736   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
737   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
738
739   // get rid of all buffering and remote recs
740   // including destorying all array elements
741   CKLOCMGR_LOOP(mgr->flushAllRecs(););
742
743 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
744
745   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
746 #endif
747 }
748
749 // flush state in reduction manager
750 void CkMemCheckPT::resetReductionMgr()
751 {
752   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
753   int numGroups = CkpvAccess(_groupIDTable)->size();
754   for(int i=0;i<numGroups;i++) {
755     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
756     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
757     obj->flushStates();
758     obj->ckJustMigrated();
759   }
760   // reset again
761   //CpvAccess(_qd)->flushStates();
762
763 #if 1
764   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
765 #else
766   if (CkMyPe() == 0)
767     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
768 #endif
769 }
770
771 // recover the lost buddies
772 void CkMemCheckPT::recoverBuddies()
773 {
774   int idx;
775   int len = ckTable.length();
776   // ready to flush reduction manager
777   // cannot be CkMemCheckPT::restart because destory will modify states
778   double curTime = CmiWallTimer();
779   if (CkMyPe() == thisFailedPe)
780   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
781   stage = (char *)"recoverBuddies";
782   if (CkMyPe() == thisFailedPe)
783   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
784   startTime = curTime;
785
786   // recover buddies
787   expectCount = 0;
788   for (idx=0; idx<len; idx++) {
789     CkCheckPTInfo *entry = ckTable[idx];
790     if (entry->pNo == thisFailedPe) {
791 #if CK_NO_PROC_POOL
792       // find a new buddy
793 /*
794       int budPe = CkMyPe();
795 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
796       while (budPe == CkMyPe() || isFailed(budPe)) 
797           budPe = (budPe+1)%CkNumPes();
798 */
799       int budPe = BuddyPE(CkMyPe());
800       //entry->pNo = budPe;
801 #else
802       int budPe = thisFailedPe;
803 #endif
804       entry->updateBuddy(CkMyPe(), budPe);
805 #if 0
806       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
807       CkArrayCheckPTMessage *msg = entry->getCopy();
808       msg->cp_flag = 0;            // not checkpointing
809       thisProxy[budPe].recvData(msg);
810 #else
811       CkArrayCheckPTMessage *msg = entry->getCopy();
812       msg->bud1 = budPe;
813       msg->bud2 = CkMyPe();
814       msg->cp_flag = 0;            // not checkpointing
815       thisProxy[budPe].recoverEntry(msg);
816 #endif
817       expectCount ++;
818     }
819   }
820
821 #if 1
822   if (expectCount == 0) {
823     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
824   }
825 #else
826   if (CkMyPe() == 0) {
827     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
828   }
829 #endif
830
831   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
832 }
833
834 void CkMemCheckPT::gotData()
835 {
836   ackCount ++;
837   if (ackCount == expectCount) {
838     ackCount = 0;
839     expectCount = -1;
840     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
841   }
842 }
843
844 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
845 {
846   for (int i=0; i<n; i++) {
847     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
848     mgr->updateLocation(idx[i], nowOnPe);
849   }
850 }
851
852 // restore array elements
853 void CkMemCheckPT::recoverArrayElements()
854 {
855   double curTime = CmiWallTimer();
856   int len = ckTable.length();
857   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
858   stage = (char *)"recoverArrayElements";
859   if (CkMyPe() == thisFailedPe)
860   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
861   startTime = curTime;
862
863   // recover all array elements
864   int count = 0;
865 #if STREAMING_INFORMHOME
866   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
867   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
868 #endif
869   for (int idx=0; idx<len; idx++)
870   {
871     CkCheckPTInfo *entry = ckTable[idx];
872 #if CK_NO_PROC_POOL
873     // the bigger one will do 
874 //    if (CkMyPe() < entry->pNo) continue;
875     if (!isMaster(entry->pNo)) continue;
876 #else
877     // smaller one do it, which has the original object
878     if (CkMyPe() == entry->pNo+1 || 
879         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
880 #endif
881 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
882
883     entry->updateBuddy(CkMyPe(), entry->pNo);
884     CkArrayCheckPTMessage *msg = entry->getCopy();
885     // gzheng
886     //thisProxy[CkMyPe()].inmem_restore(msg);
887     inmem_restore(msg);
888 #if STREAMING_INFORMHOME
889     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
890     int homePe = mgr->homePe(msg->index);
891     if (homePe != CkMyPe()) {
892       gmap[homePe].push_back(msg->locMgr);
893       imap[homePe].push_back(msg->index);
894     }
895 #endif
896     count ++;
897   }
898 #if STREAMING_INFORMHOME
899   for (int i=0; i<CkNumPes(); i++) {
900     if (gmap[i].size() && i!=CkMyPe()) {
901       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
902     }
903   }
904   delete [] imap;
905   delete [] gmap;
906 #endif
907   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
908
909   CKLOCMGR_LOOP(mgr->doneInserting(););
910
911   inRestarting = 0;
912  // _crashedNode = -1;
913 CpvAccess(_crashedNode) = -1;
914
915   if (CkMyPe() == 0)
916     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
917 }
918
919 static double restartT;
920
921 // on every processor
922 // turn load balancer back on
923 void CkMemCheckPT::finishUp()
924 {
925   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
926   //CKLOCMGR_LOOP(mgr->doneInserting(););
927   
928   if (CkMyPe() == thisFailedPe)
929   {
930        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
931        //CkStartQD(cpCallback);
932        cpCallback.send();
933        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
934   }
935
936 #if CK_NO_PROC_POOL
937 #if NODE_CHECKPOINT
938   int numnodes = CmiNumPhysicalNodes();
939 #else
940   int numnodes = CkNumPes();
941 #endif
942   if (numnodes-totalFailed() <=2) {
943     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
944     _memChkptOn = 0;
945   }
946 #endif
947 }
948
949 // called only on 0
950 void CkMemCheckPT::quiescence(CkCallback &cb)
951 {
952   static int pe_count = 0;
953   pe_count ++;
954   CmiAssert(CkMyPe() == 0);
955   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
956   if (pe_count == CkNumPes()) {
957     pe_count = 0;
958     cb.send();
959   }
960 }
961
962 // User callable function - to start a checkpoint
963 // callback cb is used to pass control back
964 void CkStartMemCheckpoint(CkCallback &cb)
965 {
966 #if CMK_MEM_CHECKPOINT
967   if (_memChkptOn == 0) {
968     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
969     cb.send();
970     return;
971   }
972   if (CkInRestarting()) {
973       // trying to checkpointing during restart
974     cb.send();
975     return;
976   }
977     // store user callback and user data
978   CkMemCheckPT::cpCallback = cb;
979
980     // broadcast to start check pointing
981   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
982   checkptMgr.doItNow(CkMyPe(), cb);
983 #else
984   // when mem checkpoint is disabled, invike cb immediately
985   cb.send();
986 #endif
987 }
988
989 void CkRestartCheckPoint(int diePe)
990 {
991 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
992   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
993   // broadcast
994   checkptMgr.restart(diePe);
995 }
996
997 static int _diePE = -1;
998
999 // callback function used locally by ccs handler
1000 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1001 {
1002 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1003   CkRestartCheckPoint(_diePE);
1004 }
1005
1006 // Converse function handles
1007 static int askPhaseHandlerIdx;
1008 static int recvPhaseHandlerIdx;
1009 static int askProcDataHandlerIdx;
1010 static int restartBcastHandlerIdx;
1011 static int recoverProcDataHandlerIdx;
1012 static int restartBeginHandlerIdx;
1013 static int notifyHandlerIdx;
1014
1015 // called on crashed PE
1016 static void restartBeginHandler(char *msg)
1017 {
1018 #if CMK_MEM_CHECKPOINT
1019   static int count = 0;
1020   CmiFree(msg);
1021   CmiAssert(CkMyPe() == _diePE);
1022   count ++;
1023   if (count == CkNumPes()) {
1024     CkRestartCheckPointCallback(NULL, NULL);
1025     count = 0;
1026   }
1027 #endif
1028 }
1029
1030 extern void _discard_charm_message();
1031 extern void _resume_charm_message();
1032
1033 static void restartBcastHandler(char *msg)
1034 {
1035 #if CMK_MEM_CHECKPOINT
1036   // advance phase counter
1037   CkMemCheckPT::inRestarting = 1;
1038   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1039   // gzheng
1040   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1041
1042   if (CkMyPe()==_diePE)
1043     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1044
1045   // reset QD counters
1046 /*  gzheng
1047   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1048 */
1049
1050 /*  gzheng
1051   if (CkMyPe()==_diePE)
1052       CkRestartCheckPointCallback(NULL, NULL);
1053 */
1054   CmiFree(msg);
1055
1056   _resume_charm_message();
1057
1058     // reduction
1059   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1060   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1061   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1062 #endif
1063 }
1064
1065 extern void _initDone();
1066
1067 // called on crashed processor
1068 static void recoverProcDataHandler(char *msg)
1069 {
1070 #if CMK_MEM_CHECKPOINT
1071    int i;
1072    envelope *env = (envelope *)msg;
1073    CkUnpackMessage(&env);
1074    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1075    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1076    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1077    //cur_restart_phase ++;
1078      // gzheng ?
1079    //CpvAccess(_qd)->flushStates();
1080
1081    // restore readonly, mainchare, group, nodegroup
1082 //   int temp = cur_restart_phase;
1083 //   cur_restart_phase = -1;
1084    PUP::fromMem p(procMsg->packData);
1085    _handleProcData(p);
1086
1087    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1088    // gzheng
1089    CKLOCMGR_LOOP(mgr->startInserting(););
1090
1091    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1092    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1093    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1094    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1095
1096    _initDone();
1097 //   CpvAccess(_qd)->flushStates();
1098    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1099 #endif
1100 }
1101
1102 // called on its backup processor
1103 // get backup message buffer and sent to crashed processor
1104 static void askProcDataHandler(char *msg)
1105 {
1106 #if CMK_MEM_CHECKPOINT
1107     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1108     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1109     if (CpvAccess(procChkptBuf) == NULL) 
1110       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1111     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1112     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1113     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1114
1115     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1116
1117     CkPackMessage(&env);
1118     CmiSetHandler(env, recoverProcDataHandlerIdx);
1119     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1120     CpvAccess(procChkptBuf) = NULL;
1121 #endif
1122 }
1123
1124 // called on PE 0
1125 void qd_callback(void *m)
1126 {
1127    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1128    CkFreeMsg(m);
1129 #ifdef CMK_SMP
1130    for(int i=0;i<CmiMyNodeSize();i++){
1131    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1132    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1133         CmiSetHandler(msg, askProcDataHandlerIdx);
1134         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1135         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1136    }
1137    return;
1138 #endif
1139    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1140    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1141    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1142    CmiSetHandler(msg, askProcDataHandlerIdx);
1143    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1144    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1145
1146 }
1147
1148 // on crashed node
1149 void CkMemRestart(const char *dummy, CkArgMsg *args)
1150 {
1151 #if CMK_MEM_CHECKPOINT
1152    _diePE = CmiMyNode();
1153    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1154    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1155    CkMemCheckPT::inRestarting = 1;
1156
1157   CpvAccess( _crashedNode )= CmiMyNode();
1158         
1159   _discard_charm_message();
1160   
1161   if(CmiMyRank()==0){
1162     CkCallback cb(qd_callback);
1163     CkStartQD(cb);
1164     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1165   }
1166 #else
1167    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1168 #endif
1169 }
1170
1171 // can be called in other files
1172 // return true if it is in restarting
1173 extern "C"
1174 int CkInRestarting()
1175 {
1176 #if CMK_MEM_CHECKPOINT
1177   if (CpvAccess( _crashedNode)!=-1) return 1;
1178   // gzheng
1179   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1180   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1181   return CkMemCheckPT::inRestarting;
1182 #else
1183   return 0;
1184 #endif
1185 }
1186
1187 /*****************************************************************************
1188                 module initialization
1189 *****************************************************************************/
1190
1191 static int arg_where = CkCheckPoint_inMEM;
1192
1193 #if CMK_MEM_CHECKPOINT
1194 void init_memcheckpt(char **argv)
1195 {
1196     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1197       arg_where = CkCheckPoint_inDISK;
1198     }
1199
1200         // initiliazing _crashedNode variable
1201         CpvInitialize(int, _crashedNode);
1202         CpvAccess(_crashedNode) = -1;
1203
1204 }
1205 #endif
1206
1207 class CkMemCheckPTInit: public Chare {
1208 public:
1209   CkMemCheckPTInit(CkArgMsg *m) {
1210 #if CMK_MEM_CHECKPOINT
1211     if (arg_where == CkCheckPoint_inDISK) {
1212       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1213     }
1214     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1215     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1216 #endif
1217   }
1218 };
1219
1220 static void notifyHandler(char *msg)
1221 {
1222 #if CMK_MEM_CHECKPOINT
1223   CmiFree(msg);
1224       /* immediately increase restart phase to filter old messages */
1225   CpvAccess(_curRestartPhase) ++;
1226   CpvAccess(_qd)->flushStates();
1227   _discard_charm_message();
1228 #endif
1229 }
1230
1231 extern "C"
1232 void notify_crash(int node)
1233 {
1234 #ifdef CMK_MEM_CHECKPOINT
1235   CpvAccess( _crashedNode) = node;
1236 #ifdef CMK_SMP
1237   for(int i=0;i<CkMyNodeSize();i++){
1238         CpvAccessOther(_crashedNode,i)=node;
1239   }
1240 #endif
1241   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1242   CkMemCheckPT::inRestarting = 1;
1243
1244 /*
1245 #ifdef CMK_SMP
1246 */
1247   int pe = CmiNodeFirst(CkMyNode());
1248   for(int i=0;i<CkMyNodeSize();i++){
1249         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1250         CmiSetHandler(msg, notifyHandlerIdx);
1251         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1252   }
1253 /*
1254  return;
1255 #else 
1256     // this may be in interrupt handler, send a message to reset QD
1257   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1258   CmiSetHandler(msg, notifyHandlerIdx);
1259   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1260 #endif
1261 */
1262 #endif
1263 }
1264
1265 extern "C" void (*notify_crash_fn)(int node);
1266
1267 #if CMK_CONVERSE_MPI
1268 static int pingHandlerIdx;
1269 static int pingCheckHandlerIdx;
1270 static int buddyDieHandlerIdx;
1271 static double lastPingTime = -1;
1272
1273 extern void _initCharm(int argc, char **argv);
1274 extern "C" void mpi_restart_crashed(int pe, int rank);
1275 extern "C" int  find_spare_mpirank(int pe);
1276 extern void CkDeleteChares();
1277
1278 void pingBuddy();
1279 void pingCheckHandler();
1280
1281 void buddyDieHandler(char *msg)
1282 {
1283    // notify
1284    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1285    notify_crash(diepe);
1286    // send message to crash pe to let it restart
1287    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1288    int newrank = find_spare_mpirank(diepe);
1289    int buddy = obj->BuddyPE(CmiMyPe());
1290    if (buddy == diepe)  {
1291      mpi_restart_crashed(diepe, newrank);
1292      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1293    }
1294 }
1295
1296 void pingHandler(void *msg)
1297 {
1298   lastPingTime = CmiWallTimer();
1299   CmiFree(msg);
1300 //if (CmiMyPe() == 5) printf("[%d] pingHandler %f\n", 5, lastPingTime);
1301 }
1302
1303 void pingCheckHandler()
1304 {
1305   double now = CmiWallTimer();
1306   if (lastPingTime > 0 && now - lastPingTime > 4) {
1307     int pe, buddy;
1308     // tell everyone the buddy dies
1309     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1310     for (pe = 0; pe < CmiNumPes(); pe++) {
1311        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1312     }
1313     buddy = pe;
1314     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1315     for (int pe = 0; pe < CmiNumPes(); pe++) {
1316       if (obj->isFailed(pe) || pe == buddy) continue;
1317       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1318       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1319       CmiSetHandler(msg, buddyDieHandlerIdx);
1320       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1321     }
1322   }
1323   else 
1324     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1325 }
1326
1327 void pingBuddy()
1328 {
1329   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1330   if (obj) {
1331     int buddy = obj->BuddyPE(CkMyPe());
1332 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1333     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1334     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1335     CmiSetHandler(msg, pingHandlerIdx);
1336     CmiGetRestartPhase(msg) = 9999;
1337     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1338   }
1339   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1340 }
1341 #endif
1342
1343 // initproc
1344 void CkRegisterRestartHandler( )
1345 {
1346 #if CMK_MEM_CHECKPOINT
1347   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1348   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1349   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1350   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1351   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1352
1353 #if CMK_CONVERSE_MPI
1354   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1355   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1356   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1357
1358 #endif
1359
1360   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1361   CpvAccess(procChkptBuf) = NULL;
1362
1363   notify_crash_fn = notify_crash;
1364
1365 #if 1
1366   // for debugging
1367   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1368 //  sleep(4);
1369 #endif
1370 #endif
1371 }
1372
1373
1374 extern "C"
1375 int CkHasCheckpoints()
1376 {
1377   return checkpointed;
1378 }
1379
1380 /// @todo: the following definitions should be moved to a separate file containing
1381 // structures and functions about fault tolerance strategies
1382
1383 /**
1384  *  * @brief: function for killing a process                                             
1385  *   */
1386 #ifdef CMK_MEM_CHECKPOINT
1387 #if CMK_HAS_GETPID
1388 void killLocal(void *_dummy,double curWallTime){
1389         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1390         if(CmiWallTimer()<killTime-1){
1391                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1392         }else{  
1393                 kill(getpid(),SIGKILL);                                               
1394         }              
1395
1396 #else
1397 void killLocal(void *_dummy,double curWallTime){
1398   CmiAbort("kill() not supported!");
1399 }
1400 #endif
1401 #endif
1402
1403 #ifdef CMK_MEM_CHECKPOINT
1404 /**
1405  * @brief: reads the file with the kill information
1406  */
1407 void readKillFile(){
1408         FILE *fp=fopen(killFile,"r");
1409         if(!fp){
1410                 return;
1411         }
1412         int proc;
1413         double sec;
1414         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1415                 if(proc == CkMyNode() && CkMyRank() == 0){
1416                         killTime = CmiWallTimer()+sec;
1417                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1418                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1419                 }
1420         }
1421         fclose(fp);
1422 }
1423 #endif
1424
1425 #include "CkMemCheckpoint.def.h"
1426
1427