fix the bug incurred by previous commit
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF       CkPrintf
55 #define DEBUGF noopck
56
57 // pick buddy processor from a different physical node
58 #define NODE_CHECKPOINT                        0
59
60 // assume NO extra processors--1
61 // assume extra processors--0
62 #if CMK_CONVERSE_MPI
63 #define CK_NO_PROC_POOL                         0
64 #else
65 #define CK_NO_PROC_POOL                         0
66 #endif
67
68 #define STREAMING_INFORMHOME                    1
69 CpvDeclare(int, _crashedNode);
70
71 // static, so that it is accessible from Converse part
72 int CkMemCheckPT::inRestarting = 0;
73 double CkMemCheckPT::startTime;
74 char *CkMemCheckPT::stage;
75 CkCallback CkMemCheckPT::cpCallback;
76
77 int _memChkptOn = 1;                    // checkpoint is on or off
78
79 CkGroupID ckCheckPTGroupID;             // readonly
80
81 static int checkpointed = 0;
82
83 /// @todo the following declarations should be moved into a separate file for all 
84 // fault tolerant strategies
85
86 #ifdef CMK_MEM_CHECKPOINT
87 // name of the kill file that contains processes to be killed 
88 char *killFile;                                               
89 // flag for the kill file         
90 int killFlag=0;
91 // variable for storing the killing time
92 double killTime=0.0;
93 #endif
94
95 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
96 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
97
98 // compute the backup processor
99 // FIXME: avoid crashed processors
100 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
101
102 inline int CkMemCheckPT::BuddyPE(int pe)
103 {
104   int budpe;
105 #if NODE_CHECKPOINT
106     // buddy is the processor with same rank on the next physical node
107   int r1 = CmiPhysicalRank(pe);
108   int budnode = CmiPhysicalNodeID(pe);
109   do {
110     budnode = (budnode+1)%CmiNumPhysicalNodes();
111     int *pelist;
112     int num;
113     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
114     budpe = pelist[r1 % num];
115   } while (isFailed(budpe));
116   if (budpe == pe) {
117     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
118     CmiAbort("Failed to find a buddy processor");
119   }
120 #else
121   budpe = pe;
122   while (budpe == pe || isFailed(budpe)) 
123           budpe = (budpe+1)%CkNumPes();
124 #endif
125   return budpe;
126 }
127
128 // called in array element constructor
129 // choose and register with 2 buddies for checkpoiting 
130 #if CMK_MEM_CHECKPOINT
131 void ArrayElement::init_checkpt() {
132         if (_memChkptOn == 0) return;
133         if (CkInRestarting()) {
134           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
135         }
136         // only master init checkpoint
137         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
138
139         budPEs[0] = CkMyPe();
140         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
141         CmiAssert(budPEs[0] != budPEs[1]);
142         // inform checkPTMgr
143         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
144         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
145         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
146         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
147 }
148 #endif
149
150 // entry function invoked by checkpoint mgr asking for checkpoint data
151 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
152 #if CMK_MEM_CHECKPOINT
153 //  DEBUGF("[p%d] HERE checkpoint to PE %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
154 //char index[128];   thisIndexMax.sprint(index);
155 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
156   CkLocMgr *locMgr = thisArray->getLocMgr();
157   CmiAssert(myRec!=NULL);
158   int size;
159   {
160         PUP::sizer p;
161         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
162         size = p.size();
163   }
164   int packSize = size/sizeof(double) +1;
165   CkArrayCheckPTMessage *msg =
166                  new (packSize, 0) CkArrayCheckPTMessage;
167   msg->len = size;
168   msg->index =thisIndexMax;
169   msg->aid = thisArrayID;
170   msg->locMgr = locMgr->getGroupID();
171   msg->cp_flag = 1;
172   {
173         PUP::toMem p(msg->packData);
174         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
175   }
176
177   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
178   checkptMgr.recvData(msg, 2, budPEs);
179   delete m;
180 #endif
181 }
182
183 // checkpoint holder class - for memory checkpointing
184 class CkMemCheckPTInfo: public CkCheckPTInfo
185 {
186   CkArrayCheckPTMessage *ckBuffer;
187 public:
188   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
189                     CkCheckPTInfo(a, loc, idx, pno)
190   {
191     ckBuffer = NULL;
192   }
193   ~CkMemCheckPTInfo() 
194   {
195     if (ckBuffer) delete ckBuffer; 
196   }
197   inline void updateBuffer(CkArrayCheckPTMessage *data) 
198   {
199     CmiAssert(data!=NULL);
200     if (ckBuffer) delete ckBuffer;
201     ckBuffer = data;
202   }    
203   inline CkArrayCheckPTMessage * getCopy()
204   {
205     if (ckBuffer == NULL) {
206       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
207       CmiAbort("Abort!");
208     }
209     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
210   }     
211   inline void updateBuddy(int b1, int b2) {
212      CmiAssert(ckBuffer);
213      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
214      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
215      CmiAssert(pNo != CkMyPe());
216   }
217   inline int getSize() { 
218      CmiAssert(ckBuffer);
219      return ckBuffer->len; 
220   }
221 };
222
223 // checkpoint holder class - for in-disk checkpointing
224 class CkDiskCheckPTInfo: public CkCheckPTInfo 
225 {
226   char *fname;
227   int bud1, bud2;
228   int len;                      // checkpoint size
229 public:
230   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
231   {
232 #if CMK_USE_MKSTEMP
233     fname = new char[64];
234     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
235     mkstemp(fname);
236 #else
237     fname=tmpnam(NULL);
238 #endif
239     bud1 = bud2 = -1;
240     len = 0;
241   }
242   ~CkDiskCheckPTInfo() 
243   {
244     remove(fname);
245   }
246   inline void updateBuffer(CkArrayCheckPTMessage *data) 
247   {
248     double t = CmiWallTimer();
249     // unpack it
250     envelope *env = UsrToEnv(data);
251     CkUnpackMessage(&env);
252     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
253     FILE *f = fopen(fname,"wb");
254     PUP::toDisk p(f);
255     CkPupMessage(p, (void **)&data);
256     // delay sync to the end because otherwise the messages are blocked
257 //    fsync(fileno(f));
258     fclose(f);
259     bud1 = data->bud1;
260     bud2 = data->bud2;
261     len = data->len;
262     delete data;
263     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
264   }
265   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
266   {
267     CkArrayCheckPTMessage *data;
268     FILE *f = fopen(fname,"rb");
269     PUP::fromDisk p(f);
270     CkPupMessage(p, (void **)&data);
271     fclose(f);
272     data->bud1 = bud1;                          // update the buddies
273     data->bud2 = bud2;
274     return data;
275   }
276   inline void updateBuddy(int b1, int b2) {
277      bud1 = b1; bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      return len; 
283   }
284 };
285
286 CkMemCheckPT::CkMemCheckPT(int w)
287 {
288   int numnodes = 0;
289 #if NODE_CHECKPOINT
290   numnodes = CmiNumPhysicalNodes();
291 #else
292   numnodes = CkNumPes();
293 #endif
294 #if CK_NO_PROC_POOL
295   if (numnodes <= 2)
296 #else
297   if (numnodes  == 1)
298 #endif
299   {
300     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
301     _memChkptOn = 0;
302   }
303   inRestarting = 0;
304   recvCount = peCount = 0;
305   ackCount = 0;
306   expectCount = -1;
307   where = w;
308
309 #if CMK_CONVERSE_MPI
310   void pingBuddy();
311   void pingCheckHandler();
312   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
313   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
314 #endif
315 }
316
317 CkMemCheckPT::~CkMemCheckPT()
318 {
319   int len = ckTable.length();
320   for (int i=0; i<len; i++) {
321     delete ckTable[i];
322   }
323 }
324
325 void CkMemCheckPT::pup(PUP::er& p) 
326
327   CBase_CkMemCheckPT::pup(p); 
328   p|cpStarter;
329   p|thisFailedPe;
330   p|failedPes;
331   p|ckCheckPTGroupID;           // recover global variable
332   p|cpCallback;                 // store callback
333   p|where;                      // where to checkpoint
334   p|peCount;
335   if (p.isUnpacking()) {
336     recvCount = 0;
337 #if CMK_CONVERSE_MPI
338     void pingBuddy();
339     void pingCheckHandler();
340     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
341     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
342 #endif
343   }
344 }
345
346 // called by checkpoint mgr to restore an array element
347 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
348 {
349 #if CMK_MEM_CHECKPOINT
350   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
351   // m->index.print();
352   PUP::fromMem p(m->packData);
353   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
354   CmiAssert(mgr);
355 #if STREAMING_INFORMHOME
356   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
357 #else
358   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
359 #endif
360
361   // find a list of array elements bound together
362   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
363   CmiAssert(elt);
364   CkLocRec_local *rec = elt->myRec;
365   CkVec<CkMigratable *> list;
366   mgr->migratableList(rec, list);
367   CmiAssert(list.length() > 0);
368   for (int l=0; l<list.length(); l++) {
369     elt = (ArrayElement *)list[l];
370     elt->budPEs[0] = m->bud1;
371     elt->budPEs[1] = m->bud2;
372     //    reset, may not needed now
373     // for now.
374     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
375       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
376       if (c) c->redNo = 0;
377     }
378   }
379 #endif
380 }
381
382 // return 1 if pe is a crashed processor
383 int CkMemCheckPT::isFailed(int pe)
384 {
385   for (int i=0; i<failedPes.length(); i++)
386     if (failedPes[i] == pe) return 1;
387   return 0;
388 }
389
390 // add pe into history list of all failed processors
391 void CkMemCheckPT::failed(int pe)
392 {
393   if (isFailed(pe)) return;
394   failedPes.push_back(pe);
395 }
396
397 int CkMemCheckPT::totalFailed()
398 {
399   return failedPes.length();
400 }
401
402 // create an checkpoint entry for array element of aid with index.
403 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
404 {
405   // error check, no duplicate
406   int idx, len = ckTable.size();
407   for (idx=0; idx<len; idx++) {
408     CkCheckPTInfo *entry = ckTable[idx];
409     if (index == entry->index) {
410       if (loc == entry->locMgr) {
411           // bindTo array elements
412           return;
413       }
414         // for array inheritance, the following check may fail
415         // because ArrayElement constructor of all superclasses are called
416       if (aid == entry->aid) {
417         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
418         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
419       }
420     }
421   }
422   CkCheckPTInfo *newEntry;
423   if (where == CkCheckPoint_inMEM)
424     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
425   else
426     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
427   ckTable.push_back(newEntry);
428   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
429 }
430
431 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
432 {
433   int buddy = msg->bud1;
434   if (buddy == CkMyPe()) buddy = msg->bud2;
435   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
436   recvData(msg);
437     // ack
438   thisProxy[buddy].gotData();
439 }
440
441 // loop through my checkpoint table and ask checkpointed array elements
442 // to send me checkpoint data.
443 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
444 {
445   checkpointed = 1;
446   cpCallback = cb;
447   cpStarter = starter;
448   if (CkMyPe() == cpStarter) {
449     startTime = CmiWallTimer();
450     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
451   }
452
453   int len = ckTable.length();
454   for (int i=0; i<len; i++) {
455     CkCheckPTInfo *entry = ckTable[i];
456       // always let the bigger number processor send request
457     //if (CkMyPe() < entry->pNo) continue;
458       // always let the smaller number processor send request, may on same proc
459     if (!isMaster(entry->pNo)) continue;
460       // call inmem_checkpoint to the array element, ask it to send
461       // back checkpoint data via recvData().
462     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
463     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
464   }
465     // if my table is empty, then I am done
466   if (len == 0) contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
467
468   // pack and send proc level data
469   sendProcData();
470 }
471
472 // don't handle array elements
473 static inline void _handleProcData(PUP::er &p)
474 {
475     // save readonlys, and callback BTW
476     CkPupROData(p);
477
478     // save mainchares 
479     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
480         
481 #ifndef CMK_CHARE_USE_PTR
482     // save non-migratable chare
483     CkPupChareData(p);
484 #endif
485
486     // save groups into Groups.dat
487     CkPupGroupData(p);
488
489     // save nodegroups into NodeGroups.dat
490     if(CkMyRank()==0) CkPupNodeGroupData(p);
491 }
492
493 void CkMemCheckPT::sendProcData()
494 {
495   // find out size of buffer
496   int size;
497   {
498     PUP::sizer p;
499     _handleProcData(p);
500     size = p.size();
501   }
502   int packSize = size;
503   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
504   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d to %d\n", CkMyPe(), size, ChkptOnPe(CkMyPe()));
505   {
506     PUP::toMem p(msg->packData);
507     _handleProcData(p);
508   }
509   msg->pe = CkMyPe();
510   msg->len = size;
511   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
512   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
513 }
514
515 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
516 {
517   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
518   CpvAccess(procChkptBuf) = msg;
519   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
520   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[msg->reportPe]));
521 }
522
523 // ArrayElement call this function to give us the checkpointed data
524 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
525 {
526   int len = ckTable.length();
527   int idx;
528   for (idx=0; idx<len; idx++) {
529     CkCheckPTInfo *entry = ckTable[idx];
530     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
531   }
532   CkAssert(idx < len);
533   int isChkpting = msg->cp_flag;
534   ckTable[idx]->updateBuffer(msg);
535   if (isChkpting) {
536       // all my array elements have returned their inmem data
537       // inform starter processor that I am done.
538     recvCount ++;
539     if (recvCount == ckTable.length()) {
540       if (where == CkCheckPoint_inMEM) {
541         contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
542       }
543       else if (where == CkCheckPoint_inDISK) {
544         // another barrier for finalize the writing using fsync
545         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
546         contribute(0,NULL,CkReduction::sum_int,localcb);
547       }
548       else
549         CmiAbort("Unknown checkpoint scheme");
550       recvCount = 0;
551     } 
552   }
553 }
554
555 // only used in disk checkpointing
556 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
557 {
558   delete m;
559 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
560   system("sync");
561 #endif
562   contribute(CkCallback(CkReductionTarget(CkMemCheckPT, cpFinish), thisProxy[cpStarter]));
563 }
564
565 // only is called on cpStarter when checkpoint is done
566 void CkMemCheckPT::cpFinish()
567 {
568   CmiAssert(CkMyPe() == cpStarter);
569   peCount++;
570     // now that all processors have finished, activate callback
571   if (peCount == 2) {
572     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
573     cpCallback.send();
574     peCount = 0;
575     thisProxy.report();
576   }
577 }
578
579 // for debugging, report checkpoint info
580 void CkMemCheckPT::report()
581 {
582   int objsize = 0;
583   int len = ckTable.length();
584   for (int i=0; i<len; i++) {
585     CkCheckPTInfo *entry = ckTable[i];
586     CmiAssert(entry);
587     objsize += entry->getSize();
588   }
589   CmiAssert(CpvAccess(procChkptBuf));
590 //  CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
591 }
592
593 /*****************************************************************************
594                         RESTART Procedure
595 *****************************************************************************/
596
597 // master processor of two buddies
598 inline int CkMemCheckPT::isMaster(int buddype)
599 {
600 #if 0
601   int mype = CkMyPe();
602 //CkPrintf("ismaster: %d %d\n", pe, mype);
603   if (CkNumPes() - totalFailed() == 2) {
604     return mype > buddype;
605   }
606   for (int i=1; i<CkNumPes(); i++) {
607     int me = (buddype+i)%CkNumPes();
608     if (isFailed(me)) continue;
609     if (me == mype) return 1;
610     else return 0;
611   }
612   return 0;
613 #else
614     // smaller one
615   int mype = CkMyPe();
616 //CkPrintf("ismaster: %d %d\n", pe, mype);
617   if (CkNumPes() - totalFailed() == 2) {
618     return mype < buddype;
619   }
620 #if NODE_CHECKPOINT
621   int pe_per_node = CmiNumPesOnPhysicalNode(CmiPhysicalNodeID(mype));
622   for (int i=pe_per_node; i<CkNumPes(); i+=pe_per_node) {
623 #else
624   for (int i=1; i<CkNumPes(); i++) {
625 #endif
626     int me = (mype+i)%CkNumPes();
627     if (isFailed(me)) continue;
628     if (me == buddype) return 1;
629     else return 0;
630   }
631   return 0;
632 #endif
633 }
634
635 #ifdef CKLOCMGR_LOOP
636 #undef CKLOCMGR_LOOP
637 #endif
638
639 // loop over all CkLocMgr and do "code"
640 #define  CKLOCMGR_LOOP(code)    {       \
641   int numGroups = CkpvAccess(_groupIDTable)->size();    \
642   for(int i=0;i<numGroups;i++) {        \
643     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
644     if(obj->isLocMgr())  {      \
645       CkLocMgr *mgr = (CkLocMgr*)obj;   \
646       code      \
647     }   \
648   }     \
649  }
650
651 #if 0
652 // helper class to pup all elements that belong to same ckLocMgr
653 class ElementDestoryer : public CkLocIterator {
654 private:
655         CkLocMgr *locMgr;
656 public:
657         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
658         void addLocation(CkLocation &loc) {
659                 CkArrayIndex idx=loc.getIndex();
660                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
661                 loc.destroy();
662         }
663 };
664 #endif
665
666 // restore the bitmap vector for LB
667 void CkMemCheckPT::resetLB(int diepe)
668 {
669 #if CMK_LBDB_ON
670   int i;
671   char *bitmap = new char[CkNumPes()];
672   // set processor available bitmap
673   get_avail_vector(bitmap);
674
675   for (i=0; i<failedPes.length(); i++)
676     bitmap[failedPes[i]] = 0; 
677   bitmap[diepe] = 0;
678
679 #if CK_NO_PROC_POOL
680   set_avail_vector(bitmap);
681 #endif
682
683   // if I am the crashed pe, rebuild my failedPEs array
684   if (CkMyNode() == diepe)
685     for (i=0; i<CkNumPes(); i++) 
686       if (bitmap[i]==0) failed(i);
687
688   delete [] bitmap;
689 #endif
690 }
691
692 // in case when failedPe dies, everybody go through its checkpoint table:
693 // destory all array elements
694 // recover lost buddies
695 // reconstruct all array elements from check point data
696 // called on all processors
697 void CkMemCheckPT::restart(int diePe)
698 {
699 #if CMK_MEM_CHECKPOINT
700   double curTime = CmiWallTimer();
701   if (CkMyPe() == diePe)
702     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
703   stage = (char*)"resetLB";
704   startTime = curTime;
705   if (CkMyPe() == diePe)
706     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
707
708 #if CK_NO_PROC_POOL
709   failed(diePe);        // add into the list of failed pes
710 #endif
711   thisFailedPe = diePe;
712
713   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
714
715   inRestarting = 1;
716                                                                                 
717   // disable load balancer's barrier
718   if (CkMyPe() != diePe) resetLB(diePe);
719
720   CKLOCMGR_LOOP(mgr->startInserting(););
721
722   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
723   barrier(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
724 /*
725   if (CkMyPe() == 0)
726     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
727 */
728 #endif
729 }
730
731 // loally remove all array elements
732 void CkMemCheckPT::removeArrayElements()
733 {
734 #if CMK_MEM_CHECKPOINT
735   int len = ckTable.length();
736   double curTime = CmiWallTimer();
737   if (CkMyPe() == thisFailedPe) 
738     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
739   stage = (char*)"removeArrayElements";
740   startTime = curTime;
741
742   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
743   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
744
745   // get rid of all buffering and remote recs
746   // including destorying all array elements
747   CKLOCMGR_LOOP(mgr->flushAllRecs(););
748
749 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
750
751   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
752   barrier(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
753 #endif
754 }
755
756 // flush state in reduction manager
757 void CkMemCheckPT::resetReductionMgr()
758 {
759   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
760   int numGroups = CkpvAccess(_groupIDTable)->size();
761   for(int i=0;i<numGroups;i++) {
762     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
763     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
764     obj->flushStates();
765     obj->ckJustMigrated();
766   }
767   // reset again
768   //CpvAccess(_qd)->flushStates();
769
770 #if 1
771   //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
772   barrier(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
773 #else
774   if (CkMyPe() == 0)
775     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
776 #endif
777 }
778
779 // recover the lost buddies
780 void CkMemCheckPT::recoverBuddies()
781 {
782   int idx;
783   int len = ckTable.length();
784   // ready to flush reduction manager
785   // cannot be CkMemCheckPT::restart because destory will modify states
786   double curTime = CmiWallTimer();
787   if (CkMyPe() == thisFailedPe)
788   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
789   stage = (char *)"recoverBuddies";
790   if (CkMyPe() == thisFailedPe)
791   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
792   startTime = curTime;
793
794   // recover buddies
795   expectCount = 0;
796   for (idx=0; idx<len; idx++) {
797     CkCheckPTInfo *entry = ckTable[idx];
798     if (entry->pNo == thisFailedPe) {
799 #if CK_NO_PROC_POOL
800       // find a new buddy
801 /*
802       int budPe = CkMyPe();
803 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
804       while (budPe == CkMyPe() || isFailed(budPe)) 
805           budPe = (budPe+1)%CkNumPes();
806 */
807       int budPe = BuddyPE(CkMyPe());
808       //entry->pNo = budPe;
809 #else
810       int budPe = thisFailedPe;
811 #endif
812       entry->updateBuddy(CkMyPe(), budPe);
813 #if 0
814       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
815       CkArrayCheckPTMessage *msg = entry->getCopy();
816       msg->cp_flag = 0;            // not checkpointing
817       thisProxy[budPe].recvData(msg);
818 #else
819       CkArrayCheckPTMessage *msg = entry->getCopy();
820       msg->bud1 = budPe;
821       msg->bud2 = CkMyPe();
822       msg->cp_flag = 0;            // not checkpointing
823       thisProxy[budPe].recoverEntry(msg);
824 #endif
825       expectCount ++;
826     }
827   }
828
829 #if 1
830   if (expectCount == 0) {
831     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
832     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
833   }
834 #else
835   if (CkMyPe() == 0) {
836     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
837   }
838 #endif
839
840   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
841 }
842
843 void CkMemCheckPT::gotData()
844 {
845   ackCount ++;
846   if (ackCount == expectCount) {
847     ackCount = 0;
848     expectCount = -1;
849     //thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
850     contribute(CkCallback(CkReductionTarget(CkMemCheckPT, recoverArrayElements), thisProxy));
851   }
852 }
853
854 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
855 {
856   for (int i=0; i<n; i++) {
857     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
858     mgr->updateLocation(idx[i], nowOnPe);
859   }
860 }
861
862 // restore array elements
863 void CkMemCheckPT::recoverArrayElements()
864 {
865   double curTime = CmiWallTimer();
866   int len = ckTable.length();
867   //CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
868   stage = (char *)"recoverArrayElements";
869   if (CkMyPe() == thisFailedPe)
870   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
871   startTime = curTime;
872
873   // recover all array elements
874   int count = 0;
875 #if STREAMING_INFORMHOME
876   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
877   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
878 #endif
879   for (int idx=0; idx<len; idx++)
880   {
881     CkCheckPTInfo *entry = ckTable[idx];
882 #if CK_NO_PROC_POOL
883     // the bigger one will do 
884 //    if (CkMyPe() < entry->pNo) continue;
885     if (!isMaster(entry->pNo)) continue;
886 #else
887     // smaller one do it, which has the original object
888     if (CkMyPe() == entry->pNo+1 || 
889         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
890 #endif
891 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
892
893     entry->updateBuddy(CkMyPe(), entry->pNo);
894     CkArrayCheckPTMessage *msg = entry->getCopy();
895     // gzheng
896     //thisProxy[CkMyPe()].inmem_restore(msg);
897     inmem_restore(msg);
898 #if STREAMING_INFORMHOME
899     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
900     int homePe = mgr->homePe(msg->index);
901     if (homePe != CkMyPe()) {
902       gmap[homePe].push_back(msg->locMgr);
903       imap[homePe].push_back(msg->index);
904     }
905 #endif
906     CkFreeMsg(msg);
907     count ++;
908   }
909 #if STREAMING_INFORMHOME
910   for (int i=0; i<CkNumPes(); i++) {
911     if (gmap[i].size() && i!=CkMyPe()) {
912       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
913     }
914   }
915   delete [] imap;
916   delete [] gmap;
917 #endif
918   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
919
920   CKLOCMGR_LOOP(mgr->doneInserting(););
921
922   inRestarting = 0;
923   // _crashedNode = -1;
924   CpvAccess(_crashedNode) = -1;
925
926   if (CkMyPe() == 0)
927     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
928 }
929
930 static double restartT;
931
932 // on every processor
933 // turn load balancer back on
934 void CkMemCheckPT::finishUp()
935 {
936   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
937   //CKLOCMGR_LOOP(mgr->doneInserting(););
938   
939   if (CkMyPe() == thisFailedPe)
940   {
941        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
942        //CkStartQD(cpCallback);
943        cpCallback.send();
944        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
945   }
946
947 #if CK_NO_PROC_POOL
948 #if NODE_CHECKPOINT
949   int numnodes = CmiNumPhysicalNodes();
950 #else
951   int numnodes = CkNumPes();
952 #endif
953   if (numnodes-totalFailed() <=2) {
954     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
955     _memChkptOn = 0;
956   }
957 #endif
958 }
959
960 // called only on 0
961 void CkMemCheckPT::quiescence(CkCallback &cb)
962 {
963   static int pe_count = 0;
964   pe_count ++;
965   CmiAssert(CkMyPe() == 0);
966   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
967   if (pe_count == CkNumPes()) {
968     pe_count = 0;
969     cb.send();
970   }
971 }
972
973 // User callable function - to start a checkpoint
974 // callback cb is used to pass control back
975 void CkStartMemCheckpoint(CkCallback &cb)
976 {
977 #if CMK_MEM_CHECKPOINT
978   if (_memChkptOn == 0) {
979     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
980     cb.send();
981     return;
982   }
983   if (CkInRestarting()) {
984       // trying to checkpointing during restart
985     cb.send();
986     return;
987   }
988     // store user callback and user data
989   CkMemCheckPT::cpCallback = cb;
990
991     // broadcast to start check pointing
992   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
993   checkptMgr.doItNow(CkMyPe(), cb);
994 #else
995   // when mem checkpoint is disabled, invike cb immediately
996   cb.send();
997 #endif
998 }
999
1000 void CkRestartCheckPoint(int diePe)
1001 {
1002 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
1003   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
1004   // broadcast
1005   checkptMgr.restart(diePe);
1006 }
1007
1008 static int _diePE = -1;
1009
1010 // callback function used locally by ccs handler
1011 static void CkRestartCheckPointCallback(void *ignore, void *msg)
1012 {
1013 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1014   CkRestartCheckPoint(_diePE);
1015 }
1016
1017 // Converse function handles
1018 static int askPhaseHandlerIdx;
1019 static int recvPhaseHandlerIdx;
1020 static int askProcDataHandlerIdx;
1021 static int restartBcastHandlerIdx;
1022 static int recoverProcDataHandlerIdx;
1023 static int restartBeginHandlerIdx;
1024 static int notifyHandlerIdx;
1025
1026 // called on crashed PE
1027 static void restartBeginHandler(char *msg)
1028 {
1029 #if CMK_MEM_CHECKPOINT
1030   CmiFree(msg);
1031   static int count = 0;
1032   CmiAssert(CkMyPe() == _diePE);
1033   count ++;
1034   if (count == CkNumPes()) {
1035     CkRestartCheckPointCallback(NULL, NULL);
1036     count = 0;
1037   }
1038 #endif
1039 }
1040
1041 extern void _discard_charm_message();
1042 extern void _resume_charm_message();
1043
1044 static void * doNothingMsg(int * size, void * data, void ** remote, int count){
1045         return data;
1046 }
1047
1048 static void restartBcastHandler(char *msg)
1049 {
1050 #if CMK_MEM_CHECKPOINT
1051   // advance phase counter
1052   CkMemCheckPT::inRestarting = 1;
1053   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1054   // gzheng
1055   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1056
1057   if (CkMyPe()==_diePE)
1058     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1059
1060   // reset QD counters
1061 /*  gzheng
1062   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1063 */
1064
1065 /*  gzheng
1066   if (CkMyPe()==_diePE)
1067       CkRestartCheckPointCallback(NULL, NULL);
1068 */
1069   CmiFree(msg);
1070
1071   _resume_charm_message();
1072
1073     // reduction
1074   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1075   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1076   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1077   checkpointed = 0;
1078 #endif
1079 }
1080
1081 extern void _initDone();
1082
1083 // called on crashed processor
1084 static void recoverProcDataHandler(char *msg)
1085 {
1086 #if CMK_MEM_CHECKPOINT
1087    int i;
1088    envelope *env = (envelope *)msg;
1089    CkUnpackMessage(&env);
1090    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1091    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1092    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1093    //cur_restart_phase ++;
1094      // gzheng ?
1095    //CpvAccess(_qd)->flushStates();
1096
1097    // restore readonly, mainchare, group, nodegroup
1098 //   int temp = cur_restart_phase;
1099 //   cur_restart_phase = -1;
1100    PUP::fromMem p(procMsg->packData);
1101    _handleProcData(p);
1102
1103    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1104    // gzheng
1105    CKLOCMGR_LOOP(mgr->startInserting(););
1106
1107    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1108    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1109    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1110    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1111
1112    _initDone();
1113 //   CpvAccess(_qd)->flushStates();
1114    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1115 #endif
1116 }
1117
1118 // called on its backup processor
1119 // get backup message buffer and sent to crashed processor
1120 static void askProcDataHandler(char *msg)
1121 {
1122 #if CMK_MEM_CHECKPOINT
1123     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1124     CkPrintf("[%d] askProcDataHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1125     if (CpvAccess(procChkptBuf) == NULL)  {
1126       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1127       CkAbort("no checkpoint found");
1128     }
1129     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1130     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1131
1132     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1133
1134     CkPackMessage(&env);
1135     CmiSetHandler(env, recoverProcDataHandlerIdx);
1136     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1137     CpvAccess(procChkptBuf) = NULL;
1138 #endif
1139 }
1140
1141 // called on PE 0
1142 void qd_callback(void *m)
1143 {
1144    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1145    CkFreeMsg(m);
1146 #ifdef CMK_SMP
1147    for(int i=0;i<CmiMyNodeSize();i++){
1148    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1149    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1150         CmiSetHandler(msg, askProcDataHandlerIdx);
1151         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1152         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1153    }
1154    return;
1155 #endif
1156    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1157    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1158    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1159    CmiSetHandler(msg, askProcDataHandlerIdx);
1160    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1161    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1162
1163 }
1164
1165 // on crashed node
1166 void CkMemRestart(const char *dummy, CkArgMsg *args)
1167 {
1168 #if CMK_MEM_CHECKPOINT
1169    _diePE = CmiMyNode();
1170    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1171    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1172    CkMemCheckPT::inRestarting = 1;
1173
1174   CpvAccess( _crashedNode )= CmiMyNode();
1175         
1176   _discard_charm_message();
1177   
1178   if(CmiMyRank()==0){
1179     CkCallback cb(qd_callback);
1180     CkStartQD(cb);
1181     CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1182   }
1183 #else
1184    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1185 #endif
1186 }
1187
1188 // can be called in other files
1189 // return true if it is in restarting
1190 extern "C"
1191 int CkInRestarting()
1192 {
1193 #if CMK_MEM_CHECKPOINT
1194   if (CpvAccess( _crashedNode)!=-1) return 1;
1195   // gzheng
1196   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1197   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1198   return CkMemCheckPT::inRestarting;
1199 #else
1200   return 0;
1201 #endif
1202 }
1203
1204 /*****************************************************************************
1205                 module initialization
1206 *****************************************************************************/
1207
1208 static int arg_where = CkCheckPoint_inMEM;
1209
1210 #if CMK_MEM_CHECKPOINT
1211 void init_memcheckpt(char **argv)
1212 {
1213     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1214       arg_where = CkCheckPoint_inDISK;
1215     }
1216
1217         // initiliazing _crashedNode variable
1218         CpvInitialize(int, _crashedNode);
1219         CpvAccess(_crashedNode) = -1;
1220
1221 }
1222 #endif
1223
1224 class CkMemCheckPTInit: public Chare {
1225 public:
1226   CkMemCheckPTInit(CkArgMsg *m) {
1227 #if CMK_MEM_CHECKPOINT
1228     if (arg_where == CkCheckPoint_inDISK) {
1229       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1230     }
1231     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1232     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1233 #endif
1234   }
1235 };
1236
1237 static void notifyHandler(char *msg)
1238 {
1239 #if CMK_MEM_CHECKPOINT
1240   CmiFree(msg);
1241       /* immediately increase restart phase to filter old messages */
1242   CpvAccess(_curRestartPhase) ++;
1243   CpvAccess(_qd)->flushStates();
1244   _discard_charm_message();
1245 #endif
1246 }
1247
1248 extern "C"
1249 void notify_crash(int node)
1250 {
1251 #ifdef CMK_MEM_CHECKPOINT
1252   CpvAccess( _crashedNode) = node;
1253 #ifdef CMK_SMP
1254   for(int i=0;i<CkMyNodeSize();i++){
1255         CpvAccessOther(_crashedNode,i)=node;
1256   }
1257 #endif
1258   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1259   CkMemCheckPT::inRestarting = 1;
1260
1261     // this may be in interrupt handler, send a message to reset QD
1262   int pe = CmiNodeFirst(CkMyNode());
1263   for(int i=0;i<CkMyNodeSize();i++){
1264         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1265         CmiSetHandler(msg, notifyHandlerIdx);
1266         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1267   }
1268 #endif
1269 }
1270
1271 extern "C" void (*notify_crash_fn)(int node);
1272
1273 #if CMK_CONVERSE_MPI
1274 static int pingHandlerIdx;
1275 static int pingCheckHandlerIdx;
1276 static int buddyDieHandlerIdx;
1277 static double lastPingTime = -1;
1278
1279 extern "C" void mpi_restart_crashed(int pe, int rank);
1280 extern "C" int  find_spare_mpirank(int pe);
1281
1282 void pingBuddy();
1283 void pingCheckHandler();
1284
1285 void buddyDieHandler(char *msg)
1286 {
1287 #if CMK_MEM_CHECKPOINT
1288    // notify
1289    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1290    notify_crash(diepe);
1291    // send message to crash pe to let it restart
1292    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1293    int newrank = find_spare_mpirank(diepe);
1294    int buddy = obj->BuddyPE(CmiMyPe());
1295    if (buddy == diepe)  {
1296      mpi_restart_crashed(diepe, newrank);
1297      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1298    }
1299 #endif
1300 }
1301
1302 void pingHandler(void *msg)
1303 {
1304   lastPingTime = CmiWallTimer();
1305   CmiFree(msg);
1306 }
1307
1308 void pingCheckHandler()
1309 {
1310 #if CMK_MEM_CHECKPOINT
1311   double now = CmiWallTimer();
1312   if (lastPingTime > 0 && now - lastPingTime > 4) {
1313     int i, pe, buddy;
1314     // tell everyone the buddy dies
1315     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1316     for (i = 1; i < CmiNumPes(); i++) {
1317        pe = (CmiMyPe() - i + CmiNumPes()) % CmiNumPes();
1318        if (obj->BuddyPE(pe) == CmiMyPe()) break;
1319     }
1320     buddy = pe;
1321     CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
1322     for (int pe = 0; pe < CmiNumPes(); pe++) {
1323       if (obj->isFailed(pe) || pe == buddy) continue;
1324       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1325       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1326       CmiSetHandler(msg, buddyDieHandlerIdx);
1327       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1328     }
1329   }
1330   else 
1331     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1332 #endif
1333 }
1334
1335 void pingBuddy()
1336 {
1337 #if CMK_MEM_CHECKPOINT
1338   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1339   if (obj) {
1340     int buddy = obj->BuddyPE(CkMyPe());
1341 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1342     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1343     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1344     CmiSetHandler(msg, pingHandlerIdx);
1345     CmiGetRestartPhase(msg) = 9999;
1346     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1347   }
1348   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1349 #endif
1350 }
1351 #endif
1352
1353 // initproc
1354 void CkRegisterRestartHandler( )
1355 {
1356 #if CMK_MEM_CHECKPOINT
1357   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1358   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1359   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1360   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1361   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1362
1363 #if CMK_CONVERSE_MPI
1364   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1365   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1366   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1367 #endif
1368
1369   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1370   CpvAccess(procChkptBuf) = NULL;
1371
1372   notify_crash_fn = notify_crash;
1373
1374 #if ! CMK_CONVERSE_MPI
1375   // print pid to kill
1376 //  CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1377 //  sleep(4);
1378 #endif
1379 #endif
1380 }
1381
1382
1383 extern "C"
1384 int CkHasCheckpoints()
1385 {
1386   return checkpointed;
1387 }
1388
1389 /// @todo: the following definitions should be moved to a separate file containing
1390 // structures and functions about fault tolerance strategies
1391
1392 /**
1393  *  * @brief: function for killing a process                                             
1394  *   */
1395 #ifdef CMK_MEM_CHECKPOINT
1396 #if CMK_HAS_GETPID
1397 void killLocal(void *_dummy,double curWallTime){
1398         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1399         if(CmiWallTimer()<killTime-1){
1400                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1401         }else{ 
1402 #if CMK_CONVERSE_MPI
1403                                 CkDieNow();
1404 #else 
1405                 kill(getpid(),SIGKILL);                                               
1406 #endif
1407         }              
1408
1409 #else
1410 void killLocal(void *_dummy,double curWallTime){
1411   CmiAbort("kill() not supported!");
1412 }
1413 #endif
1414 #endif
1415
1416 #ifdef CMK_MEM_CHECKPOINT
1417 /**
1418  * @brief: reads the file with the kill information
1419  */
1420 void readKillFile(){
1421         FILE *fp=fopen(killFile,"r");
1422         if(!fp){
1423                 printf("[%d] Cannot open file %s (MEMCKPT) \n",CkMyPe(),killFile);
1424                 return;
1425         }
1426         int proc;
1427         double sec;
1428         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1429                 if(proc == CkMyNode() && CkMyRank() == 0){
1430                         killTime = CmiWallTimer()+sec;
1431                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1432                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1433                 }
1434         }
1435         fclose(fp);
1436 }
1437
1438 #if ! CMK_CONVERSE_MPI
1439 void CkDieNow()
1440 {
1441          // ignored for non-mpi version
1442         CmiPrintf("[%d] die now.\n", CmiMyPe());
1443         killTime = CmiWallTimer()+0.001;
1444         CcdCallFnAfter(killLocal,NULL,1);
1445 }
1446 #endif
1447
1448 #endif
1449
1450 #include "CkMemCheckpoint.def.h"
1451
1452