Merge branch 'harshitha/adaptive_lb' of charmgit:charm into harshitha/adaptive_lb
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82 static int checkpointed = 0;
83
84 /// @todo the following declarations should be moved into a separate file for all 
85 // fault tolerant strategies
86
87 #ifdef CMK_MEM_CHECKPOINT
88 // name of the kill file that contains processes to be killed 
89 char *killFile;                                               
90 // flag for the kill file         
91 int killFlag=0;
92 // variable for storing the killing time
93 double killTime=0.0;
94 #endif
95
96 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
97 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
98
99 // compute the backup processor
100 // FIXME: avoid crashed processors
101 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
102
103 inline int CkMemCheckPT::BuddyPE(int pe)
104 {
105   int budpe;
106 #if NODE_CHECKPOINT
107     // buddy is the processor with same rank on the next physical node
108   int r1 = CmiPhysicalRank(pe);
109   int budnode = CmiPhysicalNodeID(pe);
110   do {
111     budnode = (budnode+1)%CmiNumPhysicalNodes();
112     int *pelist;
113     int num;
114     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
115     budpe = pelist[r1 % num];
116   } while (isFailed(budpe));
117   if (budpe == pe) {
118     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
119     CmiAbort("Failed to find a buddy processor");
120   }
121 #else
122   budpe = pe;
123   while (budpe == pe || isFailed(budpe)) 
124           budpe = (budpe+1)%CkNumPes();
125 #endif
126   return budpe;
127 }
128
129 // called in array element constructor
130 // choose and register with 2 buddies for checkpoiting 
131 #if CMK_MEM_CHECKPOINT
132 void ArrayElement::init_checkpt() {
133         if (_memChkptOn == 0) return;
134         if (CkInRestarting()) {
135           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
136         }
137         // only master init checkpoint
138         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
139
140         budPEs[0] = CkMyPe();
141         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
142         CmiAssert(budPEs[0] != budPEs[1]);
143         // inform checkPTMgr
144         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
145         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
146         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
147         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
148 }
149 #endif
150
151 // entry function invoked by checkpoint mgr asking for checkpoint data
152 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
153 #if CMK_MEM_CHECKPOINT
154   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
155 //char index[128];   thisIndexMax.sprint(index);
156 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
157   CkLocMgr *locMgr = thisArray->getLocMgr();
158   CmiAssert(myRec!=NULL);
159   int size;
160   {
161         PUP::sizer p;
162         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
163         size = p.size();
164   }
165   int packSize = size/sizeof(double) +1;
166   CkArrayCheckPTMessage *msg =
167                  new (packSize, 0) CkArrayCheckPTMessage;
168   msg->len = size;
169   msg->index =thisIndexMax;
170   msg->aid = thisArrayID;
171   msg->locMgr = locMgr->getGroupID();
172   msg->cp_flag = 1;
173   {
174         PUP::toMem p(msg->packData);
175         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
176   }
177
178   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
179   checkptMgr.recvData(msg, 2, budPEs);
180   delete m;
181 #endif
182 }
183
184 // checkpoint holder class - for memory checkpointing
185 class CkMemCheckPTInfo: public CkCheckPTInfo
186 {
187   CkArrayCheckPTMessage *ckBuffer;
188 public:
189   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
190                     CkCheckPTInfo(a, loc, idx, pno)
191   {
192     ckBuffer = NULL;
193   }
194   ~CkMemCheckPTInfo() 
195   {
196     if (ckBuffer) delete ckBuffer; 
197   }
198   inline void updateBuffer(CkArrayCheckPTMessage *data) 
199   {
200     CmiAssert(data!=NULL);
201     if (ckBuffer) delete ckBuffer;
202     ckBuffer = data;
203   }    
204   inline CkArrayCheckPTMessage * getCopy()
205   {
206     if (ckBuffer == NULL) {
207       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
208       CmiAbort("Abort!");
209     }
210     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
211   }     
212   inline void updateBuddy(int b1, int b2) {
213      CmiAssert(ckBuffer);
214      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
215      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
216      CmiAssert(pNo != CkMyPe());
217   }
218   inline int getSize() { 
219      CmiAssert(ckBuffer);
220      return ckBuffer->len; 
221   }
222 };
223
224 // checkpoint holder class - for in-disk checkpointing
225 class CkDiskCheckPTInfo: public CkCheckPTInfo 
226 {
227   char *fname;
228   int bud1, bud2;
229   int len;                      // checkpoint size
230 public:
231   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
232   {
233 #if CMK_USE_MKSTEMP
234     fname = new char[64];
235     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
236     mkstemp(fname);
237 #else
238     fname=tmpnam(NULL);
239 #endif
240     bud1 = bud2 = -1;
241     len = 0;
242   }
243   ~CkDiskCheckPTInfo() 
244   {
245     remove(fname);
246   }
247   inline void updateBuffer(CkArrayCheckPTMessage *data) 
248   {
249     double t = CmiWallTimer();
250     // unpack it
251     envelope *env = UsrToEnv(data);
252     CkUnpackMessage(&env);
253     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
254     FILE *f = fopen(fname,"wb");
255     PUP::toDisk p(f);
256     CkPupMessage(p, (void **)&data);
257     // delay sync to the end because otherwise the messages are blocked
258 //    fsync(fileno(f));
259     fclose(f);
260     bud1 = data->bud1;
261     bud2 = data->bud2;
262     len = data->len;
263     delete data;
264     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
265   }
266   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
267   {
268     CkArrayCheckPTMessage *data;
269     FILE *f = fopen(fname,"rb");
270     PUP::fromDisk p(f);
271     CkPupMessage(p, (void **)&data);
272     fclose(f);
273     data->bud1 = bud1;                          // update the buddies
274     data->bud2 = bud2;
275     return data;
276   }
277   inline void updateBuddy(int b1, int b2) {
278      bud1 = b1; bud2 = b2;
279      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
280      CmiAssert(pNo != CkMyPe());
281   }
282   inline int getSize() { 
283      return len; 
284   }
285 };
286
287 CkMemCheckPT::CkMemCheckPT(int w)
288 {
289   int numnodes = 0;
290 #if NODE_CHECKPOINT
291   numnodes = CmiNumPhysicalNodes();
292 #else
293   numnodes = CkNumPes();
294 #endif
295 #if CK_NO_PROC_POOL
296   if (numnodes <= 2)
297 #else
298   if (numnodes  == 1)
299 #endif
300   {
301     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
302     _memChkptOn = 0;
303   }
304   inRestarting = 0;
305   recvCount = peCount = 0;
306   ackCount = 0;
307   expectCount = -1;
308   where = w;
309
310 #if CMK_CONVERSE_MPI
311   void pingBuddy();
312   void pingCheckHandler();
313   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
314   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
315 #endif
316 }
317
318 CkMemCheckPT::~CkMemCheckPT()
319 {
320   int len = ckTable.length();
321   for (int i=0; i<len; i++) {
322     delete ckTable[i];
323   }
324 }
325
326 void CkMemCheckPT::pup(PUP::er& p) 
327
328   CBase_CkMemCheckPT::pup(p); 
329   p|cpStarter;
330   p|thisFailedPe;
331   p|failedPes;
332   p|ckCheckPTGroupID;           // recover global variable
333   p|cpCallback;                 // store callback
334   p|where;                      // where to checkpoint
335   p|peCount;
336   if (p.isUnpacking()) {
337     recvCount = 0;
338 #if CMK_CONVERSE_MPI
339     void pingBuddy();
340     void pingCheckHandler();
341     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
342     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
343 #endif
344   }
345 }
346
347 // called by checkpoint mgr to restore an array element
348 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
349 {
350 #if CMK_MEM_CHECKPOINT
351   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
352   // m->index.print();
353   PUP::fromMem p(m->packData);
354   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
355   CmiAssert(mgr);
356 #if STREAMING_INFORMHOME
357   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
358 #else
359   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
360 #endif
361
362   // find a list of array elements bound together
363   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
364   CmiAssert(elt);
365   CkLocRec_local *rec = elt->myRec;
366   CkVec<CkMigratable *> list;
367   mgr->migratableList(rec, list);
368   CmiAssert(list.length() > 0);
369   for (int l=0; l<list.length(); l++) {
370     elt = (ArrayElement *)list[l];
371     elt->budPEs[0] = m->bud1;
372     elt->budPEs[1] = m->bud2;
373     //    reset, may not needed now
374     // for now.
375     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
376       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
377       if (c) c->redNo = 0;
378     }
379   }
380 #endif
381 }
382
383 // return 1 if pe is a crashed processor
384 int CkMemCheckPT::isFailed(int pe)
385 {
386   for (int i=0; i<failedPes.length(); i++)
387     if (failedPes[i] == pe) return 1;
388   return 0;
389 }
390
391 // add pe into history list of all failed processors
392 void CkMemCheckPT::failed(int pe)
393 {
394   if (isFailed(pe)) return;
395   failedPes.push_back(pe);
396 }
397
398 int CkMemCheckPT::totalFailed()
399 {
400   return failedPes.length();
401 }
402
403 // create an checkpoint entry for array element of aid with index.
404 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
405 {
406   // error check, no duplicate
407   int idx, len = ckTable.size();
408   for (idx=0; idx<len; idx++) {
409     CkCheckPTInfo *entry = ckTable[idx];
410     if (index == entry->index) {
411       if (loc == entry->locMgr) {
412           // bindTo array elements
413           return;
414       }
415         // for array inheritance, the following check may fail
416         // because ArrayElement constructor of all superclasses are called
417       if (aid == entry->aid) {
418         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
419         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
420       }
421     }
422   }
423   CkCheckPTInfo *newEntry;
424   if (where == CkCheckPoint_inMEM)
425     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
426   else
427     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
428   ckTable.push_back(newEntry);
429   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
430 }
431
432 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
433 {
434   int buddy = msg->bud1;
435   if (buddy == CkMyPe()) buddy = msg->bud2;
436   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
437   recvData(msg);
438     // ack
439   thisProxy[buddy].gotData();
440 }
441
442 // loop through my checkpoint table and ask checkpointed array elements
443 // to send me checkpoint data.
444 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
445 {
446   checkpointed = 1;
447   cpCallback = cb;
448   cpStarter = starter;
449   if (CkMyPe() == cpStarter) {
450     startTime = CmiWallTimer();
451     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
452   }
453
454   int len = ckTable.length();
455   for (int i=0; i<len; i++) {
456     CkCheckPTInfo *entry = ckTable[i];
457       // always let the bigger number processor send request
458     //if (CkMyPe() < entry->pNo) continue;
459       // always let the smaller number processor send request, may on same proc
460     if (!isMaster(entry->pNo)) continue;
461       // call inmem_checkpoint to the array element, ask it to send
462       // back checkpoint data via recvData().
463     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
464     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
465   }
466     // if my table is empty, then I am done
467   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
468
469   // pack and send proc level data
470   sendProcData();
471 }
472
473 // don't handle array elements
474 static inline void _handleProcData(PUP::er &p)
475 {
476     // save readonlys, and callback BTW
477     CkPupROData(p);
478
479     // save mainchares 
480     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
481         
482 #ifndef CMK_CHARE_USE_PTR
483     // save non-migratable chare
484     CkPupChareData(p);
485 #endif
486
487     // save groups into Groups.dat
488     CkPupGroupData(p);
489
490     // save nodegroups into NodeGroups.dat
491     if(CkMyRank()==0) CkPupNodeGroupData(p);
492 }
493
494 void CkMemCheckPT::sendProcData()
495 {
496   // find out size of buffer
497   int size;
498   {
499     PUP::sizer p;
500     _handleProcData(p);
501     size = p.size();
502   }
503   int packSize = size;
504   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
505   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
506   {
507     PUP::toMem p(msg->packData);
508     _handleProcData(p);
509   }
510   msg->pe = CkMyPe();
511   msg->len = size;
512   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
513   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
514 }
515
516 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
517 {
518   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
519   CpvAccess(procChkptBuf) = msg;
520   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
521   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
522 }
523
524 // ArrayElement call this function to give us the checkpointed data
525 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
526 {
527   int len = ckTable.length();
528   int idx;
529   for (idx=0; idx<len; idx++) {
530     CkCheckPTInfo *entry = ckTable[idx];
531     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
532   }
533   CkAssert(idx < len);
534   int isChkpting = msg->cp_flag;
535   ckTable[idx]->updateBuffer(msg);
536   if (isChkpting) {
537       // all my array elements have returned their inmem data
538       // inform starter processor that I am done.
539     recvCount ++;
540     if (recvCount == ckTable.length()) {
541       if (where == CkCheckPoint_inMEM) {
542         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
543       }
544       else if (where == CkCheckPoint_inDISK) {
545         // another barrier for finalize the writing using fsync
546         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
547         contribute(0,NULL,CkReduction::sum_int,localcb);
548       }
549       else
550         CmiAbort("Unknown checkpoint scheme");
551       recvCount = 0;
552     } 
553   }
554 }
555
556 // only used in disk checkpointing
557 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
558 {
559   delete m;
560 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
561   system("sync");
562 #endif
563   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
564 }
565
566 // only is called on cpStarter when checkpoint is done
567 void CkMemCheckPT::cpFinish()
568 {
569   CmiAssert(CkMyPe() == cpStarter);
570   peCount++;
571     // now that all processors have finished, activate callback
572   if (peCount == 2) {
573     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
574     cpCallback.send();
575     peCount = 0;
576     thisProxy.report();
577   }
578 }
579
580 // for debugging, report checkpoint info
581 void CkMemCheckPT::report()
582 {
583   int objsize = 0;
584   int len = ckTable.length();
585   for (int i=0; i<len; i++) {
586     CkCheckPTInfo *entry = ckTable[i];
587     CmiAssert(entry);
588     objsize += entry->getSize();
589   }
590   CmiAssert(CpvAccess(procChkptBuf));
591   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
592 }
593
594 /*****************************************************************************
595                         RESTART Procedure
596 *****************************************************************************/
597
598 // master processor of two buddies
599 inline int CkMemCheckPT::isMaster(int buddype)
600 {
601 #if 0
602   int mype = CkMyPe();
603 //CkPrintf("ismaster: %d %d\n", pe, mype);
604   if (CkNumPes() - totalFailed() == 2) {
605     return mype > buddype;
606   }
607   for (int i=1; i<CkNumPes(); i++) {
608     int me = (buddype+i)%CkNumPes();
609     if (isFailed(me)) continue;
610     if (me == mype) return 1;
611     else return 0;
612   }
613   return 0;
614 #else
615     // smaller one
616   int mype = CkMyPe();
617 //CkPrintf("ismaster: %d %d\n", pe, mype);
618   if (CkNumPes() - totalFailed() == 2) {
619     return mype < buddype;
620   }
621   for (int i=1; i<CkNumPes(); i++) {
622     int me = (mype+i)%CkNumPes();
623     if (isFailed(me)) continue;
624     if (me == buddype) return 1;
625     else return 0;
626   }
627   return 0;
628 #endif
629 }
630
631 #ifdef CKLOCMGR_LOOP
632 #undef CKLOCMGR_LOOP
633 #endif
634
635 // loop over all CkLocMgr and do "code"
636 #define  CKLOCMGR_LOOP(code)    {       \
637   int numGroups = CkpvAccess(_groupIDTable)->size();    \
638   for(int i=0;i<numGroups;i++) {        \
639     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
640     if(obj->isLocMgr())  {      \
641       CkLocMgr *mgr = (CkLocMgr*)obj;   \
642       code      \
643     }   \
644   }     \
645  }
646
647 #if 0
648 // helper class to pup all elements that belong to same ckLocMgr
649 class ElementDestoryer : public CkLocIterator {
650 private:
651         CkLocMgr *locMgr;
652 public:
653         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
654         void addLocation(CkLocation &loc) {
655                 CkArrayIndex idx=loc.getIndex();
656                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
657                 loc.destroy();
658         }
659 };
660 #endif
661
662 // restore the bitmap vector for LB
663 void CkMemCheckPT::resetLB(int diepe)
664 {
665 #if CMK_LBDB_ON
666   int i;
667   char *bitmap = new char[CkNumPes()];
668   // set processor available bitmap
669   get_avail_vector(bitmap);
670
671   for (i=0; i<failedPes.length(); i++)
672     bitmap[failedPes[i]] = 0; 
673   bitmap[diepe] = 0;
674
675 #if CK_NO_PROC_POOL
676   set_avail_vector(bitmap);
677 #endif
678
679   // if I am the crashed pe, rebuild my failedPEs array
680   if (CkMyPe() == diepe)
681     for (i=0; i<CkNumPes(); i++) 
682       if (bitmap[i]==0) failed(i);
683
684   delete [] bitmap;
685 #endif
686 }
687
688 // in case when failedPe dies, everybody go through its checkpoint table:
689 // destory all array elements
690 // recover lost buddies
691 // reconstruct all array elements from check point data
692 // called on all processors
693 void CkMemCheckPT::restart(int diePe)
694 {
695 #if CMK_MEM_CHECKPOINT
696   double curTime = CmiWallTimer();
697   if (CkMyPe() == diePe)
698     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
699   stage = (char*)"resetLB";
700   startTime = curTime;
701   if (CkMyPe() == diePe)
702     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
703
704 #if CK_NO_PROC_POOL
705   failed(diePe);        // add into the list of failed pes
706 #endif
707   thisFailedPe = diePe;
708
709   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
710
711   inRestarting = 1;
712                                                                                 
713   // disable load balancer's barrier
714   if (CkMyPe() != diePe) resetLB(diePe);
715
716   CKLOCMGR_LOOP(mgr->startInserting(););
717
718   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
719 /*
720   if (CkMyPe() == 0)
721     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
722 */
723 #endif
724 }
725
726 // loally remove all array elements
727 void CkMemCheckPT::removeArrayElements()
728 {
729 #if CMK_MEM_CHECKPOINT
730   int len = ckTable.length();
731   double curTime = CmiWallTimer();
732   if (CkMyPe() == thisFailedPe) 
733     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
734   stage = (char*)"removeArrayElements";
735   startTime = curTime;
736
737   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
738   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
739
740   // get rid of all buffering and remote recs
741   // including destorying all array elements
742   CKLOCMGR_LOOP(mgr->flushAllRecs(););
743
744 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
745
746   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
747 #endif
748 }
749
750 // flush state in reduction manager
751 void CkMemCheckPT::resetReductionMgr()
752 {
753   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
754   int numGroups = CkpvAccess(_groupIDTable)->size();
755   for(int i=0;i<numGroups;i++) {
756     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
757     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
758     obj->flushStates();
759     obj->ckJustMigrated();
760   }
761   // reset again
762   //CpvAccess(_qd)->flushStates();
763
764 #if 1
765   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
766 #else
767   if (CkMyPe() == 0)
768     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
769 #endif
770 }
771
772 // recover the lost buddies
773 void CkMemCheckPT::recoverBuddies()
774 {
775   int idx;
776   int len = ckTable.length();
777   // ready to flush reduction manager
778   // cannot be CkMemCheckPT::restart because destory will modify states
779   double curTime = CmiWallTimer();
780   if (CkMyPe() == thisFailedPe)
781   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
782   stage = (char *)"recoverBuddies";
783   if (CkMyPe() == thisFailedPe)
784   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
785   startTime = curTime;
786
787   // recover buddies
788   expectCount = 0;
789   for (idx=0; idx<len; idx++) {
790     CkCheckPTInfo *entry = ckTable[idx];
791     if (entry->pNo == thisFailedPe) {
792 #if CK_NO_PROC_POOL
793       // find a new buddy
794 /*
795       int budPe = CkMyPe();
796 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
797       while (budPe == CkMyPe() || isFailed(budPe)) 
798           budPe = (budPe+1)%CkNumPes();
799 */
800       int budPe = BuddyPE(CkMyPe());
801       //entry->pNo = budPe;
802 #else
803       int budPe = thisFailedPe;
804 #endif
805       entry->updateBuddy(CkMyPe(), budPe);
806 #if 0
807       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
808       CkArrayCheckPTMessage *msg = entry->getCopy();
809       msg->cp_flag = 0;            // not checkpointing
810       thisProxy[budPe].recvData(msg);
811 #else
812       CkArrayCheckPTMessage *msg = entry->getCopy();
813       msg->bud1 = budPe;
814       msg->bud2 = CkMyPe();
815       msg->cp_flag = 0;            // not checkpointing
816       thisProxy[budPe].recoverEntry(msg);
817 #endif
818       expectCount ++;
819     }
820   }
821
822 #if 1
823   if (expectCount == 0) {
824     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
825   }
826 #else
827   if (CkMyPe() == 0) {
828     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
829   }
830 #endif
831
832   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
833 }
834
835 void CkMemCheckPT::gotData()
836 {
837   ackCount ++;
838   if (ackCount == expectCount) {
839     ackCount = 0;
840     expectCount = -1;
841     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
842   }
843 }
844
845 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
846 {
847   for (int i=0; i<n; i++) {
848     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
849     mgr->updateLocation(idx[i], nowOnPe);
850   }
851 }
852
853 // restore array elements
854 void CkMemCheckPT::recoverArrayElements()
855 {
856   double curTime = CmiWallTimer();
857   int len = ckTable.length();
858   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
859   stage = (char *)"recoverArrayElements";
860   if (CkMyPe() == thisFailedPe)
861   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
862   startTime = curTime;
863
864   // recover all array elements
865   int count = 0;
866 #if STREAMING_INFORMHOME
867   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
868   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
869 #endif
870   for (int idx=0; idx<len; idx++)
871   {
872     CkCheckPTInfo *entry = ckTable[idx];
873 #if CK_NO_PROC_POOL
874     // the bigger one will do 
875 //    if (CkMyPe() < entry->pNo) continue;
876     if (!isMaster(entry->pNo)) continue;
877 #else
878     // smaller one do it, which has the original object
879     if (CkMyPe() == entry->pNo+1 || 
880         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
881 #endif
882 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
883
884     entry->updateBuddy(CkMyPe(), entry->pNo);
885     CkArrayCheckPTMessage *msg = entry->getCopy();
886     // gzheng
887     //thisProxy[CkMyPe()].inmem_restore(msg);
888     inmem_restore(msg);
889 #if STREAMING_INFORMHOME
890     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
891     int homePe = mgr->homePe(msg->index);
892     if (homePe != CkMyPe()) {
893       gmap[homePe].push_back(msg->locMgr);
894       imap[homePe].push_back(msg->index);
895     }
896 #endif
897     count ++;
898   }
899 #if STREAMING_INFORMHOME
900   for (int i=0; i<CkNumPes(); i++) {
901     if (gmap[i].size() && i!=CkMyPe()) {
902       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
903     }
904   }
905   delete [] imap;
906   delete [] gmap;
907 #endif
908   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
909
910   CKLOCMGR_LOOP(mgr->doneInserting(););
911
912   inRestarting = 0;
913   // _crashedNode = -1;
914   CpvAccess(_crashedNode) = -1;
915
916   if (CkMyPe() == 0)
917     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
918 }
919
920 static double restartT;
921
922 // on every processor
923 // turn load balancer back on
924 void CkMemCheckPT::finishUp()
925 {
926   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
927   //CKLOCMGR_LOOP(mgr->doneInserting(););
928   
929   if (CkMyPe() == thisFailedPe)
930   {
931        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
932        //CkStartQD(cpCallback);
933        cpCallback.send();
934        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
935   }
936
937 #if CK_NO_PROC_POOL
938 #if NODE_CHECKPOINT
939   int numnodes = CmiNumPhysicalNodes();
940 #else
941   int numnodes = CkNumPes();
942 #endif
943   if (numnodes-totalFailed() <=2) {
944     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
945     _memChkptOn = 0;
946   }
947 #endif
948 }
949
950 // called only on 0
951 void CkMemCheckPT::quiescence(CkCallback &cb)
952 {
953   static int pe_count = 0;
954   pe_count ++;
955   CmiAssert(CkMyPe() == 0);
956   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
957   if (pe_count == CkNumPes()) {
958     pe_count = 0;
959     cb.send();
960   }
961 }
962
963 // User callable function - to start a checkpoint
964 // callback cb is used to pass control back
965 void CkStartMemCheckpoint(CkCallback &cb)
966 {
967 #if CMK_MEM_CHECKPOINT
968   if (_memChkptOn == 0) {
969     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
970     cb.send();
971     return;
972   }
973   if (CkInRestarting()) {
974       // trying to checkpointing during restart
975     cb.send();
976     return;
977   }
978     // store user callback and user data
979   CkMemCheckPT::cpCallback = cb;
980
981     // broadcast to start check pointing
982   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
983   checkptMgr.doItNow(CkMyPe(), cb);
984 #else
985   // when mem checkpoint is disabled, invike cb immediately
986   cb.send();
987 #endif
988 }
989
990 void CkRestartCheckPoint(int diePe)
991 {
992 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
993   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
994   // broadcast
995   checkptMgr.restart(diePe);
996 }
997
998 static int _diePE = -1;
999
1000 // callback function used locally by ccs handler
1001 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1002 {
1003 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1004   CkRestartCheckPoint(_diePE);
1005 }
1006
1007 // Converse function handles
1008 static int askPhaseHandlerIdx;
1009 static int recvPhaseHandlerIdx;
1010 static int askProcDataHandlerIdx;
1011 static int restartBcastHandlerIdx;
1012 static int recoverProcDataHandlerIdx;
1013 static int restartBeginHandlerIdx;
1014 static int notifyHandlerIdx;
1015
1016 // called on crashed PE
1017 static void restartBeginHandler(char *msg)
1018 {
1019 #if CMK_MEM_CHECKPOINT
1020   static int count = 0;
1021   CmiFree(msg);
1022   CmiAssert(CkMyPe() == _diePE);
1023   count ++;
1024   if (count == CkNumPes()) {
1025     CkRestartCheckPointCallback(NULL, NULL);
1026     count = 0;
1027   }
1028 #endif
1029 }
1030
1031 extern void _discard_charm_message();
1032 extern void _resume_charm_message();
1033
1034 static void restartBcastHandler(char *msg)
1035 {
1036 #if CMK_MEM_CHECKPOINT
1037   // advance phase counter
1038   CkMemCheckPT::inRestarting = 1;
1039   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1040   // gzheng
1041   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1042
1043   if (CkMyPe()==_diePE)
1044     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1045
1046   // reset QD counters
1047 /*  gzheng
1048   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1049 */
1050
1051 /*  gzheng
1052   if (CkMyPe()==_diePE)
1053       CkRestartCheckPointCallback(NULL, NULL);
1054 */
1055   CmiFree(msg);
1056
1057   _resume_charm_message();
1058
1059     // reduction
1060   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1061   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1062   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1063
1064   checkpointed = 0;
1065 #endif
1066 }
1067
1068 extern void _initDone();
1069
1070 // called on crashed processor
1071 static void recoverProcDataHandler(char *msg)
1072 {
1073 #if CMK_MEM_CHECKPOINT
1074    int i;
1075    envelope *env = (envelope *)msg;
1076    CkUnpackMessage(&env);
1077    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1078    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1079    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1080    //cur_restart_phase ++;
1081      // gzheng ?
1082    //CpvAccess(_qd)->flushStates();
1083
1084    // restore readonly, mainchare, group, nodegroup
1085 //   int temp = cur_restart_phase;
1086 //   cur_restart_phase = -1;
1087    PUP::fromMem p(procMsg->packData);
1088    _handleProcData(p);
1089
1090    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1091    // gzheng
1092    CKLOCMGR_LOOP(mgr->startInserting(););
1093
1094    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1095    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1096    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1097    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1098
1099    _initDone();
1100 //   CpvAccess(_qd)->flushStates();
1101    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1102 #endif
1103 }
1104
1105 // called on its backup processor
1106 // get backup message buffer and sent to crashed processor
1107 static void askProcDataHandler(char *msg)
1108 {
1109 #if CMK_MEM_CHECKPOINT
1110     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1111     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1112     if (CpvAccess(procChkptBuf) == NULL) 
1113       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1114     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1115     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1116     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1117
1118     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1119
1120     CkPackMessage(&env);
1121     CmiSetHandler(env, recoverProcDataHandlerIdx);
1122     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1123     CpvAccess(procChkptBuf) = NULL;
1124 #endif
1125 }
1126
1127 // called on PE 0
1128 void qd_callback(void *m)
1129 {
1130    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1131    CkFreeMsg(m);
1132 #ifdef CMK_SMP
1133    for(int i=0;i<CmiMyNodeSize();i++){
1134    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1135    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1136         CmiSetHandler(msg, askProcDataHandlerIdx);
1137         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1138         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1139    }
1140    return;
1141 #endif
1142    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1143    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1144    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1145    CmiSetHandler(msg, askProcDataHandlerIdx);
1146    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1147    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1148
1149 }
1150
1151 // on crashed node
1152 void CkMemRestart(const char *dummy, CkArgMsg *args)
1153 {
1154 #if CMK_MEM_CHECKPOINT
1155    _diePE = CmiMyNode();
1156    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1157    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1158    CkMemCheckPT::inRestarting = 1;
1159
1160   CpvAccess( _crashedNode )= CmiMyNode();
1161         
1162   _discard_charm_message();
1163   
1164   if(CmiMyRank()==0){
1165     CkCallback cb(qd_callback);
1166     CkStartQD(cb);
1167     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1168   }
1169 #else
1170    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1171 #endif
1172 }
1173
1174 // can be called in other files
1175 // return true if it is in restarting
1176 extern "C"
1177 int CkInRestarting()
1178 {
1179 #if CMK_MEM_CHECKPOINT
1180   if (CpvAccess( _crashedNode)!=-1) return 1;
1181   // gzheng
1182   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1183   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1184   return CkMemCheckPT::inRestarting;
1185 #else
1186   return 0;
1187 #endif
1188 }
1189
1190 /*****************************************************************************
1191                 module initialization
1192 *****************************************************************************/
1193
1194 static int arg_where = CkCheckPoint_inMEM;
1195
1196 #if CMK_MEM_CHECKPOINT
1197 void init_memcheckpt(char **argv)
1198 {
1199     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1200       arg_where = CkCheckPoint_inDISK;
1201     }
1202
1203         // initiliazing _crashedNode variable
1204         CpvInitialize(int, _crashedNode);
1205         CpvAccess(_crashedNode) = -1;
1206
1207 }
1208 #endif
1209
1210 class CkMemCheckPTInit: public Chare {
1211 public:
1212   CkMemCheckPTInit(CkArgMsg *m) {
1213 #if CMK_MEM_CHECKPOINT
1214     if (arg_where == CkCheckPoint_inDISK) {
1215       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1216     }
1217     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1218     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1219 #endif
1220   }
1221 };
1222
1223 static void notifyHandler(char *msg)
1224 {
1225 #if CMK_MEM_CHECKPOINT
1226   CmiFree(msg);
1227       /* immediately increase restart phase to filter old messages */
1228   CpvAccess(_curRestartPhase) ++;
1229   CpvAccess(_qd)->flushStates();
1230   _discard_charm_message();
1231 #endif
1232 }
1233
1234 extern "C"
1235 void notify_crash(int node)
1236 {
1237 #ifdef CMK_MEM_CHECKPOINT
1238   CpvAccess( _crashedNode) = node;
1239 #ifdef CMK_SMP
1240   for(int i=0;i<CkMyNodeSize();i++){
1241         CpvAccessOther(_crashedNode,i)=node;
1242   }
1243 #endif
1244   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1245   CkMemCheckPT::inRestarting = 1;
1246
1247 /*
1248 #ifdef CMK_SMP
1249 */
1250   int pe = CmiNodeFirst(CkMyNode());
1251   for(int i=0;i<CkMyNodeSize();i++){
1252         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1253         CmiSetHandler(msg, notifyHandlerIdx);
1254         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1255   }
1256 /*
1257  return;
1258 #else 
1259     // this may be in interrupt handler, send a message to reset QD
1260   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1261   CmiSetHandler(msg, notifyHandlerIdx);
1262   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1263 #endif
1264 */
1265 #endif
1266 }
1267
1268 extern "C" void (*notify_crash_fn)(int node);
1269
1270 #if CMK_CONVERSE_MPI
1271 static int pingHandlerIdx;
1272 static int pingCheckHandlerIdx;
1273 static int buddyDieHandlerIdx;
1274 static double lastPingTime = -1;
1275
1276 extern "C" void mpi_restart_crashed(int pe, int rank);
1277 extern "C" int  find_spare_mpirank(int pe);
1278
1279 void pingBuddy();
1280 void pingCheckHandler();
1281
1282 void buddyDieHandler(char *msg)
1283 {
1284 #if CMK_MEM_CHECKPOINT
1285    // notify
1286    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1287    notify_crash(diepe);
1288    // send message to crash pe to let it restart
1289    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1290    int newrank = find_spare_mpirank(diepe);
1291    int buddy = obj->BuddyPE(CmiMyPe());
1292    if (buddy == diepe)  {
1293      mpi_restart_crashed(diepe, newrank);
1294      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1295    }
1296 #endif
1297 }
1298
1299 void pingHandler(void *msg)
1300 {
1301   lastPingTime = CmiWallTimer();
1302   CmiFree(msg);
1303 }
1304
1305 void pingCheckHandler()
1306 {
1307 #if CMK_MEM_CHECKPOINT
1308   double now = CmiWallTimer();
1309   if (lastPingTime > 0 && now - lastPingTime > 4) {
1310     int i, pe, buddy;
1311     // tell everyone the buddy dies
1312     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1313     for (i = 1; i < CmiNumPes(); i++) {
1314        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1315        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1316     }
1317     buddy = pe;
1318     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1319     for (int pe = 0; pe < CmiNumPes(); pe++) {
1320       if (obj->isFailed(pe) || pe == buddy) continue;
1321       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1322       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1323       CmiSetHandler(msg, buddyDieHandlerIdx);
1324       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1325     }
1326   }
1327   else 
1328     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1329 #endif
1330 }
1331
1332 void pingBuddy()
1333 {
1334 #if CMK_MEM_CHECKPOINT
1335   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1336   if (obj) {
1337     int buddy = obj->BuddyPE(CkMyPe());
1338 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1339     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1340     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1341     CmiSetHandler(msg, pingHandlerIdx);
1342     CmiGetRestartPhase(msg) = 9999;
1343     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1344   }
1345   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1346 #endif
1347 }
1348 #endif
1349
1350 // initproc
1351 void CkRegisterRestartHandler( )
1352 {
1353 #if CMK_MEM_CHECKPOINT
1354   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1355   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1356   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1357   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1358   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1359
1360 #if CMK_CONVERSE_MPI
1361   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1362   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1363   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1364 #endif
1365
1366   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1367   CpvAccess(procChkptBuf) = NULL;
1368
1369   notify_crash_fn = notify_crash;
1370
1371 #if ! CMK_CONVERSE_MPI
1372   // print pid to kill
1373   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1374 //  sleep(4);
1375 #endif
1376 #endif
1377 }
1378
1379
1380 extern "C"
1381 int CkHasCheckpoints()
1382 {
1383   return checkpointed;
1384 }
1385
1386 /// @todo: the following definitions should be moved to a separate file containing
1387 // structures and functions about fault tolerance strategies
1388
1389 /**
1390  *  * @brief: function for killing a process                                             
1391  *   */
1392 #ifdef CMK_MEM_CHECKPOINT
1393 #if CMK_HAS_GETPID
1394 void killLocal(void *_dummy,double curWallTime){
1395         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1396         if(CmiWallTimer()<killTime-1){
1397                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1398         }else{  
1399                 kill(getpid(),SIGKILL);                                               
1400         }              
1401
1402 #else
1403 void killLocal(void *_dummy,double curWallTime){
1404   CmiAbort("kill() not supported!");
1405 }
1406 #endif
1407 #endif
1408
1409 #ifdef CMK_MEM_CHECKPOINT
1410 /**
1411  * @brief: reads the file with the kill information
1412  */
1413 void readKillFile(){
1414         FILE *fp=fopen(killFile,"r");
1415         if(!fp){
1416                 return;
1417         }
1418         int proc;
1419         double sec;
1420         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1421                 if(proc == CkMyNode() && CkMyRank() == 0){
1422                         killTime = CmiWallTimer()+sec;
1423                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1424                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1425                 }
1426         }
1427         fclose(fp);
1428 }
1429
1430 #if ! CMK_CONVERSE_MPI
1431 void CkDieNow()
1432 {
1433          // ignored for non-mpi version
1434         CmiPrintf("[%d] die now.\n", CmiMyPe());
1435         killTime = CmiWallTimer()+0.001;
1436         CcdCallFnAfter(killLocal,NULL,1);
1437 }
1438 #endif
1439
1440 #endif
1441
1442 #include "CkMemCheckpoint.def.h"
1443
1444