Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 #define DEBUGF      // CkPrintf
51
52 // pick buddy processor from a different physical node
53 #define NODE_CHECKPOINT                        0
54
55 // assume NO extra processors--1
56 // assume extra processors--0
57 #define CK_NO_PROC_POOL                         1
58
59 #define STREAMING_INFORMHOME                    1
60
61 int crashed_node = -1;
62
63 // static, so that it is accessible from Converse part
64 int CkMemCheckPT::inRestarting = 0;
65 double CkMemCheckPT::startTime;
66 char *CkMemCheckPT::stage;
67 CkCallback CkMemCheckPT::cpCallback;
68
69 int _memChkptOn = 1;                    // checkpoint is on or off
70
71 CkGroupID ckCheckPTGroupID;             // readonly
72
73
74 /// @todo the following declarations should be moved into a separate file for all 
75 // fault tolerant strategies
76
77 #ifdef CMK_MEM_CHECKPOINT
78 // name of the kill file that contains processes to be killed 
79 char *killFile;                                               
80 // flag for the kill file         
81 int killFlag=0;
82 // variable for storing the killing time
83 double killTime=0.0;
84 #endif
85
86 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
87 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
88
89 // compute the backup processor
90 // FIXME: avoid crashed processors
91 inline int ChkptOnPe(int pe) { return (pe+1)%CkNumPes(); }
92
93 inline int CkMemCheckPT::BuddyPE(int pe)
94 {
95   int budpe;
96 #if NODE_CHECKPOINT
97     // buddy is the processor with same rank on the next physical node
98   int r1 = CmiPhysicalRank(pe);
99   int budnode = CmiPhysicalNodeID(pe);
100   do {
101     budnode = (budnode+1)%CmiNumPhysicalNodes();
102     int *pelist;
103     int num;
104     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
105     budpe = pelist[r1 % num];
106   } while (isFailed(budpe));
107   if (budpe == pe) {
108     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
109     CmiAbort("Failed to find a buddy processor");
110   }
111 #else
112   budpe = pe;
113   while (budpe == pe || isFailed(budpe)) 
114           budpe = (budpe+1)%CkNumPes();
115 #endif
116   return budpe;
117 }
118
119 // called in array element constructor
120 // choose and register with 2 buddies for checkpoiting 
121 #if CMK_MEM_CHECKPOINT
122 void ArrayElement::init_checkpt() {
123         if (_memChkptOn == 0) return;
124         if (CkInRestarting()) {
125           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
126         }
127         // only master init checkpoint
128         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
129
130         budPEs[0] = CkMyPe();
131         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
132         CmiAssert(budPEs[0] != budPEs[1]);
133         // inform checkPTMgr
134         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
135         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
136         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
137         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
138 }
139 #endif
140
141 // entry function invoked by checkpoint mgr asking for checkpoint data
142 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
143 #if CMK_MEM_CHECKPOINT
144   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
145   CkLocMgr *locMgr = thisArray->getLocMgr();
146   CmiAssert(myRec!=NULL);
147   int size;
148   {
149         PUP::sizer p;
150         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
151         size = p.size();
152   }
153   int packSize = size/sizeof(double) +1;
154   CkArrayCheckPTMessage *msg =
155                  new (packSize, 0) CkArrayCheckPTMessage;
156   msg->len = size;
157   msg->index =thisIndexMax;
158   msg->aid = thisArrayID;
159   msg->locMgr = locMgr->getGroupID();
160   msg->cp_flag = 1;
161   {
162         PUP::toMem p(msg->packData);
163         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
164   }
165
166   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
167   checkptMgr.recvData(msg, 2, budPEs);
168   delete m;
169 #endif
170 }
171
172 // checkpoint holder class - for memory checkpointing
173 class CkMemCheckPTInfo: public CkCheckPTInfo
174 {
175   CkArrayCheckPTMessage *ckBuffer;
176 public:
177   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
178                     CkCheckPTInfo(a, loc, idx, pno)
179   {
180     ckBuffer = NULL;
181   }
182   ~CkMemCheckPTInfo() 
183   {
184     if (ckBuffer) delete ckBuffer; 
185   }
186   inline void updateBuffer(CkArrayCheckPTMessage *data) 
187   {
188     CmiAssert(data!=NULL);
189     if (ckBuffer) delete ckBuffer;
190     ckBuffer = data;
191   }    
192   inline CkArrayCheckPTMessage * getCopy()
193   {
194     if (ckBuffer == NULL) {
195       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
196       CmiAbort("Abort!");
197     }
198     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
199   }     
200   inline void updateBuddy(int b1, int b2) {
201      CmiAssert(ckBuffer);
202      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
203      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
204      CmiAssert(pNo != CkMyPe());
205   }
206   inline int getSize() { 
207      CmiAssert(ckBuffer);
208      return ckBuffer->len; 
209   }
210 };
211
212 // checkpoint holder class - for in-disk checkpointing
213 class CkDiskCheckPTInfo: public CkCheckPTInfo 
214 {
215   char *fname;
216   int bud1, bud2;
217   int len;                      // checkpoint size
218 public:
219   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
220   {
221 #if CMK_USE_MKSTEMP
222     fname = new char[64];
223     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
224     mkstemp(fname);
225 #else
226     fname=tmpnam(NULL);
227 #endif
228     bud1 = bud2 = -1;
229     len = 0;
230   }
231   ~CkDiskCheckPTInfo() 
232   {
233     remove(fname);
234   }
235   inline void updateBuffer(CkArrayCheckPTMessage *data) 
236   {
237     double t = CmiWallTimer();
238     // unpack it
239     envelope *env = UsrToEnv(data);
240     CkUnpackMessage(&env);
241     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
242     FILE *f = fopen(fname,"wb");
243     PUP::toDisk p(f);
244     CkPupMessage(p, (void **)&data);
245     // delay sync to the end because otherwise the messages are blocked
246 //    fsync(fileno(f));
247     fclose(f);
248     bud1 = data->bud1;
249     bud2 = data->bud2;
250     len = data->len;
251     delete data;
252     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
253   }
254   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
255   {
256     CkArrayCheckPTMessage *data;
257     FILE *f = fopen(fname,"rb");
258     PUP::fromDisk p(f);
259     CkPupMessage(p, (void **)&data);
260     fclose(f);
261     data->bud1 = bud1;                          // update the buddies
262     data->bud2 = bud2;
263     return data;
264   }
265   inline void updateBuddy(int b1, int b2) {
266      bud1 = b1; bud2 = b2;
267      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
268      CmiAssert(pNo != CkMyPe());
269   }
270   inline int getSize() { 
271      return len; 
272   }
273 };
274
275 CkMemCheckPT::CkMemCheckPT(int w)
276 {
277   int numnodes = 0;
278 #if NODE_CHECKPOINT
279   numnodes = CmiNumPhysicalNodes();
280 #else
281   numnodes = CkNumPes();
282 #endif
283 #if CK_NO_PROC_POOL
284   if (numnodes <= 2)
285 #else
286   if (numnodes  == 1)
287 #endif
288   {
289     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
290     _memChkptOn = 0;
291   }
292   inRestarting = 0;
293   recvCount = peCount = 0;
294   ackCount = 0;
295   expectCount = -1;
296   where = w;
297 }
298
299 CkMemCheckPT::~CkMemCheckPT()
300 {
301   int len = ckTable.length();
302   for (int i=0; i<len; i++) {
303     delete ckTable[i];
304   }
305 }
306
307 void CkMemCheckPT::pup(PUP::er& p) 
308
309   CBase_CkMemCheckPT::pup(p); 
310   p|cpStarter;
311   p|thisFailedPe;
312   p|failedPes;
313   p|ckCheckPTGroupID;           // recover global variable
314   p|cpCallback;                 // store callback
315   p|where;                      // where to checkpoint
316   if (p.isUnpacking()) {
317     recvCount = peCount = 0;
318   }
319 }
320
321 // called by checkpoint mgr to restore an array element
322 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
323 {
324 #if CMK_MEM_CHECKPOINT
325   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
326   // m->index.print();
327   PUP::fromMem p(m->packData);
328   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
329   CmiAssert(mgr);
330 #if STREAMING_INFORMHOME
331   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
332 #else
333   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
334 #endif
335
336   // find a list of array elements bound together
337   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
338   CmiAssert(elt);
339   CkLocRec_local *rec = elt->myRec;
340   CkVec<CkMigratable *> list;
341   mgr->migratableList(rec, list);
342   CmiAssert(list.length() > 0);
343   for (int l=0; l<list.length(); l++) {
344     elt = (ArrayElement *)list[l];
345     elt->budPEs[0] = m->bud1;
346     elt->budPEs[1] = m->bud2;
347     //    reset, may not needed now
348     // for now.
349     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
350       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
351       if (c) c->redNo = 0;
352     }
353   }
354 #endif
355 }
356
357 // return 1 if pe is a crashed processor
358 int CkMemCheckPT::isFailed(int pe)
359 {
360   for (int i=0; i<failedPes.length(); i++)
361     if (failedPes[i] == pe) return 1;
362   return 0;
363 }
364
365 // add pe into history list of all failed processors
366 void CkMemCheckPT::failed(int pe)
367 {
368   if (isFailed(pe)) return;
369   failedPes.push_back(pe);
370 }
371
372 int CkMemCheckPT::totalFailed()
373 {
374   return failedPes.length();
375 }
376
377 // create an checkpoint entry for array element of aid with index.
378 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
379 {
380   // error check, no duplicate
381   int idx, len = ckTable.size();
382   for (idx=0; idx<len; idx++) {
383     CkCheckPTInfo *entry = ckTable[idx];
384     if (index == entry->index) {
385       if (loc == entry->locMgr) {
386           // bindTo array elements
387           return;
388       }
389         // for array inheritance, the following check may fail
390         // because ArrayElement constructor of all superclasses are called
391       if (aid == entry->aid) {
392         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
393         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
394       }
395     }
396   }
397   CkCheckPTInfo *newEntry;
398   if (where == CkCheckPoint_inMEM)
399     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
400   else
401     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
402   ckTable.push_back(newEntry);
403   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
404 }
405
406 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
407 {
408   int buddy = msg->bud1;
409   if (buddy == CkMyPe()) buddy = msg->bud2;
410   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
411   recvData(msg);
412     // ack
413   thisProxy[buddy].gotData();
414 }
415
416 // loop through my checkpoint table and ask checkpointed array elements
417 // to send me checkpoint data.
418 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
419 {
420   cpCallback = cb;
421   cpStarter = starter;
422   if (CkMyPe() == cpStarter) {
423     startTime = CmiWallTimer();
424     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
425   }
426
427   int len = ckTable.length();
428   for (int i=0; i<len; i++) {
429     CkCheckPTInfo *entry = ckTable[i];
430       // always let the bigger number processor send request
431     if (CkMyPe() < entry->pNo) continue;
432       // call inmem_checkpoint to the array element, ask it to send
433       // back checkpoint data via recvData().
434     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
435     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
436   }
437     // if my table is empty, then I am done
438   if (len == 0) thisProxy[cpStarter].cpFinish();
439
440   // pack and send proc level data
441   sendProcData();
442 }
443
444 // don't handle array elements
445 static inline void _handleProcData(PUP::er &p)
446 {
447     // save readonlys, and callback BTW
448     CkPupROData(p);
449
450     // save mainchares into MainChares.dat
451     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
452         
453 #ifndef CMK_CHARE_USE_PTR
454     // save non-migratable chare
455     CkPupChareData(p);
456 #endif
457
458     // save groups into Groups.dat
459     CkPupGroupData(p);
460
461     // save nodegroups into NodeGroups.dat
462     if(CkMyRank()==0) CkPupNodeGroupData(p);
463 }
464
465 void CkMemCheckPT::sendProcData()
466 {
467   // find out size of buffer
468   int size;
469   {
470     PUP::sizer p;
471     _handleProcData(p);
472     size = p.size();
473   }
474   int packSize = size;
475   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
476   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
477   {
478     PUP::toMem p(msg->packData);
479     _handleProcData(p);
480   }
481   msg->pe = CkMyPe();
482   msg->len = size;
483   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
484   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
485 }
486
487 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
488 {
489   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
490   CpvAccess(procChkptBuf) = msg;
491   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
492   thisProxy[msg->reportPe].cpFinish();
493 }
494
495 // ArrayElement call this function to give us the checkpointed data
496 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
497 {
498   int len = ckTable.length();
499   int idx;
500   for (idx=0; idx<len; idx++) {
501     CkCheckPTInfo *entry = ckTable[idx];
502     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
503   }
504   CkAssert(idx < len);
505   int isChkpting = msg->cp_flag;
506   ckTable[idx]->updateBuffer(msg);
507   if (isChkpting) {
508       // all my array elements have returned their inmem data
509       // inform starter processor that I am done.
510     recvCount ++;
511     if (recvCount == ckTable.length()) {
512       if (where == CkCheckPoint_inMEM) {
513         thisProxy[cpStarter].cpFinish();
514       }
515       else if (where == CkCheckPoint_inDISK) {
516         // another barrier for finalize the writing using fsync
517         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
518         contribute(0,NULL,CkReduction::sum_int,localcb);
519       }
520       else
521         CmiAbort("Unknown checkpoint scheme");
522       recvCount = 0;
523     } 
524   }
525 }
526
527 // only used in disk checkpointing
528 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
529 {
530   delete m;
531 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
532   system("sync");
533 #endif
534   thisProxy[cpStarter].cpFinish();
535 }
536
537 // only is called on cpStarter when checkpoint is done
538 void CkMemCheckPT::cpFinish()
539 {
540   CmiAssert(CkMyPe() == cpStarter);
541   peCount++;
542     // now that all processors have finished, activate callback
543   if (peCount == 2*(CkNumPes())) {
544     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
545     cpCallback.send();
546     peCount = 0;
547     thisProxy.report();
548   }
549 }
550
551 // for debugging, report checkpoint info
552 void CkMemCheckPT::report()
553 {
554   int objsize = 0;
555   int len = ckTable.length();
556   for (int i=0; i<len; i++) {
557     CkCheckPTInfo *entry = ckTable[i];
558     CmiAssert(entry);
559     objsize += entry->getSize();
560   }
561   CmiAssert(CpvAccess(procChkptBuf));
562   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
563 }
564
565 /*****************************************************************************
566                         RESTART Procedure
567 *****************************************************************************/
568
569 // master processor of two buddies
570 inline int CkMemCheckPT::isMaster(int buddype)
571 {
572 #if 0
573   int mype = CkMyPe();
574 //CkPrintf("ismaster: %d %d\n", pe, mype);
575   if (CkNumPes() - totalFailed() == 2) {
576     return mype > buddype;
577   }
578   for (int i=1; i<CkNumPes(); i++) {
579     int me = (buddype+i)%CkNumPes();
580     if (isFailed(me)) continue;
581     if (me == mype) return 1;
582     else return 0;
583   }
584   return 0;
585 #else
586   int mype = CkMyPe();
587 //CkPrintf("ismaster: %d %d\n", pe, mype);
588   if (CkNumPes() - totalFailed() == 2) {
589     return mype < buddype;
590   }
591   for (int i=1; i<CkNumPes(); i++) {
592     int me = (mype+i)%CkNumPes();
593     if (isFailed(me)) continue;
594     if (me == buddype) return 1;
595     else return 0;
596   }
597   return 0;
598 #endif
599 }
600
601 #ifdef CKLOCMGR_LOOP
602 #undef CKLOCMGR_LOOP
603 #endif
604
605 // loop over all CkLocMgr and do "code"
606 #define  CKLOCMGR_LOOP(code)    {       \
607   int numGroups = CkpvAccess(_groupIDTable)->size();    \
608   for(int i=0;i<numGroups;i++) {        \
609     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
610     if(obj->isLocMgr())  {      \
611       CkLocMgr *mgr = (CkLocMgr*)obj;   \
612       code      \
613     }   \
614   }     \
615  }
616
617 #if 0
618 // helper class to pup all elements that belong to same ckLocMgr
619 class ElementDestoryer : public CkLocIterator {
620 private:
621         CkLocMgr *locMgr;
622 public:
623         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
624         void addLocation(CkLocation &loc) {
625                 CkArrayIndexMax idx=loc.getIndex();
626                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
627                 loc.destroy();
628         }
629 };
630 #endif
631
632 // restore the bitmap vector for LB
633 void CkMemCheckPT::resetLB(int diepe)
634 {
635 #if CMK_LBDB_ON
636   int i;
637   char *bitmap = new char[CkNumPes()];
638   // set processor available bitmap
639   get_avail_vector(bitmap);
640
641   for (i=0; i<failedPes.length(); i++)
642     bitmap[failedPes[i]] = 0; 
643   bitmap[diepe] = 0;
644
645 #if CK_NO_PROC_POOL
646   set_avail_vector(bitmap);
647 #endif
648
649   // if I am the crashed pe, rebuild my failedPEs array
650   if (CkMyPe() == diepe)
651     for (i=0; i<CkNumPes(); i++) 
652       if (bitmap[i]==0) failed(i);
653
654   delete [] bitmap;
655 #endif
656 }
657
658 // in case when failedPe dies, everybody go through its checkpoint table:
659 // destory all array elements
660 // recover lost buddies
661 // reconstruct all array elements from check point data
662 // called on all processors
663 void CkMemCheckPT::restart(int diePe)
664 {
665 #if CMK_MEM_CHECKPOINT
666   double curTime = CmiWallTimer();
667   if (CkMyPe() == diePe)
668     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
669   stage = (char*)"resetLB";
670   startTime = curTime;
671   if (CkMyPe() == diePe)
672     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
673
674 #if CK_NO_PROC_POOL
675   failed(diePe);        // add into the list of failed pes
676 #endif
677   thisFailedPe = diePe;
678
679   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
680
681   inRestarting = 1;
682                                                                                 
683   // disable load balancer's barrier
684   if (CkMyPe() != diePe) resetLB(diePe);
685
686   CKLOCMGR_LOOP(mgr->startInserting(););
687
688   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
689 /*
690   if (CkMyPe() == 0)
691     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
692 */
693 #endif
694 }
695
696 // loally remove all array elements
697 void CkMemCheckPT::removeArrayElements()
698 {
699 #if CMK_MEM_CHECKPOINT
700   int len = ckTable.length();
701   double curTime = CmiWallTimer();
702   if (CkMyPe() == thisFailedPe) 
703     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
704   stage = (char*)"removeArrayElements";
705   startTime = curTime;
706
707   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
708   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
709
710   // get rid of all buffering and remote recs
711   // including destorying all array elements
712   CKLOCMGR_LOOP(mgr->flushAllRecs(););
713
714 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
715
716   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
717 #endif
718 }
719
720 // flush state in reduction manager
721 void CkMemCheckPT::resetReductionMgr()
722 {
723   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
724   int numGroups = CkpvAccess(_groupIDTable)->size();
725   for(int i=0;i<numGroups;i++) {
726     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
727     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
728     obj->flushStates();
729     obj->ckJustMigrated();
730   }
731   // reset again
732   //CpvAccess(_qd)->flushStates();
733
734 #if 1
735   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
736 #else
737   if (CkMyPe() == 0)
738     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
739 #endif
740 }
741
742 // recover the lost buddies
743 void CkMemCheckPT::recoverBuddies()
744 {
745   int idx;
746   int len = ckTable.length();
747   // ready to flush reduction manager
748   // cannot be CkMemCheckPT::restart because destory will modify states
749   double curTime = CmiWallTimer();
750   if (CkMyPe() == thisFailedPe)
751   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
752   stage = (char *)"recoverBuddies";
753   if (CkMyPe() == thisFailedPe)
754   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
755   startTime = curTime;
756
757   // recover buddies
758   expectCount = 0;
759   for (idx=0; idx<len; idx++) {
760     CkCheckPTInfo *entry = ckTable[idx];
761     if (entry->pNo == thisFailedPe) {
762 #if CK_NO_PROC_POOL
763       // find a new buddy
764 /*
765       int budPe = CkMyPe();
766 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
767       while (budPe == CkMyPe() || isFailed(budPe)) 
768           budPe = (budPe+1)%CkNumPes();
769 */
770       int budPe = BuddyPE(CkMyPe());
771       //entry->pNo = budPe;
772 #else
773       int budPe = thisFailedPe;
774 #endif
775       entry->updateBuddy(CkMyPe(), budPe);
776 #if 0
777       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
778       CkArrayCheckPTMessage *msg = entry->getCopy();
779       msg->cp_flag = 0;            // not checkpointing
780       thisProxy[budPe].recvData(msg);
781 #else
782       CkArrayCheckPTMessage *msg = entry->getCopy();
783       msg->bud1 = budPe;
784       msg->bud2 = CkMyPe();
785       msg->cp_flag = 0;            // not checkpointing
786       thisProxy[budPe].recoverEntry(msg);
787 #endif
788       expectCount ++;
789     }
790   }
791
792 #if 1
793   if (expectCount == 0) {
794     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
795   }
796 #else
797   if (CkMyPe() == 0) {
798     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
799   }
800 #endif
801
802   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
803 }
804
805 void CkMemCheckPT::gotData()
806 {
807   ackCount ++;
808   if (ackCount == expectCount) {
809     ackCount = 0;
810     expectCount = -1;
811     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
812   }
813 }
814
815 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndexMax *idx,int nowOnPe)
816 {
817   for (int i=0; i<n; i++) {
818     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
819     mgr->updateLocation(idx[i], nowOnPe);
820   }
821 }
822
823 // restore array elements
824 void CkMemCheckPT::recoverArrayElements()
825 {
826   double curTime = CmiWallTimer();
827   int len = ckTable.length();
828   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
829   stage = (char *)"recoverArrayElements";
830   if (CkMyPe() == thisFailedPe)
831   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
832   startTime = curTime;
833
834   // recover all array elements
835   int count = 0;
836 #if STREAMING_INFORMHOME
837   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
838   CkVec<CkArrayIndexMax> * imap = new CkVec<CkArrayIndexMax>[CkNumPes()];
839 #endif
840   for (int idx=0; idx<len; idx++)
841   {
842     CkCheckPTInfo *entry = ckTable[idx];
843 #if CK_NO_PROC_POOL
844     // the bigger one will do 
845 //    if (CkMyPe() < entry->pNo) continue;
846     if (!isMaster(entry->pNo)) continue;
847 #else
848     // smaller one do it, which has the original object
849     if (CkMyPe() == entry->pNo+1 || 
850         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
851 #endif
852 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
853
854     entry->updateBuddy(CkMyPe(), entry->pNo);
855     CkArrayCheckPTMessage *msg = entry->getCopy();
856     // gzheng
857     //thisProxy[CkMyPe()].inmem_restore(msg);
858     inmem_restore(msg);
859 #if STREAMING_INFORMHOME
860     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
861     int homePe = mgr->homePe(msg->index);
862     if (homePe != CkMyPe()) {
863       gmap[homePe].push_back(msg->locMgr);
864       imap[homePe].push_back(msg->index);
865     }
866 #endif
867     count ++;
868   }
869 #if STREAMING_INFORMHOME
870   for (int i=0; i<CkNumPes(); i++) {
871     if (gmap[i].size() && i!=CkMyPe()) {
872       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
873     }
874   }
875   delete [] imap;
876   delete [] gmap;
877 #endif
878   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
879
880   CKLOCMGR_LOOP(mgr->doneInserting(););
881
882   inRestarting = 0;
883   crashed_node = -1;
884
885   if (CkMyPe() == 0)
886     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
887 }
888
889 static double restartT;
890
891 // on every processor
892 // turn load balancer back on
893 void CkMemCheckPT::finishUp()
894 {
895   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
896   //CKLOCMGR_LOOP(mgr->doneInserting(););
897   
898   if (CkMyPe() == thisFailedPe)
899   {
900        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
901        //CkStartQD(cpCallback);
902        cpCallback.send();
903        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
904   }
905
906 #if CK_NO_PROC_POOL
907 #if NODE_CHECKPOINT
908   int numnodes = CmiNumPhysicalNodes();
909 #else
910   int numnodes = CkNumPes();
911 #endif
912   if (numnodes-totalFailed() <=2) {
913     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
914     _memChkptOn = 0;
915   }
916 #endif
917 }
918
919 // called only on 0
920 void CkMemCheckPT::quiescence(CkCallback &cb)
921 {
922   static int pe_count = 0;
923   pe_count ++;
924   CmiAssert(CkMyPe() == 0);
925   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
926   if (pe_count == CkNumPes()) {
927     pe_count = 0;
928     cb.send();
929   }
930 }
931
932 // User callable function - to start a checkpoint
933 // callback cb is used to pass control back
934 void CkStartMemCheckpoint(CkCallback &cb)
935 {
936 #if CMK_MEM_CHECKPOINT
937   if (_memChkptOn == 0) {
938     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
939     cb.send();
940     return;
941   }
942   if (CkInRestarting()) {
943       // trying to checkpointing during restart
944     cb.send();
945     return;
946   }
947     // store user callback and user data
948   CkMemCheckPT::cpCallback = cb;
949
950     // broadcast to start check pointing
951   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
952   checkptMgr.doItNow(CkMyPe(), cb);
953 #else
954   // when mem checkpoint is disabled, invike cb immediately
955   cb.send();
956 #endif
957 }
958
959 void CkRestartCheckPoint(int diePe)
960 {
961 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
962   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
963   // broadcast
964   checkptMgr.restart(diePe);
965 }
966
967 static int _diePE = -1;
968
969 // callback function used locally by ccs handler
970 static void CkRestartCheckPointCallback(void *ignore, void *msg)
971 {
972 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
973   CkRestartCheckPoint(_diePE);
974 }
975
976 // Converse function handles
977 static int askPhaseHandlerIdx;
978 static int recvPhaseHandlerIdx;
979 static int askProcDataHandlerIdx;
980 static int restartBcastHandlerIdx;
981 static int recoverProcDataHandlerIdx;
982 static int restartBeginHandlerIdx;
983 static int notifyHandlerIdx;
984
985 // called on crashed PE
986 static void restartBeginHandler(char *msg)
987 {
988 #if CMK_MEM_CHECKPOINT
989   static int count = 0;
990   CmiFree(msg);
991   CmiAssert(CkMyPe() == _diePE);
992   count ++;
993   if (count == CkNumPes()) {
994     CkRestartCheckPointCallback(NULL, NULL);
995     count = 0;
996   }
997 #endif
998 }
999
1000 extern void _discard_charm_message();
1001 extern void _resume_charm_message();
1002
1003 static void restartBcastHandler(char *msg)
1004 {
1005 #if CMK_MEM_CHECKPOINT
1006   // advance phase counter
1007   CkMemCheckPT::inRestarting = 1;
1008   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1009   // gzheng
1010   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1011
1012   if (CkMyPe()==_diePE)
1013     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), cur_restart_phase, _diePE, CkWallTimer());
1014
1015   // reset QD counters
1016 /*  gzheng
1017   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1018 */
1019
1020 /*  gzheng
1021   if (CkMyPe()==_diePE)
1022       CkRestartCheckPointCallback(NULL, NULL);
1023 */
1024   CmiFree(msg);
1025
1026   _resume_charm_message();
1027
1028     // reduction
1029   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1030   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1031   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1032 #endif
1033 }
1034
1035 extern void _initDone();
1036
1037 // called on crashed processor
1038 static void recoverProcDataHandler(char *msg)
1039 {
1040 #if CMK_MEM_CHECKPOINT
1041    int i;
1042    envelope *env = (envelope *)msg;
1043    CkUnpackMessage(&env);
1044    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1045    cur_restart_phase = procMsg->cur_restart_phase;
1046    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), cur_restart_phase, CkWallTimer());
1047    //cur_restart_phase ++;
1048      // gzheng ?
1049    //CpvAccess(_qd)->flushStates();
1050
1051    // restore readonly, mainchare, group, nodegroup
1052 //   int temp = cur_restart_phase;
1053 //   cur_restart_phase = -1;
1054    PUP::fromMem p(procMsg->packData);
1055    _handleProcData(p);
1056
1057    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1058    // gzheng
1059    CKLOCMGR_LOOP(mgr->startInserting(););
1060
1061    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1062    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1063    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1064    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1065
1066    _initDone();
1067 //   CpvAccess(_qd)->flushStates();
1068    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1069 #endif
1070 }
1071
1072 // called on its backup processor
1073 // get backup message buffer and sent to crashed processor
1074 static void askProcDataHandler(char *msg)
1075 {
1076 #if CMK_MEM_CHECKPOINT
1077     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1078     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, cur_restart_phase, CkWallTimer());
1079     if (CpvAccess(procChkptBuf) == NULL) 
1080       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1081     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1082     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1083     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1084
1085     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
1086
1087     CkPackMessage(&env);
1088     CmiSetHandler(env, recoverProcDataHandlerIdx);
1089     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1090     CpvAccess(procChkptBuf) = NULL;
1091 #endif
1092 }
1093
1094 // called on PE 0
1095 void qd_callback(void *m)
1096 {
1097    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), crashed_node);
1098    CkFreeMsg(m);
1099    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1100    *(int *)(msg+CmiMsgHeaderSizeBytes) = crashed_node;
1101    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1102    CmiSetHandler(msg, askProcDataHandlerIdx);
1103    int pe = ChkptOnPe(crashed_node);    // FIXME ?
1104    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1105 }
1106
1107 // on crashed PE
1108 void CkMemRestart(const char *dummy, CkArgMsg *args)
1109 {
1110 #if CMK_MEM_CHECKPOINT
1111    _diePE = CkMyPe();
1112    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1113    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), cur_restart_phase, CkMemCheckPT::startTime);
1114    CkMemCheckPT::inRestarting = 1;
1115    crashed_node = CkMyPe();
1116
1117    _discard_charm_message();
1118    CkCallback cb(qd_callback);
1119    CkStartQD(cb);
1120 #else
1121    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1122 #endif
1123 }
1124
1125 // can be called in other files
1126 // return true if it is in restarting
1127 extern "C"
1128 int CkInRestarting()
1129 {
1130 #if CMK_MEM_CHECKPOINT
1131   if (crashed_node!=-1) return 1;
1132   // gzheng
1133   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1134   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1135   return CkMemCheckPT::inRestarting;
1136 #else
1137   return 0;
1138 #endif
1139 }
1140
1141 /*****************************************************************************
1142                 module initialization
1143 *****************************************************************************/
1144
1145 static int arg_where = CkCheckPoint_inMEM;
1146
1147 #if CMK_MEM_CHECKPOINT
1148 void init_memcheckpt(char **argv)
1149 {
1150     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1151       arg_where = CkCheckPoint_inDISK;
1152     }
1153 }
1154 #endif
1155
1156 class CkMemCheckPTInit: public Chare {
1157 public:
1158   CkMemCheckPTInit(CkArgMsg *m) {
1159 #if CMK_MEM_CHECKPOINT
1160     if (arg_where == CkCheckPoint_inDISK) {
1161       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1162     }
1163     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1164     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1165 #endif
1166   }
1167 };
1168
1169 static void notifyHandler(char *msg)
1170 {
1171 #if CMK_MEM_CHECKPOINT
1172   CmiFree(msg);
1173       /* immediately increase restart phase to filter old messages */
1174   cur_restart_phase ++;
1175   CpvAccess(_qd)->flushStates();
1176   _discard_charm_message();
1177 #endif
1178 }
1179
1180 extern "C"
1181 void notify_crash(int node)
1182 {
1183 #ifdef CMK_MEM_CHECKPOINT
1184   crashed_node = node;
1185   CmiAssert(CmiMyPe() != crashed_node);
1186   CkMemCheckPT::inRestarting = 1;
1187
1188     // this may be in interrupt handler, send a message to reset QD
1189   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1190   CmiSetHandler(msg, notifyHandlerIdx);
1191   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1192 #endif
1193 }
1194
1195
1196 // initproc
1197 void CkRegisterRestartHandler( )
1198 {
1199 #if CMK_MEM_CHECKPOINT
1200   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1201   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1202   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1203   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1204   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1205
1206   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1207   CpvAccess(procChkptBuf) = NULL;
1208
1209 #if 1
1210   // for debugging
1211   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1212 //  sleep(4);
1213 #endif
1214 #endif
1215 }
1216
1217 /// @todo: the following definitions should be moved to a separate file containing
1218 // structures and functions about fault tolerance strategies
1219
1220 /**
1221  *  * @brief: function for killing a process                                             
1222  *   */
1223 #ifdef CMK_MEM_CHECKPOINT
1224 #if CMK_HAS_GETPID
1225 void killLocal(void *_dummy,double curWallTime){
1226         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1227         if(CmiWallTimer()<killTime-1){
1228                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1229         }else{  
1230                 kill(getpid(),SIGKILL);                                               
1231         }              
1232
1233 #else
1234 void killLocal(void *_dummy,double curWallTime){
1235   CmiAbort("kill() not supported!");
1236 }
1237 #endif
1238 #endif
1239
1240 #ifdef CMK_MEM_CHECKPOINT
1241 /**
1242  * @brief: reads the file with the kill information
1243  */
1244 void readKillFile(){
1245         FILE *fp=fopen(killFile,"r");
1246         if(!fp){
1247                 return;
1248         }
1249         int proc;
1250         double sec;
1251         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1252                 if(proc == CkMyPe()){
1253                         killTime = CmiWallTimer()+sec;
1254                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1255                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1256                 }
1257         }
1258         fclose(fp);
1259 }
1260 #endif
1261
1262 #include "CkMemCheckpoint.def.h"
1263
1264