added a new function CkPupChareData() to save chare objects. This includes pupping...
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 #define DEBUGF      // CkPrintf
51
52 // assume NO extra processors--1
53 // assume extra processors--0
54 #define CK_NO_PROC_POOL                         1
55
56 // static, so that it is accessible from Converse part
57 int CkMemCheckPT::inRestarting = 0;
58 double CkMemCheckPT::startTime;
59 char *CkMemCheckPT::stage;
60 CkCallback CkMemCheckPT::cpCallback;
61
62 int _memChkptOn = 1;                    // checkpoint is on or off
63
64 CkGroupID ckCheckPTGroupID;             // readonly
65
66
67 /// @todo the following declarations should be moved into a separate file for all 
68 // fault tolerant strategies
69
70 #ifdef CMK_MEM_CHECKPOINT
71 // name of the kill file that contains processes to be killed 
72 char *killFile;                                               
73 // flag for the kill file         
74 int killFlag=0;
75 // variable for storing the killing time
76 double killTime=0.0;
77 #endif
78
79 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
80 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
81
82 // compute the backup processor
83 // FIXME: avoid crashed processors
84 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
85
86 // called in array element constructor
87 // choose and register with 2 buddies for checkpoiting 
88 #if CMK_MEM_CHECKPOINT
89 void ArrayElement::init_checkpt() {
90         if (_memChkptOn == 0) return;
91         // only master init checkpoint
92         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
93
94         budPEs[0] = CkMyPe();
95         budPEs[1] = (CkMyPe()+1)%CkNumPes();
96         CmiAssert(budPEs[0] != budPEs[1]);
97         // inform checkPTMgr
98         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
99         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
100         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
101         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
102 }
103 #endif
104
105 // entry function invoked by checkpoint mgr asking for checkpoint data
106 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
107 #if CMK_MEM_CHECKPOINT
108   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
109   CkLocMgr *locMgr = thisArray->getLocMgr();
110   CmiAssert(myRec!=NULL);
111   int size;
112   {
113         PUP::sizer p;
114         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
115         size = p.size();
116   }
117   int packSize = size/sizeof(double) +1;
118   CkArrayCheckPTMessage *msg =
119                  new (packSize, 0) CkArrayCheckPTMessage;
120   msg->len = size;
121   msg->index =thisIndexMax;
122   msg->aid = thisArrayID;
123   msg->locMgr = locMgr->getGroupID();
124   msg->cp_flag = 1;
125   {
126         PUP::toMem p(msg->packData);
127         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
128   }
129
130   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
131   checkptMgr.recvData(msg, 2, budPEs);
132   delete m;
133 #endif
134 }
135
136 // checkpoint holder class - for memory checkpointing
137 class CkMemCheckPTInfo: public CkCheckPTInfo
138 {
139   CkArrayCheckPTMessage *ckBuffer;
140 public:
141   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
142                     CkCheckPTInfo(a, loc, idx, pno)
143   {
144     ckBuffer = NULL;
145   }
146   ~CkMemCheckPTInfo() 
147   {
148     if (ckBuffer) delete ckBuffer; 
149   }
150   inline void updateBuffer(CkArrayCheckPTMessage *data) 
151   {
152     if (ckBuffer) delete ckBuffer;
153     ckBuffer = data;
154   }    
155   inline CkArrayCheckPTMessage * getCopy()
156   {
157     if (ckBuffer == NULL) {
158       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
159       CmiAbort("Abort!");
160     }
161     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
162   }     
163   inline void updateBuddy(int b1, int b2) {
164      CmiAssert(ckBuffer);
165      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
166   }
167   inline int getSize() { 
168      CmiAssert(ckBuffer);
169      return ckBuffer->len; 
170   }
171 };
172
173 // checkpoint holder class - for in-disk checkpointing
174 class CkDiskCheckPTInfo: public CkCheckPTInfo 
175 {
176   char *fname;
177   int bud1, bud2;
178   int len;                      // checkpoint size
179 public:
180   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
181   {
182 #if CMK_USE_MKSTEMP
183     fname = new char[64];
184     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
185     mkstemp(fname);
186 #else
187     fname=tmpnam(NULL);
188 #endif
189     bud1 = bud2 = -1;
190     len = 0;
191   }
192   ~CkDiskCheckPTInfo() 
193   {
194     remove(fname);
195   }
196   inline void updateBuffer(CkArrayCheckPTMessage *data) 
197   {
198     double t = CmiWallTimer();
199     // unpack it
200     envelope *env = UsrToEnv(data);
201     CkUnpackMessage(&env);
202     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
203     FILE *f = fopen(fname,"wb");
204     PUP::toDisk p(f);
205     CkPupMessage(p, (void **)&data);
206     // delay sync to the end because otherwise the messages are blocked
207 //    fsync(fileno(f));
208     fclose(f);
209     bud1 = data->bud1;
210     bud2 = data->bud2;
211     len = data->len;
212     delete data;
213     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
214   }
215   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
216   {
217     CkArrayCheckPTMessage *data;
218     FILE *f = fopen(fname,"rb");
219     PUP::fromDisk p(f);
220     CkPupMessage(p, (void **)&data);
221     fclose(f);
222     data->bud1 = bud1;                          // update the buddies
223     data->bud2 = bud2;
224     return data;
225   }
226   inline void updateBuddy(int b1, int b2) {
227      bud1 = b1; bud2 = b2;
228   }
229   inline int getSize() { 
230      return len; 
231   }
232 };
233
234 CkMemCheckPT::CkMemCheckPT(int w)
235 {
236 #if CK_NO_PROC_POOL
237   if (CkNumPes() <= 2) {
238 #else
239   if (CkNumPes()  == 1) {
240 #endif
241     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
242     _memChkptOn = 0;
243   }
244   inRestarting = 0;
245   recvCount = peCount = 0;
246   where = w;
247 }
248
249 CkMemCheckPT::~CkMemCheckPT()
250 {
251   int len = ckTable.length();
252   for (int i=0; i<len; i++) {
253     delete ckTable[i];
254   }
255 }
256
257 void CkMemCheckPT::pup(PUP::er& p) 
258
259   CBase_CkMemCheckPT::pup(p); 
260   p|cpStarter;
261   p|thisFailedPe;
262   p|failedPes;
263   p|ckCheckPTGroupID;           // recover global variable
264   p|cpCallback;                 // store callback
265   p|where;                      // where to checkpoint
266   if (p.isUnpacking()) {
267     recvCount = peCount = 0;
268   }
269 }
270
271 // called by checkpoint mgr to restore an array element
272 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
273 {
274 #if CMK_MEM_CHECKPOINT
275   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
276   // m->index.print();
277   PUP::fromMem p(m->packData);
278   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
279   CmiAssert(mgr);
280   mgr->resume(m->index, p);
281
282   // find a list of array elements bound together
283   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
284   CmiAssert(elt);
285   CkLocRec_local *rec = elt->myRec;
286   CkVec<CkMigratable *> list;
287   mgr->migratableList(rec, list);
288   CmiAssert(list.length() > 0);
289   for (int l=0; l<list.length(); l++) {
290     elt = (ArrayElement *)list[l];
291     elt->budPEs[0] = m->bud1;
292     elt->budPEs[1] = m->bud2;
293     //    reset, may not needed now
294     // for now.
295     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
296       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
297       if (c) c->redNo = 0;
298     }
299   }
300 #endif
301 }
302
303 // return 1 if pe is a crashed processor
304 int CkMemCheckPT::isFailed(int pe)
305 {
306   for (int i=0; i<failedPes.length(); i++)
307     if (failedPes[i] == pe) return 1;
308   return 0;
309 }
310
311 // add pe into history list of all failed processors
312 void CkMemCheckPT::failed(int pe)
313 {
314   if (isFailed(pe)) return;
315   failedPes.push_back(pe);
316 }
317
318 int CkMemCheckPT::totalFailed()
319 {
320   return failedPes.length();
321 }
322
323 // create an checkpoint entry for array element of aid with index.
324 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
325 {
326   // error check, no duplicate
327   int idx, len = ckTable.size();
328   for (idx=0; idx<len; idx++) {
329     CkCheckPTInfo *entry = ckTable[idx];
330     if (index == entry->index) {
331       if (loc == entry->locMgr) {
332           // bindTo array elements
333           return;
334       }
335         // for array inheritance, the following check may fail
336         // because ArrayElement constructor of all superclasses are called
337       if (aid == entry->aid) {
338         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
339         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
340       }
341     }
342   }
343   CkCheckPTInfo *newEntry;
344   if (where == CkCheckPoint_inMEM)
345     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
346   else
347     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
348   ckTable.push_back(newEntry);
349   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
350 }
351
352 // loop through my checkpoint table and ask checkpointed array elements
353 // to send me checkpoint data.
354 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
355 {
356   cpCallback = cb;
357   cpStarter = starter;
358   if (CkMyPe() == cpStarter) {
359     startTime = CmiWallTimer();
360     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
361   }
362
363   int len = ckTable.length();
364   for (int i=0; i<len; i++) {
365     CkCheckPTInfo *entry = ckTable[i];
366       // always let the bigger number processor send request
367     if (CkMyPe() < entry->pNo) continue;
368       // call inmem_checkpoint to the array element, ask it to send
369       // back checkpoint data via recvData().
370     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
371     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
372   }
373     // if my table is empty, then I am done
374   if (len == 0) thisProxy[cpStarter].cpFinish();
375
376   // pack and send proc level data
377   sendProcData();
378 }
379
380 // don't handle array elements
381 static inline void _handleProcData(PUP::er &p)
382 {
383     // save readonlys, and callback BTW
384     CkPupROData(p);
385
386     // save mainchares into MainChares.dat
387     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
388         
389 #if CMK_FT_CHARE
390     // save non-migratable chare
391     CkPupChareData(p);
392 #endif
393
394     // save groups into Groups.dat
395     CkPupGroupData(p);
396
397     // save nodegroups into NodeGroups.dat
398     if(CkMyRank()==0) CkPupNodeGroupData(p);
399 }
400
401 void CkMemCheckPT::sendProcData()
402 {
403   // find out size of buffer
404   int size;
405   {
406     PUP::sizer p;
407     _handleProcData(p);
408     size = p.size();
409   }
410   int packSize = size;
411   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
412   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
413   {
414     PUP::toMem p(msg->packData);
415     _handleProcData(p);
416   }
417   msg->pe = CkMyPe();
418   msg->len = size;
419   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
420   thisProxy[ChkptOnPe()].recvProcData(msg);
421 }
422
423 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
424 {
425   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
426   CpvAccess(procChkptBuf) = msg;
427 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
428   thisProxy[msg->reportPe].cpFinish();
429 }
430
431 // ArrayElement call this function to give us the checkpointed data
432 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
433 {
434   int len = ckTable.length();
435   int idx;
436   for (idx=0; idx<len; idx++) {
437     CkCheckPTInfo *entry = ckTable[idx];
438     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
439   }
440   CkAssert(idx < len);
441   int isChkpting = msg->cp_flag;
442   ckTable[idx]->updateBuffer(msg);
443   if (isChkpting) {
444       // all my array elements have returned their inmem data
445       // inform starter processor that I am done.
446     recvCount ++;
447     if (recvCount == ckTable.length()) {
448       if (where == CkCheckPoint_inMEM) {
449         thisProxy[cpStarter].cpFinish();
450       }
451       else if (where == CkCheckPoint_inDISK) {
452         // another barrier for finalize the writing using fsync
453         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
454         contribute(0,NULL,CkReduction::sum_int,localcb);
455       }
456       else
457         CmiAbort("Unknown checkpoint scheme");
458       recvCount = 0;
459     } 
460   }
461 }
462
463 // only used in disk checkpointing
464 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
465 {
466   delete m;
467 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
468   system("sync");
469 #endif
470   thisProxy[cpStarter].cpFinish();
471 }
472
473 // only is called on cpStarter when checkpoint is done
474 void CkMemCheckPT::cpFinish()
475 {
476   CmiAssert(CkMyPe() == cpStarter);
477   peCount++;
478     // now that all processors have finished, activate callback
479   if (peCount == 2*(CkNumPes())) {
480     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
481     cpCallback.send();
482     peCount = 0;
483     thisProxy.report();
484   }
485 }
486
487 // for debugging, report checkpoint info
488 void CkMemCheckPT::report()
489 {
490   int objsize = 0;
491   int len = ckTable.length();
492   for (int i=0; i<len; i++) {
493     CkCheckPTInfo *entry = ckTable[i];
494     CmiAssert(entry);
495     objsize += entry->getSize();
496   }
497   CmiAssert(CpvAccess(procChkptBuf));
498   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
499 }
500
501 /*****************************************************************************
502                         RESTART Procedure
503 *****************************************************************************/
504
505 // master processor of two buddies
506 inline int CkMemCheckPT::isMaster(int buddype)
507 {
508   int mype = CkMyPe();
509 //CkPrintf("ismaster: %d %d\n", pe, mype);
510   if (CkNumPes() - totalFailed() == 2) {
511     return mype > buddype;
512   }
513   for (int i=1; i<CkNumPes(); i++) {
514     int me = (buddype+i)%CkNumPes();
515     if (isFailed(me)) continue;
516     if (me == mype) return 1;
517     else return 0;
518   }
519   return 0;
520 }
521
522 #ifdef CKLOCMGR_LOOP
523 #undef CKLOCMGR_LOOP
524 #endif
525
526 // loop over all CkLocMgr and do "code"
527 #define  CKLOCMGR_LOOP(code)    {       \
528   int numGroups = CkpvAccess(_groupIDTable)->size();    \
529   for(int i=0;i<numGroups;i++) {        \
530     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
531     if(obj->isLocMgr())  {      \
532       CkLocMgr *mgr = (CkLocMgr*)obj;   \
533       code      \
534     }   \
535   }     \
536  }
537
538 #if 0
539 // helper class to pup all elements that belong to same ckLocMgr
540 class ElementDestoryer : public CkLocIterator {
541 private:
542         CkLocMgr *locMgr;
543 public:
544         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
545         void addLocation(CkLocation &loc) {
546                 CkArrayIndexMax idx=loc.getIndex();
547                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
548                 loc.destroy();
549         }
550 };
551 #endif
552
553 // restore the bitmap vector for LB
554 void CkMemCheckPT::resetLB(int diepe)
555 {
556 #if CMK_LBDB_ON
557   int i;
558   char *bitmap = new char[CkNumPes()];
559   // set processor available bitmap
560   get_avail_vector(bitmap);
561
562   for (i=0; i<failedPes.length(); i++)
563     bitmap[failedPes[i]] = 0; 
564   bitmap[diepe] = 0;
565
566 #if CK_NO_PROC_POOL
567   set_avail_vector(bitmap);
568 #endif
569
570   // if I am the crashed pe, rebuild my failedPEs array
571   if (CkMyPe() == diepe)
572     for (i=0; i<CkNumPes(); i++) 
573       if (bitmap[i]==0) failed(i);
574
575   delete [] bitmap;
576 #endif
577 }
578
579 // in case when failedPe dies, everybody go through its checkpoint table:
580 // destory all array elements
581 // recover lost buddies
582 // reconstruct all array elements from check point data
583 // called on all processors
584 void CkMemCheckPT::restart(int diePe)
585 {
586 #if CMK_MEM_CHECKPOINT
587   double curTime = CmiWallTimer();
588   if (CkMyPe() == diePe)
589     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
590   stage = "resetLB";
591   startTime = curTime;
592   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
593
594 #if CK_NO_PROC_POOL
595   failed(diePe);        // add into the list of failed pes
596 #endif
597   thisFailedPe = diePe;
598
599   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
600
601   inRestarting = 1;
602                                                                                 
603   // disable load balancer's barrier
604   if (CkMyPe() != diePe) resetLB(diePe);
605
606   CKLOCMGR_LOOP(mgr->startInserting(););
607
608   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
609 #endif
610 }
611
612 // loally remove all array elements
613 void CkMemCheckPT::removeArrayElements()
614 {
615 #if CMK_MEM_CHECKPOINT
616   int len = ckTable.length();
617   double curTime = CmiWallTimer();
618   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
619   stage = "removeArrayElements";
620   startTime = curTime;
621
622   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
623   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
624
625   // get rid of all buffering and remote recs
626   // including destorying all array elements
627   CKLOCMGR_LOOP(mgr->flushAllRecs(););
628
629 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
630
631   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
632 #endif
633 }
634
635 // flush state in reduction manager
636 void CkMemCheckPT::resetReductionMgr()
637 {
638   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
639   int numGroups = CkpvAccess(_groupIDTable)->size();
640   for(int i=0;i<numGroups;i++) {
641     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
642     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
643     obj->flushStates();
644     obj->ckJustMigrated();
645   }
646   // reset again
647   //CpvAccess(_qd)->flushStates();
648
649   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
650 }
651
652 // recover the lost buddies
653 void CkMemCheckPT::recoverBuddies()
654 {
655   int idx;
656   int len = ckTable.length();
657   // ready to flush reduction manager
658   // cannot be CkMemCheckPT::restart because destory will modify states
659   double curTime = CmiWallTimer();
660   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
661   stage = (char *)"recoverBuddies";
662   startTime = curTime;
663
664   // recover buddies
665   for (idx=0; idx<len; idx++) {
666     CkCheckPTInfo *entry = ckTable[idx];
667     if (entry->pNo == thisFailedPe) {
668 #if CK_NO_PROC_POOL
669       // find a new buddy
670       int budPe = CkMyPe();
671 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
672       while (budPe == CkMyPe() || isFailed(budPe)) 
673           budPe = (budPe+1)%CkNumPes();
674       entry->pNo = budPe;
675 #else
676       int budPe = thisFailedPe;
677 #endif
678       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
679       CkArrayCheckPTMessage *msg = entry->getCopy();
680       msg->cp_flag = 0;            // not checkpointing
681       thisProxy[budPe].recvData(msg);
682     }
683   }
684
685   if (CkMyPe() == 0)
686     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
687 }
688
689 // restore array elements
690 void CkMemCheckPT::recoverArrayElements()
691 {
692   double curTime = CmiWallTimer();
693   int len = ckTable.length();
694   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
695   stage = (char *)"recoverArrayElements";
696   startTime = curTime;
697
698   // recover all array elements
699   int count = 0;
700   for (int idx=0; idx<len; idx++)
701   {
702     CkCheckPTInfo *entry = ckTable[idx];
703 #if CK_NO_PROC_POOL
704     // the bigger one will do 
705 //    if (CkMyPe() < entry->pNo) continue;
706     if (!isMaster(entry->pNo)) continue;
707 #else
708     // smaller one do it, which has the original object
709     if (CkMyPe() == entry->pNo+1 || 
710         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
711 #endif
712 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
713
714     entry->updateBuddy(CkMyPe(), entry->pNo);
715     CkArrayCheckPTMessage *msg = entry->getCopy();
716     // gzheng
717     //checkptMgr[CkMyPe()].inmem_restore(msg);
718     inmem_restore(msg);
719     count ++;
720   }
721 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
722
723   if (CkMyPe() == 0)
724     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
725 }
726
727 // on every processor
728 // turn load balancer back on
729 void CkMemCheckPT::finishUp()
730 {
731   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
732   CKLOCMGR_LOOP(mgr->doneInserting(););
733   
734   inRestarting = 0;
735
736   if (CkMyPe() == 0)
737   {
738        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
739        CkStartQD(cpCallback);
740   } 
741 #if CK_NO_PROC_POOL
742   if (CkNumPes()-totalFailed() <=2) {
743     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
744     _memChkptOn = 0;
745   }
746 #endif
747 }
748
749 // called only on 0
750 void CkMemCheckPT::quiescence(CkCallback &cb)
751 {
752   static int pe_count = 0;
753   pe_count ++;
754   CmiAssert(CkMyPe() == 0);
755   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
756   if (pe_count == CkNumPes()) {
757     pe_count = 0;
758     cb.send();
759   }
760 }
761
762 // User callable function - to start a checkpoint
763 // callback cb is used to pass control back
764 void CkStartMemCheckpoint(CkCallback &cb)
765 {
766 #if CMK_MEM_CHECKPOINT
767   if (_memChkptOn == 0) {
768     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
769     cb.send();
770     return;
771   }
772   if (CkInRestarting()) {
773       // trying to checkpointing during restart
774     cb.send();
775     return;
776   }
777     // store user callback and user data
778   CkMemCheckPT::cpCallback = cb;
779
780     // broadcast to start check pointing
781   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
782   checkptMgr.doItNow(CkMyPe(), cb);
783 #else
784   // when mem checkpoint is disabled, invike cb immediately
785   cb.send();
786 #endif
787 }
788
789 void CkRestartCheckPoint(int diePe)
790 {
791 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
792   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
793   // broadcast
794   checkptMgr.restart(diePe);
795 }
796
797 static int _diePE = -1;
798
799 // callback function used locally by ccs handler
800 static void CkRestartCheckPointCallback(void *ignore, void *msg)
801 {
802 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
803   CkRestartCheckPoint(_diePE);
804 }
805
806 // Converse function handles
807 static int askProcDataHandlerIdx;
808 static int restartBcastHandlerIdx;
809 static int recoverProcDataHandlerIdx;
810 static int restartBeginHandlerIdx;
811
812 static void restartBeginHandler(char *msg)
813 {
814 #if CMK_MEM_CHECKPOINT
815   static int count = 0;
816   CmiFree(msg);
817   CmiAssert(CkMyPe() == _diePE);
818   count ++;
819   if (count == CkNumPes()) {
820     CkRestartCheckPointCallback(NULL, NULL);
821     count = 0;
822   }
823 #endif
824 }
825
826 static void restartBcastHandler(char *msg)
827 {
828 #if CMK_MEM_CHECKPOINT
829   // advance phase counter
830   CkMemCheckPT::inRestarting = 1;
831   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
832   // gzheng
833   if (CkMyPe() != _diePE) cur_restart_phase ++;
834
835   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
836
837   // reset QD counters
838   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
839
840 /*  gzheng
841   if (CkMyPe()==_diePE)
842       CkRestartCheckPointCallback(NULL, NULL);
843 */
844   CmiFree(msg);
845
846   char restartmsg[CmiMsgHeaderSizeBytes];
847   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
848   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
849 #endif
850 }
851
852 extern void _initDone();
853
854 // called on crashed processor
855 static void recoverProcDataHandler(char *msg)
856 {
857 #if CMK_MEM_CHECKPOINT
858    int i;
859    envelope *env = (envelope *)msg;
860    CkUnpackMessage(&env);
861    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
862    cur_restart_phase = procMsg->cur_restart_phase;
863    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
864    cur_restart_phase ++;
865    CpvAccess(_qd)->flushStates();
866
867    // restore readonly, mainchare, group, nodegroup
868 //   int temp = cur_restart_phase;
869 //   cur_restart_phase = -1;
870    PUP::fromMem p(procMsg->packData);
871    _handleProcData(p);
872
873    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
874    // gzheng
875    CKLOCMGR_LOOP(mgr->startInserting(););
876
877    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
878    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
879    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
880    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
881    CmiFree(msg);
882
883    _initDone();
884 #endif
885 }
886
887 // called on its backup processor
888 // get backup message buffer and sent to crashed processor
889 static void askProcDataHandler(char *msg)
890 {
891 #if CMK_MEM_CHECKPOINT
892     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
893     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
894     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
895     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
896     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
897
898     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
899
900     CkPackMessage(&env);
901     CmiSetHandler(env, recoverProcDataHandlerIdx);
902     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
903     CpvAccess(procChkptBuf) = NULL;
904 #endif
905 }
906
907 void CkMemRestart(const char *dummy, CkArgMsg *args)
908 {
909 #if CMK_MEM_CHECKPOINT
910    _diePE = CkMyPe();
911    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
912    CkMemCheckPT::startTime = CmiWallTimer();
913    CkMemCheckPT::inRestarting = 1;
914    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
915    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
916    cur_restart_phase = 9999;             // big enough to get it processed
917    CmiSetHandler(msg, askProcDataHandlerIdx);
918    int pe = ChkptOnPe();
919    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
920    cur_restart_phase=0;    // allow all message to come in
921 #else
922    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
923 #endif
924 }
925
926 // can be called in other files
927 // return true if it is in restarting
928 int CkInRestarting()
929 {
930 #if CMK_MEM_CHECKPOINT
931   // gzheng
932   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
933   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
934   return CkMemCheckPT::inRestarting;
935 #else
936   return 0;
937 #endif
938 }
939
940 /*****************************************************************************
941                 module initialization
942 *****************************************************************************/
943
944 static int arg_where = CkCheckPoint_inMEM;
945
946 #if CMK_MEM_CHECKPOINT
947 void init_memcheckpt(char **argv)
948 {
949     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
950       arg_where = CkCheckPoint_inDISK;
951     }
952 }
953 #endif
954
955 class CkMemCheckPTInit: public Chare {
956 public:
957   CkMemCheckPTInit(CkArgMsg *m) {
958 #if CMK_MEM_CHECKPOINT
959     if (arg_where == CkCheckPoint_inDISK) {
960       CkPrintf("Charm++> Double-disk Checkpointing. \n");
961     }
962     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
963     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
964 #endif
965   }
966 };
967
968 // initproc
969 void CkRegisterRestartHandler( )
970 {
971 #if CMK_MEM_CHECKPOINT
972   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
973   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
974   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
975   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
976
977   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
978   CpvAccess(procChkptBuf) = NULL;
979
980 #if 1
981   // for debugging
982   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
983 //  sleep(4);
984 #endif
985 #endif
986 }
987
988 /// @todo: the following definitions should be moved to a separate file containing
989 // structures and functions about fault tolerance strategies
990
991 /**
992  *  * @brief: function for killing a process                                             
993  *   */
994 #ifdef CMK_MEM_CHECKPOINT
995 #if CMK_HAS_GETPID
996 void killLocal(void *_dummy,double curWallTime){
997         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
998         if(CmiWallTimer()<killTime-1){
999                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1000         }else{  
1001                 kill(getpid(),SIGKILL);                                               
1002         }              
1003
1004 #else
1005 void killLocal(void *_dummy,double curWallTime){
1006   CmiAbort("kill() not supported!");
1007 }
1008 #endif
1009 #endif
1010
1011 #ifdef CMK_MEM_CHECKPOINT
1012 /**
1013  * @brief: reads the file with the kill information
1014  */
1015 void readKillFile(){
1016         FILE *fp=fopen(killFile,"r");
1017         if(!fp){
1018                 return;
1019         }
1020         int proc;
1021         double sec;
1022         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1023                 if(proc == CkMyPe()){
1024                         killTime = CmiWallTimer()+sec;
1025                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1026                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1027                 }
1028         }
1029         fclose(fp);
1030 }
1031 #endif
1032
1033 #include "CkMemCheckpoint.def.h"
1034
1035