solve the memory leak after restart
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   p|peCount;
336   if (p.isUnpacking()) {
337     recvCount = 0;
338 #if CMK_CONVERSE_MPI
339     void pingBuddy();
340     void pingCheckHandler();
341     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
342     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
343 #endif
344   }
345 }
346
347 // called by checkpoint mgr to restore an array element
348 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
349 {
350 #if CMK_MEM_CHECKPOINT
351   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
352   // m->index.print();
353   PUP::fromMem p(m->packData);
354   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
355   CmiAssert(mgr);
356 #if STREAMING_INFORMHOME
357   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
358 #else
359   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
360 #endif
361
362   // find a list of array elements bound together
363   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
364   CmiAssert(elt);
365   CkLocRec_local *rec = elt->myRec;
366   CkVec<CkMigratable *> list;
367   mgr->migratableList(rec, list);
368   CmiAssert(list.length() > 0);
369   for (int l=0; l<list.length(); l++) {
370     elt = (ArrayElement *)list[l];
371     elt->budPEs[0] = m->bud1;
372     elt->budPEs[1] = m->bud2;
373     //    reset, may not needed now
374     // for now.
375     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
376       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
377       if (c) c->redNo = 0;
378     }
379   }
380 #endif
381 }
382
383 // return 1 if pe is a crashed processor
384 int CkMemCheckPT::isFailed(int pe)
385 {
386   for (int i=0; i<failedPes.length(); i++)
387     if (failedPes[i] == pe) return 1;
388   return 0;
389 }
390
391 // add pe into history list of all failed processors
392 void CkMemCheckPT::failed(int pe)
393 {
394   if (isFailed(pe)) return;
395   failedPes.push_back(pe);
396 }
397
398 int CkMemCheckPT::totalFailed()
399 {
400   return failedPes.length();
401 }
402
403 // create an checkpoint entry for array element of aid with index.
404 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
405 {
406   // error check, no duplicate
407   int idx, len = ckTable.size();
408   for (idx=0; idx<len; idx++) {
409     CkCheckPTInfo *entry = ckTable[idx];
410     if (index == entry->index) {
411       if (loc == entry->locMgr) {
412           // bindTo array elements
413           return;
414       }
415         // for array inheritance, the following check may fail
416         // because ArrayElement constructor of all superclasses are called
417       if (aid == entry->aid) {
418         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
419         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
420       }
421     }
422   }
423   CkCheckPTInfo *newEntry;
424   if (where == CkCheckPoint_inMEM)
425     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
426   else
427     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
428   ckTable.push_back(newEntry);
429   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
430 }
431
432 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
433 {
434   int buddy = msg->bud1;
435   if (buddy == CkMyPe()) buddy = msg->bud2;
436   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
437   recvData(msg);
438     // ack
439   thisProxy[buddy].gotData();
440 }
441
442 // loop through my checkpoint table and ask checkpointed array elements
443 // to send me checkpoint data.
444 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
445 {
446   checkpointed = 1;
447   cpCallback = cb;
448   cpStarter = starter;
449   if (CkMyPe() == cpStarter) {
450     startTime = CmiWallTimer();
451     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
452   }
453
454   int len = ckTable.length();
455   for (int i=0; i<len; i++) {
456     CkCheckPTInfo *entry = ckTable[i];
457       // always let the bigger number processor send request
458     //if (CkMyPe() < entry->pNo) continue;
459       // always let the smaller number processor send request, may on same proc
460     if (!isMaster(entry->pNo)) continue;
461       // call inmem_checkpoint to the array element, ask it to send
462       // back checkpoint data via recvData().
463     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
464     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
465   }
466     // if my table is empty, then I am done
467   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
468
469   // pack and send proc level data
470   sendProcData();
471 }
472
473 // don't handle array elements
474 static inline void _handleProcData(PUP::er &p)
475 {
476     // save readonlys, and callback BTW
477     CkPupROData(p);
478
479     // save mainchares 
480     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
481         
482 #ifndef CMK_CHARE_USE_PTR
483     // save non-migratable chare
484     CkPupChareData(p);
485 #endif
486
487     // save groups into Groups.dat
488     CkPupGroupData(p);
489
490     // save nodegroups into NodeGroups.dat
491     if(CkMyRank()==0) CkPupNodeGroupData(p);
492 }
493
494 void CkMemCheckPT::sendProcData()
495 {
496   // find out size of buffer
497   int size;
498   {
499     PUP::sizer p;
500     _handleProcData(p);
501     size = p.size();
502   }
503   int packSize = size;
504   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
505   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
506   {
507     PUP::toMem p(msg->packData);
508     _handleProcData(p);
509   }
510   msg->pe = CkMyPe();
511   msg->len = size;
512   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
513   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
514 }
515
516 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
517 {
518   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
519   CpvAccess(procChkptBuf) = msg;
520   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
521   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
522 }
523
524 // ArrayElement call this function to give us the checkpointed data
525 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
526 {
527   int len = ckTable.length();
528   int idx;
529   for (idx=0; idx<len; idx++) {
530     CkCheckPTInfo *entry = ckTable[idx];
531     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
532   }
533   CkAssert(idx < len);
534   int isChkpting = msg->cp_flag;
535   ckTable[idx]->updateBuffer(msg);
536   if (isChkpting) {
537       // all my array elements have returned their inmem data
538       // inform starter processor that I am done.
539     recvCount ++;
540     if (recvCount == ckTable.length()) {
541       if (where == CkCheckPoint_inMEM) {
542         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
543       }
544       else if (where == CkCheckPoint_inDISK) {
545         // another barrier for finalize the writing using fsync
546         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
547         contribute(0,NULL,CkReduction::sum_int,localcb);
548       }
549       else
550         CmiAbort("Unknown checkpoint scheme");
551       recvCount = 0;
552     } 
553   }
554 }
555
556 // only used in disk checkpointing
557 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
558 {
559   delete m;
560 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
561   system("sync");
562 #endif
563   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
564 }
565
566 // only is called on cpStarter when checkpoint is done
567 void CkMemCheckPT::cpFinish()
568 {
569   CmiAssert(CkMyPe() == cpStarter);
570   peCount++;
571     // now that all processors have finished, activate callback
572   if (peCount == 2) {
573     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
574     cpCallback.send();
575     peCount = 0;
576     thisProxy.report();
577   }
578 }
579
580 // for debugging, report checkpoint info
581 void CkMemCheckPT::report()
582 {
583   int objsize = 0;
584   int len = ckTable.length();
585   for (int i=0; i<len; i++) {
586     CkCheckPTInfo *entry = ckTable[i];
587     CmiAssert(entry);
588     objsize += entry->getSize();
589   }
590   CmiAssert(CpvAccess(procChkptBuf));
591   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
592 }
593
594 /*****************************************************************************
595                         RESTART Procedure
596 *****************************************************************************/
597
598 // master processor of two buddies
599 inline int CkMemCheckPT::isMaster(int buddype)
600 {
601 #if 0
602   int mype = CkMyPe();
603 //CkPrintf("ismaster: %d %d\n", pe, mype);
604   if (CkNumPes() - totalFailed() == 2) {
605     return mype > buddype;
606   }
607   for (int i=1; i<CkNumPes(); i++) {
608     int me = (buddype+i)%CkNumPes();
609     if (isFailed(me)) continue;
610     if (me == mype) return 1;
611     else return 0;
612   }
613   return 0;
614 #else
615     // smaller one
616   int mype = CkMyPe();
617 //CkPrintf("ismaster: %d %d\n", pe, mype);
618   if (CkNumPes() - totalFailed() == 2) {
619     return mype < buddype;
620   }
621   for (int i=1; i<CkNumPes(); i++) {
622     int me = (mype+i)%CkNumPes();
623     if (isFailed(me)) continue;
624     if (me == buddype) return 1;
625     else return 0;
626   }
627   return 0;
628 #endif
629 }
630
631 #ifdef CKLOCMGR_LOOP
632 #undef CKLOCMGR_LOOP
633 #endif
634
635 // loop over all CkLocMgr and do "code"
636 #define  CKLOCMGR_LOOP(code)    {       \
637   int numGroups = CkpvAccess(_groupIDTable)->size();    \
638   for(int i=0;i<numGroups;i++) {        \
639     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
640     if(obj->isLocMgr())  {      \
641       CkLocMgr *mgr = (CkLocMgr*)obj;   \
642       code      \
643     }   \
644   }     \
645  }
646
647 #if 0
648 // helper class to pup all elements that belong to same ckLocMgr
649 class ElementDestoryer : public CkLocIterator {
650 private:
651         CkLocMgr *locMgr;
652 public:
653         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
654         void addLocation(CkLocation &loc) {
655                 CkArrayIndex idx=loc.getIndex();
656                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
657                 loc.destroy();
658         }
659 };
660 #endif
661
662 // restore the bitmap vector for LB
663 void CkMemCheckPT::resetLB(int diepe)
664 {
665 #if CMK_LBDB_ON
666   int i;
667   char *bitmap = new char[CkNumPes()];
668   // set processor available bitmap
669   get_avail_vector(bitmap);
670
671   for (i=0; i<failedPes.length(); i++)
672     bitmap[failedPes[i]] = 0; 
673   bitmap[diepe] = 0;
674
675 #if CK_NO_PROC_POOL
676   set_avail_vector(bitmap);
677 #endif
678
679   // if I am the crashed pe, rebuild my failedPEs array
680   if (CkMyPe() == diepe)
681     for (i=0; i<CkNumPes(); i++) 
682       if (bitmap[i]==0) failed(i);
683
684   delete [] bitmap;
685 #endif
686 }
687
688 // in case when failedPe dies, everybody go through its checkpoint table:
689 // destory all array elements
690 // recover lost buddies
691 // reconstruct all array elements from check point data
692 // called on all processors
693 void CkMemCheckPT::restart(int diePe)
694 {
695 #if CMK_MEM_CHECKPOINT
696   double curTime = CmiWallTimer();
697   if (CkMyPe() == diePe)
698     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
699   stage = (char*)"resetLB";
700   startTime = curTime;
701   if (CkMyPe() == diePe)
702     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
703
704 #if CK_NO_PROC_POOL
705   failed(diePe);        // add into the list of failed pes
706 #endif
707   thisFailedPe = diePe;
708
709   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
710
711   inRestarting = 1;
712                                                                                 
713   // disable load balancer's barrier
714   if (CkMyPe() != diePe) resetLB(diePe);
715
716   CKLOCMGR_LOOP(mgr->startInserting(););
717
718   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
719 /*
720   if (CkMyPe() == 0)
721     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
722 */
723 #endif
724 }
725
726 // loally remove all array elements
727 void CkMemCheckPT::removeArrayElements()
728 {
729 #if CMK_MEM_CHECKPOINT
730   int len = ckTable.length();
731   double curTime = CmiWallTimer();
732   if (CkMyPe() == thisFailedPe) 
733     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
734   stage = (char*)"removeArrayElements";
735   startTime = curTime;
736
737   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
738   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
739
740   // get rid of all buffering and remote recs
741   // including destorying all array elements
742   CKLOCMGR_LOOP(mgr->flushAllRecs(););
743
744 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
745
746   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
747 #endif
748 }
749
750 // flush state in reduction manager
751 void CkMemCheckPT::resetReductionMgr()
752 {
753   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
754   int numGroups = CkpvAccess(_groupIDTable)->size();
755   for(int i=0;i<numGroups;i++) {
756     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
757     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
758     obj->flushStates();
759     obj->ckJustMigrated();
760   }
761   // reset again
762   //CpvAccess(_qd)->flushStates();
763
764 #if 1
765   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
766 #else
767   if (CkMyPe() == 0)
768     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
769 #endif
770 }
771
772 // recover the lost buddies
773 void CkMemCheckPT::recoverBuddies()
774 {
775   int idx;
776   int len = ckTable.length();
777   // ready to flush reduction manager
778   // cannot be CkMemCheckPT::restart because destory will modify states
779   double curTime = CmiWallTimer();
780   if (CkMyPe() == thisFailedPe)
781   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
782   stage = (char *)"recoverBuddies";
783   if (CkMyPe() == thisFailedPe)
784   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
785   startTime = curTime;
786
787   // recover buddies
788   expectCount = 0;
789   for (idx=0; idx<len; idx++) {
790     CkCheckPTInfo *entry = ckTable[idx];
791     if (entry->pNo == thisFailedPe) {
792 #if CK_NO_PROC_POOL
793       // find a new buddy
794 /*
795       int budPe = CkMyPe();
796 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
797       while (budPe == CkMyPe() || isFailed(budPe)) 
798           budPe = (budPe+1)%CkNumPes();
799 */
800       int budPe = BuddyPE(CkMyPe());
801       //entry->pNo = budPe;
802 #else
803       int budPe = thisFailedPe;
804 #endif
805       entry->updateBuddy(CkMyPe(), budPe);
806 #if 0
807       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
808       CkArrayCheckPTMessage *msg = entry->getCopy();
809       msg->cp_flag = 0;            // not checkpointing
810       thisProxy[budPe].recvData(msg);
811 #else
812       CkArrayCheckPTMessage *msg = entry->getCopy();
813       msg->bud1 = budPe;
814       msg->bud2 = CkMyPe();
815       msg->cp_flag = 0;            // not checkpointing
816       thisProxy[budPe].recoverEntry(msg);
817 #endif
818       expectCount ++;
819     }
820   }
821
822 #if 1
823   if (expectCount == 0) {
824     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
825     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
826   }
827 #else
828   if (CkMyPe() == 0) {
829     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
830   }
831 #endif
832
833   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
834 }
835
836 void CkMemCheckPT::gotData()
837 {
838   ackCount ++;
839   if (ackCount == expectCount) {
840     ackCount = 0;
841     expectCount = -1;
842     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
843     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
844   }
845 }
846
847 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
848 {
849   for (int i=0; i<n; i++) {
850     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
851     mgr->updateLocation(idx[i], nowOnPe);
852   }
853 }
854
855 // restore array elements
856 void CkMemCheckPT::recoverArrayElements()
857 {
858   double curTime = CmiWallTimer();
859   int len = ckTable.length();
860   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
861   stage = (char *)"recoverArrayElements";
862   if (CkMyPe() == thisFailedPe)
863   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
864   startTime = curTime;
865
866   // recover all array elements
867   int count = 0;
868 #if STREAMING_INFORMHOME
869   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
870   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
871 #endif
872   for (int idx=0; idx<len; idx++)
873   {
874     CkCheckPTInfo *entry = ckTable[idx];
875 #if CK_NO_PROC_POOL
876     // the bigger one will do 
877 //    if (CkMyPe() < entry->pNo) continue;
878     if (!isMaster(entry->pNo)) continue;
879 #else
880     // smaller one do it, which has the original object
881     if (CkMyPe() == entry->pNo+1 || 
882         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
883 #endif
884 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
885
886     entry->updateBuddy(CkMyPe(), entry->pNo);
887     CkArrayCheckPTMessage *msg = entry->getCopy();
888     // gzheng
889     //thisProxy[CkMyPe()].inmem_restore(msg);
890     inmem_restore(msg);
891 #if STREAMING_INFORMHOME
892     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
893     int homePe = mgr->homePe(msg->index);
894     if (homePe != CkMyPe()) {
895       gmap[homePe].push_back(msg->locMgr);
896       imap[homePe].push_back(msg->index);
897     }
898 #endif
899         CkFreeMsg(msg);
900     count ++;
901   }
902 #if STREAMING_INFORMHOME
903   for (int i=0; i<CkNumPes(); i++) {
904     if (gmap[i].size() && i!=CkMyPe()) {
905       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
906     }
907   }
908   delete [] imap;
909   delete [] gmap;
910 #endif
911   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
912
913   CKLOCMGR_LOOP(mgr->doneInserting(););
914
915   inRestarting = 0;
916   // _crashedNode = -1;
917   CpvAccess(_crashedNode) = -1;
918
919   if (CkMyPe() == 0)
920     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
921 }
922
923 static double restartT;
924
925 // on every processor
926 // turn load balancer back on
927 void CkMemCheckPT::finishUp()
928 {
929   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
930   //CKLOCMGR_LOOP(mgr->doneInserting(););
931   
932   if (CkMyPe() == thisFailedPe)
933   {
934        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
935        //CkStartQD(cpCallback);
936        cpCallback.send();
937        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
938   }
939
940 #if CK_NO_PROC_POOL
941 #if NODE_CHECKPOINT
942   int numnodes = CmiNumPhysicalNodes();
943 #else
944   int numnodes = CkNumPes();
945 #endif
946   if (numnodes-totalFailed() <=2) {
947     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
948     _memChkptOn = 0;
949   }
950 #endif
951 }
952
953 // called only on 0
954 void CkMemCheckPT::quiescence(CkCallback &cb)
955 {
956   static int pe_count = 0;
957   pe_count ++;
958   CmiAssert(CkMyPe() == 0);
959   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
960   if (pe_count == CkNumPes()) {
961     pe_count = 0;
962     cb.send();
963   }
964 }
965
966 // User callable function - to start a checkpoint
967 // callback cb is used to pass control back
968 void CkStartMemCheckpoint(CkCallback &cb)
969 {
970 #if CMK_MEM_CHECKPOINT
971   if (_memChkptOn == 0) {
972     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
973     cb.send();
974     return;
975   }
976   if (CkInRestarting()) {
977       // trying to checkpointing during restart
978     cb.send();
979     return;
980   }
981     // store user callback and user data
982   CkMemCheckPT::cpCallback = cb;
983
984     // broadcast to start check pointing
985   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
986   checkptMgr.doItNow(CkMyPe(), cb);
987 #else
988   // when mem checkpoint is disabled, invike cb immediately
989   cb.send();
990 #endif
991 }
992
993 void CkRestartCheckPoint(int diePe)
994 {
995 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
996   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
997   // broadcast
998   checkptMgr.restart(diePe);
999 }
1000
1001 static int _diePE = -1;
1002
1003 // callback function used locally by ccs handler
1004 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1005 {
1006 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1007   CkRestartCheckPoint(_diePE);
1008 }
1009
1010 // Converse function handles
1011 static int askPhaseHandlerIdx;
1012 static int recvPhaseHandlerIdx;
1013 static int askProcDataHandlerIdx;
1014 static int restartBcastHandlerIdx;
1015 static int recoverProcDataHandlerIdx;
1016 static int restartBeginHandlerIdx;
1017 static int notifyHandlerIdx;
1018
1019 // called on crashed PE
1020 static void restartBeginHandler(char *msg)
1021 {
1022 #if CMK_MEM_CHECKPOINT
1023   static int count = 0;
1024   CmiFree(msg);
1025   CmiAssert(CkMyPe() == _diePE);
1026   count ++;
1027   if (count == CkNumPes()) {
1028     CkRestartCheckPointCallback(NULL, NULL);
1029     count = 0;
1030   }
1031 #endif
1032 }
1033
1034 extern void _discard_charm_message();
1035 extern void _resume_charm_message();
1036
1037 static void restartBcastHandler(char *msg)
1038 {
1039 #if CMK_MEM_CHECKPOINT
1040   // advance phase counter
1041   CkMemCheckPT::inRestarting = 1;
1042   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1043   // gzheng
1044   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1045
1046   if (CkMyPe()==_diePE)
1047     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1048
1049   // reset QD counters
1050 /*  gzheng
1051   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1052 */
1053
1054 /*  gzheng
1055   if (CkMyPe()==_diePE)
1056       CkRestartCheckPointCallback(NULL, NULL);
1057 */
1058   CmiFree(msg);
1059
1060   _resume_charm_message();
1061
1062     // reduction
1063   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1064   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1065   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1066
1067   checkpointed = 0;
1068 #endif
1069 }
1070
1071 extern void _initDone();
1072
1073 // called on crashed processor
1074 static void recoverProcDataHandler(char *msg)
1075 {
1076 #if CMK_MEM_CHECKPOINT
1077    int i;
1078    envelope *env = (envelope *)msg;
1079    CkUnpackMessage(&env);
1080    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1081    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1082    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1083    //cur_restart_phase ++;
1084      // gzheng ?
1085    //CpvAccess(_qd)->flushStates();
1086
1087    // restore readonly, mainchare, group, nodegroup
1088 //   int temp = cur_restart_phase;
1089 //   cur_restart_phase = -1;
1090    PUP::fromMem p(procMsg->packData);
1091    _handleProcData(p);
1092
1093    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1094    // gzheng
1095    CKLOCMGR_LOOP(mgr->startInserting(););
1096
1097    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1098    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1099    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1100    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1101
1102    _initDone();
1103 //   CpvAccess(_qd)->flushStates();
1104    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1105 #endif
1106 }
1107
1108 // called on its backup processor
1109 // get backup message buffer and sent to crashed processor
1110 static void askProcDataHandler(char *msg)
1111 {
1112 #if CMK_MEM_CHECKPOINT
1113     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1114     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1115     if (CpvAccess(procChkptBuf) == NULL) 
1116       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1117     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1118     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1119     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1120
1121     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1122
1123     CkPackMessage(&env);
1124     CmiSetHandler(env, recoverProcDataHandlerIdx);
1125     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1126     CpvAccess(procChkptBuf) = NULL;
1127 #endif
1128 }
1129
1130 // called on PE 0
1131 void qd_callback(void *m)
1132 {
1133    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1134    CkFreeMsg(m);
1135 #ifdef CMK_SMP
1136    for(int i=0;i<CmiMyNodeSize();i++){
1137    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1138    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1139         CmiSetHandler(msg, askProcDataHandlerIdx);
1140         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1141         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1142    }
1143    return;
1144 #endif
1145    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1146    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1147    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1148    CmiSetHandler(msg, askProcDataHandlerIdx);
1149    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1150    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1151
1152 }
1153
1154 // on crashed node
1155 void CkMemRestart(const char *dummy, CkArgMsg *args)
1156 {
1157 #if CMK_MEM_CHECKPOINT
1158    _diePE = CmiMyNode();
1159    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1160    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1161    CkMemCheckPT::inRestarting = 1;
1162
1163   CpvAccess( _crashedNode )= CmiMyNode();
1164         
1165   _discard_charm_message();
1166   
1167   if(CmiMyRank()==0){
1168     CkCallback cb(qd_callback);
1169     CkStartQD(cb);
1170     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1171   }
1172 #else
1173    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1174 #endif
1175 }
1176
1177 // can be called in other files
1178 // return true if it is in restarting
1179 extern "C"
1180 int CkInRestarting()
1181 {
1182 #if CMK_MEM_CHECKPOINT
1183   if (CpvAccess( _crashedNode)!=-1) return 1;
1184   // gzheng
1185   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1186   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1187   return CkMemCheckPT::inRestarting;
1188 #else
1189   return 0;
1190 #endif
1191 }
1192
1193 /*****************************************************************************
1194                 module initialization
1195 *****************************************************************************/
1196
1197 static int arg_where = CkCheckPoint_inMEM;
1198
1199 #if CMK_MEM_CHECKPOINT
1200 void init_memcheckpt(char **argv)
1201 {
1202     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1203       arg_where = CkCheckPoint_inDISK;
1204     }
1205
1206         // initiliazing _crashedNode variable
1207         CpvInitialize(int, _crashedNode);
1208         CpvAccess(_crashedNode) = -1;
1209
1210 }
1211 #endif
1212
1213 class CkMemCheckPTInit: public Chare {
1214 public:
1215   CkMemCheckPTInit(CkArgMsg *m) {
1216 #if CMK_MEM_CHECKPOINT
1217     if (arg_where == CkCheckPoint_inDISK) {
1218       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1219     }
1220     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1221     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1222 #endif
1223   }
1224 };
1225
1226 static void notifyHandler(char *msg)
1227 {
1228 #if CMK_MEM_CHECKPOINT
1229   CmiFree(msg);
1230       /* immediately increase restart phase to filter old messages */
1231   CpvAccess(_curRestartPhase) ++;
1232   CpvAccess(_qd)->flushStates();
1233   _discard_charm_message();
1234 #endif
1235 }
1236
1237 extern "C"
1238 void notify_crash(int node)
1239 {
1240 #ifdef CMK_MEM_CHECKPOINT
1241   CpvAccess( _crashedNode) = node;
1242 #ifdef CMK_SMP
1243   for(int i=0;i<CkMyNodeSize();i++){
1244         CpvAccessOther(_crashedNode,i)=node;
1245   }
1246 #endif
1247   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1248   CkMemCheckPT::inRestarting = 1;
1249
1250 /*
1251 #ifdef CMK_SMP
1252 */
1253   int pe = CmiNodeFirst(CkMyNode());
1254   for(int i=0;i<CkMyNodeSize();i++){
1255         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1256         CmiSetHandler(msg, notifyHandlerIdx);
1257         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1258   }
1259 /*
1260  return;
1261 #else 
1262     // this may be in interrupt handler, send a message to reset QD
1263   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1264   CmiSetHandler(msg, notifyHandlerIdx);
1265   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1266 #endif
1267 */
1268 #endif
1269 }
1270
1271 extern "C" void (*notify_crash_fn)(int node);
1272
1273 #if CMK_CONVERSE_MPI
1274 static int pingHandlerIdx;
1275 static int pingCheckHandlerIdx;
1276 static int buddyDieHandlerIdx;
1277 static double lastPingTime = -1;
1278
1279 extern "C" void mpi_restart_crashed(int pe, int rank);
1280 extern "C" int  find_spare_mpirank(int pe);
1281
1282 void pingBuddy();
1283 void pingCheckHandler();
1284
1285 void buddyDieHandler(char *msg)
1286 {
1287 #if CMK_MEM_CHECKPOINT
1288    // notify
1289    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1290    notify_crash(diepe);
1291    // send message to crash pe to let it restart
1292    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1293    int newrank = find_spare_mpirank(diepe);
1294    int buddy = obj->BuddyPE(CmiMyPe());
1295    if (buddy == diepe)  {
1296      mpi_restart_crashed(diepe, newrank);
1297      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1298    }
1299 #endif
1300 }
1301
1302 void pingHandler(void *msg)
1303 {
1304   lastPingTime = CmiWallTimer();
1305   CmiFree(msg);
1306 }
1307
1308 void pingCheckHandler()
1309 {
1310 #if CMK_MEM_CHECKPOINT
1311   double now = CmiWallTimer();
1312   if (lastPingTime > 0 && now - lastPingTime > 4) {
1313     int i, pe, buddy;
1314     // tell everyone the buddy dies
1315     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1316     for (i = 1; i < CmiNumPes(); i++) {
1317        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1318        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1319     }
1320     buddy = pe;
1321     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1322     for (int pe = 0; pe < CmiNumPes(); pe++) {
1323       if (obj->isFailed(pe) || pe == buddy) continue;
1324       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1325       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1326       CmiSetHandler(msg, buddyDieHandlerIdx);
1327       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1328     }
1329   }
1330   else 
1331     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1332 #endif
1333 }
1334
1335 void pingBuddy()
1336 {
1337 #if CMK_MEM_CHECKPOINT
1338   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1339   if (obj) {
1340     int buddy = obj->BuddyPE(CkMyPe());
1341 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1342     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1343     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1344     CmiSetHandler(msg, pingHandlerIdx);
1345     CmiGetRestartPhase(msg) = 9999;
1346     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1347   }
1348   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1349 #endif
1350 }
1351 #endif
1352
1353 // initproc
1354 void CkRegisterRestartHandler( )
1355 {
1356 #if CMK_MEM_CHECKPOINT
1357   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1358   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1359   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1360   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1361   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1362
1363 #if CMK_CONVERSE_MPI
1364   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1365   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1366   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1367 #endif
1368
1369   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1370   CpvAccess(procChkptBuf) = NULL;
1371
1372   notify_crash_fn = notify_crash;
1373
1374 #if ! CMK_CONVERSE_MPI
1375   // print pid to kill
1376   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1377 //  sleep(4);
1378 #endif
1379 #endif
1380 }
1381
1382
1383 extern "C"
1384 int CkHasCheckpoints()
1385 {
1386   return checkpointed;
1387 }
1388
1389 /// @todo: the following definitions should be moved to a separate file containing
1390 // structures and functions about fault tolerance strategies
1391
1392 /**
1393  *  * @brief: function for killing a process                                             
1394  *   */
1395 #ifdef CMK_MEM_CHECKPOINT
1396 #if CMK_HAS_GETPID
1397 void killLocal(void *_dummy,double curWallTime){
1398         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1399         if(CmiWallTimer()<killTime-1){
1400                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1401         }else{  
1402                 kill(getpid(),SIGKILL);                                               
1403         }              
1404
1405 #else
1406 void killLocal(void *_dummy,double curWallTime){
1407   CmiAbort("kill() not supported!");
1408 }
1409 #endif
1410 #endif
1411
1412 #ifdef CMK_MEM_CHECKPOINT
1413 /**
1414  * @brief: reads the file with the kill information
1415  */
1416 void readKillFile(){
1417         FILE *fp=fopen(killFile,"r");
1418         if(!fp){
1419                 return;
1420         }
1421         int proc;
1422         double sec;
1423         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1424                 if(proc == CkMyNode() && CkMyRank() == 0){
1425                         killTime = CmiWallTimer()+sec;
1426                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1427                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1428                 }
1429         }
1430         fclose(fp);
1431 }
1432
1433 #if ! CMK_CONVERSE_MPI
1434 void CkDieNow()
1435 {
1436          // ignored for non-mpi version
1437         CmiPrintf("[%d] die now.\n", CmiMyPe());
1438         killTime = CmiWallTimer()+0.001;
1439         CcdCallFnAfter(killLocal,NULL,1);
1440 }
1441 #endif
1442
1443 #endif
1444
1445 #include "CkMemCheckpoint.def.h"
1446
1447