10138dc88dbc370747a3db93c3252bdf0d4f13f9
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 #define DEBUGF      // CkPrintf
51
52 // assume NO extra processors--1
53 // assume extra processors--0
54 #define CK_NO_PROC_POOL                         1
55
56 // static, so that it is accessible from Converse part
57 int CkMemCheckPT::inRestarting = 0;
58 double CkMemCheckPT::startTime;
59 char *CkMemCheckPT::stage;
60 CkCallback CkMemCheckPT::cpCallback;
61
62 int _memChkptOn = 1;                    // checkpoint is on or off
63
64 CkGroupID ckCheckPTGroupID;             // readonly
65
66
67 /// @todo the following declarations should be moved into a separate file for all 
68 // fault tolerant strategies
69
70 #ifdef CMK_MEM_CHECKPOINT
71 // name of the kill file that contains processes to be killed 
72 char *killFile;                                               
73 // flag for the kill file         
74 int killFlag=0;
75 // variable for storing the killing time
76 double killTime=0.0;
77 #endif
78
79 /// checkpoint buffer for processor system data
80 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
81
82 // compute the backup processor
83 // FIXME: avoid crashed processors
84 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
85
86 // called in array element constructor
87 // choose and register with 2 buddies for checkpoiting 
88 #if CMK_MEM_CHECKPOINT
89 void ArrayElement::init_checkpt() {
90         if (_memChkptOn == 0) return;
91         // only master init checkpoint
92         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
93
94         budPEs[0] = CkMyPe();
95         budPEs[1] = (CkMyPe()+1)%CkNumPes();
96         CmiAssert(budPEs[0] != budPEs[1]);
97         // inform checkPTMgr
98         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
99         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
100         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
101         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
102 }
103 #endif
104
105 // entry function invoked by checkpoint mgr asking for checkpoint data
106 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
107 #if CMK_MEM_CHECKPOINT
108   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
109   CkLocMgr *locMgr = thisArray->getLocMgr();
110   CmiAssert(myRec!=NULL);
111   int size;
112   {
113         PUP::sizer p;
114         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
115         size = p.size();
116   }
117   int packSize = size/sizeof(double) +1;
118   CkArrayCheckPTMessage *msg =
119                  new (packSize, 0) CkArrayCheckPTMessage;
120   msg->len = size;
121   msg->index =thisIndexMax;
122   msg->aid = thisArrayID;
123   msg->locMgr = locMgr->getGroupID();
124   msg->cp_flag = 1;
125   {
126         PUP::toMem p(msg->packData);
127         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
128   }
129
130   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
131   checkptMgr.recvData(msg, 2, budPEs);
132   delete m;
133 #endif
134 }
135
136 // checkpoint holder class - for memory checkpointing
137 class CkMemCheckPTInfo: public CkCheckPTInfo
138 {
139   CkArrayCheckPTMessage *ckBuffer;
140 public:
141   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
142                     CkCheckPTInfo(a, loc, idx, pno)
143   {
144     ckBuffer = NULL;
145   }
146   ~CkMemCheckPTInfo() 
147   {
148     if (ckBuffer) delete ckBuffer; 
149   }
150   inline void updateBuffer(CkArrayCheckPTMessage *data) 
151   {
152     if (ckBuffer) delete ckBuffer;
153     ckBuffer = data;
154   }    
155   inline CkArrayCheckPTMessage * getCopy()
156   {
157     if (ckBuffer == NULL) {
158       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
159       CmiAbort("Abort!");
160     }
161     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
162   }     
163   inline void updateBuddy(int b1, int b2) {
164      CmiAssert(ckBuffer);
165      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
166   }
167   inline int getSize() { 
168      CmiAssert(ckBuffer);
169      return ckBuffer->len; 
170   }
171 };
172
173 // checkpoint holder class - for in-disk checkpointing
174 class CkDiskCheckPTInfo: public CkCheckPTInfo 
175 {
176   char *fname;
177   int bud1, bud2;
178   int len;                      // checkpoint size
179 public:
180   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
181   {
182 #if CMK_USE_MKSTEMP
183     fname = new char[64];
184     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
185     mkstemp(fname);
186 #else
187     fname=tmpnam(NULL);
188 #endif
189     bud1 = bud2 = -1;
190     len = 0;
191   }
192   ~CkDiskCheckPTInfo() 
193   {
194     remove(fname);
195   }
196   inline void updateBuffer(CkArrayCheckPTMessage *data) 
197   {
198     double t = CmiWallTimer();
199     // unpack it
200     envelope *env = UsrToEnv(data);
201     CkUnpackMessage(&env);
202     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
203     FILE *f = fopen(fname,"wb");
204     PUP::toDisk p(f);
205     CkPupMessage(p, (void **)&data);
206     // delay sync to the end because otherwise the messages are blocked
207 //    fsync(fileno(f));
208     fclose(f);
209     bud1 = data->bud1;
210     bud2 = data->bud2;
211     len = data->len;
212     delete data;
213     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
214   }
215   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
216   {
217     CkArrayCheckPTMessage *data;
218     FILE *f = fopen(fname,"rb");
219     PUP::fromDisk p(f);
220     CkPupMessage(p, (void **)&data);
221     fclose(f);
222     data->bud1 = bud1;                          // update the buddies
223     data->bud2 = bud2;
224     return data;
225   }
226   inline void updateBuddy(int b1, int b2) {
227      bud1 = b1; bud2 = b2;
228   }
229   inline int getSize() { 
230      return len; 
231   }
232 };
233
234 CkMemCheckPT::CkMemCheckPT(int w)
235 {
236 #if CK_NO_PROC_POOL
237   if (CkNumPes() <= 2) {
238 #else
239   if (CkNumPes()  == 1) {
240 #endif
241     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
242     _memChkptOn = 0;
243   }
244   inRestarting = 0;
245   recvCount = peCount = 0;
246   where = w;
247 }
248
249 CkMemCheckPT::~CkMemCheckPT()
250 {
251   int len = ckTable.length();
252   for (int i=0; i<len; i++) {
253     delete ckTable[i];
254   }
255 }
256
257 void CkMemCheckPT::pup(PUP::er& p) 
258
259   CBase_CkMemCheckPT::pup(p); 
260   p|cpStarter;
261   p|thisFailedPe;
262   p|failedPes;
263   p|ckCheckPTGroupID;           // recover global variable
264   p|cpCallback;                 // store callback
265   p|where;                      // where to checkpoint
266   if (p.isUnpacking()) {
267     recvCount = peCount = 0;
268   }
269 }
270
271 // called by checkpoint mgr to restore an array element
272 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
273 {
274 #if CMK_MEM_CHECKPOINT
275   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
276   // m->index.print();
277   PUP::fromMem p(m->packData);
278   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
279   CmiAssert(mgr);
280   mgr->resume(m->index, p);
281
282   // find a list of array elements bound together
283   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
284   CmiAssert(elt);
285   CkLocRec_local *rec = elt->myRec;
286   CkVec<CkMigratable *> list;
287   mgr->migratableList(rec, list);
288   CmiAssert(list.length() > 0);
289   for (int l=0; l<list.length(); l++) {
290     elt = (ArrayElement *)list[l];
291     elt->budPEs[0] = m->bud1;
292     elt->budPEs[1] = m->bud2;
293     //    reset, may not needed now
294     // for now.
295     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
296       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
297       if (c) c->redNo = 0;
298     }
299   }
300 #endif
301 }
302
303 // return 1 if pe is a crashed processor
304 int CkMemCheckPT::isFailed(int pe)
305 {
306   for (int i=0; i<failedPes.length(); i++)
307     if (failedPes[i] == pe) return 1;
308   return 0;
309 }
310
311 // add pe into history list of all failed processors
312 void CkMemCheckPT::failed(int pe)
313 {
314   if (isFailed(pe)) return;
315   failedPes.push_back(pe);
316 }
317
318 int CkMemCheckPT::totalFailed()
319 {
320   return failedPes.length();
321 }
322
323 // create an checkpoint entry for array element of aid with index.
324 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
325 {
326   // error check, no duplicate
327   int idx, len = ckTable.size();
328   for (idx=0; idx<len; idx++) {
329     CkCheckPTInfo *entry = ckTable[idx];
330     if (index == entry->index) {
331       if (loc == entry->locMgr) {
332           // bindTo array elements
333           return;
334       }
335         // for array inheritance, the following check may fail
336         // because ArrayElement constructor of all superclasses are called
337       if (aid == entry->aid) {
338         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
339         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
340       }
341     }
342   }
343   CkCheckPTInfo *newEntry;
344   if (where == CkCheckPoint_inMEM)
345     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
346   else
347     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
348   ckTable.push_back(newEntry);
349   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
350 }
351
352 // loop through my checkpoint table and ask checkpointed array elements
353 // to send me checkpoint data.
354 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
355 {
356   cpCallback = cb;
357   cpStarter = starter;
358   if (CkMyPe() == cpStarter) {
359     startTime = CmiWallTimer();
360     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
361   }
362
363   int len = ckTable.length();
364   for (int i=0; i<len; i++) {
365     CkCheckPTInfo *entry = ckTable[i];
366       // always let the bigger number processor send request
367     if (CkMyPe() < entry->pNo) continue;
368       // call inmem_checkpoint to the array element, ask it to send
369       // back checkpoint data via recvData().
370     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
371     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
372   }
373     // if my table is empty, then I am done
374   if (len == 0) thisProxy[cpStarter].cpFinish();
375
376   // pack and send proc level data
377   sendProcData();
378 }
379
380 // don't handle array elements
381 static inline void _handleProcData(PUP::er &p)
382 {
383     // save readonlys, and callback BTW
384     CkPupROData(p);
385
386     // save mainchares into MainChares.dat
387     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
388         
389     // save groups into Groups.dat
390     CkPupGroupData(p);
391
392     // save nodegroups into NodeGroups.dat
393     if(CkMyRank()==0) CkPupNodeGroupData(p);
394 }
395
396 void CkMemCheckPT::sendProcData()
397 {
398   // find out size of buffer
399   int size;
400   {
401     PUP::sizer p;
402     _handleProcData(p);
403     size = p.size();
404   }
405   int packSize = size;
406   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
407   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
408   {
409     PUP::toMem p(msg->packData);
410     _handleProcData(p);
411   }
412   msg->pe = CkMyPe();
413   msg->len = size;
414   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
415   thisProxy[ChkptOnPe()].recvProcData(msg);
416 }
417
418 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
419 {
420   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
421   CpvAccess(procChkptBuf) = msg;
422 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
423   thisProxy[msg->reportPe].cpFinish();
424 }
425
426 // ArrayElement call this function to give us the checkpointed data
427 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
428 {
429   int len = ckTable.length();
430   int idx;
431   for (idx=0; idx<len; idx++) {
432     CkCheckPTInfo *entry = ckTable[idx];
433     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
434   }
435   CkAssert(idx < len);
436   int isChkpting = msg->cp_flag;
437   ckTable[idx]->updateBuffer(msg);
438   if (isChkpting) {
439       // all my array elements have returned their inmem data
440       // inform starter processor that I am done.
441     recvCount ++;
442     if (recvCount == ckTable.length()) {
443       if (where == CkCheckPoint_inMEM) {
444         thisProxy[cpStarter].cpFinish();
445       }
446       else if (where == CkCheckPoint_inDISK) {
447         // another barrier for finalize the writing using fsync
448         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
449         contribute(0,NULL,CkReduction::sum_int,localcb);
450       }
451       else
452         CmiAbort("Unknown checkpoint scheme");
453       recvCount = 0;
454     } 
455   }
456 }
457
458 // only used in disk checkpointing
459 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
460 {
461   delete m;
462 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
463   system("sync");
464 #endif
465   thisProxy[cpStarter].cpFinish();
466 }
467
468 // only is called on cpStarter when checkpoint is done
469 void CkMemCheckPT::cpFinish()
470 {
471   CmiAssert(CkMyPe() == cpStarter);
472   peCount++;
473     // now that all processors have finished, activate callback
474   if (peCount == 2*(CkNumPes())) {
475     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
476     cpCallback.send();
477     peCount = 0;
478     thisProxy.report();
479   }
480 }
481
482 // for debugging, report checkpoint info
483 void CkMemCheckPT::report()
484 {
485   int objsize = 0;
486   int len = ckTable.length();
487   for (int i=0; i<len; i++) {
488     CkCheckPTInfo *entry = ckTable[i];
489     CmiAssert(entry);
490     objsize += entry->getSize();
491   }
492   CmiAssert(CpvAccess(procChkptBuf));
493   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
494 }
495
496 /*****************************************************************************
497                         RESTART Procedure
498 *****************************************************************************/
499
500 // master processor of two buddies
501 inline int CkMemCheckPT::isMaster(int buddype)
502 {
503   int mype = CkMyPe();
504 //CkPrintf("ismaster: %d %d\n", pe, mype);
505   if (CkNumPes() - totalFailed() == 2) {
506     return mype > buddype;
507   }
508   for (int i=1; i<CkNumPes(); i++) {
509     int me = (buddype+i)%CkNumPes();
510     if (isFailed(me)) continue;
511     if (me == mype) return 1;
512     else return 0;
513   }
514   return 0;
515 }
516
517 #ifdef CKLOCMGR_LOOP
518 #undef CKLOCMGR_LOOP
519 #endif
520
521 // loop over all CkLocMgr and do "code"
522 #define  CKLOCMGR_LOOP(code)    {       \
523   int numGroups = CkpvAccess(_groupIDTable)->size();    \
524   for(int i=0;i<numGroups;i++) {        \
525     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
526     if(obj->isLocMgr())  {      \
527       CkLocMgr *mgr = (CkLocMgr*)obj;   \
528       code      \
529     }   \
530   }     \
531  }
532
533 #if 0
534 // helper class to pup all elements that belong to same ckLocMgr
535 class ElementDestoryer : public CkLocIterator {
536 private:
537         CkLocMgr *locMgr;
538 public:
539         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
540         void addLocation(CkLocation &loc) {
541                 CkArrayIndexMax idx=loc.getIndex();
542                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
543                 loc.destroy();
544         }
545 };
546 #endif
547
548 // restore the bitmap vector for LB
549 void CkMemCheckPT::resetLB(int diepe)
550 {
551 #if CMK_LBDB_ON
552   int i;
553   char *bitmap = new char[CkNumPes()];
554   // set processor available bitmap
555   get_avail_vector(bitmap);
556
557   for (i=0; i<failedPes.length(); i++)
558     bitmap[failedPes[i]] = 0; 
559   bitmap[diepe] = 0;
560
561 #if CK_NO_PROC_POOL
562   set_avail_vector(bitmap);
563 #endif
564
565   // if I am the crashed pe, rebuild my failedPEs array
566   if (CkMyPe() == diepe)
567     for (i=0; i<CkNumPes(); i++) 
568       if (bitmap[i]==0) failed(i);
569
570   delete [] bitmap;
571 #endif
572 }
573
574 // in case when failedPe dies, everybody go through its checkpoint table:
575 // destory all array elements
576 // recover lost buddies
577 // reconstruct all array elements from check point data
578 // called on all processors
579 void CkMemCheckPT::restart(int diePe)
580 {
581 #if CMK_MEM_CHECKPOINT
582   double curTime = CmiWallTimer();
583   if (CkMyPe() == diePe)
584     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
585   stage = "resetLB";
586   startTime = curTime;
587   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
588
589 #if CK_NO_PROC_POOL
590   failed(diePe);        // add into the list of failed pes
591 #endif
592   thisFailedPe = diePe;
593
594   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
595
596   inRestarting = 1;
597                                                                                 
598   // disable load balancer's barrier
599   if (CkMyPe() != diePe) resetLB(diePe);
600
601   CKLOCMGR_LOOP(mgr->startInserting(););
602
603   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
604 #endif
605 }
606
607 // loally remove all array elements
608 void CkMemCheckPT::removeArrayElements()
609 {
610 #if CMK_MEM_CHECKPOINT
611   int len = ckTable.length();
612   double curTime = CmiWallTimer();
613   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
614   stage = "removeArrayElements";
615   startTime = curTime;
616
617   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
618   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
619
620   // get rid of all buffering and remote recs
621   // including destorying all array elements
622   CKLOCMGR_LOOP(mgr->flushAllRecs(););
623
624 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
625
626   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
627 #endif
628 }
629
630 // flush state in reduction manager
631 void CkMemCheckPT::resetReductionMgr()
632 {
633   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
634   int numGroups = CkpvAccess(_groupIDTable)->size();
635   for(int i=0;i<numGroups;i++) {
636     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
637     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
638     obj->flushStates();
639     obj->ckJustMigrated();
640   }
641   // reset again
642   //CpvAccess(_qd)->flushStates();
643
644   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
645 }
646
647 // recover the lost buddies
648 void CkMemCheckPT::recoverBuddies()
649 {
650   int idx;
651   int len = ckTable.length();
652   // ready to flush reduction manager
653   // cannot be CkMemCheckPT::restart because destory will modify states
654   double curTime = CmiWallTimer();
655   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
656   stage = (char *)"recoverBuddies";
657   startTime = curTime;
658
659   // recover buddies
660   for (idx=0; idx<len; idx++) {
661     CkCheckPTInfo *entry = ckTable[idx];
662     if (entry->pNo == thisFailedPe) {
663 #if CK_NO_PROC_POOL
664       // find a new buddy
665       int budPe = CkMyPe();
666 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
667       while (budPe == CkMyPe() || isFailed(budPe)) 
668           budPe = (budPe+1)%CkNumPes();
669       entry->pNo = budPe;
670 #else
671       int budPe = thisFailedPe;
672 #endif
673       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
674       CkArrayCheckPTMessage *msg = entry->getCopy();
675       msg->cp_flag = 0;            // not checkpointing
676       thisProxy[budPe].recvData(msg);
677     }
678   }
679
680   if (CkMyPe() == 0)
681     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
682 }
683
684 // restore array elements
685 void CkMemCheckPT::recoverArrayElements()
686 {
687   double curTime = CmiWallTimer();
688   int len = ckTable.length();
689   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
690   stage = (char *)"recoverArrayElements";
691   startTime = curTime;
692
693   // recover all array elements
694   int count = 0;
695   for (int idx=0; idx<len; idx++)
696   {
697     CkCheckPTInfo *entry = ckTable[idx];
698 #if CK_NO_PROC_POOL
699     // the bigger one will do 
700 //    if (CkMyPe() < entry->pNo) continue;
701     if (!isMaster(entry->pNo)) continue;
702 #else
703     // smaller one do it, which has the original object
704     if (CkMyPe() == entry->pNo+1 || 
705         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
706 #endif
707 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
708
709     entry->updateBuddy(CkMyPe(), entry->pNo);
710     CkArrayCheckPTMessage *msg = entry->getCopy();
711     // gzheng
712     //checkptMgr[CkMyPe()].inmem_restore(msg);
713     inmem_restore(msg);
714     count ++;
715   }
716 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
717
718   if (CkMyPe() == 0)
719     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
720 }
721
722 // on every processor
723 // turn load balancer back on
724 void CkMemCheckPT::finishUp()
725 {
726   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
727   CKLOCMGR_LOOP(mgr->doneInserting(););
728   
729   inRestarting = 0;
730
731   if (CkMyPe() == 0)
732   {
733        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
734        CkStartQD(cpCallback);
735   } 
736 #if CK_NO_PROC_POOL
737   if (CkNumPes()-totalFailed() <=2) {
738     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
739     _memChkptOn = 0;
740   }
741 #endif
742 }
743
744 // called only on 0
745 void CkMemCheckPT::quiescence(CkCallback &cb)
746 {
747   static int pe_count = 0;
748   pe_count ++;
749   CmiAssert(CkMyPe() == 0);
750   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
751   if (pe_count == CkNumPes()) {
752     pe_count = 0;
753     cb.send();
754   }
755 }
756
757 // User callable function - to start a checkpoint
758 // callback cb is used to pass control back
759 void CkStartMemCheckpoint(CkCallback &cb)
760 {
761 #if CMK_MEM_CHECKPOINT
762   if (_memChkptOn == 0) {
763     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
764     cb.send();
765     return;
766   }
767   if (CkInRestarting()) {
768       // trying to checkpointing during restart
769     cb.send();
770     return;
771   }
772     // store user callback and user data
773   CkMemCheckPT::cpCallback = cb;
774
775     // broadcast to start check pointing
776   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
777   checkptMgr.doItNow(CkMyPe(), cb);
778 #else
779   // when mem checkpoint is disabled, invike cb immediately
780   cb.send();
781 #endif
782 }
783
784 void CkRestartCheckPoint(int diePe)
785 {
786 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
787   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
788   // broadcast
789   checkptMgr.restart(diePe);
790 }
791
792 static int _diePE = -1;
793
794 // callback function used locally by ccs handler
795 static void CkRestartCheckPointCallback(void *ignore, void *msg)
796 {
797 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
798   CkRestartCheckPoint(_diePE);
799 }
800
801 // Converse function handles
802 static int askProcDataHandlerIdx;
803 static int restartBcastHandlerIdx;
804 static int recoverProcDataHandlerIdx;
805 static int restartBeginHandlerIdx;
806
807 static void restartBeginHandler(char *msg)
808 {
809 #if CMK_MEM_CHECKPOINT
810   static int count = 0;
811   CmiFree(msg);
812   CmiAssert(CkMyPe() == _diePE);
813   count ++;
814   if (count == CkNumPes()) {
815     CkRestartCheckPointCallback(NULL, NULL);
816     count = 0;
817   }
818 #endif
819 }
820
821 static void restartBcastHandler(char *msg)
822 {
823 #if CMK_MEM_CHECKPOINT
824   // advance phase counter
825   CkMemCheckPT::inRestarting = 1;
826   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
827   // gzheng
828   if (CkMyPe() != _diePE) cur_restart_phase ++;
829
830   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
831
832   // reset QD counters
833   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
834
835 /*  gzheng
836   if (CkMyPe()==_diePE)
837       CkRestartCheckPointCallback(NULL, NULL);
838 */
839   CmiFree(msg);
840
841   char restartmsg[CmiMsgHeaderSizeBytes];
842   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
843   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
844 #endif
845 }
846
847 extern void _initDone();
848
849 // called on crashed processor
850 static void recoverProcDataHandler(char *msg)
851 {
852 #if CMK_MEM_CHECKPOINT
853    int i;
854    envelope *env = (envelope *)msg;
855    CkUnpackMessage(&env);
856    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
857    cur_restart_phase = procMsg->cur_restart_phase;
858    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
859    cur_restart_phase ++;
860    CpvAccess(_qd)->flushStates();
861
862    // restore readonly, mainchare, group, nodegroup
863 //   int temp = cur_restart_phase;
864 //   cur_restart_phase = -1;
865    PUP::fromMem p(procMsg->packData);
866    _handleProcData(p);
867
868    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
869    // gzheng
870    CKLOCMGR_LOOP(mgr->startInserting(););
871
872    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
873    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
874    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
875    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
876    CmiFree(msg);
877
878    _initDone();
879 #endif
880 }
881
882 // called on its backup processor
883 // get backup message buffer and sent to crashed processor
884 static void askProcDataHandler(char *msg)
885 {
886 #if CMK_MEM_CHECKPOINT
887     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
888     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
889     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
890     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
891     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
892
893     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
894
895     CkPackMessage(&env);
896     CmiSetHandler(env, recoverProcDataHandlerIdx);
897     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
898     CpvAccess(procChkptBuf) = NULL;
899 #endif
900 }
901
902 void CkMemRestart(const char *dummy, CkArgMsg *args)
903 {
904 #if CMK_MEM_CHECKPOINT
905    _diePE = CkMyPe();
906    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
907    CkMemCheckPT::startTime = CmiWallTimer();
908    CkMemCheckPT::inRestarting = 1;
909    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
910    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
911    cur_restart_phase = 9999;             // big enough to get it processed
912    CmiSetHandler(msg, askProcDataHandlerIdx);
913    int pe = ChkptOnPe();
914    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
915    cur_restart_phase=0;    // allow all message to come in
916 #else
917    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
918 #endif
919 }
920
921 // can be called in other files
922 // return true if it is in restarting
923 int CkInRestarting()
924 {
925 #if CMK_MEM_CHECKPOINT
926   // gzheng
927   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
928   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
929   return CkMemCheckPT::inRestarting;
930 #else
931   return 0;
932 #endif
933 }
934
935 /*****************************************************************************
936                 module initialization
937 *****************************************************************************/
938
939 static int arg_where = CkCheckPoint_inMEM;
940
941 #if CMK_MEM_CHECKPOINT
942 void init_memcheckpt(char **argv)
943 {
944     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
945       arg_where = CkCheckPoint_inDISK;
946     }
947 }
948 #endif
949
950 class CkMemCheckPTInit: public Chare {
951 public:
952   CkMemCheckPTInit(CkArgMsg *m) {
953 #if CMK_MEM_CHECKPOINT
954     if (arg_where == CkCheckPoint_inDISK) {
955       CkPrintf("Charm++> Double-disk Checkpointing. \n");
956     }
957     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
958     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
959 #endif
960   }
961 };
962
963 // initproc
964 void CkRegisterRestartHandler( )
965 {
966 #if CMK_MEM_CHECKPOINT
967   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
968   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
969   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
970   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
971
972   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
973   CpvAccess(procChkptBuf) = NULL;
974
975 #if 1
976   // for debugging
977   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
978 //  sleep(4);
979 #endif
980 #endif
981 }
982
983 /// @todo: the following definitions should be moved to a separate file containing
984 // structures and functions about fault tolerance strategies
985
986 /**
987  *  * @brief: function for killing a process                                             
988  *   */
989 #ifdef CMK_MEM_CHECKPOINT
990 #if CMK_HAS_GETPID
991 void killLocal(void *_dummy,double curWallTime){
992         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
993         if(CmiWallTimer()<killTime-1){
994                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
995         }else{  
996                 kill(getpid(),SIGKILL);                                               
997         }              
998
999 #else
1000 void killLocal(void *_dummy,double curWallTime){
1001   CmiAbort("kill() not supported!");
1002 }
1003 #endif
1004 #endif
1005
1006 #ifdef CMK_MEM_CHECKPOINT
1007 /**
1008  * @brief: reads the file with the kill information
1009  */
1010 void readKillFile(){
1011         FILE *fp=fopen(killFile,"r");
1012         if(!fp){
1013                 return;
1014         }
1015         int proc;
1016         double sec;
1017         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1018                 if(proc == CkMyPe()){
1019                         killTime = CmiWallTimer()+sec;
1020                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1021                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1022                 }
1023         }
1024         fclose(fp);
1025 }
1026 #endif
1027
1028 #include "CkMemCheckpoint.def.h"
1029
1030