09d8b02e7f3d729939d87ea3134bd30e1b616dcd
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 #define DEBUGF      // CkPrintf
51
52 // pick buddy processor from a different physical node
53 #define NODE_CHECKPOINT                        0
54
55 // assume NO extra processors--1
56 // assume extra processors--0
57 #define CK_NO_PROC_POOL                         1
58
59 #define STREAMING_INFORMHOME                    1
60 CpvDeclare(int, _crashedNode);
61
62 // static, so that it is accessible from Converse part
63 int CkMemCheckPT::inRestarting = 0;
64 double CkMemCheckPT::startTime;
65 char *CkMemCheckPT::stage;
66 CkCallback CkMemCheckPT::cpCallback;
67
68 int _memChkptOn = 1;                    // checkpoint is on or off
69
70 CkGroupID ckCheckPTGroupID;             // readonly
71
72
73 /// @todo the following declarations should be moved into a separate file for all 
74 // fault tolerant strategies
75
76 #ifdef CMK_MEM_CHECKPOINT
77 // name of the kill file that contains processes to be killed 
78 char *killFile;                                               
79 // flag for the kill file         
80 int killFlag=0;
81 // variable for storing the killing time
82 double killTime=0.0;
83 #endif
84
85 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
86 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
87
88 // compute the backup processor
89 // FIXME: avoid crashed processors
90 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
91
92 inline int CkMemCheckPT::BuddyPE(int pe)
93 {
94   int budpe;
95 #if NODE_CHECKPOINT
96     // buddy is the processor with same rank on the next physical node
97   int r1 = CmiPhysicalRank(pe);
98   int budnode = CmiPhysicalNodeID(pe);
99   do {
100     budnode = (budnode+1)%CmiNumPhysicalNodes();
101     int *pelist;
102     int num;
103     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
104     budpe = pelist[r1 % num];
105   } while (isFailed(budpe));
106   if (budpe == pe) {
107     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
108     CmiAbort("Failed to find a buddy processor");
109   }
110 #else
111   budpe = pe;
112   while (budpe == pe || isFailed(budpe)) 
113           budpe = (budpe+1)%CkNumPes();
114 #endif
115   return budpe;
116 }
117
118 // called in array element constructor
119 // choose and register with 2 buddies for checkpoiting 
120 #if CMK_MEM_CHECKPOINT
121 void ArrayElement::init_checkpt() {
122         if (_memChkptOn == 0) return;
123         if (CkInRestarting()) {
124           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
125         }
126         // only master init checkpoint
127         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
128
129         budPEs[0] = CkMyPe();
130         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
131         CmiAssert(budPEs[0] != budPEs[1]);
132         // inform checkPTMgr
133         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
134         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
135         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
136         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
137 }
138 #endif
139
140 // entry function invoked by checkpoint mgr asking for checkpoint data
141 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
142 #if CMK_MEM_CHECKPOINT
143   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
144 //char index[128];   thisIndexMax.sprint(index);
145 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
146   CkLocMgr *locMgr = thisArray->getLocMgr();
147   CmiAssert(myRec!=NULL);
148   int size;
149   {
150         PUP::sizer p;
151         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
152         size = p.size();
153   }
154   int packSize = size/sizeof(double) +1;
155   CkArrayCheckPTMessage *msg =
156                  new (packSize, 0) CkArrayCheckPTMessage;
157   msg->len = size;
158   msg->index =thisIndexMax;
159   msg->aid = thisArrayID;
160   msg->locMgr = locMgr->getGroupID();
161   msg->cp_flag = 1;
162   {
163         PUP::toMem p(msg->packData);
164         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
165   }
166
167   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
168   checkptMgr.recvData(msg, 2, budPEs);
169   delete m;
170 #endif
171 }
172
173 // checkpoint holder class - for memory checkpointing
174 class CkMemCheckPTInfo: public CkCheckPTInfo
175 {
176   CkArrayCheckPTMessage *ckBuffer;
177 public:
178   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
179                     CkCheckPTInfo(a, loc, idx, pno)
180   {
181     ckBuffer = NULL;
182   }
183   ~CkMemCheckPTInfo() 
184   {
185     if (ckBuffer) delete ckBuffer; 
186   }
187   inline void updateBuffer(CkArrayCheckPTMessage *data) 
188   {
189     CmiAssert(data!=NULL);
190     if (ckBuffer) delete ckBuffer;
191     ckBuffer = data;
192   }    
193   inline CkArrayCheckPTMessage * getCopy()
194   {
195     if (ckBuffer == NULL) {
196       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
197       CmiAbort("Abort!");
198     }
199     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
200   }     
201   inline void updateBuddy(int b1, int b2) {
202      CmiAssert(ckBuffer);
203      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
204      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
205      CmiAssert(pNo != CkMyPe());
206   }
207   inline int getSize() { 
208      CmiAssert(ckBuffer);
209      return ckBuffer->len; 
210   }
211 };
212
213 // checkpoint holder class - for in-disk checkpointing
214 class CkDiskCheckPTInfo: public CkCheckPTInfo 
215 {
216   char *fname;
217   int bud1, bud2;
218   int len;                      // checkpoint size
219 public:
220   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
221   {
222 #if CMK_USE_MKSTEMP
223     fname = new char[64];
224     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
225     mkstemp(fname);
226 #else
227     fname=tmpnam(NULL);
228 #endif
229     bud1 = bud2 = -1;
230     len = 0;
231   }
232   ~CkDiskCheckPTInfo() 
233   {
234     remove(fname);
235   }
236   inline void updateBuffer(CkArrayCheckPTMessage *data) 
237   {
238     double t = CmiWallTimer();
239     // unpack it
240     envelope *env = UsrToEnv(data);
241     CkUnpackMessage(&env);
242     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
243     FILE *f = fopen(fname,"wb");
244     PUP::toDisk p(f);
245     CkPupMessage(p, (void **)&data);
246     // delay sync to the end because otherwise the messages are blocked
247 //    fsync(fileno(f));
248     fclose(f);
249     bud1 = data->bud1;
250     bud2 = data->bud2;
251     len = data->len;
252     delete data;
253     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
254   }
255   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
256   {
257     CkArrayCheckPTMessage *data;
258     FILE *f = fopen(fname,"rb");
259     PUP::fromDisk p(f);
260     CkPupMessage(p, (void **)&data);
261     fclose(f);
262     data->bud1 = bud1;                          // update the buddies
263     data->bud2 = bud2;
264     return data;
265   }
266   inline void updateBuddy(int b1, int b2) {
267      bud1 = b1; bud2 = b2;
268      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
269      CmiAssert(pNo != CkMyPe());
270   }
271   inline int getSize() { 
272      return len; 
273   }
274 };
275
276 CkMemCheckPT::CkMemCheckPT(int w)
277 {
278   int numnodes = 0;
279 #if NODE_CHECKPOINT
280   numnodes = CmiNumPhysicalNodes();
281 #else
282   numnodes = CkNumPes();
283 #endif
284 #if CK_NO_PROC_POOL
285   if (numnodes <= 2)
286 #else
287   if (numnodes  == 1)
288 #endif
289   {
290     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
291     _memChkptOn = 0;
292   }
293   inRestarting = 0;
294   recvCount = peCount = 0;
295   ackCount = 0;
296   expectCount = -1;
297   where = w;
298 }
299
300 CkMemCheckPT::~CkMemCheckPT()
301 {
302   int len = ckTable.length();
303   for (int i=0; i<len; i++) {
304     delete ckTable[i];
305   }
306 }
307
308 void CkMemCheckPT::pup(PUP::er& p) 
309
310   CBase_CkMemCheckPT::pup(p); 
311   p|cpStarter;
312   p|thisFailedPe;
313   p|failedPes;
314   p|ckCheckPTGroupID;           // recover global variable
315   p|cpCallback;                 // store callback
316   p|where;                      // where to checkpoint
317   if (p.isUnpacking()) {
318     recvCount = peCount = 0;
319   }
320 }
321
322 // called by checkpoint mgr to restore an array element
323 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
324 {
325 #if CMK_MEM_CHECKPOINT
326   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
327   // m->index.print();
328   PUP::fromMem p(m->packData);
329   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
330   CmiAssert(mgr);
331 #if STREAMING_INFORMHOME
332   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
333 #else
334   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
335 #endif
336
337   // find a list of array elements bound together
338   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
339   CmiAssert(elt);
340   CkLocRec_local *rec = elt->myRec;
341   CkVec<CkMigratable *> list;
342   mgr->migratableList(rec, list);
343   CmiAssert(list.length() > 0);
344   for (int l=0; l<list.length(); l++) {
345     elt = (ArrayElement *)list[l];
346     elt->budPEs[0] = m->bud1;
347     elt->budPEs[1] = m->bud2;
348     //    reset, may not needed now
349     // for now.
350     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
351       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
352       if (c) c->redNo = 0;
353     }
354   }
355 #endif
356 }
357
358 // return 1 if pe is a crashed processor
359 int CkMemCheckPT::isFailed(int pe)
360 {
361   for (int i=0; i<failedPes.length(); i++)
362     if (failedPes[i] == pe) return 1;
363   return 0;
364 }
365
366 // add pe into history list of all failed processors
367 void CkMemCheckPT::failed(int pe)
368 {
369   if (isFailed(pe)) return;
370   failedPes.push_back(pe);
371 }
372
373 int CkMemCheckPT::totalFailed()
374 {
375   return failedPes.length();
376 }
377
378 // create an checkpoint entry for array element of aid with index.
379 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
380 {
381   // error check, no duplicate
382   int idx, len = ckTable.size();
383   for (idx=0; idx<len; idx++) {
384     CkCheckPTInfo *entry = ckTable[idx];
385     if (index == entry->index) {
386       if (loc == entry->locMgr) {
387           // bindTo array elements
388           return;
389       }
390         // for array inheritance, the following check may fail
391         // because ArrayElement constructor of all superclasses are called
392       if (aid == entry->aid) {
393         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
394         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
395       }
396     }
397   }
398   CkCheckPTInfo *newEntry;
399   if (where == CkCheckPoint_inMEM)
400     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
401   else
402     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
403   ckTable.push_back(newEntry);
404   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
405 }
406
407 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
408 {
409   int buddy = msg->bud1;
410   if (buddy == CkMyPe()) buddy = msg->bud2;
411   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
412   recvData(msg);
413     // ack
414   thisProxy[buddy].gotData();
415 }
416
417 // loop through my checkpoint table and ask checkpointed array elements
418 // to send me checkpoint data.
419 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
420 {
421   cpCallback = cb;
422   cpStarter = starter;
423   if (CkMyPe() == cpStarter) {
424     startTime = CmiWallTimer();
425     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
426   }
427
428   int len = ckTable.length();
429   for (int i=0; i<len; i++) {
430     CkCheckPTInfo *entry = ckTable[i];
431       // always let the bigger number processor send request
432     //if (CkMyPe() < entry->pNo) continue;
433       // always let the smaller number processor send request, may on same proc
434     if (!isMaster(entry->pNo)) continue;
435       // call inmem_checkpoint to the array element, ask it to send
436       // back checkpoint data via recvData().
437     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
438     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
439   }
440     // if my table is empty, then I am done
441   if (len == 0) thisProxy[cpStarter].cpFinish();
442
443   // pack and send proc level data
444   sendProcData();
445 }
446
447 // don't handle array elements
448 static inline void _handleProcData(PUP::er &p)
449 {
450     // save readonlys, and callback BTW
451     CkPupROData(p);
452
453     // save mainchares 
454     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
455         
456 #ifndef CMK_CHARE_USE_PTR
457     // save non-migratable chare
458     CkPupChareData(p);
459 #endif
460
461     // save groups into Groups.dat
462     CkPupGroupData(p);
463
464     // save nodegroups into NodeGroups.dat
465     if(CkMyRank()==0) CkPupNodeGroupData(p);
466 }
467
468 void CkMemCheckPT::sendProcData()
469 {
470   // find out size of buffer
471   int size;
472   {
473     PUP::sizer p;
474     _handleProcData(p);
475     size = p.size();
476   }
477   int packSize = size;
478   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
479   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
480   {
481     PUP::toMem p(msg->packData);
482     _handleProcData(p);
483   }
484   msg->pe = CkMyPe();
485   msg->len = size;
486   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
487   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
488 }
489
490 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
491 {
492   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
493   CpvAccess(procChkptBuf) = msg;
494   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
495   thisProxy[msg->reportPe].cpFinish();
496 }
497
498 // ArrayElement call this function to give us the checkpointed data
499 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
500 {
501   int len = ckTable.length();
502   int idx;
503   for (idx=0; idx<len; idx++) {
504     CkCheckPTInfo *entry = ckTable[idx];
505     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
506   }
507   CkAssert(idx < len);
508   int isChkpting = msg->cp_flag;
509   ckTable[idx]->updateBuffer(msg);
510   if (isChkpting) {
511       // all my array elements have returned their inmem data
512       // inform starter processor that I am done.
513     recvCount ++;
514     if (recvCount == ckTable.length()) {
515       if (where == CkCheckPoint_inMEM) {
516         thisProxy[cpStarter].cpFinish();
517       }
518       else if (where == CkCheckPoint_inDISK) {
519         // another barrier for finalize the writing using fsync
520         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
521         contribute(0,NULL,CkReduction::sum_int,localcb);
522       }
523       else
524         CmiAbort("Unknown checkpoint scheme");
525       recvCount = 0;
526     } 
527   }
528 }
529
530 // only used in disk checkpointing
531 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
532 {
533   delete m;
534 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
535   system("sync");
536 #endif
537   thisProxy[cpStarter].cpFinish();
538 }
539
540 // only is called on cpStarter when checkpoint is done
541 void CkMemCheckPT::cpFinish()
542 {
543   CmiAssert(CkMyPe() == cpStarter);
544   peCount++;
545     // now that all processors have finished, activate callback
546   if (peCount == 2*(CkNumPes())) {
547     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
548     cpCallback.send();
549     peCount = 0;
550     thisProxy.report();
551   }
552 }
553
554 // for debugging, report checkpoint info
555 void CkMemCheckPT::report()
556 {
557   int objsize = 0;
558   int len = ckTable.length();
559   for (int i=0; i<len; i++) {
560     CkCheckPTInfo *entry = ckTable[i];
561     CmiAssert(entry);
562     objsize += entry->getSize();
563   }
564   CmiAssert(CpvAccess(procChkptBuf));
565   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
566 }
567
568 /*****************************************************************************
569                         RESTART Procedure
570 *****************************************************************************/
571
572 // master processor of two buddies
573 inline int CkMemCheckPT::isMaster(int buddype)
574 {
575 #if 0
576   int mype = CkMyPe();
577 //CkPrintf("ismaster: %d %d\n", pe, mype);
578   if (CkNumPes() - totalFailed() == 2) {
579     return mype > buddype;
580   }
581   for (int i=1; i<CkNumPes(); i++) {
582     int me = (buddype+i)%CkNumPes();
583     if (isFailed(me)) continue;
584     if (me == mype) return 1;
585     else return 0;
586   }
587   return 0;
588 #else
589     // smaller one
590   int mype = CkMyPe();
591 //CkPrintf("ismaster: %d %d\n", pe, mype);
592   if (CkNumPes() - totalFailed() == 2) {
593     return mype < buddype;
594   }
595   for (int i=1; i<CkNumPes(); i++) {
596     int me = (mype+i)%CkNumPes();
597     if (isFailed(me)) continue;
598     if (me == buddype) return 1;
599     else return 0;
600   }
601   return 0;
602 #endif
603 }
604
605 #ifdef CKLOCMGR_LOOP
606 #undef CKLOCMGR_LOOP
607 #endif
608
609 // loop over all CkLocMgr and do "code"
610 #define  CKLOCMGR_LOOP(code)    {       \
611   int numGroups = CkpvAccess(_groupIDTable)->size();    \
612   for(int i=0;i<numGroups;i++) {        \
613     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
614     if(obj->isLocMgr())  {      \
615       CkLocMgr *mgr = (CkLocMgr*)obj;   \
616       code      \
617     }   \
618   }     \
619  }
620
621 #if 0
622 // helper class to pup all elements that belong to same ckLocMgr
623 class ElementDestoryer : public CkLocIterator {
624 private:
625         CkLocMgr *locMgr;
626 public:
627         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
628         void addLocation(CkLocation &loc) {
629                 CkArrayIndex idx=loc.getIndex();
630                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
631                 loc.destroy();
632         }
633 };
634 #endif
635
636 // restore the bitmap vector for LB
637 void CkMemCheckPT::resetLB(int diepe)
638 {
639 #if CMK_LBDB_ON
640   int i;
641   char *bitmap = new char[CkNumPes()];
642   // set processor available bitmap
643   get_avail_vector(bitmap);
644
645   for (i=0; i<failedPes.length(); i++)
646     bitmap[failedPes[i]] = 0; 
647   bitmap[diepe] = 0;
648
649 #if CK_NO_PROC_POOL
650   set_avail_vector(bitmap);
651 #endif
652
653   // if I am the crashed pe, rebuild my failedPEs array
654   if (CkMyPe() == diepe)
655     for (i=0; i<CkNumPes(); i++) 
656       if (bitmap[i]==0) failed(i);
657
658   delete [] bitmap;
659 #endif
660 }
661
662 // in case when failedPe dies, everybody go through its checkpoint table:
663 // destory all array elements
664 // recover lost buddies
665 // reconstruct all array elements from check point data
666 // called on all processors
667 void CkMemCheckPT::restart(int diePe)
668 {
669 #if CMK_MEM_CHECKPOINT
670   double curTime = CmiWallTimer();
671   if (CkMyPe() == diePe)
672     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
673   stage = (char*)"resetLB";
674   startTime = curTime;
675   if (CkMyPe() == diePe)
676     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
677
678 #if CK_NO_PROC_POOL
679   failed(diePe);        // add into the list of failed pes
680 #endif
681   thisFailedPe = diePe;
682
683   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
684
685   inRestarting = 1;
686                                                                                 
687   // disable load balancer's barrier
688   if (CkMyPe() != diePe) resetLB(diePe);
689
690   CKLOCMGR_LOOP(mgr->startInserting(););
691
692   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
693 /*
694   if (CkMyPe() == 0)
695     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
696 */
697 #endif
698 }
699
700 // loally remove all array elements
701 void CkMemCheckPT::removeArrayElements()
702 {
703 #if CMK_MEM_CHECKPOINT
704   int len = ckTable.length();
705   double curTime = CmiWallTimer();
706   if (CkMyPe() == thisFailedPe) 
707     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
708   stage = (char*)"removeArrayElements";
709   startTime = curTime;
710
711   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
712   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
713
714   // get rid of all buffering and remote recs
715   // including destorying all array elements
716   CKLOCMGR_LOOP(mgr->flushAllRecs(););
717
718 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
719
720   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
721 #endif
722 }
723
724 // flush state in reduction manager
725 void CkMemCheckPT::resetReductionMgr()
726 {
727   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
728   int numGroups = CkpvAccess(_groupIDTable)->size();
729   for(int i=0;i<numGroups;i++) {
730     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
731     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
732     obj->flushStates();
733     obj->ckJustMigrated();
734   }
735   // reset again
736   //CpvAccess(_qd)->flushStates();
737
738 #if 1
739   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
740 #else
741   if (CkMyPe() == 0)
742     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
743 #endif
744 }
745
746 // recover the lost buddies
747 void CkMemCheckPT::recoverBuddies()
748 {
749   int idx;
750   int len = ckTable.length();
751   // ready to flush reduction manager
752   // cannot be CkMemCheckPT::restart because destory will modify states
753   double curTime = CmiWallTimer();
754   if (CkMyPe() == thisFailedPe)
755   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
756   stage = (char *)"recoverBuddies";
757   if (CkMyPe() == thisFailedPe)
758   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
759   startTime = curTime;
760
761   // recover buddies
762   expectCount = 0;
763   for (idx=0; idx<len; idx++) {
764     CkCheckPTInfo *entry = ckTable[idx];
765     if (entry->pNo == thisFailedPe) {
766 #if CK_NO_PROC_POOL
767       // find a new buddy
768 /*
769       int budPe = CkMyPe();
770 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
771       while (budPe == CkMyPe() || isFailed(budPe)) 
772           budPe = (budPe+1)%CkNumPes();
773 */
774       int budPe = BuddyPE(CkMyPe());
775       //entry->pNo = budPe;
776 #else
777       int budPe = thisFailedPe;
778 #endif
779       entry->updateBuddy(CkMyPe(), budPe);
780 #if 0
781       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
782       CkArrayCheckPTMessage *msg = entry->getCopy();
783       msg->cp_flag = 0;            // not checkpointing
784       thisProxy[budPe].recvData(msg);
785 #else
786       CkArrayCheckPTMessage *msg = entry->getCopy();
787       msg->bud1 = budPe;
788       msg->bud2 = CkMyPe();
789       msg->cp_flag = 0;            // not checkpointing
790       thisProxy[budPe].recoverEntry(msg);
791 #endif
792       expectCount ++;
793     }
794   }
795
796 #if 1
797   if (expectCount == 0) {
798     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
799   }
800 #else
801   if (CkMyPe() == 0) {
802     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
803   }
804 #endif
805
806   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
807 }
808
809 void CkMemCheckPT::gotData()
810 {
811   ackCount ++;
812   if (ackCount == expectCount) {
813     ackCount = 0;
814     expectCount = -1;
815     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
816   }
817 }
818
819 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
820 {
821   for (int i=0; i<n; i++) {
822     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
823     mgr->updateLocation(idx[i], nowOnPe);
824   }
825 }
826
827 // restore array elements
828 void CkMemCheckPT::recoverArrayElements()
829 {
830   double curTime = CmiWallTimer();
831   int len = ckTable.length();
832   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
833   stage = (char *)"recoverArrayElements";
834   if (CkMyPe() == thisFailedPe)
835   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
836   startTime = curTime;
837
838   // recover all array elements
839   int count = 0;
840 #if STREAMING_INFORMHOME
841   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
842   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
843 #endif
844   for (int idx=0; idx<len; idx++)
845   {
846     CkCheckPTInfo *entry = ckTable[idx];
847 #if CK_NO_PROC_POOL
848     // the bigger one will do 
849 //    if (CkMyPe() < entry->pNo) continue;
850     if (!isMaster(entry->pNo)) continue;
851 #else
852     // smaller one do it, which has the original object
853     if (CkMyPe() == entry->pNo+1 || 
854         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
855 #endif
856 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
857
858     entry->updateBuddy(CkMyPe(), entry->pNo);
859     CkArrayCheckPTMessage *msg = entry->getCopy();
860     // gzheng
861     //thisProxy[CkMyPe()].inmem_restore(msg);
862     inmem_restore(msg);
863 #if STREAMING_INFORMHOME
864     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
865     int homePe = mgr->homePe(msg->index);
866     if (homePe != CkMyPe()) {
867       gmap[homePe].push_back(msg->locMgr);
868       imap[homePe].push_back(msg->index);
869     }
870 #endif
871     count ++;
872   }
873 #if STREAMING_INFORMHOME
874   for (int i=0; i<CkNumPes(); i++) {
875     if (gmap[i].size() && i!=CkMyPe()) {
876       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
877     }
878   }
879   delete [] imap;
880   delete [] gmap;
881 #endif
882   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
883
884   CKLOCMGR_LOOP(mgr->doneInserting(););
885
886   inRestarting = 0;
887  // _crashedNode = -1;
888 CpvAccess(_crashedNode) = -1;
889
890   if (CkMyPe() == 0)
891     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
892 }
893
894 static double restartT;
895
896 // on every processor
897 // turn load balancer back on
898 void CkMemCheckPT::finishUp()
899 {
900   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
901   //CKLOCMGR_LOOP(mgr->doneInserting(););
902   
903   if (CkMyPe() == thisFailedPe)
904   {
905        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
906        //CkStartQD(cpCallback);
907        cpCallback.send();
908        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
909   }
910
911 #if CK_NO_PROC_POOL
912 #if NODE_CHECKPOINT
913   int numnodes = CmiNumPhysicalNodes();
914 #else
915   int numnodes = CkNumPes();
916 #endif
917   if (numnodes-totalFailed() <=2) {
918     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
919     _memChkptOn = 0;
920   }
921 #endif
922 }
923
924 // called only on 0
925 void CkMemCheckPT::quiescence(CkCallback &cb)
926 {
927   static int pe_count = 0;
928   pe_count ++;
929   CmiAssert(CkMyPe() == 0);
930   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
931   if (pe_count == CkNumPes()) {
932     pe_count = 0;
933     cb.send();
934   }
935 }
936
937 // User callable function - to start a checkpoint
938 // callback cb is used to pass control back
939 void CkStartMemCheckpoint(CkCallback &cb)
940 {
941 #if CMK_MEM_CHECKPOINT
942   if (_memChkptOn == 0) {
943     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
944     cb.send();
945     return;
946   }
947   if (CkInRestarting()) {
948       // trying to checkpointing during restart
949     cb.send();
950     return;
951   }
952     // store user callback and user data
953   CkMemCheckPT::cpCallback = cb;
954
955     // broadcast to start check pointing
956   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
957   checkptMgr.doItNow(CkMyPe(), cb);
958 #else
959   // when mem checkpoint is disabled, invike cb immediately
960   cb.send();
961 #endif
962 }
963
964 void CkRestartCheckPoint(int diePe)
965 {
966 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
967   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
968   // broadcast
969   checkptMgr.restart(diePe);
970 }
971
972 static int _diePE = -1;
973
974 // callback function used locally by ccs handler
975 static void CkRestartCheckPointCallback(void *ignore, void *msg)
976 {
977 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
978   CkRestartCheckPoint(_diePE);
979 }
980
981 // Converse function handles
982 static int askPhaseHandlerIdx;
983 static int recvPhaseHandlerIdx;
984 static int askProcDataHandlerIdx;
985 static int restartBcastHandlerIdx;
986 static int recoverProcDataHandlerIdx;
987 static int restartBeginHandlerIdx;
988 static int notifyHandlerIdx;
989
990 // called on crashed PE
991 static void restartBeginHandler(char *msg)
992 {
993 #if CMK_MEM_CHECKPOINT
994   static int count = 0;
995   CmiFree(msg);
996   CmiAssert(CkMyPe() == _diePE);
997   count ++;
998   if (count == CkNumPes()) {
999     CkRestartCheckPointCallback(NULL, NULL);
1000     count = 0;
1001   }
1002 #endif
1003 }
1004
1005 extern void _discard_charm_message();
1006 extern void _resume_charm_message();
1007
1008 static void restartBcastHandler(char *msg)
1009 {
1010 #if CMK_MEM_CHECKPOINT
1011   // advance phase counter
1012   CkMemCheckPT::inRestarting = 1;
1013   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1014   // gzheng
1015   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1016
1017   if (CkMyPe()==_diePE)
1018     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1019
1020   // reset QD counters
1021 /*  gzheng
1022   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1023 */
1024
1025 /*  gzheng
1026   if (CkMyPe()==_diePE)
1027       CkRestartCheckPointCallback(NULL, NULL);
1028 */
1029   CmiFree(msg);
1030
1031   _resume_charm_message();
1032
1033     // reduction
1034   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1035   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1036   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1037 #endif
1038 }
1039
1040 extern void _initDone();
1041
1042 // called on crashed processor
1043 static void recoverProcDataHandler(char *msg)
1044 {
1045 #if CMK_MEM_CHECKPOINT
1046    int i;
1047    envelope *env = (envelope *)msg;
1048    CkUnpackMessage(&env);
1049    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1050    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1051    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1052    //cur_restart_phase ++;
1053      // gzheng ?
1054    //CpvAccess(_qd)->flushStates();
1055
1056    // restore readonly, mainchare, group, nodegroup
1057 //   int temp = cur_restart_phase;
1058 //   cur_restart_phase = -1;
1059    PUP::fromMem p(procMsg->packData);
1060    _handleProcData(p);
1061
1062    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1063    // gzheng
1064    CKLOCMGR_LOOP(mgr->startInserting(););
1065
1066    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1067    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1068    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1069    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1070
1071    _initDone();
1072 //   CpvAccess(_qd)->flushStates();
1073    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1074 #endif
1075 }
1076
1077 // called on its backup processor
1078 // get backup message buffer and sent to crashed processor
1079 static void askProcDataHandler(char *msg)
1080 {
1081 #if CMK_MEM_CHECKPOINT
1082     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1083     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1084     if (CpvAccess(procChkptBuf) == NULL) 
1085       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1086     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1087     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1088     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1089
1090     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1091
1092     CkPackMessage(&env);
1093     CmiSetHandler(env, recoverProcDataHandlerIdx);
1094     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1095     CpvAccess(procChkptBuf) = NULL;
1096 #endif
1097 }
1098
1099 // called on PE 0
1100 void qd_callback(void *m)
1101 {
1102    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1103    CkFreeMsg(m);
1104 #ifdef CMK_SMP
1105    for(int i=0;i<CmiMyNodeSize();i++){
1106    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1107    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1108         CmiSetHandler(msg, askProcDataHandlerIdx);
1109         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1110         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1111    }
1112    return;
1113 #endif
1114    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1115    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1116    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1117    CmiSetHandler(msg, askProcDataHandlerIdx);
1118    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1119    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1120
1121 }
1122
1123 // on crashed node
1124 void CkMemRestart(const char *dummy, CkArgMsg *args)
1125 {
1126 #if CMK_MEM_CHECKPOINT
1127    _diePE = CmiMyNode();
1128    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1129    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1130    CkMemCheckPT::inRestarting = 1;
1131
1132   CpvAccess( _crashedNode )= CmiMyNode();
1133         
1134   _discard_charm_message();
1135   
1136 if(CmiMyRank()==0){
1137    CkCallback cb(qd_callback);
1138    CkStartQD(cb);
1139    CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1140  }
1141 #else
1142    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1143 #endif
1144 }
1145
1146 // can be called in other files
1147 // return true if it is in restarting
1148 extern "C"
1149 int CkInRestarting()
1150 {
1151 #if CMK_MEM_CHECKPOINT
1152   if (CpvAccess( _crashedNode)!=-1) return 1;
1153   // gzheng
1154   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1155   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1156   return CkMemCheckPT::inRestarting;
1157 #else
1158   return 0;
1159 #endif
1160 }
1161
1162 /*****************************************************************************
1163                 module initialization
1164 *****************************************************************************/
1165
1166 static int arg_where = CkCheckPoint_inMEM;
1167
1168 #if CMK_MEM_CHECKPOINT
1169 void init_memcheckpt(char **argv)
1170 {
1171     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1172       arg_where = CkCheckPoint_inDISK;
1173     }
1174
1175         // initiliazing _crashedNode variable
1176         CpvInitialize(int, _crashedNode);
1177         CpvAccess(_crashedNode) = -1;
1178
1179 }
1180 #endif
1181
1182 class CkMemCheckPTInit: public Chare {
1183 public:
1184   CkMemCheckPTInit(CkArgMsg *m) {
1185 #if CMK_MEM_CHECKPOINT
1186     if (arg_where == CkCheckPoint_inDISK) {
1187       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1188     }
1189     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1190     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1191 #endif
1192   }
1193 };
1194
1195 static void notifyHandler(char *msg)
1196 {
1197 #if CMK_MEM_CHECKPOINT
1198   CmiFree(msg);
1199       /* immediately increase restart phase to filter old messages */
1200   CpvAccess(_curRestartPhase) ++;
1201   CpvAccess(_qd)->flushStates();
1202   _discard_charm_message();
1203 #endif
1204 }
1205
1206 extern "C"
1207 void notify_crash(int node)
1208 {
1209 #ifdef CMK_MEM_CHECKPOINT
1210   CpvAccess( _crashedNode) = node;
1211 #ifdef CMK_SMP
1212   for(int i=0;i<CkMyNodeSize();i++){
1213         CpvAccessOther(_crashedNode,i)=node;
1214   }
1215 #endif
1216   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1217   CkMemCheckPT::inRestarting = 1;
1218
1219 #ifdef CMK_SMP
1220   for(int i=0;i<CkMyNodeSize();i++){
1221         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1222         CmiSetHandler(msg, notifyHandlerIdx);
1223         CmiSyncSendAndFree(CkMyNode()*CkMyNodeSize()+i, CmiMsgHeaderSizeBytes, (char *)msg);
1224   }
1225  return;
1226 #else 
1227     // this may be in interrupt handler, send a message to reset QD
1228   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1229   CmiSetHandler(msg, notifyHandlerIdx);
1230   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1231 #endif
1232 #endif
1233 }
1234
1235 extern "C" void (*notify_crash_fn)(int node);
1236
1237 // initproc
1238 void CkRegisterRestartHandler( )
1239 {
1240 #if CMK_MEM_CHECKPOINT
1241   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1242   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1243   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1244   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1245   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1246
1247   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1248   CpvAccess(procChkptBuf) = NULL;
1249
1250   notify_crash_fn = notify_crash;
1251 #if 1
1252   // for debugging
1253   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1254 //  sleep(4);
1255 #endif
1256 #endif
1257 }
1258
1259 /// @todo: the following definitions should be moved to a separate file containing
1260 // structures and functions about fault tolerance strategies
1261
1262 /**
1263  *  * @brief: function for killing a process                                             
1264  *   */
1265 #ifdef CMK_MEM_CHECKPOINT
1266 #if CMK_HAS_GETPID
1267 void killLocal(void *_dummy,double curWallTime){
1268         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1269         if(CmiWallTimer()<killTime-1){
1270                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1271         }else{  
1272                 kill(getpid(),SIGKILL);                                               
1273         }              
1274
1275 #else
1276 void killLocal(void *_dummy,double curWallTime){
1277   CmiAbort("kill() not supported!");
1278 }
1279 #endif
1280 #endif
1281
1282 #ifdef CMK_MEM_CHECKPOINT
1283 /**
1284  * @brief: reads the file with the kill information
1285  */
1286 void readKillFile(){
1287         FILE *fp=fopen(killFile,"r");
1288         if(!fp){
1289                 return;
1290         }
1291         int proc;
1292         double sec;
1293         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1294                 if(proc == CkMyNode() && CkMyRank() == 0){
1295                         killTime = CmiWallTimer()+sec;
1296                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1297                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1298                 }
1299         }
1300         fclose(fp);
1301 }
1302 #endif
1303
1304 #include "CkMemCheckpoint.def.h"
1305
1306