Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 #define DEBUGF      // CkPrintf
51
52 // pick buddy processor from a different physical node
53 #define NODE_CHECKPOINT                        0
54
55 // assume NO extra processors--1
56 // assume extra processors--0
57 #define CK_NO_PROC_POOL                         1
58
59 #define STREAMING_INFORMHOME                    1
60
61 int crashed_node = -1;
62
63 // static, so that it is accessible from Converse part
64 int CkMemCheckPT::inRestarting = 0;
65 double CkMemCheckPT::startTime;
66 char *CkMemCheckPT::stage;
67 CkCallback CkMemCheckPT::cpCallback;
68
69 int _memChkptOn = 1;                    // checkpoint is on or off
70
71 CkGroupID ckCheckPTGroupID;             // readonly
72
73
74 /// @todo the following declarations should be moved into a separate file for all 
75 // fault tolerant strategies
76
77 #ifdef CMK_MEM_CHECKPOINT
78 // name of the kill file that contains processes to be killed 
79 char *killFile;                                               
80 // flag for the kill file         
81 int killFlag=0;
82 // variable for storing the killing time
83 double killTime=0.0;
84 #endif
85
86 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
87 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
88
89 // compute the backup processor
90 // FIXME: avoid crashed processors
91 inline int ChkptOnPe(int pe) { return (pe+1)%CkNumPes(); }
92
93 inline int CkMemCheckPT::BuddyPE(int pe)
94 {
95   int budpe;
96 #if NODE_CHECKPOINT
97     // buddy is the processor with same rank on the next physical node
98   int r1 = CmiPhysicalRank(pe);
99   int budnode = CmiPhysicalNodeID(pe);
100   do {
101     budnode = (budnode+1)%CmiNumPhysicalNodes();
102     int *pelist;
103     int num;
104     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
105     budpe = pelist[r1 % num];
106   } while (isFailed(budpe));
107   if (budpe == pe) {
108     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
109     CmiAbort("Failed to find a buddy processor");
110   }
111 #else
112   budpe = pe;
113   while (budpe == pe || isFailed(budpe)) 
114           budpe = (budpe+1)%CkNumPes();
115 #endif
116   return budpe;
117 }
118
119 // called in array element constructor
120 // choose and register with 2 buddies for checkpoiting 
121 #if CMK_MEM_CHECKPOINT
122 void ArrayElement::init_checkpt() {
123         if (_memChkptOn == 0) return;
124         if (CkInRestarting()) {
125           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
126         }
127         // only master init checkpoint
128         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
129
130         budPEs[0] = CkMyPe();
131         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
132         CmiAssert(budPEs[0] != budPEs[1]);
133         // inform checkPTMgr
134         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
135         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
136         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
137         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
138 }
139 #endif
140
141 // entry function invoked by checkpoint mgr asking for checkpoint data
142 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
143 #if CMK_MEM_CHECKPOINT
144   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
145 //char index[128];   thisIndexMax.sprint(index);
146 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
147   CkLocMgr *locMgr = thisArray->getLocMgr();
148   CmiAssert(myRec!=NULL);
149   int size;
150   {
151         PUP::sizer p;
152         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
153         size = p.size();
154   }
155   int packSize = size/sizeof(double) +1;
156   CkArrayCheckPTMessage *msg =
157                  new (packSize, 0) CkArrayCheckPTMessage;
158   msg->len = size;
159   msg->index =thisIndexMax;
160   msg->aid = thisArrayID;
161   msg->locMgr = locMgr->getGroupID();
162   msg->cp_flag = 1;
163   {
164         PUP::toMem p(msg->packData);
165         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
166   }
167
168   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
169   checkptMgr.recvData(msg, 2, budPEs);
170   delete m;
171 #endif
172 }
173
174 // checkpoint holder class - for memory checkpointing
175 class CkMemCheckPTInfo: public CkCheckPTInfo
176 {
177   CkArrayCheckPTMessage *ckBuffer;
178 public:
179   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
180                     CkCheckPTInfo(a, loc, idx, pno)
181   {
182     ckBuffer = NULL;
183   }
184   ~CkMemCheckPTInfo() 
185   {
186     if (ckBuffer) delete ckBuffer; 
187   }
188   inline void updateBuffer(CkArrayCheckPTMessage *data) 
189   {
190     CmiAssert(data!=NULL);
191     if (ckBuffer) delete ckBuffer;
192     ckBuffer = data;
193   }    
194   inline CkArrayCheckPTMessage * getCopy()
195   {
196     if (ckBuffer == NULL) {
197       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
198       CmiAbort("Abort!");
199     }
200     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
201   }     
202   inline void updateBuddy(int b1, int b2) {
203      CmiAssert(ckBuffer);
204      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
205      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
206      CmiAssert(pNo != CkMyPe());
207   }
208   inline int getSize() { 
209      CmiAssert(ckBuffer);
210      return ckBuffer->len; 
211   }
212 };
213
214 // checkpoint holder class - for in-disk checkpointing
215 class CkDiskCheckPTInfo: public CkCheckPTInfo 
216 {
217   char *fname;
218   int bud1, bud2;
219   int len;                      // checkpoint size
220 public:
221   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
222   {
223 #if CMK_USE_MKSTEMP
224     fname = new char[64];
225     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
226     mkstemp(fname);
227 #else
228     fname=tmpnam(NULL);
229 #endif
230     bud1 = bud2 = -1;
231     len = 0;
232   }
233   ~CkDiskCheckPTInfo() 
234   {
235     remove(fname);
236   }
237   inline void updateBuffer(CkArrayCheckPTMessage *data) 
238   {
239     double t = CmiWallTimer();
240     // unpack it
241     envelope *env = UsrToEnv(data);
242     CkUnpackMessage(&env);
243     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
244     FILE *f = fopen(fname,"wb");
245     PUP::toDisk p(f);
246     CkPupMessage(p, (void **)&data);
247     // delay sync to the end because otherwise the messages are blocked
248 //    fsync(fileno(f));
249     fclose(f);
250     bud1 = data->bud1;
251     bud2 = data->bud2;
252     len = data->len;
253     delete data;
254     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
255   }
256   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
257   {
258     CkArrayCheckPTMessage *data;
259     FILE *f = fopen(fname,"rb");
260     PUP::fromDisk p(f);
261     CkPupMessage(p, (void **)&data);
262     fclose(f);
263     data->bud1 = bud1;                          // update the buddies
264     data->bud2 = bud2;
265     return data;
266   }
267   inline void updateBuddy(int b1, int b2) {
268      bud1 = b1; bud2 = b2;
269      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
270      CmiAssert(pNo != CkMyPe());
271   }
272   inline int getSize() { 
273      return len; 
274   }
275 };
276
277 CkMemCheckPT::CkMemCheckPT(int w)
278 {
279   int numnodes = 0;
280 #if NODE_CHECKPOINT
281   numnodes = CmiNumPhysicalNodes();
282 #else
283   numnodes = CkNumPes();
284 #endif
285 #if CK_NO_PROC_POOL
286   if (numnodes <= 2)
287 #else
288   if (numnodes  == 1)
289 #endif
290   {
291     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
292     _memChkptOn = 0;
293   }
294   inRestarting = 0;
295   recvCount = peCount = 0;
296   ackCount = 0;
297   expectCount = -1;
298   where = w;
299 }
300
301 CkMemCheckPT::~CkMemCheckPT()
302 {
303   int len = ckTable.length();
304   for (int i=0; i<len; i++) {
305     delete ckTable[i];
306   }
307 }
308
309 void CkMemCheckPT::pup(PUP::er& p) 
310
311   CBase_CkMemCheckPT::pup(p); 
312   p|cpStarter;
313   p|thisFailedPe;
314   p|failedPes;
315   p|ckCheckPTGroupID;           // recover global variable
316   p|cpCallback;                 // store callback
317   p|where;                      // where to checkpoint
318   if (p.isUnpacking()) {
319     recvCount = peCount = 0;
320   }
321 }
322
323 // called by checkpoint mgr to restore an array element
324 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
325 {
326 #if CMK_MEM_CHECKPOINT
327   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
328   // m->index.print();
329   PUP::fromMem p(m->packData);
330   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
331   CmiAssert(mgr);
332 #if STREAMING_INFORMHOME
333   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
334 #else
335   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
336 #endif
337
338   // find a list of array elements bound together
339   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
340   CmiAssert(elt);
341   CkLocRec_local *rec = elt->myRec;
342   CkVec<CkMigratable *> list;
343   mgr->migratableList(rec, list);
344   CmiAssert(list.length() > 0);
345   for (int l=0; l<list.length(); l++) {
346     elt = (ArrayElement *)list[l];
347     elt->budPEs[0] = m->bud1;
348     elt->budPEs[1] = m->bud2;
349     //    reset, may not needed now
350     // for now.
351     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
352       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
353       if (c) c->redNo = 0;
354     }
355   }
356 #endif
357 }
358
359 // return 1 if pe is a crashed processor
360 int CkMemCheckPT::isFailed(int pe)
361 {
362   for (int i=0; i<failedPes.length(); i++)
363     if (failedPes[i] == pe) return 1;
364   return 0;
365 }
366
367 // add pe into history list of all failed processors
368 void CkMemCheckPT::failed(int pe)
369 {
370   if (isFailed(pe)) return;
371   failedPes.push_back(pe);
372 }
373
374 int CkMemCheckPT::totalFailed()
375 {
376   return failedPes.length();
377 }
378
379 // create an checkpoint entry for array element of aid with index.
380 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
381 {
382   // error check, no duplicate
383   int idx, len = ckTable.size();
384   for (idx=0; idx<len; idx++) {
385     CkCheckPTInfo *entry = ckTable[idx];
386     if (index == entry->index) {
387       if (loc == entry->locMgr) {
388           // bindTo array elements
389           return;
390       }
391         // for array inheritance, the following check may fail
392         // because ArrayElement constructor of all superclasses are called
393       if (aid == entry->aid) {
394         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
395         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
396       }
397     }
398   }
399   CkCheckPTInfo *newEntry;
400   if (where == CkCheckPoint_inMEM)
401     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
402   else
403     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
404   ckTable.push_back(newEntry);
405   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
406 }
407
408 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
409 {
410   int buddy = msg->bud1;
411   if (buddy == CkMyPe()) buddy = msg->bud2;
412   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
413   recvData(msg);
414     // ack
415   thisProxy[buddy].gotData();
416 }
417
418 // loop through my checkpoint table and ask checkpointed array elements
419 // to send me checkpoint data.
420 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
421 {
422   cpCallback = cb;
423   cpStarter = starter;
424   if (CkMyPe() == cpStarter) {
425     startTime = CmiWallTimer();
426     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
427   }
428
429   int len = ckTable.length();
430   for (int i=0; i<len; i++) {
431     CkCheckPTInfo *entry = ckTable[i];
432       // always let the bigger number processor send request
433     //if (CkMyPe() < entry->pNo) continue;
434       // always let the smaller number processor send request, may on same proc
435     if (!isMaster(entry->pNo)) continue;
436       // call inmem_checkpoint to the array element, ask it to send
437       // back checkpoint data via recvData().
438     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
439     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
440   }
441     // if my table is empty, then I am done
442   if (len == 0) thisProxy[cpStarter].cpFinish();
443
444   // pack and send proc level data
445   sendProcData();
446 }
447
448 // don't handle array elements
449 static inline void _handleProcData(PUP::er &p)
450 {
451     // save readonlys, and callback BTW
452     CkPupROData(p);
453
454     // save mainchares 
455     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
456         
457 #ifndef CMK_CHARE_USE_PTR
458     // save non-migratable chare
459     CkPupChareData(p);
460 #endif
461
462     // save groups into Groups.dat
463     CkPupGroupData(p);
464
465     // save nodegroups into NodeGroups.dat
466     if(CkMyRank()==0) CkPupNodeGroupData(p);
467 }
468
469 void CkMemCheckPT::sendProcData()
470 {
471   // find out size of buffer
472   int size;
473   {
474     PUP::sizer p;
475     _handleProcData(p);
476     size = p.size();
477   }
478   int packSize = size;
479   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
480   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
481   {
482     PUP::toMem p(msg->packData);
483     _handleProcData(p);
484   }
485   msg->pe = CkMyPe();
486   msg->len = size;
487   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
488   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
489 }
490
491 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
492 {
493   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
494   CpvAccess(procChkptBuf) = msg;
495   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
496   thisProxy[msg->reportPe].cpFinish();
497 }
498
499 // ArrayElement call this function to give us the checkpointed data
500 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
501 {
502   int len = ckTable.length();
503   int idx;
504   for (idx=0; idx<len; idx++) {
505     CkCheckPTInfo *entry = ckTable[idx];
506     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
507   }
508   CkAssert(idx < len);
509   int isChkpting = msg->cp_flag;
510   ckTable[idx]->updateBuffer(msg);
511   if (isChkpting) {
512       // all my array elements have returned their inmem data
513       // inform starter processor that I am done.
514     recvCount ++;
515     if (recvCount == ckTable.length()) {
516       if (where == CkCheckPoint_inMEM) {
517         thisProxy[cpStarter].cpFinish();
518       }
519       else if (where == CkCheckPoint_inDISK) {
520         // another barrier for finalize the writing using fsync
521         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
522         contribute(0,NULL,CkReduction::sum_int,localcb);
523       }
524       else
525         CmiAbort("Unknown checkpoint scheme");
526       recvCount = 0;
527     } 
528   }
529 }
530
531 // only used in disk checkpointing
532 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
533 {
534   delete m;
535 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
536   system("sync");
537 #endif
538   thisProxy[cpStarter].cpFinish();
539 }
540
541 // only is called on cpStarter when checkpoint is done
542 void CkMemCheckPT::cpFinish()
543 {
544   CmiAssert(CkMyPe() == cpStarter);
545   peCount++;
546     // now that all processors have finished, activate callback
547   if (peCount == 2*(CkNumPes())) {
548     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
549     cpCallback.send();
550     peCount = 0;
551     thisProxy.report();
552   }
553 }
554
555 // for debugging, report checkpoint info
556 void CkMemCheckPT::report()
557 {
558   int objsize = 0;
559   int len = ckTable.length();
560   for (int i=0; i<len; i++) {
561     CkCheckPTInfo *entry = ckTable[i];
562     CmiAssert(entry);
563     objsize += entry->getSize();
564   }
565   CmiAssert(CpvAccess(procChkptBuf));
566   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
567 }
568
569 /*****************************************************************************
570                         RESTART Procedure
571 *****************************************************************************/
572
573 // master processor of two buddies
574 inline int CkMemCheckPT::isMaster(int buddype)
575 {
576 #if 0
577   int mype = CkMyPe();
578 //CkPrintf("ismaster: %d %d\n", pe, mype);
579   if (CkNumPes() - totalFailed() == 2) {
580     return mype > buddype;
581   }
582   for (int i=1; i<CkNumPes(); i++) {
583     int me = (buddype+i)%CkNumPes();
584     if (isFailed(me)) continue;
585     if (me == mype) return 1;
586     else return 0;
587   }
588   return 0;
589 #else
590     // smaller one
591   int mype = CkMyPe();
592 //CkPrintf("ismaster: %d %d\n", pe, mype);
593   if (CkNumPes() - totalFailed() == 2) {
594     return mype < buddype;
595   }
596   for (int i=1; i<CkNumPes(); i++) {
597     int me = (mype+i)%CkNumPes();
598     if (isFailed(me)) continue;
599     if (me == buddype) return 1;
600     else return 0;
601   }
602   return 0;
603 #endif
604 }
605
606 #ifdef CKLOCMGR_LOOP
607 #undef CKLOCMGR_LOOP
608 #endif
609
610 // loop over all CkLocMgr and do "code"
611 #define  CKLOCMGR_LOOP(code)    {       \
612   int numGroups = CkpvAccess(_groupIDTable)->size();    \
613   for(int i=0;i<numGroups;i++) {        \
614     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
615     if(obj->isLocMgr())  {      \
616       CkLocMgr *mgr = (CkLocMgr*)obj;   \
617       code      \
618     }   \
619   }     \
620  }
621
622 #if 0
623 // helper class to pup all elements that belong to same ckLocMgr
624 class ElementDestoryer : public CkLocIterator {
625 private:
626         CkLocMgr *locMgr;
627 public:
628         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
629         void addLocation(CkLocation &loc) {
630                 CkArrayIndexMax idx=loc.getIndex();
631                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
632                 loc.destroy();
633         }
634 };
635 #endif
636
637 // restore the bitmap vector for LB
638 void CkMemCheckPT::resetLB(int diepe)
639 {
640 #if CMK_LBDB_ON
641   int i;
642   char *bitmap = new char[CkNumPes()];
643   // set processor available bitmap
644   get_avail_vector(bitmap);
645
646   for (i=0; i<failedPes.length(); i++)
647     bitmap[failedPes[i]] = 0; 
648   bitmap[diepe] = 0;
649
650 #if CK_NO_PROC_POOL
651   set_avail_vector(bitmap);
652 #endif
653
654   // if I am the crashed pe, rebuild my failedPEs array
655   if (CkMyPe() == diepe)
656     for (i=0; i<CkNumPes(); i++) 
657       if (bitmap[i]==0) failed(i);
658
659   delete [] bitmap;
660 #endif
661 }
662
663 // in case when failedPe dies, everybody go through its checkpoint table:
664 // destory all array elements
665 // recover lost buddies
666 // reconstruct all array elements from check point data
667 // called on all processors
668 void CkMemCheckPT::restart(int diePe)
669 {
670 #if CMK_MEM_CHECKPOINT
671   double curTime = CmiWallTimer();
672   if (CkMyPe() == diePe)
673     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
674   stage = (char*)"resetLB";
675   startTime = curTime;
676   if (CkMyPe() == diePe)
677     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
678
679 #if CK_NO_PROC_POOL
680   failed(diePe);        // add into the list of failed pes
681 #endif
682   thisFailedPe = diePe;
683
684   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
685
686   inRestarting = 1;
687                                                                                 
688   // disable load balancer's barrier
689   if (CkMyPe() != diePe) resetLB(diePe);
690
691   CKLOCMGR_LOOP(mgr->startInserting(););
692
693   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
694 /*
695   if (CkMyPe() == 0)
696     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
697 */
698 #endif
699 }
700
701 // loally remove all array elements
702 void CkMemCheckPT::removeArrayElements()
703 {
704 #if CMK_MEM_CHECKPOINT
705   int len = ckTable.length();
706   double curTime = CmiWallTimer();
707   if (CkMyPe() == thisFailedPe) 
708     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
709   stage = (char*)"removeArrayElements";
710   startTime = curTime;
711
712   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
713   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
714
715   // get rid of all buffering and remote recs
716   // including destorying all array elements
717   CKLOCMGR_LOOP(mgr->flushAllRecs(););
718
719 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
720
721   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
722 #endif
723 }
724
725 // flush state in reduction manager
726 void CkMemCheckPT::resetReductionMgr()
727 {
728   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
729   int numGroups = CkpvAccess(_groupIDTable)->size();
730   for(int i=0;i<numGroups;i++) {
731     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
732     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
733     obj->flushStates();
734     obj->ckJustMigrated();
735   }
736   // reset again
737   //CpvAccess(_qd)->flushStates();
738
739 #if 1
740   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
741 #else
742   if (CkMyPe() == 0)
743     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
744 #endif
745 }
746
747 // recover the lost buddies
748 void CkMemCheckPT::recoverBuddies()
749 {
750   int idx;
751   int len = ckTable.length();
752   // ready to flush reduction manager
753   // cannot be CkMemCheckPT::restart because destory will modify states
754   double curTime = CmiWallTimer();
755   if (CkMyPe() == thisFailedPe)
756   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
757   stage = (char *)"recoverBuddies";
758   if (CkMyPe() == thisFailedPe)
759   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
760   startTime = curTime;
761
762   // recover buddies
763   expectCount = 0;
764   for (idx=0; idx<len; idx++) {
765     CkCheckPTInfo *entry = ckTable[idx];
766     if (entry->pNo == thisFailedPe) {
767 #if CK_NO_PROC_POOL
768       // find a new buddy
769 /*
770       int budPe = CkMyPe();
771 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
772       while (budPe == CkMyPe() || isFailed(budPe)) 
773           budPe = (budPe+1)%CkNumPes();
774 */
775       int budPe = BuddyPE(CkMyPe());
776       //entry->pNo = budPe;
777 #else
778       int budPe = thisFailedPe;
779 #endif
780       entry->updateBuddy(CkMyPe(), budPe);
781 #if 0
782       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
783       CkArrayCheckPTMessage *msg = entry->getCopy();
784       msg->cp_flag = 0;            // not checkpointing
785       thisProxy[budPe].recvData(msg);
786 #else
787       CkArrayCheckPTMessage *msg = entry->getCopy();
788       msg->bud1 = budPe;
789       msg->bud2 = CkMyPe();
790       msg->cp_flag = 0;            // not checkpointing
791       thisProxy[budPe].recoverEntry(msg);
792 #endif
793       expectCount ++;
794     }
795   }
796
797 #if 1
798   if (expectCount == 0) {
799     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
800   }
801 #else
802   if (CkMyPe() == 0) {
803     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
804   }
805 #endif
806
807   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
808 }
809
810 void CkMemCheckPT::gotData()
811 {
812   ackCount ++;
813   if (ackCount == expectCount) {
814     ackCount = 0;
815     expectCount = -1;
816     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
817   }
818 }
819
820 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndexMax *idx,int nowOnPe)
821 {
822   for (int i=0; i<n; i++) {
823     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
824     mgr->updateLocation(idx[i], nowOnPe);
825   }
826 }
827
828 // restore array elements
829 void CkMemCheckPT::recoverArrayElements()
830 {
831   double curTime = CmiWallTimer();
832   int len = ckTable.length();
833   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
834   stage = (char *)"recoverArrayElements";
835   if (CkMyPe() == thisFailedPe)
836   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
837   startTime = curTime;
838
839   // recover all array elements
840   int count = 0;
841 #if STREAMING_INFORMHOME
842   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
843   CkVec<CkArrayIndexMax> * imap = new CkVec<CkArrayIndexMax>[CkNumPes()];
844 #endif
845   for (int idx=0; idx<len; idx++)
846   {
847     CkCheckPTInfo *entry = ckTable[idx];
848 #if CK_NO_PROC_POOL
849     // the bigger one will do 
850 //    if (CkMyPe() < entry->pNo) continue;
851     if (!isMaster(entry->pNo)) continue;
852 #else
853     // smaller one do it, which has the original object
854     if (CkMyPe() == entry->pNo+1 || 
855         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
856 #endif
857 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
858
859     entry->updateBuddy(CkMyPe(), entry->pNo);
860     CkArrayCheckPTMessage *msg = entry->getCopy();
861     // gzheng
862     //thisProxy[CkMyPe()].inmem_restore(msg);
863     inmem_restore(msg);
864 #if STREAMING_INFORMHOME
865     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
866     int homePe = mgr->homePe(msg->index);
867     if (homePe != CkMyPe()) {
868       gmap[homePe].push_back(msg->locMgr);
869       imap[homePe].push_back(msg->index);
870     }
871 #endif
872     count ++;
873   }
874 #if STREAMING_INFORMHOME
875   for (int i=0; i<CkNumPes(); i++) {
876     if (gmap[i].size() && i!=CkMyPe()) {
877       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
878     }
879   }
880   delete [] imap;
881   delete [] gmap;
882 #endif
883   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
884
885   CKLOCMGR_LOOP(mgr->doneInserting(););
886
887   inRestarting = 0;
888   crashed_node = -1;
889
890   if (CkMyPe() == 0)
891     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
892 }
893
894 static double restartT;
895
896 // on every processor
897 // turn load balancer back on
898 void CkMemCheckPT::finishUp()
899 {
900   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
901   //CKLOCMGR_LOOP(mgr->doneInserting(););
902   
903   if (CkMyPe() == thisFailedPe)
904   {
905        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
906        //CkStartQD(cpCallback);
907        cpCallback.send();
908        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
909   }
910
911 #if CK_NO_PROC_POOL
912 #if NODE_CHECKPOINT
913   int numnodes = CmiNumPhysicalNodes();
914 #else
915   int numnodes = CkNumPes();
916 #endif
917   if (numnodes-totalFailed() <=2) {
918     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
919     _memChkptOn = 0;
920   }
921 #endif
922 }
923
924 // called only on 0
925 void CkMemCheckPT::quiescence(CkCallback &cb)
926 {
927   static int pe_count = 0;
928   pe_count ++;
929   CmiAssert(CkMyPe() == 0);
930   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
931   if (pe_count == CkNumPes()) {
932     pe_count = 0;
933     cb.send();
934   }
935 }
936
937 // User callable function - to start a checkpoint
938 // callback cb is used to pass control back
939 void CkStartMemCheckpoint(CkCallback &cb)
940 {
941 #if CMK_MEM_CHECKPOINT
942   if (_memChkptOn == 0) {
943     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
944     cb.send();
945     return;
946   }
947   if (CkInRestarting()) {
948       // trying to checkpointing during restart
949     cb.send();
950     return;
951   }
952     // store user callback and user data
953   CkMemCheckPT::cpCallback = cb;
954
955     // broadcast to start check pointing
956   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
957   checkptMgr.doItNow(CkMyPe(), cb);
958 #else
959   // when mem checkpoint is disabled, invike cb immediately
960   cb.send();
961 #endif
962 }
963
964 void CkRestartCheckPoint(int diePe)
965 {
966 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
967   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
968   // broadcast
969   checkptMgr.restart(diePe);
970 }
971
972 static int _diePE = -1;
973
974 // callback function used locally by ccs handler
975 static void CkRestartCheckPointCallback(void *ignore, void *msg)
976 {
977 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
978   CkRestartCheckPoint(_diePE);
979 }
980
981 // Converse function handles
982 static int askPhaseHandlerIdx;
983 static int recvPhaseHandlerIdx;
984 static int askProcDataHandlerIdx;
985 static int restartBcastHandlerIdx;
986 static int recoverProcDataHandlerIdx;
987 static int restartBeginHandlerIdx;
988 static int notifyHandlerIdx;
989
990 // called on crashed PE
991 static void restartBeginHandler(char *msg)
992 {
993 #if CMK_MEM_CHECKPOINT
994   static int count = 0;
995   CmiFree(msg);
996   CmiAssert(CkMyPe() == _diePE);
997   count ++;
998   if (count == CkNumPes()) {
999     CkRestartCheckPointCallback(NULL, NULL);
1000     count = 0;
1001   }
1002 #endif
1003 }
1004
1005 extern void _discard_charm_message();
1006 extern void _resume_charm_message();
1007
1008 static void restartBcastHandler(char *msg)
1009 {
1010 #if CMK_MEM_CHECKPOINT
1011   // advance phase counter
1012   CkMemCheckPT::inRestarting = 1;
1013   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1014   // gzheng
1015   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1016
1017   if (CkMyPe()==_diePE)
1018     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), cur_restart_phase, _diePE, CkWallTimer());
1019
1020   // reset QD counters
1021 /*  gzheng
1022   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1023 */
1024
1025 /*  gzheng
1026   if (CkMyPe()==_diePE)
1027       CkRestartCheckPointCallback(NULL, NULL);
1028 */
1029   CmiFree(msg);
1030
1031   _resume_charm_message();
1032
1033     // reduction
1034   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1035   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1036   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1037 #endif
1038 }
1039
1040 extern void _initDone();
1041
1042 // called on crashed processor
1043 static void recoverProcDataHandler(char *msg)
1044 {
1045 #if CMK_MEM_CHECKPOINT
1046    int i;
1047    envelope *env = (envelope *)msg;
1048    CkUnpackMessage(&env);
1049    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1050    cur_restart_phase = procMsg->cur_restart_phase;
1051    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), cur_restart_phase, CkWallTimer());
1052    //cur_restart_phase ++;
1053      // gzheng ?
1054    //CpvAccess(_qd)->flushStates();
1055
1056    // restore readonly, mainchare, group, nodegroup
1057 //   int temp = cur_restart_phase;
1058 //   cur_restart_phase = -1;
1059    PUP::fromMem p(procMsg->packData);
1060    _handleProcData(p);
1061
1062    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1063    // gzheng
1064    CKLOCMGR_LOOP(mgr->startInserting(););
1065
1066    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1067    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1068    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1069    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1070
1071    _initDone();
1072 //   CpvAccess(_qd)->flushStates();
1073    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1074 #endif
1075 }
1076
1077 // called on its backup processor
1078 // get backup message buffer and sent to crashed processor
1079 static void askProcDataHandler(char *msg)
1080 {
1081 #if CMK_MEM_CHECKPOINT
1082     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1083     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, cur_restart_phase, CkWallTimer());
1084     if (CpvAccess(procChkptBuf) == NULL) 
1085       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1086     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1087     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1088     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1089
1090     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
1091
1092     CkPackMessage(&env);
1093     CmiSetHandler(env, recoverProcDataHandlerIdx);
1094     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1095     CpvAccess(procChkptBuf) = NULL;
1096 #endif
1097 }
1098
1099 // called on PE 0
1100 void qd_callback(void *m)
1101 {
1102    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), crashed_node);
1103    CkFreeMsg(m);
1104    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1105    *(int *)(msg+CmiMsgHeaderSizeBytes) = crashed_node;
1106    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1107    CmiSetHandler(msg, askProcDataHandlerIdx);
1108    int pe = ChkptOnPe(crashed_node);    // FIXME ?
1109    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1110 }
1111
1112 // on crashed PE
1113 void CkMemRestart(const char *dummy, CkArgMsg *args)
1114 {
1115 #if CMK_MEM_CHECKPOINT
1116    _diePE = CkMyPe();
1117    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1118    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), cur_restart_phase, CkMemCheckPT::startTime);
1119    CkMemCheckPT::inRestarting = 1;
1120    crashed_node = CkMyPe();
1121
1122    _discard_charm_message();
1123    CkCallback cb(qd_callback);
1124    CkStartQD(cb);
1125 #else
1126    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1127 #endif
1128 }
1129
1130 // can be called in other files
1131 // return true if it is in restarting
1132 extern "C"
1133 int CkInRestarting()
1134 {
1135 #if CMK_MEM_CHECKPOINT
1136   if (crashed_node!=-1) return 1;
1137   // gzheng
1138   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1139   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1140   return CkMemCheckPT::inRestarting;
1141 #else
1142   return 0;
1143 #endif
1144 }
1145
1146 /*****************************************************************************
1147                 module initialization
1148 *****************************************************************************/
1149
1150 static int arg_where = CkCheckPoint_inMEM;
1151
1152 #if CMK_MEM_CHECKPOINT
1153 void init_memcheckpt(char **argv)
1154 {
1155     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1156       arg_where = CkCheckPoint_inDISK;
1157     }
1158 }
1159 #endif
1160
1161 class CkMemCheckPTInit: public Chare {
1162 public:
1163   CkMemCheckPTInit(CkArgMsg *m) {
1164 #if CMK_MEM_CHECKPOINT
1165     if (arg_where == CkCheckPoint_inDISK) {
1166       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1167     }
1168     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1169     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1170 #endif
1171   }
1172 };
1173
1174 static void notifyHandler(char *msg)
1175 {
1176 #if CMK_MEM_CHECKPOINT
1177   CmiFree(msg);
1178       /* immediately increase restart phase to filter old messages */
1179   cur_restart_phase ++;
1180   CpvAccess(_qd)->flushStates();
1181   _discard_charm_message();
1182 #endif
1183 }
1184
1185 extern "C"
1186 void notify_crash(int node)
1187 {
1188 #ifdef CMK_MEM_CHECKPOINT
1189   crashed_node = node;
1190   CmiAssert(CmiMyPe() != crashed_node);
1191   CkMemCheckPT::inRestarting = 1;
1192
1193     // this may be in interrupt handler, send a message to reset QD
1194   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1195   CmiSetHandler(msg, notifyHandlerIdx);
1196   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1197 #endif
1198 }
1199
1200 extern "C" void (*notify_crash_fn)(int node);
1201
1202 // initproc
1203 void CkRegisterRestartHandler( )
1204 {
1205 #if CMK_MEM_CHECKPOINT
1206   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1207   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1208   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1209   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1210   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1211
1212   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1213   CpvAccess(procChkptBuf) = NULL;
1214
1215   notify_crash_fn = notify_crash;
1216 #if 1
1217   // for debugging
1218   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1219 //  sleep(4);
1220 #endif
1221 #endif
1222 }
1223
1224 /// @todo: the following definitions should be moved to a separate file containing
1225 // structures and functions about fault tolerance strategies
1226
1227 /**
1228  *  * @brief: function for killing a process                                             
1229  *   */
1230 #ifdef CMK_MEM_CHECKPOINT
1231 #if CMK_HAS_GETPID
1232 void killLocal(void *_dummy,double curWallTime){
1233         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1234         if(CmiWallTimer()<killTime-1){
1235                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1236         }else{  
1237                 kill(getpid(),SIGKILL);                                               
1238         }              
1239
1240 #else
1241 void killLocal(void *_dummy,double curWallTime){
1242   CmiAbort("kill() not supported!");
1243 }
1244 #endif
1245 #endif
1246
1247 #ifdef CMK_MEM_CHECKPOINT
1248 /**
1249  * @brief: reads the file with the kill information
1250  */
1251 void readKillFile(){
1252         FILE *fp=fopen(killFile,"r");
1253         if(!fp){
1254                 return;
1255         }
1256         int proc;
1257         double sec;
1258         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1259                 if(proc == CkMyPe()){
1260                         killTime = CmiWallTimer()+sec;
1261                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1262                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1263                 }
1264         }
1265         fclose(fp);
1266 }
1267 #endif
1268
1269 #include "CkMemCheckpoint.def.h"
1270
1271