b1d9f6f7d3613f155f15edbf8e1c3a03fb788c10
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "charm++.h"
43 #include "ck.h"
44 #include "register.h"
45 #include "conv-ccs.h"
46
47 #define DEBUGF     // CkPrintf
48
49 // assume NO extra processors--1
50 // assume extra processors--0
51 #define CK_NO_PROC_POOL                         1
52
53 // static, so that it is accessible from Converse part
54 int CkMemCheckPT::inRestarting = 0;
55 double CkMemCheckPT::startTime;
56 char *CkMemCheckPT::stage;
57 CkCallback CkMemCheckPT::cpCallback;
58
59 int _memChkptOn = 1;                    // checkpoint is on or off
60
61 CkGroupID ckCheckPTGroupID;             // readonly
62
63 /// checkpoint buffer for processor system data
64 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
65
66 // compute the backup processor
67 // FIXME: avoid crashed processors
68 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
69
70 // called in array element constructor
71 // choose and register with 2 buddies for checkpoiting 
72 #if CMK_MEM_CHECKPOINT
73 void ArrayElement::init_checkpt() {
74         if (_memChkptOn == 0) return;
75         // only master init checkpoint
76         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
77
78         budPEs[0] = CkMyPe();
79         budPEs[1] = (CkMyPe()+1)%CkNumPes();
80         CmiAssert(budPEs[0] != budPEs[1]);
81         // inform checkPTMgr
82         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
83         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
84         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
85         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
86 }
87 #endif
88
89 // entry function invoked by checkpoint mgr asking for checkpoint data
90 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
91 #if CMK_MEM_CHECKPOINT
92   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
93   CkLocMgr *locMgr = thisArray->getLocMgr();
94   CmiAssert(myRec!=NULL);
95   int size;
96   {
97         PUP::sizer p;
98         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
99         size = p.size();
100   }
101   int packSize = size/sizeof(double) +1;
102   CkArrayCheckPTMessage *msg =
103                  new (packSize, 0) CkArrayCheckPTMessage;
104   msg->len = size;
105   msg->index =thisIndexMax;
106   msg->aid = thisArrayID;
107   msg->locMgr = locMgr->getGroupID();
108   msg->cp_flag = 1;
109   {
110         PUP::toMem p(msg->packData);
111         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
112   }
113
114   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
115   checkptMgr.recvData(msg, 2, budPEs);
116   delete m;
117 #endif
118 }
119
120 // checkpoint holder class - for memory checkpointing
121 class CkMemCheckPTInfo: public CkCheckPTInfo
122 {
123   CkArrayCheckPTMessage *ckBuffer;
124 public:
125   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
126                     CkCheckPTInfo(a, loc, idx, pno)
127   {
128     ckBuffer = NULL;
129   }
130   ~CkMemCheckPTInfo() 
131   {
132     if (ckBuffer) delete ckBuffer; 
133   }
134   inline void updateBuffer(CkArrayCheckPTMessage *data) 
135   {
136     if (ckBuffer) delete ckBuffer;
137     ckBuffer = data;
138   }    
139   inline CkArrayCheckPTMessage * getCopy()
140   {
141     if (ckBuffer == NULL) {
142       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
143       CmiAbort("Abort!");
144     }
145     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
146   }     
147   inline void updateBuddy(int b1, int b2) {
148      CmiAssert(ckBuffer);
149      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
150   }
151   inline int getSize() { 
152      CmiAssert(ckBuffer);
153      return ckBuffer->len; 
154   }
155 };
156
157 // checkpoint holder class - for in-disk checkpointing
158 class CkDiskCheckPTInfo: public CkCheckPTInfo 
159 {
160   char *fname;
161   int bud1, bud2;
162   int len;                      // checkpoint size
163 public:
164   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
165   {
166 #if CMK_USE_MKSTEMP
167     fname = new char[64];
168     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
169     mkstemp(fname);
170 #else
171     fname=tmpnam(NULL);
172 #endif
173     bud1 = bud2 = -1;
174     len = 0;
175   }
176   ~CkDiskCheckPTInfo() 
177   {
178     remove(fname);
179   }
180   inline void updateBuffer(CkArrayCheckPTMessage *data) 
181   {
182     double t = CmiWallTimer();
183     // unpack it
184     envelope *env = UsrToEnv(data);
185     CkUnpackMessage(&env);
186     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
187     FILE *f = fopen(fname,"wb");
188     PUP::toDisk p(f);
189     CkPupMessage(p, (void **)&data);
190     // delay sync to the end because otherwise the messages are blocked
191 //    fsync(fileno(f));
192     fclose(f);
193     bud1 = data->bud1;
194     bud2 = data->bud2;
195     len = data->len;
196     delete data;
197     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
198   }
199   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
200   {
201     CkArrayCheckPTMessage *data;
202     FILE *f = fopen(fname,"rb");
203     PUP::fromDisk p(f);
204     CkPupMessage(p, (void **)&data);
205     fclose(f);
206     data->bud1 = bud1;                          // update the buddies
207     data->bud2 = bud2;
208     return data;
209   }
210   inline void updateBuddy(int b1, int b2) {
211      bud1 = b1; bud2 = b2;
212   }
213   inline int getSize() { 
214      return len; 
215   }
216 };
217
218 CkMemCheckPT::CkMemCheckPT(int w)
219 {
220 #if CK_NO_PROC_POOL
221   if (CkNumPes() <= 2) {
222 #else
223   if (CkNumPes()  == 1) {
224 #endif
225     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
226     _memChkptOn = 0;
227   }
228   inRestarting = 0;
229   recvCount = peCount = 0;
230   where = w;
231 }
232
233 CkMemCheckPT::~CkMemCheckPT()
234 {
235   int len = ckTable.length();
236   for (int i=0; i<len; i++) {
237     delete ckTable[i];
238   }
239 }
240
241 void CkMemCheckPT::pup(PUP::er& p) 
242
243   CBase_CkMemCheckPT::pup(p); 
244   p|cpStarter;
245   p|thisFailedPe;
246   p|failedPes;
247   p|ckCheckPTGroupID;           // recover global variable
248   p|cpCallback;                 // store callback
249   p|where;                      // where to checkpoint
250   if (p.isUnpacking()) {
251     recvCount = peCount = 0;
252   }
253 }
254
255 // called by checkpoint mgr to restore an array element
256 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
257 {
258 #if CMK_MEM_CHECKPOINT
259   DEBUGF("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  
260   // m->index.print();
261   PUP::fromMem p(m->packData);
262   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
263   CmiAssert(mgr);
264   mgr->resume(m->index, p);
265
266   // find a list of array elements bound together
267   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
268   CmiAssert(elt);
269   CkLocRec_local *rec = elt->myRec;
270   CkVec<CkMigratable *> list;
271   mgr->migratableList(rec, list);
272   CmiAssert(list.length() > 0);
273   for (int l=0; l<list.length(); l++) {
274     elt = (ArrayElement *)list[l];
275     elt->budPEs[0] = m->bud1;
276     elt->budPEs[1] = m->bud2;
277     //    reset, may not needed now
278     // for now.
279     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
280       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
281       if (c) c->redNo = 0;
282     }
283   }
284 #endif
285 }
286
287 // return 1 if pe is a crashed processor
288 int CkMemCheckPT::isFailed(int pe)
289 {
290   for (int i=0; i<failedPes.length(); i++)
291     if (failedPes[i] == pe) return 1;
292   return 0;
293 }
294
295 // add pe into history list of all failed processors
296 void CkMemCheckPT::failed(int pe)
297 {
298   if (isFailed(pe)) return;
299   failedPes.push_back(pe);
300 }
301
302 int CkMemCheckPT::totalFailed()
303 {
304   return failedPes.length();
305 }
306
307 // create an checkpoint entry for array element of aid with index.
308 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
309 {
310   // error check, no duplicate
311   int idx, len = ckTable.size();
312   for (idx=0; idx<len; idx++) {
313     CkCheckPTInfo *entry = ckTable[idx];
314     if (index == entry->index) {
315       if (loc == entry->locMgr) {
316           // bindTo array elements
317           return;
318       }
319         // for array inheritance, the following check may fail
320         // because ArrayElement constructor of all superclasses are called
321       if (aid == entry->aid) {
322         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
323         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
324       }
325     }
326   }
327   CkCheckPTInfo *newEntry;
328   if (where == CkCheckPoint_inMEM)
329     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
330   else
331     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
332   ckTable.push_back(newEntry);
333   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
334 }
335
336 // loop through my checkpoint table and ask checkpointed array elements
337 // to send me checkpoint data.
338 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
339 {
340   cpCallback = cb;
341   cpStarter = starter;
342   if (CkMyPe() == cpStarter) {
343     startTime = CmiWallTimer();
344     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
345   }
346
347   int len = ckTable.length();
348   for (int i=0; i<len; i++) {
349     CkCheckPTInfo *entry = ckTable[i];
350       // always let the bigger number processor send request
351     if (CkMyPe() < entry->pNo) continue;
352       // call inmem_checkpoint to the array element, ask it to send
353       // back checkpoint data via recvData().
354     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
355     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
356   }
357     // if my table is empty, then I am done
358   if (len == 0) thisProxy[cpStarter].cpFinish();
359
360   // pack and send proc level data
361   sendProcData();
362 }
363
364 // don't handle array elements
365 static inline void _handleProcData(PUP::er &p)
366 {
367     // save readonlys, and callback BTW
368     CkPupROData(p);
369
370     // save mainchares into MainChares.dat
371     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
372         
373     // save groups into Groups.dat
374     CkPupGroupData(p);
375
376     // save nodegroups into NodeGroups.dat
377     if(CkMyRank()==0) CkPupNodeGroupData(p);
378 }
379
380 void CkMemCheckPT::sendProcData()
381 {
382   // find out size of buffer
383   int size;
384   {
385     PUP::sizer p;
386     _handleProcData(p);
387     size = p.size();
388   }
389   int packSize = size;
390   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
391   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
392   {
393     PUP::toMem p(msg->packData);
394     _handleProcData(p);
395   }
396   msg->pe = CkMyPe();
397   msg->len = size;
398   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
399   thisProxy[ChkptOnPe()].recvProcData(msg);
400 }
401
402 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
403 {
404   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
405   CpvAccess(procChkptBuf) = msg;
406 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
407   thisProxy[msg->reportPe].cpFinish();
408 }
409
410 // ArrayElement call this function to give us the checkpointed data
411 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
412 {
413   int len = ckTable.length();
414   int idx;
415   for (idx=0; idx<len; idx++) {
416     CkCheckPTInfo *entry = ckTable[idx];
417     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
418   }
419   CkAssert(idx < len);
420   int isChkpting = msg->cp_flag;
421   ckTable[idx]->updateBuffer(msg);
422   if (isChkpting) {
423       // all my array elements have returned their inmem data
424       // inform starter processor that I am done.
425     recvCount ++;
426     if (recvCount == ckTable.length()) {
427       if (where == CkCheckPoint_inMEM) {
428         thisProxy[cpStarter].cpFinish();
429       }
430       else if (where == CkCheckPoint_inDISK) {
431         // another barrier for finalize the writing using fsync
432         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
433         contribute(0,NULL,CkReduction::sum_int,localcb);
434       }
435       else
436         CmiAbort("Unknown checkpoint scheme");
437       recvCount = 0;
438     } 
439   }
440 }
441
442 // only used in disk checkpointing
443 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
444 {
445   delete m;
446 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
447   system("sync");
448 #endif
449   thisProxy[cpStarter].cpFinish();
450 }
451
452 // only is called on cpStarter when checkpoint is done
453 void CkMemCheckPT::cpFinish()
454 {
455   CmiAssert(CkMyPe() == cpStarter);
456   peCount++;
457     // now that all processors have finished, activate callback
458   if (peCount == 2*(CkNumPes())) {
459     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
460     cpCallback.send();
461     peCount = 0;
462     thisProxy.report();
463   }
464 }
465
466 // for debugging, report checkpoint info
467 void CkMemCheckPT::report()
468 {
469   int objsize = 0;
470   int len = ckTable.length();
471   for (int i=0; i<len; i++) {
472     CkCheckPTInfo *entry = ckTable[i];
473     CmiAssert(entry);
474     objsize += entry->getSize();
475   }
476   CmiAssert(CpvAccess(procChkptBuf));
477   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
478 }
479
480 /*****************************************************************************
481                         RESTART Procedure
482 *****************************************************************************/
483
484 // master processor of two buddies
485 inline int CkMemCheckPT::isMaster(int buddype)
486 {
487   int mype = CkMyPe();
488 //CkPrintf("ismaster: %d %d\n", pe, mype);
489   if (CkNumPes() - totalFailed() == 2) {
490     return mype > buddype;
491   }
492   for (int i=1; i<CkNumPes(); i++) {
493     int me = (buddype+i)%CkNumPes();
494     if (isFailed(me)) continue;
495     if (me == mype) return 1;
496     else return 0;
497   }
498   return 0;
499 }
500
501 #ifdef CKLOCMGR_LOOP
502 #undef CKLOCMGR_LOOP
503 #endif
504
505 // loop over all CkLocMgr and do "code"
506 #define  CKLOCMGR_LOOP(code)    {       \
507   int numGroups = CkpvAccess(_groupIDTable)->size();    \
508   for(int i=0;i<numGroups;i++) {        \
509     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
510     if(obj->isLocMgr())  {      \
511       CkLocMgr *mgr = (CkLocMgr*)obj;   \
512       code      \
513     }   \
514   }     \
515  }
516
517 #if 0
518 // helper class to pup all elements that belong to same ckLocMgr
519 class ElementDestoryer : public CkLocIterator {
520 private:
521         CkLocMgr *locMgr;
522 public:
523         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
524         void addLocation(CkLocation &loc) {
525                 CkArrayIndexMax idx=loc.getIndex();
526                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
527                 loc.destroy();
528         }
529 };
530 #endif
531
532 // restore the bitmap vector for LB
533 void CkMemCheckPT::resetLB(int diepe)
534 {
535 #if CMK_LBDB_ON
536   int i;
537   char *bitmap = new char[CkNumPes()];
538   // set processor available bitmap
539   get_avail_vector(bitmap);
540
541   for (i=0; i<failedPes.length(); i++)
542     bitmap[failedPes[i]] = 0; 
543   bitmap[diepe] = 0;
544
545 #if CK_NO_PROC_POOL
546   set_avail_vector(bitmap);
547 #endif
548
549   // if I am the crashed pe, rebuild my failedPEs array
550   if (CkMyPe() == diepe)
551     for (i=0; i<CkNumPes(); i++) 
552       if (bitmap[i]==0) failed(i);
553
554   delete [] bitmap;
555 #endif
556 }
557
558 // in case when failedPe dies, everybody go through its checkpoint table:
559 // destory all array elements
560 // recover lost buddies
561 // reconstruct all array elements from check point data
562 // called on all processors
563 void CkMemCheckPT::restart(int diePe)
564 {
565 #if CMK_MEM_CHECKPOINT
566   double curTime = CmiWallTimer();
567   if (CkMyPe() == diePe)
568     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
569   stage = "resetLB";
570   startTime = curTime;
571   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
572
573 #if CK_NO_PROC_POOL
574   failed(diePe);        // add into the list of failed pes
575 #endif
576   thisFailedPe = diePe;
577
578   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
579
580   inRestarting = 1;
581                                                                                 
582   // disable load balancer's barrier
583   if (CkMyPe() != diePe) resetLB(diePe);
584
585   CKLOCMGR_LOOP(mgr->startInserting(););
586
587   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
588 #endif
589 }
590
591 // loally remove all array elements
592 void CkMemCheckPT::removeArrayElements()
593 {
594 #if CMK_MEM_CHECKPOINT
595   int len = ckTable.length();
596   double curTime = CmiWallTimer();
597   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
598   stage = "removeArrayElements";
599   startTime = curTime;
600
601   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
602   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
603
604   // get rid of all buffering and remote recs
605   // including destorying all array elements
606   CKLOCMGR_LOOP(mgr->flushAllRecs(););
607
608 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
609
610   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
611 #endif
612 }
613
614 // flush state in reduction manager
615 void CkMemCheckPT::resetReductionMgr()
616 {
617   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
618   int numGroups = CkpvAccess(_groupIDTable)->size();
619   for(int i=0;i<numGroups;i++) {
620     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
621     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
622     obj->flushStates();
623     obj->ckJustMigrated();
624   }
625   // reset again
626   //CpvAccess(_qd)->flushStates();
627
628   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
629 }
630
631 // recover the lost buddies
632 void CkMemCheckPT::recoverBuddies()
633 {
634   int idx;
635   int len = ckTable.length();
636   // ready to flush reduction manager
637   // cannot be CkMemCheckPT::restart because destory will modify states
638   double curTime = CmiWallTimer();
639   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
640   stage = "recoverBuddies";
641   startTime = curTime;
642
643   // recover buddies
644   for (idx=0; idx<len; idx++) {
645     CkCheckPTInfo *entry = ckTable[idx];
646     if (entry->pNo == thisFailedPe) {
647 #if CK_NO_PROC_POOL
648       // find a new buddy
649       int budPe = CkMyPe();
650 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
651       while (budPe == CkMyPe() || isFailed(budPe)) 
652           budPe = (budPe+1)%CkNumPes();
653       entry->pNo = budPe;
654 #else
655       int budPe = thisFailedPe;
656 #endif
657       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
658       CkArrayCheckPTMessage *msg = entry->getCopy();
659       msg->cp_flag = 0;            // not checkpointing
660       thisProxy[budPe].recvData(msg);
661     }
662   }
663
664   if (CkMyPe() == 0)
665     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
666 }
667
668 // restore array elements
669 void CkMemCheckPT::recoverArrayElements()
670 {
671   double curTime = CmiWallTimer();
672   int len = ckTable.length();
673   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
674   stage = "recoverArrayElements";
675   startTime = curTime;
676
677   // recover all array elements
678   int count = 0;
679   for (int idx=0; idx<len; idx++)
680   {
681     CkCheckPTInfo *entry = ckTable[idx];
682 #if CK_NO_PROC_POOL
683     // the bigger one will do 
684 //    if (CkMyPe() < entry->pNo) continue;
685     if (!isMaster(entry->pNo)) continue;
686 #else
687     // smaller one do it, which has the original object
688     if (CkMyPe() == entry->pNo+1 || 
689         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
690 #endif
691 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
692
693     entry->updateBuddy(CkMyPe(), entry->pNo);
694     CkArrayCheckPTMessage *msg = entry->getCopy();
695     // gzheng
696     //checkptMgr[CkMyPe()].inmem_restore(msg);
697     inmem_restore(msg);
698     count ++;
699   }
700 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
701
702   if (CkMyPe() == 0)
703     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
704 }
705
706 // on every processor
707 // turn load balancer back on
708 void CkMemCheckPT::finishUp()
709 {
710   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
711   CKLOCMGR_LOOP(mgr->doneInserting(););
712   
713   inRestarting = 0;
714
715   if (CkMyPe() == 0)
716   {
717        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
718        CkStartQD(cpCallback);
719   } 
720 #if CK_NO_PROC_POOL
721   if (CkNumPes()-totalFailed() <=2) {
722     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
723     _memChkptOn = 0;
724   }
725 #endif
726 }
727
728 // called only on 0
729 void CkMemCheckPT::quiescence(CkCallback &cb)
730 {
731   static int pe_count = 0;
732   pe_count ++;
733   CmiAssert(CkMyPe() == 0);
734   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
735   if (pe_count == CkNumPes()) {
736     pe_count = 0;
737     cb.send();
738   }
739 }
740
741 // User callable function - to start a checkpoint
742 // callback cb is used to pass control back
743 void CkStartMemCheckpoint(CkCallback &cb)
744 {
745 #if CMK_MEM_CHECKPOINT
746   if (_memChkptOn == 0) {
747     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
748     cb.send();
749     return;
750   }
751   if (CkInRestarting()) {
752       // trying to checkpointing during restart
753     cb.send();
754     return;
755   }
756     // store user callback and user data
757   CkMemCheckPT::cpCallback = cb;
758
759     // broadcast to start check pointing
760   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
761   checkptMgr.doItNow(CkMyPe(), cb);
762 #else
763   // when mem checkpoint is disabled, invike cb immediately
764   cb.send();
765 #endif
766 }
767
768 void CkRestartCheckPoint(int diePe)
769 {
770 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
771   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
772   // broadcast
773   checkptMgr.restart(diePe);
774 }
775
776 static int _diePE = -1;
777
778 // callback function used locally by ccs handler
779 static void CkRestartCheckPointCallback(void *ignore, void *msg)
780 {
781 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
782   CkRestartCheckPoint(_diePE);
783 }
784
785 // Converse function handles
786 static int askProcDataHandlerIdx;
787 static int restartBcastHandlerIdx;
788 static int recoverProcDataHandlerIdx;
789 static int restartBeginHandlerIdx;
790
791 static void restartBeginHandler(char *msg)
792 {
793 #if CMK_MEM_CHECKPOINT
794   static int count = 0;
795   CmiFree(msg);
796   CmiAssert(CkMyPe() == _diePE);
797   count ++;
798   if (count == CkNumPes()) {
799     CkRestartCheckPointCallback(NULL, NULL);
800     count = 0;
801   }
802 #endif
803 }
804
805 static void restartBcastHandler(char *msg)
806 {
807 #if CMK_MEM_CHECKPOINT
808   // advance phase counter
809   CkMemCheckPT::inRestarting = 1;
810   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
811   // gzheng
812   if (CkMyPe() != _diePE) cur_restart_phase ++;
813
814   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
815
816   // reset QD counters
817   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
818
819 /*  gzheng
820   if (CkMyPe()==_diePE)
821       CkRestartCheckPointCallback(NULL, NULL);
822 */
823   CmiFree(msg);
824
825   char restartmsg[CmiMsgHeaderSizeBytes];
826   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
827   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
828 #endif
829 }
830
831 extern void _initDone();
832
833 // called on crashed processor
834 static void recoverProcDataHandler(char *msg)
835 {
836 #if CMK_MEM_CHECKPOINT
837    int i;
838    envelope *env = (envelope *)msg;
839    CkUnpackMessage(&env);
840    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
841    cur_restart_phase = procMsg->cur_restart_phase;
842    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
843    cur_restart_phase ++;
844    CpvAccess(_qd)->flushStates();
845
846    // restore readonly, mainchare, group, nodegroup
847 //   int temp = cur_restart_phase;
848 //   cur_restart_phase = -1;
849    PUP::fromMem p(procMsg->packData);
850    _handleProcData(p);
851
852    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
853    // gzheng
854    CKLOCMGR_LOOP(mgr->startInserting(););
855
856    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
857    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
858    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
859    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
860    CmiFree(msg);
861
862    _initDone();
863 #endif
864 }
865
866 // called on its backup processor
867 // get backup message buffer and sent to crashed processor
868 static void askProcDataHandler(char *msg)
869 {
870 #if CMK_MEM_CHECKPOINT
871     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
872     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
873     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
874     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
875     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
876
877     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
878
879     CkPackMessage(&env);
880     CmiSetHandler(env, recoverProcDataHandlerIdx);
881     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
882     CpvAccess(procChkptBuf) = NULL;
883 #endif
884 }
885
886 void CkMemRestart(const char *dummy, const CkArgMsg *args)
887 {
888 #if CMK_MEM_CHECKPOINT
889    _diePE = CkMyPe();
890    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
891    CkMemCheckPT::startTime = CmiWallTimer();
892    CkMemCheckPT::inRestarting = 1;
893    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
894    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
895    cur_restart_phase = 9999;             // big enough to get it processed
896    CmiSetHandler(msg, askProcDataHandlerIdx);
897    int pe = ChkptOnPe();
898    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
899    cur_restart_phase=0;    // allow all message to come in
900 #else
901    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
902 #endif
903 }
904
905 // can be called in other files
906 // return true if it is in restarting
907 int CkInRestarting()
908 {
909 #if CMK_MEM_CHECKPOINT
910   // gzheng
911   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
912   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
913   return CkMemCheckPT::inRestarting;
914 #else
915   return 0;
916 #endif
917 }
918
919 /*****************************************************************************
920                 module initialization
921 *****************************************************************************/
922
923 class CkMemCheckPTInit: public Chare {
924 public:
925   CkMemCheckPTInit(CkArgMsg *m) {
926 #if CMK_MEM_CHECKPOINT
927     int  where = CkCheckPoint_inMEM;
928     if (CmiGetArgFlagDesc(m->argv, "+ftc_disk", "Double-disk Checkpointing")) {
929       where = CkCheckPoint_inDISK;
930       CkPrintf("Charm++> Double-disk Checkpointing. \n");
931     }
932     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(where);
933     CkPrintf("CkMemCheckPTInit main chare created!\n");
934 #endif
935   }
936 };
937
938 // initproc
939 void CkRegisterRestartHandler( )
940 {
941 #if CMK_MEM_CHECKPOINT
942   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
943   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
944   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
945   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
946
947   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
948   CpvAccess(procChkptBuf) = NULL;
949
950 #if 1
951   // for debugging
952   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
953 //  sleep(4);
954 #endif
955 #endif
956 }
957
958 #include "CkMemCheckpoint.def.h"
959
960