Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #define CK_NO_PROC_POOL                         1
64
65 #define STREAMING_INFORMHOME                    1
66 CpvDeclare(int, _crashedNode);
67
68 // static, so that it is accessible from Converse part
69 int CkMemCheckPT::inRestarting = 0;
70 double CkMemCheckPT::startTime;
71 char *CkMemCheckPT::stage;
72 CkCallback CkMemCheckPT::cpCallback;
73
74 int _memChkptOn = 1;                    // checkpoint is on or off
75
76 CkGroupID ckCheckPTGroupID;             // readonly
77
78
79 /// @todo the following declarations should be moved into a separate file for all 
80 // fault tolerant strategies
81
82 #ifdef CMK_MEM_CHECKPOINT
83 // name of the kill file that contains processes to be killed 
84 char *killFile;                                               
85 // flag for the kill file         
86 int killFlag=0;
87 // variable for storing the killing time
88 double killTime=0.0;
89 #endif
90
91 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
92 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
93
94 // compute the backup processor
95 // FIXME: avoid crashed processors
96 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
97
98 inline int CkMemCheckPT::BuddyPE(int pe)
99 {
100   int budpe;
101 #if NODE_CHECKPOINT
102     // buddy is the processor with same rank on the next physical node
103   int r1 = CmiPhysicalRank(pe);
104   int budnode = CmiPhysicalNodeID(pe);
105   do {
106     budnode = (budnode+1)%CmiNumPhysicalNodes();
107     int *pelist;
108     int num;
109     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
110     budpe = pelist[r1 % num];
111   } while (isFailed(budpe));
112   if (budpe == pe) {
113     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
114     CmiAbort("Failed to find a buddy processor");
115   }
116 #else
117   budpe = pe;
118   while (budpe == pe || isFailed(budpe)) 
119           budpe = (budpe+1)%CkNumPes();
120 #endif
121   return budpe;
122 }
123
124 // called in array element constructor
125 // choose and register with 2 buddies for checkpoiting 
126 #if CMK_MEM_CHECKPOINT
127 void ArrayElement::init_checkpt() {
128         if (_memChkptOn == 0) return;
129         if (CkInRestarting()) {
130           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
131         }
132         // only master init checkpoint
133         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
134
135         budPEs[0] = CkMyPe();
136         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
137         CmiAssert(budPEs[0] != budPEs[1]);
138         // inform checkPTMgr
139         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
140         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
141         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
142         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
143 }
144 #endif
145
146 // entry function invoked by checkpoint mgr asking for checkpoint data
147 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
148 #if CMK_MEM_CHECKPOINT
149   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
150 //char index[128];   thisIndexMax.sprint(index);
151 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
152   CkLocMgr *locMgr = thisArray->getLocMgr();
153   CmiAssert(myRec!=NULL);
154   int size;
155   {
156         PUP::sizer p;
157         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
158         size = p.size();
159   }
160   int packSize = size/sizeof(double) +1;
161   CkArrayCheckPTMessage *msg =
162                  new (packSize, 0) CkArrayCheckPTMessage;
163   msg->len = size;
164   msg->index =thisIndexMax;
165   msg->aid = thisArrayID;
166   msg->locMgr = locMgr->getGroupID();
167   msg->cp_flag = 1;
168   {
169         PUP::toMem p(msg->packData);
170         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
171   }
172
173   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
174   checkptMgr.recvData(msg, 2, budPEs);
175   delete m;
176 #endif
177 }
178
179 // checkpoint holder class - for memory checkpointing
180 class CkMemCheckPTInfo: public CkCheckPTInfo
181 {
182   CkArrayCheckPTMessage *ckBuffer;
183 public:
184   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
185                     CkCheckPTInfo(a, loc, idx, pno)
186   {
187     ckBuffer = NULL;
188   }
189   ~CkMemCheckPTInfo() 
190   {
191     if (ckBuffer) delete ckBuffer; 
192   }
193   inline void updateBuffer(CkArrayCheckPTMessage *data) 
194   {
195     CmiAssert(data!=NULL);
196     if (ckBuffer) delete ckBuffer;
197     ckBuffer = data;
198   }    
199   inline CkArrayCheckPTMessage * getCopy()
200   {
201     if (ckBuffer == NULL) {
202       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
203       CmiAbort("Abort!");
204     }
205     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
206   }     
207   inline void updateBuddy(int b1, int b2) {
208      CmiAssert(ckBuffer);
209      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
210      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
211      CmiAssert(pNo != CkMyPe());
212   }
213   inline int getSize() { 
214      CmiAssert(ckBuffer);
215      return ckBuffer->len; 
216   }
217 };
218
219 // checkpoint holder class - for in-disk checkpointing
220 class CkDiskCheckPTInfo: public CkCheckPTInfo 
221 {
222   char *fname;
223   int bud1, bud2;
224   int len;                      // checkpoint size
225 public:
226   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
227   {
228 #if CMK_USE_MKSTEMP
229     fname = new char[64];
230     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
231     mkstemp(fname);
232 #else
233     fname=tmpnam(NULL);
234 #endif
235     bud1 = bud2 = -1;
236     len = 0;
237   }
238   ~CkDiskCheckPTInfo() 
239   {
240     remove(fname);
241   }
242   inline void updateBuffer(CkArrayCheckPTMessage *data) 
243   {
244     double t = CmiWallTimer();
245     // unpack it
246     envelope *env = UsrToEnv(data);
247     CkUnpackMessage(&env);
248     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
249     FILE *f = fopen(fname,"wb");
250     PUP::toDisk p(f);
251     CkPupMessage(p, (void **)&data);
252     // delay sync to the end because otherwise the messages are blocked
253 //    fsync(fileno(f));
254     fclose(f);
255     bud1 = data->bud1;
256     bud2 = data->bud2;
257     len = data->len;
258     delete data;
259     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
260   }
261   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
262   {
263     CkArrayCheckPTMessage *data;
264     FILE *f = fopen(fname,"rb");
265     PUP::fromDisk p(f);
266     CkPupMessage(p, (void **)&data);
267     fclose(f);
268     data->bud1 = bud1;                          // update the buddies
269     data->bud2 = bud2;
270     return data;
271   }
272   inline void updateBuddy(int b1, int b2) {
273      bud1 = b1; bud2 = b2;
274      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
275      CmiAssert(pNo != CkMyPe());
276   }
277   inline int getSize() { 
278      return len; 
279   }
280 };
281
282 CkMemCheckPT::CkMemCheckPT(int w)
283 {
284   int numnodes = 0;
285 #if NODE_CHECKPOINT
286   numnodes = CmiNumPhysicalNodes();
287 #else
288   numnodes = CkNumPes();
289 #endif
290 #if CK_NO_PROC_POOL
291   if (numnodes <= 2)
292 #else
293   if (numnodes  == 1)
294 #endif
295   {
296     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
297     _memChkptOn = 0;
298   }
299   inRestarting = 0;
300   recvCount = peCount = 0;
301   ackCount = 0;
302   expectCount = -1;
303   where = w;
304 }
305
306 CkMemCheckPT::~CkMemCheckPT()
307 {
308   int len = ckTable.length();
309   for (int i=0; i<len; i++) {
310     delete ckTable[i];
311   }
312 }
313
314 void CkMemCheckPT::pup(PUP::er& p) 
315
316   CBase_CkMemCheckPT::pup(p); 
317   p|cpStarter;
318   p|thisFailedPe;
319   p|failedPes;
320   p|ckCheckPTGroupID;           // recover global variable
321   p|cpCallback;                 // store callback
322   p|where;                      // where to checkpoint
323   if (p.isUnpacking()) {
324     recvCount = peCount = 0;
325   }
326 }
327
328 // called by checkpoint mgr to restore an array element
329 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
330 {
331 #if CMK_MEM_CHECKPOINT
332   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
333   // m->index.print();
334   PUP::fromMem p(m->packData);
335   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
336   CmiAssert(mgr);
337 #if STREAMING_INFORMHOME
338   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
339 #else
340   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
341 #endif
342
343   // find a list of array elements bound together
344   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
345   CmiAssert(elt);
346   CkLocRec_local *rec = elt->myRec;
347   CkVec<CkMigratable *> list;
348   mgr->migratableList(rec, list);
349   CmiAssert(list.length() > 0);
350   for (int l=0; l<list.length(); l++) {
351     elt = (ArrayElement *)list[l];
352     elt->budPEs[0] = m->bud1;
353     elt->budPEs[1] = m->bud2;
354     //    reset, may not needed now
355     // for now.
356     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
357       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
358       if (c) c->redNo = 0;
359     }
360   }
361 #endif
362 }
363
364 // return 1 if pe is a crashed processor
365 int CkMemCheckPT::isFailed(int pe)
366 {
367   for (int i=0; i<failedPes.length(); i++)
368     if (failedPes[i] == pe) return 1;
369   return 0;
370 }
371
372 // add pe into history list of all failed processors
373 void CkMemCheckPT::failed(int pe)
374 {
375   if (isFailed(pe)) return;
376   failedPes.push_back(pe);
377 }
378
379 int CkMemCheckPT::totalFailed()
380 {
381   return failedPes.length();
382 }
383
384 // create an checkpoint entry for array element of aid with index.
385 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
386 {
387   // error check, no duplicate
388   int idx, len = ckTable.size();
389   for (idx=0; idx<len; idx++) {
390     CkCheckPTInfo *entry = ckTable[idx];
391     if (index == entry->index) {
392       if (loc == entry->locMgr) {
393           // bindTo array elements
394           return;
395       }
396         // for array inheritance, the following check may fail
397         // because ArrayElement constructor of all superclasses are called
398       if (aid == entry->aid) {
399         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
400         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
401       }
402     }
403   }
404   CkCheckPTInfo *newEntry;
405   if (where == CkCheckPoint_inMEM)
406     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
407   else
408     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
409   ckTable.push_back(newEntry);
410   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
411 }
412
413 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
414 {
415   int buddy = msg->bud1;
416   if (buddy == CkMyPe()) buddy = msg->bud2;
417   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
418   recvData(msg);
419     // ack
420   thisProxy[buddy].gotData();
421 }
422
423 // loop through my checkpoint table and ask checkpointed array elements
424 // to send me checkpoint data.
425 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
426 {
427   cpCallback = cb;
428   cpStarter = starter;
429   if (CkMyPe() == cpStarter) {
430     startTime = CmiWallTimer();
431     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
432   }
433
434   int len = ckTable.length();
435   for (int i=0; i<len; i++) {
436     CkCheckPTInfo *entry = ckTable[i];
437       // always let the bigger number processor send request
438     //if (CkMyPe() < entry->pNo) continue;
439       // always let the smaller number processor send request, may on same proc
440     if (!isMaster(entry->pNo)) continue;
441       // call inmem_checkpoint to the array element, ask it to send
442       // back checkpoint data via recvData().
443     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
444     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
445   }
446     // if my table is empty, then I am done
447   if (len == 0) thisProxy[cpStarter].cpFinish();
448
449   // pack and send proc level data
450   sendProcData();
451 }
452
453 // don't handle array elements
454 static inline void _handleProcData(PUP::er &p)
455 {
456     // save readonlys, and callback BTW
457     CkPupROData(p);
458
459     // save mainchares 
460     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
461         
462 #ifndef CMK_CHARE_USE_PTR
463     // save non-migratable chare
464     CkPupChareData(p);
465 #endif
466
467     // save groups into Groups.dat
468     CkPupGroupData(p);
469
470     // save nodegroups into NodeGroups.dat
471     if(CkMyRank()==0) CkPupNodeGroupData(p);
472 }
473
474 void CkMemCheckPT::sendProcData()
475 {
476   // find out size of buffer
477   int size;
478   {
479     PUP::sizer p;
480     _handleProcData(p);
481     size = p.size();
482   }
483   int packSize = size;
484   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
485   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
486   {
487     PUP::toMem p(msg->packData);
488     _handleProcData(p);
489   }
490   msg->pe = CkMyPe();
491   msg->len = size;
492   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
493   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
494 }
495
496 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
497 {
498   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
499   CpvAccess(procChkptBuf) = msg;
500   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
501   thisProxy[msg->reportPe].cpFinish();
502 }
503
504 // ArrayElement call this function to give us the checkpointed data
505 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
506 {
507   int len = ckTable.length();
508   int idx;
509   for (idx=0; idx<len; idx++) {
510     CkCheckPTInfo *entry = ckTable[idx];
511     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
512   }
513   CkAssert(idx < len);
514   int isChkpting = msg->cp_flag;
515   ckTable[idx]->updateBuffer(msg);
516   if (isChkpting) {
517       // all my array elements have returned their inmem data
518       // inform starter processor that I am done.
519     recvCount ++;
520     if (recvCount == ckTable.length()) {
521       if (where == CkCheckPoint_inMEM) {
522         thisProxy[cpStarter].cpFinish();
523       }
524       else if (where == CkCheckPoint_inDISK) {
525         // another barrier for finalize the writing using fsync
526         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
527         contribute(0,NULL,CkReduction::sum_int,localcb);
528       }
529       else
530         CmiAbort("Unknown checkpoint scheme");
531       recvCount = 0;
532     } 
533   }
534 }
535
536 // only used in disk checkpointing
537 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
538 {
539   delete m;
540 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
541   system("sync");
542 #endif
543   thisProxy[cpStarter].cpFinish();
544 }
545
546 // only is called on cpStarter when checkpoint is done
547 void CkMemCheckPT::cpFinish()
548 {
549   CmiAssert(CkMyPe() == cpStarter);
550   peCount++;
551     // now that all processors have finished, activate callback
552   if (peCount == 2*(CkNumPes())) {
553     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
554     cpCallback.send();
555     peCount = 0;
556     thisProxy.report();
557   }
558 }
559
560 // for debugging, report checkpoint info
561 void CkMemCheckPT::report()
562 {
563   int objsize = 0;
564   int len = ckTable.length();
565   for (int i=0; i<len; i++) {
566     CkCheckPTInfo *entry = ckTable[i];
567     CmiAssert(entry);
568     objsize += entry->getSize();
569   }
570   CmiAssert(CpvAccess(procChkptBuf));
571   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
572 }
573
574 /*****************************************************************************
575                         RESTART Procedure
576 *****************************************************************************/
577
578 // master processor of two buddies
579 inline int CkMemCheckPT::isMaster(int buddype)
580 {
581 #if 0
582   int mype = CkMyPe();
583 //CkPrintf("ismaster: %d %d\n", pe, mype);
584   if (CkNumPes() - totalFailed() == 2) {
585     return mype > buddype;
586   }
587   for (int i=1; i<CkNumPes(); i++) {
588     int me = (buddype+i)%CkNumPes();
589     if (isFailed(me)) continue;
590     if (me == mype) return 1;
591     else return 0;
592   }
593   return 0;
594 #else
595     // smaller one
596   int mype = CkMyPe();
597 //CkPrintf("ismaster: %d %d\n", pe, mype);
598   if (CkNumPes() - totalFailed() == 2) {
599     return mype < buddype;
600   }
601   for (int i=1; i<CkNumPes(); i++) {
602     int me = (mype+i)%CkNumPes();
603     if (isFailed(me)) continue;
604     if (me == buddype) return 1;
605     else return 0;
606   }
607   return 0;
608 #endif
609 }
610
611 #ifdef CKLOCMGR_LOOP
612 #undef CKLOCMGR_LOOP
613 #endif
614
615 // loop over all CkLocMgr and do "code"
616 #define  CKLOCMGR_LOOP(code)    {       \
617   int numGroups = CkpvAccess(_groupIDTable)->size();    \
618   for(int i=0;i<numGroups;i++) {        \
619     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
620     if(obj->isLocMgr())  {      \
621       CkLocMgr *mgr = (CkLocMgr*)obj;   \
622       code      \
623     }   \
624   }     \
625  }
626
627 #if 0
628 // helper class to pup all elements that belong to same ckLocMgr
629 class ElementDestoryer : public CkLocIterator {
630 private:
631         CkLocMgr *locMgr;
632 public:
633         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
634         void addLocation(CkLocation &loc) {
635                 CkArrayIndex idx=loc.getIndex();
636                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
637                 loc.destroy();
638         }
639 };
640 #endif
641
642 // restore the bitmap vector for LB
643 void CkMemCheckPT::resetLB(int diepe)
644 {
645 #if CMK_LBDB_ON
646   int i;
647   char *bitmap = new char[CkNumPes()];
648   // set processor available bitmap
649   get_avail_vector(bitmap);
650
651   for (i=0; i<failedPes.length(); i++)
652     bitmap[failedPes[i]] = 0; 
653   bitmap[diepe] = 0;
654
655 #if CK_NO_PROC_POOL
656   set_avail_vector(bitmap);
657 #endif
658
659   // if I am the crashed pe, rebuild my failedPEs array
660   if (CkMyPe() == diepe)
661     for (i=0; i<CkNumPes(); i++) 
662       if (bitmap[i]==0) failed(i);
663
664   delete [] bitmap;
665 #endif
666 }
667
668 // in case when failedPe dies, everybody go through its checkpoint table:
669 // destory all array elements
670 // recover lost buddies
671 // reconstruct all array elements from check point data
672 // called on all processors
673 void CkMemCheckPT::restart(int diePe)
674 {
675 #if CMK_MEM_CHECKPOINT
676   double curTime = CmiWallTimer();
677   if (CkMyPe() == diePe)
678     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
679   stage = (char*)"resetLB";
680   startTime = curTime;
681   if (CkMyPe() == diePe)
682     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
683
684 #if CK_NO_PROC_POOL
685   failed(diePe);        // add into the list of failed pes
686 #endif
687   thisFailedPe = diePe;
688
689   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
690
691   inRestarting = 1;
692                                                                                 
693   // disable load balancer's barrier
694   if (CkMyPe() != diePe) resetLB(diePe);
695
696   CKLOCMGR_LOOP(mgr->startInserting(););
697
698   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
699 /*
700   if (CkMyPe() == 0)
701     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
702 */
703 #endif
704 }
705
706 // loally remove all array elements
707 void CkMemCheckPT::removeArrayElements()
708 {
709 #if CMK_MEM_CHECKPOINT
710   int len = ckTable.length();
711   double curTime = CmiWallTimer();
712   if (CkMyPe() == thisFailedPe) 
713     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
714   stage = (char*)"removeArrayElements";
715   startTime = curTime;
716
717   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
718   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
719
720   // get rid of all buffering and remote recs
721   // including destorying all array elements
722   CKLOCMGR_LOOP(mgr->flushAllRecs(););
723
724 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
725
726   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
727 #endif
728 }
729
730 // flush state in reduction manager
731 void CkMemCheckPT::resetReductionMgr()
732 {
733   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
734   int numGroups = CkpvAccess(_groupIDTable)->size();
735   for(int i=0;i<numGroups;i++) {
736     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
737     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
738     obj->flushStates();
739     obj->ckJustMigrated();
740   }
741   // reset again
742   //CpvAccess(_qd)->flushStates();
743
744 #if 1
745   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
746 #else
747   if (CkMyPe() == 0)
748     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
749 #endif
750 }
751
752 // recover the lost buddies
753 void CkMemCheckPT::recoverBuddies()
754 {
755   int idx;
756   int len = ckTable.length();
757   // ready to flush reduction manager
758   // cannot be CkMemCheckPT::restart because destory will modify states
759   double curTime = CmiWallTimer();
760   if (CkMyPe() == thisFailedPe)
761   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
762   stage = (char *)"recoverBuddies";
763   if (CkMyPe() == thisFailedPe)
764   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
765   startTime = curTime;
766
767   // recover buddies
768   expectCount = 0;
769   for (idx=0; idx<len; idx++) {
770     CkCheckPTInfo *entry = ckTable[idx];
771     if (entry->pNo == thisFailedPe) {
772 #if CK_NO_PROC_POOL
773       // find a new buddy
774 /*
775       int budPe = CkMyPe();
776 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
777       while (budPe == CkMyPe() || isFailed(budPe)) 
778           budPe = (budPe+1)%CkNumPes();
779 */
780       int budPe = BuddyPE(CkMyPe());
781       //entry->pNo = budPe;
782 #else
783       int budPe = thisFailedPe;
784 #endif
785       entry->updateBuddy(CkMyPe(), budPe);
786 #if 0
787       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
788       CkArrayCheckPTMessage *msg = entry->getCopy();
789       msg->cp_flag = 0;            // not checkpointing
790       thisProxy[budPe].recvData(msg);
791 #else
792       CkArrayCheckPTMessage *msg = entry->getCopy();
793       msg->bud1 = budPe;
794       msg->bud2 = CkMyPe();
795       msg->cp_flag = 0;            // not checkpointing
796       thisProxy[budPe].recoverEntry(msg);
797 #endif
798       expectCount ++;
799     }
800   }
801
802 #if 1
803   if (expectCount == 0) {
804     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
805   }
806 #else
807   if (CkMyPe() == 0) {
808     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
809   }
810 #endif
811
812   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
813 }
814
815 void CkMemCheckPT::gotData()
816 {
817   ackCount ++;
818   if (ackCount == expectCount) {
819     ackCount = 0;
820     expectCount = -1;
821     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
822   }
823 }
824
825 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
826 {
827   for (int i=0; i<n; i++) {
828     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
829     mgr->updateLocation(idx[i], nowOnPe);
830   }
831 }
832
833 // restore array elements
834 void CkMemCheckPT::recoverArrayElements()
835 {
836   double curTime = CmiWallTimer();
837   int len = ckTable.length();
838   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
839   stage = (char *)"recoverArrayElements";
840   if (CkMyPe() == thisFailedPe)
841   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
842   startTime = curTime;
843
844   // recover all array elements
845   int count = 0;
846 #if STREAMING_INFORMHOME
847   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
848   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
849 #endif
850   for (int idx=0; idx<len; idx++)
851   {
852     CkCheckPTInfo *entry = ckTable[idx];
853 #if CK_NO_PROC_POOL
854     // the bigger one will do 
855 //    if (CkMyPe() < entry->pNo) continue;
856     if (!isMaster(entry->pNo)) continue;
857 #else
858     // smaller one do it, which has the original object
859     if (CkMyPe() == entry->pNo+1 || 
860         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
861 #endif
862 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
863
864     entry->updateBuddy(CkMyPe(), entry->pNo);
865     CkArrayCheckPTMessage *msg = entry->getCopy();
866     // gzheng
867     //thisProxy[CkMyPe()].inmem_restore(msg);
868     inmem_restore(msg);
869 #if STREAMING_INFORMHOME
870     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
871     int homePe = mgr->homePe(msg->index);
872     if (homePe != CkMyPe()) {
873       gmap[homePe].push_back(msg->locMgr);
874       imap[homePe].push_back(msg->index);
875     }
876 #endif
877     count ++;
878   }
879 #if STREAMING_INFORMHOME
880   for (int i=0; i<CkNumPes(); i++) {
881     if (gmap[i].size() && i!=CkMyPe()) {
882       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
883     }
884   }
885   delete [] imap;
886   delete [] gmap;
887 #endif
888   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
889
890   CKLOCMGR_LOOP(mgr->doneInserting(););
891
892   inRestarting = 0;
893  // _crashedNode = -1;
894 CpvAccess(_crashedNode) = -1;
895
896   if (CkMyPe() == 0)
897     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
898 }
899
900 static double restartT;
901
902 // on every processor
903 // turn load balancer back on
904 void CkMemCheckPT::finishUp()
905 {
906   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
907   //CKLOCMGR_LOOP(mgr->doneInserting(););
908   
909   if (CkMyPe() == thisFailedPe)
910   {
911        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
912        //CkStartQD(cpCallback);
913        cpCallback.send();
914        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
915   }
916
917 #if CK_NO_PROC_POOL
918 #if NODE_CHECKPOINT
919   int numnodes = CmiNumPhysicalNodes();
920 #else
921   int numnodes = CkNumPes();
922 #endif
923   if (numnodes-totalFailed() <=2) {
924     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
925     _memChkptOn = 0;
926   }
927 #endif
928 }
929
930 // called only on 0
931 void CkMemCheckPT::quiescence(CkCallback &cb)
932 {
933   static int pe_count = 0;
934   pe_count ++;
935   CmiAssert(CkMyPe() == 0);
936   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
937   if (pe_count == CkNumPes()) {
938     pe_count = 0;
939     cb.send();
940   }
941 }
942
943 // User callable function - to start a checkpoint
944 // callback cb is used to pass control back
945 void CkStartMemCheckpoint(CkCallback &cb)
946 {
947 #if CMK_MEM_CHECKPOINT
948   if (_memChkptOn == 0) {
949     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
950     cb.send();
951     return;
952   }
953   if (CkInRestarting()) {
954       // trying to checkpointing during restart
955     cb.send();
956     return;
957   }
958     // store user callback and user data
959   CkMemCheckPT::cpCallback = cb;
960
961     // broadcast to start check pointing
962   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
963   checkptMgr.doItNow(CkMyPe(), cb);
964 #else
965   // when mem checkpoint is disabled, invike cb immediately
966   cb.send();
967 #endif
968 }
969
970 void CkRestartCheckPoint(int diePe)
971 {
972 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
973   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
974   // broadcast
975   checkptMgr.restart(diePe);
976 }
977
978 static int _diePE = -1;
979
980 // callback function used locally by ccs handler
981 static void CkRestartCheckPointCallback(void *ignore, void *msg)
982 {
983 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
984   CkRestartCheckPoint(_diePE);
985 }
986
987 // Converse function handles
988 static int askPhaseHandlerIdx;
989 static int recvPhaseHandlerIdx;
990 static int askProcDataHandlerIdx;
991 static int restartBcastHandlerIdx;
992 static int recoverProcDataHandlerIdx;
993 static int restartBeginHandlerIdx;
994 static int notifyHandlerIdx;
995
996 // called on crashed PE
997 static void restartBeginHandler(char *msg)
998 {
999 #if CMK_MEM_CHECKPOINT
1000   static int count = 0;
1001   CmiFree(msg);
1002   CmiAssert(CkMyPe() == _diePE);
1003   count ++;
1004   if (count == CkNumPes()) {
1005     CkRestartCheckPointCallback(NULL, NULL);
1006     count = 0;
1007   }
1008 #endif
1009 }
1010
1011 extern void _discard_charm_message();
1012 extern void _resume_charm_message();
1013
1014 static void restartBcastHandler(char *msg)
1015 {
1016 #if CMK_MEM_CHECKPOINT
1017   // advance phase counter
1018   CkMemCheckPT::inRestarting = 1;
1019   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1020   // gzheng
1021   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1022
1023   if (CkMyPe()==_diePE)
1024     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1025
1026   // reset QD counters
1027 /*  gzheng
1028   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1029 */
1030
1031 /*  gzheng
1032   if (CkMyPe()==_diePE)
1033       CkRestartCheckPointCallback(NULL, NULL);
1034 */
1035   CmiFree(msg);
1036
1037   _resume_charm_message();
1038
1039     // reduction
1040   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1041   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1042   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1043 #endif
1044 }
1045
1046 extern void _initDone();
1047
1048 // called on crashed processor
1049 static void recoverProcDataHandler(char *msg)
1050 {
1051 #if CMK_MEM_CHECKPOINT
1052    int i;
1053    envelope *env = (envelope *)msg;
1054    CkUnpackMessage(&env);
1055    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1056    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1057    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1058    //cur_restart_phase ++;
1059      // gzheng ?
1060    //CpvAccess(_qd)->flushStates();
1061
1062    // restore readonly, mainchare, group, nodegroup
1063 //   int temp = cur_restart_phase;
1064 //   cur_restart_phase = -1;
1065    PUP::fromMem p(procMsg->packData);
1066    _handleProcData(p);
1067
1068    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1069    // gzheng
1070    CKLOCMGR_LOOP(mgr->startInserting(););
1071
1072    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1073    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1074    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1075    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1076
1077    _initDone();
1078 //   CpvAccess(_qd)->flushStates();
1079    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1080 #endif
1081 }
1082
1083 // called on its backup processor
1084 // get backup message buffer and sent to crashed processor
1085 static void askProcDataHandler(char *msg)
1086 {
1087 #if CMK_MEM_CHECKPOINT
1088     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1089     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1090     if (CpvAccess(procChkptBuf) == NULL) 
1091       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1092     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1093     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1094     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1095
1096     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1097
1098     CkPackMessage(&env);
1099     CmiSetHandler(env, recoverProcDataHandlerIdx);
1100     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1101     CpvAccess(procChkptBuf) = NULL;
1102 #endif
1103 }
1104
1105 // called on PE 0
1106 void qd_callback(void *m)
1107 {
1108    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1109    CkFreeMsg(m);
1110 #ifdef CMK_SMP
1111    for(int i=0;i<CmiMyNodeSize();i++){
1112    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1113    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1114         CmiSetHandler(msg, askProcDataHandlerIdx);
1115         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1116         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1117    }
1118    return;
1119 #endif
1120    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1121    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1122    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1123    CmiSetHandler(msg, askProcDataHandlerIdx);
1124    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1125    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1126
1127 }
1128
1129 // on crashed node
1130 void CkMemRestart(const char *dummy, CkArgMsg *args)
1131 {
1132 #if CMK_MEM_CHECKPOINT
1133    _diePE = CmiMyNode();
1134    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1135    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1136    CkMemCheckPT::inRestarting = 1;
1137
1138   CpvAccess( _crashedNode )= CmiMyNode();
1139         
1140   _discard_charm_message();
1141   
1142 if(CmiMyRank()==0){
1143    CkCallback cb(qd_callback);
1144    CkStartQD(cb);
1145    CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1146  }
1147 #else
1148    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1149 #endif
1150 }
1151
1152 // can be called in other files
1153 // return true if it is in restarting
1154 extern "C"
1155 int CkInRestarting()
1156 {
1157 #if CMK_MEM_CHECKPOINT
1158   if (CpvAccess( _crashedNode)!=-1) return 1;
1159   // gzheng
1160   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1161   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1162   return CkMemCheckPT::inRestarting;
1163 #else
1164   return 0;
1165 #endif
1166 }
1167
1168 /*****************************************************************************
1169                 module initialization
1170 *****************************************************************************/
1171
1172 static int arg_where = CkCheckPoint_inMEM;
1173
1174 #if CMK_MEM_CHECKPOINT
1175 void init_memcheckpt(char **argv)
1176 {
1177     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1178       arg_where = CkCheckPoint_inDISK;
1179     }
1180
1181         // initiliazing _crashedNode variable
1182         CpvInitialize(int, _crashedNode);
1183         CpvAccess(_crashedNode) = -1;
1184
1185 }
1186 #endif
1187
1188 class CkMemCheckPTInit: public Chare {
1189 public:
1190   CkMemCheckPTInit(CkArgMsg *m) {
1191 #if CMK_MEM_CHECKPOINT
1192     if (arg_where == CkCheckPoint_inDISK) {
1193       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1194     }
1195     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1196     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1197 #endif
1198   }
1199 };
1200
1201 static void notifyHandler(char *msg)
1202 {
1203 #if CMK_MEM_CHECKPOINT
1204   CmiFree(msg);
1205       /* immediately increase restart phase to filter old messages */
1206   CpvAccess(_curRestartPhase) ++;
1207   CpvAccess(_qd)->flushStates();
1208   _discard_charm_message();
1209 #endif
1210 }
1211
1212 extern "C"
1213 void notify_crash(int node)
1214 {
1215 #ifdef CMK_MEM_CHECKPOINT
1216   CpvAccess( _crashedNode) = node;
1217 #ifdef CMK_SMP
1218   for(int i=0;i<CkMyNodeSize();i++){
1219         CpvAccessOther(_crashedNode,i)=node;
1220   }
1221 #endif
1222   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1223   CkMemCheckPT::inRestarting = 1;
1224
1225 #ifdef CMK_SMP
1226   for(int i=0;i<CkMyNodeSize();i++){
1227         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1228         CmiSetHandler(msg, notifyHandlerIdx);
1229         CmiSyncSendAndFree(CkMyNode()*CkMyNodeSize()+i, CmiMsgHeaderSizeBytes, (char *)msg);
1230   }
1231  return;
1232 #else 
1233     // this may be in interrupt handler, send a message to reset QD
1234   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1235   CmiSetHandler(msg, notifyHandlerIdx);
1236   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1237 #endif
1238 #endif
1239 }
1240
1241 extern "C" void (*notify_crash_fn)(int node);
1242
1243 // initproc
1244 void CkRegisterRestartHandler( )
1245 {
1246 #if CMK_MEM_CHECKPOINT
1247   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1248   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1249   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1250   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1251   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1252
1253   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1254   CpvAccess(procChkptBuf) = NULL;
1255
1256   notify_crash_fn = notify_crash;
1257 #if 1
1258   // for debugging
1259   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1260 //  sleep(4);
1261 #endif
1262 #endif
1263 }
1264
1265 /// @todo: the following definitions should be moved to a separate file containing
1266 // structures and functions about fault tolerance strategies
1267
1268 /**
1269  *  * @brief: function for killing a process                                             
1270  *   */
1271 #ifdef CMK_MEM_CHECKPOINT
1272 #if CMK_HAS_GETPID
1273 void killLocal(void *_dummy,double curWallTime){
1274         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1275         if(CmiWallTimer()<killTime-1){
1276                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1277         }else{  
1278                 kill(getpid(),SIGKILL);                                               
1279         }              
1280
1281 #else
1282 void killLocal(void *_dummy,double curWallTime){
1283   CmiAbort("kill() not supported!");
1284 }
1285 #endif
1286 #endif
1287
1288 #ifdef CMK_MEM_CHECKPOINT
1289 /**
1290  * @brief: reads the file with the kill information
1291  */
1292 void readKillFile(){
1293         FILE *fp=fopen(killFile,"r");
1294         if(!fp){
1295                 return;
1296         }
1297         int proc;
1298         double sec;
1299         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1300                 if(proc == CkMyNode() && CkMyRank() == 0){
1301                         killTime = CmiWallTimer()+sec;
1302                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1303                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1304                 }
1305         }
1306         fclose(fp);
1307 }
1308 #endif
1309
1310 #include "CkMemCheckpoint.def.h"
1311
1312