Merge branch 'charm' of charmgit:charm into isomalloc
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   p|peCount;
336   if (p.isUnpacking()) {
337     recvCount = 0;
338 #if CMK_CONVERSE_MPI
339     void pingBuddy();
340     void pingCheckHandler();
341     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
342     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
343 #endif
344   }
345 }
346
347 // called by checkpoint mgr to restore an array element
348 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
349 {
350 #if CMK_MEM_CHECKPOINT
351   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
352   // m->index.print();
353   PUP::fromMem p(m->packData);
354   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
355   CmiAssert(mgr);
356 #if STREAMING_INFORMHOME
357   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
358 #else
359   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
360 #endif
361
362   // find a list of array elements bound together
363   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
364   CmiAssert(elt);
365   CkLocRec_local *rec = elt->myRec;
366   CkVec<CkMigratable *> list;
367   mgr->migratableList(rec, list);
368   CmiAssert(list.length() > 0);
369   for (int l=0; l<list.length(); l++) {
370     elt = (ArrayElement *)list[l];
371     elt->budPEs[0] = m->bud1;
372     elt->budPEs[1] = m->bud2;
373     //    reset, may not needed now
374     // for now.
375     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
376       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
377       if (c) c->redNo = 0;
378     }
379   }
380 #endif
381 }
382
383 // return 1 if pe is a crashed processor
384 int CkMemCheckPT::isFailed(int pe)
385 {
386   for (int i=0; i<failedPes.length(); i++)
387     if (failedPes[i] == pe) return 1;
388   return 0;
389 }
390
391 // add pe into history list of all failed processors
392 void CkMemCheckPT::failed(int pe)
393 {
394   if (isFailed(pe)) return;
395   failedPes.push_back(pe);
396 }
397
398 int CkMemCheckPT::totalFailed()
399 {
400   return failedPes.length();
401 }
402
403 // create an checkpoint entry for array element of aid with index.
404 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
405 {
406   // error check, no duplicate
407   int idx, len = ckTable.size();
408   for (idx=0; idx<len; idx++) {
409     CkCheckPTInfo *entry = ckTable[idx];
410     if (index == entry->index) {
411       if (loc == entry->locMgr) {
412           // bindTo array elements
413           return;
414       }
415         // for array inheritance, the following check may fail
416         // because ArrayElement constructor of all superclasses are called
417       if (aid == entry->aid) {
418         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
419         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
420       }
421     }
422   }
423   CkCheckPTInfo *newEntry;
424   if (where == CkCheckPoint_inMEM)
425     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
426   else
427     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
428   ckTable.push_back(newEntry);
429   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
430 }
431
432 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
433 {
434   int buddy = msg->bud1;
435   if (buddy == CkMyPe()) buddy = msg->bud2;
436   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
437   recvData(msg);
438     // ack
439   thisProxy[buddy].gotData();
440 }
441
442 // loop through my checkpoint table and ask checkpointed array elements
443 // to send me checkpoint data.
444 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
445 {
446   checkpointed = 1;
447   cpCallback = cb;
448   cpStarter = starter;
449   if (CkMyPe() == cpStarter) {
450     startTime = CmiWallTimer();
451     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
452   }
453
454   int len = ckTable.length();
455   for (int i=0; i<len; i++) {
456     CkCheckPTInfo *entry = ckTable[i];
457       // always let the bigger number processor send request
458     //if (CkMyPe() < entry->pNo) continue;
459       // always let the smaller number processor send request, may on same proc
460     if (!isMaster(entry->pNo)) continue;
461       // call inmem_checkpoint to the array element, ask it to send
462       // back checkpoint data via recvData().
463     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
464     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
465   }
466     // if my table is empty, then I am done
467   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
468
469   // pack and send proc level data
470   sendProcData();
471 }
472
473 // don't handle array elements
474 static inline void _handleProcData(PUP::er &p)
475 {
476     // save readonlys, and callback BTW
477     CkPupROData(p);
478
479     // save mainchares 
480     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
481         
482 #ifndef CMK_CHARE_USE_PTR
483     // save non-migratable chare
484     CkPupChareData(p);
485 #endif
486
487     // save groups into Groups.dat
488     CkPupGroupData(p);
489
490     // save nodegroups into NodeGroups.dat
491     if(CkMyRank()==0) CkPupNodeGroupData(p);
492 }
493
494 void CkMemCheckPT::sendProcData()
495 {
496   // find out size of buffer
497   int size;
498   {
499     PUP::sizer p;
500     _handleProcData(p);
501     size = p.size();
502   }
503   int packSize = size;
504   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
505   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
506   {
507     PUP::toMem p(msg->packData);
508     _handleProcData(p);
509   }
510   msg->pe = CkMyPe();
511   msg->len = size;
512   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
513   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
514 }
515
516 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
517 {
518   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
519   CpvAccess(procChkptBuf) = msg;
520   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
521   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
522 }
523
524 // ArrayElement call this function to give us the checkpointed data
525 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
526 {
527   int len = ckTable.length();
528   int idx;
529   for (idx=0; idx<len; idx++) {
530     CkCheckPTInfo *entry = ckTable[idx];
531     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
532   }
533   CkAssert(idx < len);
534   int isChkpting = msg->cp_flag;
535   ckTable[idx]->updateBuffer(msg);
536   if (isChkpting) {
537       // all my array elements have returned their inmem data
538       // inform starter processor that I am done.
539     recvCount ++;
540     if (recvCount == ckTable.length()) {
541       if (where == CkCheckPoint_inMEM) {
542         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
543       }
544       else if (where == CkCheckPoint_inDISK) {
545         // another barrier for finalize the writing using fsync
546         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
547         contribute(0,NULL,CkReduction::sum_int,localcb);
548       }
549       else
550         CmiAbort("Unknown checkpoint scheme");
551       recvCount = 0;
552     } 
553   }
554 }
555
556 // only used in disk checkpointing
557 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
558 {
559   delete m;
560 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
561   system("sync");
562 #endif
563   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
564 }
565
566 // only is called on cpStarter when checkpoint is done
567 void CkMemCheckPT::cpFinish()
568 {
569   CmiAssert(CkMyPe() == cpStarter);
570   peCount++;
571     // now that all processors have finished, activate callback
572   if (peCount == 2) {
573     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
574     cpCallback.send();
575     peCount = 0;
576     thisProxy.report();
577   }
578 }
579
580 // for debugging, report checkpoint info
581 void CkMemCheckPT::report()
582 {
583   int objsize = 0;
584   int len = ckTable.length();
585   for (int i=0; i<len; i++) {
586     CkCheckPTInfo *entry = ckTable[i];
587     CmiAssert(entry);
588     objsize += entry->getSize();
589   }
590   CmiAssert(CpvAccess(procChkptBuf));
591   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
592 }
593
594 /*****************************************************************************
595                         RESTART Procedure
596 *****************************************************************************/
597
598 // master processor of two buddies
599 inline int CkMemCheckPT::isMaster(int buddype)
600 {
601 #if 0
602   int mype = CkMyPe();
603 //CkPrintf("ismaster: %d %d\n", pe, mype);
604   if (CkNumPes() - totalFailed() == 2) {
605     return mype > buddype;
606   }
607   for (int i=1; i<CkNumPes(); i++) {
608     int me = (buddype+i)%CkNumPes();
609     if (isFailed(me)) continue;
610     if (me == mype) return 1;
611     else return 0;
612   }
613   return 0;
614 #else
615     // smaller one
616   int mype = CkMyPe();
617 //CkPrintf("ismaster: %d %d\n", pe, mype);
618   if (CkNumPes() - totalFailed() == 2) {
619     return mype < buddype;
620   }
621   for (int i=1; i<CkNumPes(); i++) {
622     int me = (mype+i)%CkNumPes();
623     if (isFailed(me)) continue;
624     if (me == buddype) return 1;
625     else return 0;
626   }
627   return 0;
628 #endif
629 }
630
631 #ifdef CKLOCMGR_LOOP
632 #undef CKLOCMGR_LOOP
633 #endif
634
635 // loop over all CkLocMgr and do "code"
636 #define  CKLOCMGR_LOOP(code)    {       \
637   int numGroups = CkpvAccess(_groupIDTable)->size();    \
638   for(int i=0;i<numGroups;i++) {        \
639     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
640     if(obj->isLocMgr())  {      \
641       CkLocMgr *mgr = (CkLocMgr*)obj;   \
642       code      \
643     }   \
644   }     \
645  }
646
647 #if 0
648 // helper class to pup all elements that belong to same ckLocMgr
649 class ElementDestoryer : public CkLocIterator {
650 private:
651         CkLocMgr *locMgr;
652 public:
653         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
654         void addLocation(CkLocation &loc) {
655                 CkArrayIndex idx=loc.getIndex();
656                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
657                 loc.destroy();
658         }
659 };
660 #endif
661
662 // restore the bitmap vector for LB
663 void CkMemCheckPT::resetLB(int diepe)
664 {
665 #if CMK_LBDB_ON
666   int i;
667   char *bitmap = new char[CkNumPes()];
668   // set processor available bitmap
669   get_avail_vector(bitmap);
670
671   for (i=0; i<failedPes.length(); i++)
672     bitmap[failedPes[i]] = 0; 
673   bitmap[diepe] = 0;
674
675 #if CK_NO_PROC_POOL
676   set_avail_vector(bitmap);
677 #endif
678
679   // if I am the crashed pe, rebuild my failedPEs array
680   if (CkMyPe() == diepe)
681     for (i=0; i<CkNumPes(); i++) 
682       if (bitmap[i]==0) failed(i);
683
684   delete [] bitmap;
685 #endif
686 }
687
688 // in case when failedPe dies, everybody go through its checkpoint table:
689 // destory all array elements
690 // recover lost buddies
691 // reconstruct all array elements from check point data
692 // called on all processors
693 void CkMemCheckPT::restart(int diePe)
694 {
695 #if CMK_MEM_CHECKPOINT
696   double curTime = CmiWallTimer();
697   if (CkMyPe() == diePe)
698     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
699   stage = (char*)"resetLB";
700   startTime = curTime;
701   if (CkMyPe() == diePe)
702     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
703
704 #if CK_NO_PROC_POOL
705   failed(diePe);        // add into the list of failed pes
706 #endif
707   thisFailedPe = diePe;
708
709   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
710
711   inRestarting = 1;
712                                                                                 
713   // disable load balancer's barrier
714   if (CkMyPe() != diePe) resetLB(diePe);
715
716   CKLOCMGR_LOOP(mgr->startInserting(););
717
718   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
719 /*
720   if (CkMyPe() == 0)
721     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
722 */
723 #endif
724 }
725
726 // loally remove all array elements
727 void CkMemCheckPT::removeArrayElements()
728 {
729 #if CMK_MEM_CHECKPOINT
730   int len = ckTable.length();
731   double curTime = CmiWallTimer();
732   if (CkMyPe() == thisFailedPe) 
733     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
734   stage = (char*)"removeArrayElements";
735   startTime = curTime;
736
737   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
738   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
739
740   // get rid of all buffering and remote recs
741   // including destorying all array elements
742   CKLOCMGR_LOOP(mgr->flushAllRecs(););
743
744 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
745
746   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
747 #endif
748 }
749
750 // flush state in reduction manager
751 void CkMemCheckPT::resetReductionMgr()
752 {
753   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
754   int numGroups = CkpvAccess(_groupIDTable)->size();
755   for(int i=0;i<numGroups;i++) {
756     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
757     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
758     obj->flushStates();
759     obj->ckJustMigrated();
760   }
761   // reset again
762   //CpvAccess(_qd)->flushStates();
763
764 #if 1
765   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
766 #else
767   if (CkMyPe() == 0)
768     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
769 #endif
770 }
771
772 // recover the lost buddies
773 void CkMemCheckPT::recoverBuddies()
774 {
775   int idx;
776   int len = ckTable.length();
777   // ready to flush reduction manager
778   // cannot be CkMemCheckPT::restart because destory will modify states
779   double curTime = CmiWallTimer();
780   if (CkMyPe() == thisFailedPe)
781   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
782   stage = (char *)"recoverBuddies";
783   if (CkMyPe() == thisFailedPe)
784   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
785   startTime = curTime;
786
787   // recover buddies
788   expectCount = 0;
789   for (idx=0; idx<len; idx++) {
790     CkCheckPTInfo *entry = ckTable[idx];
791     if (entry->pNo == thisFailedPe) {
792 #if CK_NO_PROC_POOL
793       // find a new buddy
794 /*
795       int budPe = CkMyPe();
796 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
797       while (budPe == CkMyPe() || isFailed(budPe)) 
798           budPe = (budPe+1)%CkNumPes();
799 */
800       int budPe = BuddyPE(CkMyPe());
801       //entry->pNo = budPe;
802 #else
803       int budPe = thisFailedPe;
804 #endif
805       entry->updateBuddy(CkMyPe(), budPe);
806 #if 0
807       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
808       CkArrayCheckPTMessage *msg = entry->getCopy();
809       msg->cp_flag = 0;            // not checkpointing
810       thisProxy[budPe].recvData(msg);
811 #else
812       CkArrayCheckPTMessage *msg = entry->getCopy();
813       msg->bud1 = budPe;
814       msg->bud2 = CkMyPe();
815       msg->cp_flag = 0;            // not checkpointing
816       thisProxy[budPe].recoverEntry(msg);
817 #endif
818       expectCount ++;
819     }
820   }
821
822 #if 1
823   if (expectCount == 0) {
824     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
825     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
826   }
827 #else
828   if (CkMyPe() == 0) {
829     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
830   }
831 #endif
832
833   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
834 }
835
836 void CkMemCheckPT::gotData()
837 {
838   ackCount ++;
839   if (ackCount == expectCount) {
840     ackCount = 0;
841     expectCount = -1;
842     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
843     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
844   }
845 }
846
847 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
848 {
849   for (int i=0; i<n; i++) {
850     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
851     mgr->updateLocation(idx[i], nowOnPe);
852   }
853 }
854
855 // restore array elements
856 void CkMemCheckPT::recoverArrayElements()
857 {
858   double curTime = CmiWallTimer();
859   int len = ckTable.length();
860   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
861   stage = (char *)"recoverArrayElements";
862   if (CkMyPe() == thisFailedPe)
863   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
864   startTime = curTime;
865
866   // recover all array elements
867   int count = 0;
868 #if STREAMING_INFORMHOME
869   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
870   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
871 #endif
872   for (int idx=0; idx<len; idx++)
873   {
874     CkCheckPTInfo *entry = ckTable[idx];
875 #if CK_NO_PROC_POOL
876     // the bigger one will do 
877 //    if (CkMyPe() < entry->pNo) continue;
878     if (!isMaster(entry->pNo)) continue;
879 #else
880     // smaller one do it, which has the original object
881     if (CkMyPe() == entry->pNo+1 || 
882         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
883 #endif
884 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
885
886     entry->updateBuddy(CkMyPe(), entry->pNo);
887     CkArrayCheckPTMessage *msg = entry->getCopy();
888     // gzheng
889     //thisProxy[CkMyPe()].inmem_restore(msg);
890     inmem_restore(msg);
891 #if STREAMING_INFORMHOME
892     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
893     int homePe = mgr->homePe(msg->index);
894     if (homePe != CkMyPe()) {
895       gmap[homePe].push_back(msg->locMgr);
896       imap[homePe].push_back(msg->index);
897     }
898 #endif
899     count ++;
900   }
901 #if STREAMING_INFORMHOME
902   for (int i=0; i<CkNumPes(); i++) {
903     if (gmap[i].size() && i!=CkMyPe()) {
904       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
905     }
906   }
907   delete [] imap;
908   delete [] gmap;
909 #endif
910   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
911
912   CKLOCMGR_LOOP(mgr->doneInserting(););
913
914   inRestarting = 0;
915   // _crashedNode = -1;
916   CpvAccess(_crashedNode) = -1;
917
918   if (CkMyPe() == 0)
919     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
920 }
921
922 static double restartT;
923
924 // on every processor
925 // turn load balancer back on
926 void CkMemCheckPT::finishUp()
927 {
928   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
929   //CKLOCMGR_LOOP(mgr->doneInserting(););
930   
931   if (CkMyPe() == thisFailedPe)
932   {
933        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
934        //CkStartQD(cpCallback);
935        cpCallback.send();
936        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
937   }
938
939 #if CK_NO_PROC_POOL
940 #if NODE_CHECKPOINT
941   int numnodes = CmiNumPhysicalNodes();
942 #else
943   int numnodes = CkNumPes();
944 #endif
945   if (numnodes-totalFailed() <=2) {
946     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
947     _memChkptOn = 0;
948   }
949 #endif
950 }
951
952 // called only on 0
953 void CkMemCheckPT::quiescence(CkCallback &cb)
954 {
955   static int pe_count = 0;
956   pe_count ++;
957   CmiAssert(CkMyPe() == 0);
958   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
959   if (pe_count == CkNumPes()) {
960     pe_count = 0;
961     cb.send();
962   }
963 }
964
965 // User callable function - to start a checkpoint
966 // callback cb is used to pass control back
967 void CkStartMemCheckpoint(CkCallback &cb)
968 {
969 #if CMK_MEM_CHECKPOINT
970   if (_memChkptOn == 0) {
971     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
972     cb.send();
973     return;
974   }
975   if (CkInRestarting()) {
976       // trying to checkpointing during restart
977     cb.send();
978     return;
979   }
980     // store user callback and user data
981   CkMemCheckPT::cpCallback = cb;
982
983     // broadcast to start check pointing
984   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
985   checkptMgr.doItNow(CkMyPe(), cb);
986 #else
987   // when mem checkpoint is disabled, invike cb immediately
988   cb.send();
989 #endif
990 }
991
992 void CkRestartCheckPoint(int diePe)
993 {
994 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
995   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
996   // broadcast
997   checkptMgr.restart(diePe);
998 }
999
1000 static int _diePE = -1;
1001
1002 // callback function used locally by ccs handler
1003 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1004 {
1005 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1006   CkRestartCheckPoint(_diePE);
1007 }
1008
1009 // Converse function handles
1010 static int askPhaseHandlerIdx;
1011 static int recvPhaseHandlerIdx;
1012 static int askProcDataHandlerIdx;
1013 static int restartBcastHandlerIdx;
1014 static int recoverProcDataHandlerIdx;
1015 static int restartBeginHandlerIdx;
1016 static int notifyHandlerIdx;
1017
1018 // called on crashed PE
1019 static void restartBeginHandler(char *msg)
1020 {
1021 #if CMK_MEM_CHECKPOINT
1022   static int count = 0;
1023   CmiFree(msg);
1024   CmiAssert(CkMyPe() == _diePE);
1025   count ++;
1026   if (count == CkNumPes()) {
1027     CkRestartCheckPointCallback(NULL, NULL);
1028     count = 0;
1029   }
1030 #endif
1031 }
1032
1033 extern void _discard_charm_message();
1034 extern void _resume_charm_message();
1035
1036 static void restartBcastHandler(char *msg)
1037 {
1038 #if CMK_MEM_CHECKPOINT
1039   // advance phase counter
1040   CkMemCheckPT::inRestarting = 1;
1041   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1042   // gzheng
1043   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1044
1045   if (CkMyPe()==_diePE)
1046     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1047
1048   // reset QD counters
1049 /*  gzheng
1050   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1051 */
1052
1053 /*  gzheng
1054   if (CkMyPe()==_diePE)
1055       CkRestartCheckPointCallback(NULL, NULL);
1056 */
1057   CmiFree(msg);
1058
1059   _resume_charm_message();
1060
1061     // reduction
1062   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1063   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1064   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1065
1066   checkpointed = 0;
1067 #endif
1068 }
1069
1070 extern void _initDone();
1071
1072 // called on crashed processor
1073 static void recoverProcDataHandler(char *msg)
1074 {
1075 #if CMK_MEM_CHECKPOINT
1076    int i;
1077    envelope *env = (envelope *)msg;
1078    CkUnpackMessage(&env);
1079    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1080    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1081    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1082    //cur_restart_phase ++;
1083      // gzheng ?
1084    //CpvAccess(_qd)->flushStates();
1085
1086    // restore readonly, mainchare, group, nodegroup
1087 //   int temp = cur_restart_phase;
1088 //   cur_restart_phase = -1;
1089    PUP::fromMem p(procMsg->packData);
1090    _handleProcData(p);
1091
1092    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1093    // gzheng
1094    CKLOCMGR_LOOP(mgr->startInserting(););
1095
1096    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1097    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1098    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1099    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1100
1101    _initDone();
1102 //   CpvAccess(_qd)->flushStates();
1103    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1104 #endif
1105 }
1106
1107 // called on its backup processor
1108 // get backup message buffer and sent to crashed processor
1109 static void askProcDataHandler(char *msg)
1110 {
1111 #if CMK_MEM_CHECKPOINT
1112     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1113     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1114     if (CpvAccess(procChkptBuf) == NULL) 
1115       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1116     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1117     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1118     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1119
1120     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1121
1122     CkPackMessage(&env);
1123     CmiSetHandler(env, recoverProcDataHandlerIdx);
1124     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1125     CpvAccess(procChkptBuf) = NULL;
1126 #endif
1127 }
1128
1129 // called on PE 0
1130 void qd_callback(void *m)
1131 {
1132    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1133    CkFreeMsg(m);
1134 #ifdef CMK_SMP
1135    for(int i=0;i<CmiMyNodeSize();i++){
1136    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1137    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1138         CmiSetHandler(msg, askProcDataHandlerIdx);
1139         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1140         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1141    }
1142    return;
1143 #endif
1144    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1145    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1146    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1147    CmiSetHandler(msg, askProcDataHandlerIdx);
1148    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1149    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1150
1151 }
1152
1153 // on crashed node
1154 void CkMemRestart(const char *dummy, CkArgMsg *args)
1155 {
1156 #if CMK_MEM_CHECKPOINT
1157    _diePE = CmiMyNode();
1158    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1159    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1160    CkMemCheckPT::inRestarting = 1;
1161
1162   CpvAccess( _crashedNode )= CmiMyNode();
1163         
1164   _discard_charm_message();
1165   
1166   if(CmiMyRank()==0){
1167     CkCallback cb(qd_callback);
1168     CkStartQD(cb);
1169     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1170   }
1171 #else
1172    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1173 #endif
1174 }
1175
1176 // can be called in other files
1177 // return true if it is in restarting
1178 extern "C"
1179 int CkInRestarting()
1180 {
1181 #if CMK_MEM_CHECKPOINT
1182   if (CpvAccess( _crashedNode)!=-1) return 1;
1183   // gzheng
1184   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1185   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1186   return CkMemCheckPT::inRestarting;
1187 #else
1188   return 0;
1189 #endif
1190 }
1191
1192 /*****************************************************************************
1193                 module initialization
1194 *****************************************************************************/
1195
1196 static int arg_where = CkCheckPoint_inMEM;
1197
1198 #if CMK_MEM_CHECKPOINT
1199 void init_memcheckpt(char **argv)
1200 {
1201     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1202       arg_where = CkCheckPoint_inDISK;
1203     }
1204
1205         // initiliazing _crashedNode variable
1206         CpvInitialize(int, _crashedNode);
1207         CpvAccess(_crashedNode) = -1;
1208
1209 }
1210 #endif
1211
1212 class CkMemCheckPTInit: public Chare {
1213 public:
1214   CkMemCheckPTInit(CkArgMsg *m) {
1215 #if CMK_MEM_CHECKPOINT
1216     if (arg_where == CkCheckPoint_inDISK) {
1217       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1218     }
1219     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1220     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1221 #endif
1222   }
1223 };
1224
1225 static void notifyHandler(char *msg)
1226 {
1227 #if CMK_MEM_CHECKPOINT
1228   CmiFree(msg);
1229       /* immediately increase restart phase to filter old messages */
1230   CpvAccess(_curRestartPhase) ++;
1231   CpvAccess(_qd)->flushStates();
1232   _discard_charm_message();
1233 #endif
1234 }
1235
1236 extern "C"
1237 void notify_crash(int node)
1238 {
1239 #ifdef CMK_MEM_CHECKPOINT
1240   CpvAccess( _crashedNode) = node;
1241 #ifdef CMK_SMP
1242   for(int i=0;i<CkMyNodeSize();i++){
1243         CpvAccessOther(_crashedNode,i)=node;
1244   }
1245 #endif
1246   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1247   CkMemCheckPT::inRestarting = 1;
1248
1249 /*
1250 #ifdef CMK_SMP
1251 */
1252   int pe = CmiNodeFirst(CkMyNode());
1253   for(int i=0;i<CkMyNodeSize();i++){
1254         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1255         CmiSetHandler(msg, notifyHandlerIdx);
1256         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1257   }
1258 /*
1259  return;
1260 #else 
1261     // this may be in interrupt handler, send a message to reset QD
1262   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1263   CmiSetHandler(msg, notifyHandlerIdx);
1264   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1265 #endif
1266 */
1267 #endif
1268 }
1269
1270 extern "C" void (*notify_crash_fn)(int node);
1271
1272 #if CMK_CONVERSE_MPI
1273 static int pingHandlerIdx;
1274 static int pingCheckHandlerIdx;
1275 static int buddyDieHandlerIdx;
1276 static double lastPingTime = -1;
1277
1278 extern "C" void mpi_restart_crashed(int pe, int rank);
1279 extern "C" int  find_spare_mpirank(int pe);
1280
1281 void pingBuddy();
1282 void pingCheckHandler();
1283
1284 void buddyDieHandler(char *msg)
1285 {
1286 #if CMK_MEM_CHECKPOINT
1287    // notify
1288    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1289    notify_crash(diepe);
1290    // send message to crash pe to let it restart
1291    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1292    int newrank = find_spare_mpirank(diepe);
1293    int buddy = obj->BuddyPE(CmiMyPe());
1294    if (buddy == diepe)  {
1295      mpi_restart_crashed(diepe, newrank);
1296      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1297    }
1298 #endif
1299 }
1300
1301 void pingHandler(void *msg)
1302 {
1303   lastPingTime = CmiWallTimer();
1304   CmiFree(msg);
1305 }
1306
1307 void pingCheckHandler()
1308 {
1309 #if CMK_MEM_CHECKPOINT
1310   double now = CmiWallTimer();
1311   if (lastPingTime > 0 && now - lastPingTime > 4) {
1312     int i, pe, buddy;
1313     // tell everyone the buddy dies
1314     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1315     for (i = 1; i < CmiNumPes(); i++) {
1316        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1317        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1318     }
1319     buddy = pe;
1320     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1321     for (int pe = 0; pe < CmiNumPes(); pe++) {
1322       if (obj->isFailed(pe) || pe == buddy) continue;
1323       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1324       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1325       CmiSetHandler(msg, buddyDieHandlerIdx);
1326       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1327     }
1328   }
1329   else 
1330     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1331 #endif
1332 }
1333
1334 void pingBuddy()
1335 {
1336 #if CMK_MEM_CHECKPOINT
1337   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1338   if (obj) {
1339     int buddy = obj->BuddyPE(CkMyPe());
1340 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1341     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1342     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1343     CmiSetHandler(msg, pingHandlerIdx);
1344     CmiGetRestartPhase(msg) = 9999;
1345     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1346   }
1347   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1348 #endif
1349 }
1350 #endif
1351
1352 // initproc
1353 void CkRegisterRestartHandler( )
1354 {
1355 #if CMK_MEM_CHECKPOINT
1356   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1357   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1358   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1359   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1360   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1361
1362 #if CMK_CONVERSE_MPI
1363   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1364   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1365   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1366 #endif
1367
1368   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1369   CpvAccess(procChkptBuf) = NULL;
1370
1371   notify_crash_fn = notify_crash;
1372
1373 #if ! CMK_CONVERSE_MPI
1374   // print pid to kill
1375   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1376 //  sleep(4);
1377 #endif
1378 #endif
1379 }
1380
1381
1382 extern "C"
1383 int CkHasCheckpoints()
1384 {
1385   return checkpointed;
1386 }
1387
1388 /// @todo: the following definitions should be moved to a separate file containing
1389 // structures and functions about fault tolerance strategies
1390
1391 /**
1392  *  * @brief: function for killing a process                                             
1393  *   */
1394 #ifdef CMK_MEM_CHECKPOINT
1395 #if CMK_HAS_GETPID
1396 void killLocal(void *_dummy,double curWallTime){
1397         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1398         if(CmiWallTimer()<killTime-1){
1399                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1400         }else{  
1401                 kill(getpid(),SIGKILL);                                               
1402         }              
1403
1404 #else
1405 void killLocal(void *_dummy,double curWallTime){
1406   CmiAbort("kill() not supported!");
1407 }
1408 #endif
1409 #endif
1410
1411 #ifdef CMK_MEM_CHECKPOINT
1412 /**
1413  * @brief: reads the file with the kill information
1414  */
1415 void readKillFile(){
1416         FILE *fp=fopen(killFile,"r");
1417         if(!fp){
1418                 return;
1419         }
1420         int proc;
1421         double sec;
1422         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1423                 if(proc == CkMyNode() && CkMyRank() == 0){
1424                         killTime = CmiWallTimer()+sec;
1425                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1426                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1427                 }
1428         }
1429         fclose(fp);
1430 }
1431
1432 #if ! CMK_CONVERSE_MPI
1433 void CkDieNow()
1434 {
1435          // ignored for non-mpi version
1436         CmiPrintf("[%d] die now.\n", CmiMyPe());
1437         killTime = CmiWallTimer()+0.001;
1438         CcdCallFnAfter(killLocal,NULL,1);
1439 }
1440 #endif
1441
1442 #endif
1443
1444 #include "CkMemCheckpoint.def.h"
1445
1446