f051066f95e34f1c1bd439d085e2bbb049b888b3
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03
10
11 To ensure fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element.
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume at a time only one failure happens,
17 and there is no failure during a checkpointing or restarting phase.
18
19 Restart phase contains two steps:
20 1. Converse level restart where only the newly created process for the failed
21    processor is working on restoring the system data (except array elements)
22    from its backup processor.
23 2. CkMemCheckPT gets control and recover array elements and reset all
24    states to be consistent.
25
26 TODO:
27  checkpoint scheme can be reimplemented based on per processor scheme;
28  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
29
30 */
31
32 #include "charm++.h"
33 #include "ck.h"
34 #include "register.h"
35 #include "conv-ccs.h"
36
37 #define DEBUGF     // CkPrintf
38
39 int CkMemCheckPT::inRestarting = 0;
40 double CkMemCheckPT::startTime;
41 char *CkMemCheckPT::stage;
42
43 int _memChkptOn = 1;
44
45 CkGroupID ckCheckPTGroupID;             // readonly
46
47 CkCallback CkMemCheckPT::cpCallback;    // static
48
49 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
50
51 // compute the backup processor
52 // FIXME: avoid crashed processors
53 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
54
55 // called in array element constructor
56 // choose and register with 2 buggies for checkpoiting 
57 #if CMK_MEM_CHECKPOINT
58 void ArrayElement::init_checkpt() {
59         if (_memChkptOn == 0) return;
60         // CmiPrintf("[%d] ArrayElement::init_checkpt %d\n", CkMyPe(), info.fromMigration);
61         // only masteriinit checkpoint
62         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
63
64 //        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
65         budPEs[0] = CkMyPe();
66         budPEs[1] = (CkMyPe()+1)%CkNumPes();
67         CmiAssert(budPEs[0] != budPEs[1]);
68         // inform checkPTMgr
69         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
70         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
71         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
72 }
73 #endif
74
75 // entry function invoked by checkpoint mgr asking for checkpoint data
76 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
77 #if CMK_MEM_CHECKPOINT
78   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
79   CkLocMgr *locMgr = thisArray->getLocMgr();
80   CmiAssert(myRec!=NULL);
81   int size;
82   {
83         PUP::sizer p;
84         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
85         size = p.size();
86   }
87   int packSize = size/sizeof(double) +1;
88   CkArrayCheckPTMessage *msg =
89                  new (packSize, 0) CkArrayCheckPTMessage;
90   msg->len = size;
91   msg->index =thisIndexMax;
92   msg->aid = thisArrayID;
93   msg->locMgr = locMgr->getGroupID();
94   msg->cp_flag = 1;
95   {
96         PUP::toMem p(msg->packData);
97         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
98   }
99
100   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
101   checkptMgr.recvData(msg, 2, budPEs);
102   delete m;
103 #endif
104 }
105
106 // called by checkpoint mgr to restore an array element
107 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
108 {
109 #if CMK_MEM_CHECKPOINT
110   //DEBUGF("[%d] inmem_restore restore", CmiMyPe());  m->index.print();
111 //  CmiPrintf("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
112   PUP::fromMem p(m->packData);
113   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
114   mgr->resume(m->index, p);
115
116   // find a list of array elements bound together
117   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
118   CkLocRec_local *rec = elt->myRec;
119   CkVec<CkMigratable *> list;
120   mgr->migratableList(rec, list);
121   CmiAssert(list.length() > 0);
122   for (int l=0; l<list.length(); l++) {
123     elt = (ArrayElement *)list[l];
124     elt->budPEs[0] = m->bud1;
125     elt->budPEs[1] = m->bud2;
126     //    reset, may not needed now
127     // for now.
128     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
129       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
130       if (c) c->redNo = 0;
131     }
132   }
133 #endif
134 }
135
136 CkMemCheckPT::CkMemCheckPT()
137 {
138   if (CkNumPes() <= 2) {
139     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
140     _memChkptOn = 0;
141   }
142   inRestarting = 0;
143   recvCount = peCount = 0;
144 }
145
146 CkMemCheckPT::~CkMemCheckPT()
147 {
148 }
149
150 void CkMemCheckPT::pup(PUP::er& p) 
151
152   CBase_CkMemCheckPT::pup(p); 
153   p|cpStarter;
154   p|thisFailedPe;
155   p|failedPes;
156   p|ckCheckPTGroupID;           // recover global variable
157   p|cpCallback;                 // store callback
158   if (p.isUnpacking()) {
159     recvCount = peCount = 0;
160   }
161 }
162
163 // return 1 is pe was a crashed processor
164 int CkMemCheckPT::isFailed(int pe)
165 {
166   for (int i=0; i<failedPes.length(); i++)
167     if (failedPes[i] == pe) return 1;
168   return 0;
169 }
170
171 // add pe into history list of all failed processors
172 void CkMemCheckPT::failed(int pe)
173 {
174   if (isFailed(pe)) return;
175   failedPes.push_back(pe);
176 }
177
178 int CkMemCheckPT::totalFailed()
179 {
180   return failedPes.length();
181 }
182
183 // create an checkpoint entry for array element of aid with index.
184 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
185 {
186   // error check, no duplicate
187   int idx, len = ckTable.size();
188   for (idx=0; idx<len; idx++) {
189     CkMemCheckPTInfo *entry = ckTable[idx];
190     if (index == entry->index) {
191       if (aid == entry->aid) {
192         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry. \n", CkMyPe());
193         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
194       }
195       if (loc == entry->locMgr) {
196         // bindTo array elements
197         return;
198       }
199     }
200   }
201   CkMemCheckPTInfo *newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
202   ckTable.push_back(newEntry);
203 }
204
205 // loop through my checkpoint table and ask checkpointed array elements
206 // to send me checkpoint data.
207 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
208 {
209   cpCallback = cb;
210   cpStarter = starter;
211   CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
212   if (CkMyPe() == cpStarter) startTime = CmiWallTimer();
213
214 //  if (iFailed()) return;
215   int len = ckTable.length();
216   for (int i=0; i<len; i++) {
217     CkMemCheckPTInfo *entry = ckTable[i];
218       // always let the bigger number processor send request
219     if (CkMyPe() < entry->pNo) continue;
220       // call inmem_checkpoint to the array element, ask it to send
221       // back checkpoint data via recvData().
222     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
223     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
224   }
225     // if my table is empty, then I am done
226   if (len == 0) thisProxy[cpStarter].cpFinish();
227
228   // pack and send proc level data
229   sendProcData();
230 }
231
232 // don't handle array elements
233 static void _handleProcData(PUP::er &p)
234 {
235     // save readonlys, and callback BTW
236     CkPupROData(p);
237
238     // save mainchares into MainChares.dat
239     if(CkMyPe()==0) CkPupMainChareData(p);
240         
241     // save groups into Groups.dat
242     CkPupGroupData(p);
243
244     // save nodegroups into NodeGroups.dat
245     if(CkMyRank()==0) CkPupNodeGroupData(p);
246 }
247
248 void CkMemCheckPT::sendProcData()
249 {
250   // find out size of buffer
251   int size;
252   {
253     PUP::sizer p;
254     _handleProcData(p);
255     size = p.size();
256   }
257   int packSize = size;
258   CkProcCheckPTMessage *msg =
259                  new (packSize, 0) CkProcCheckPTMessage;
260   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
261   {
262     PUP::toMem p(msg->packData);
263     _handleProcData(p);
264   }
265   msg->pe = CkMyPe();
266   msg->len = size;
267   msg->reportPe = cpStarter;    // in case other processor is not in checkpoint mode
268   thisProxy[ChkptOnPe()].recvProcData(msg);
269 }
270
271 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
272 {
273   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
274   CpvAccess(procChkptBuf) = msg;
275 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
276   thisProxy[msg->reportPe].cpFinish();
277 }
278
279 // ArrayElement call this function to give us the checkpointed data
280 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
281 {
282   int len = ckTable.length();
283   int idx;
284   for (idx=0; idx<len; idx++) {
285     CkMemCheckPTInfo *entry = ckTable[idx];
286     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
287   }
288   CkAssert(idx < len);
289   ckTable[idx]->updateBuffer(msg);
290     // all my array elements have returned their inmem data
291     // inform starter processor that I am done.
292   if (msg->cp_flag) {
293     recvCount ++;
294     if (recvCount == ckTable.length()) {
295       thisProxy[cpStarter].cpFinish();
296       recvCount = 0;
297     } 
298   }
299 }
300
301 // only is called on cpStarter when checkpoint is done
302 void CkMemCheckPT::cpFinish()
303 {
304   CmiAssert(CkMyPe() == cpStarter);
305   peCount++;
306     // now all processors have finished, activate callback
307 //CmiPrintf("peCount:%d\n",peCount);
308   if (peCount == 2*(CkNumPes())) {
309     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
310     cpCallback.send();
311     peCount = 0;
312     thisProxy.report();
313   }
314 }
315
316 void CkMemCheckPT::report()
317 {
318   int objsize = 0;
319   int len = ckTable.length();
320   for (int i=0; i<len; i++) {
321     CkMemCheckPTInfo *entry = ckTable[i];
322     CmiAssert(entry && entry->ckBuffer);
323     objsize += entry->ckBuffer->len;
324   }
325   CmiAssert(CpvAccess(procChkptBuf));
326   CkPrintf("[%d] Checkpointed Object size: %d Processor data: %d\n", CkMyPe(), objsize, CpvAccess(procChkptBuf)->len);
327 }
328
329 /*****************************************************************************
330                         RESTART Procedure
331 *****************************************************************************/
332
333 inline int CkMemCheckPT::isMaster(int pe)
334 {
335   int mype = CkMyPe(); 
336 //CkPrintf("ismaster: %d %d\n", pe, mype);
337   for (int i=1; i<CkNumPes(); i++) {
338     int me = (pe+i)%CkNumPes();
339     if (isFailed(me)) continue;
340     if (me == mype) return 1;
341     else return 0;
342   }
343   return 0;
344 }
345
346 // loop over all CkLocMgr and do "code"
347 #define  CKLOCMGR_LOOP(code)    {       \
348   int numGroups = CkpvAccess(_groupIDTable)->size();    \
349   for(int i=0;i<numGroups;i++) {        \
350     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
351     if(obj->isLocMgr())  {      \
352       CkLocMgr *mgr = (CkLocMgr*)obj;   \
353       code      \
354     }   \
355   }     \
356  }
357
358 #if 0
359 // helper class to pup all elements that belong to same ckLocMgr
360 class ElementDestoryer : public CkLocIterator {
361 private:
362         CkLocMgr *locMgr;
363 public:
364         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
365         void addLocation(CkLocation &loc) {
366                 CkArrayIndexMax idx=loc.getIndex();
367                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
368                 loc.destroy();
369         }
370 };
371 #endif
372
373 // restore the bitmap vector for LB
374 void CkMemCheckPT::resetLB(int diepe)
375 {
376 #if CMK_LBDB_ON
377   int i;
378   char *bitmap = new char[CkNumPes()];
379   // set processor available bitmap
380   get_avail_vector(bitmap);
381
382   for (i=0; i<failedPes.length(); i++)
383     bitmap[failedPes[i]] = 0; 
384   bitmap[diepe] = 0;
385   set_avail_vector(bitmap);
386
387   // if I am the crashed pe, rebuild my failedPEs array
388   if (CkMyPe() == diepe)
389     for (i=0; i<CkNumPes(); i++) 
390       if (bitmap[i]==0) failed(i);
391
392   delete [] bitmap;
393 #endif
394 }
395
396 // in case when failedPe dies, everybody go through its check point table:
397 // destory all array elements
398 // recover lost buddies
399 // reconstruct all array elements from check point data
400 // called on all processors
401 void CkMemCheckPT::restart(int diePe)
402 {
403 #if CMK_MEM_CHECKPOINT
404   double curTime = CmiWallTimer();
405   if (CkMyPe() == diePe)
406     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
407   stage = "resetLB";
408   startTime = curTime;
409   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
410
411   failed(diePe);        // add into the list of failed pes
412   thisFailedPe = diePe;
413
414   // clean array chkpt table
415   if (CkMyPe() == diePe) ckTable.length() = 0;
416
417   inRestarting = 1;
418                                                                                 
419   // disable load balancer's barrier
420   if (CkMyPe() != diePe) resetLB(diePe);
421
422   CKLOCMGR_LOOP(mgr->startInserting(););
423
424   // afterwards, the QD detection should work again
425   //if (CkMyPe() == 0)
426   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
427   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
428 #endif
429 }
430
431 void CkMemCheckPT::removeArrayElements()
432 {
433 #if CMK_MEM_CHECKPOINT
434   int len = ckTable.length();
435   double curTime = CmiWallTimer();
436   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
437   stage = "removeArrayElements";
438   startTime = curTime;
439
440   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
441   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
442
443   // get rid of all buffering and remote recs
444   CKLOCMGR_LOOP(mgr->flushAllRecs(););
445
446 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
447
448 #if 0
449   // first phase: destroy all existing array elements
450   for (int idx=0; idx<len; idx++) {
451     CkMemCheckPTInfo *entry = ckTable[idx];
452     // the bigger number PE do the destory
453     if (CkMyPe() < entry->pNo && entry->pNo != thisFailedPe) continue;
454     CkArrayMessage *msg = (CkArrayMessage *)CkAllocSysMsg();
455     msg->array_setIfNotThere(CkArray_IfNotThere_buffer);
456     CkSendMsgArray(CkIndex_ArrayElement::ckDestroy(),msg,entry->aid,entry->index);
457     //CkCallback cb(CkIndex_ArrayElement::ckDestroy(), entry->index, entry->aid);
458     //cb.send(msg);
459 CkPrintf("[%d] Destory: ", CkMyPe()); entry->index.print();
460   }
461 #endif
462
463   //if (CkMyPe() == 0)
464   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
465   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
466 #endif
467 }
468
469 // flush state in reduction manager
470 void CkMemCheckPT::resetReductionMgr()
471 {
472   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
473   int numGroups = CkpvAccess(_groupIDTable)->size();
474   for(int i=0;i<numGroups;i++) {
475     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
476     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
477     obj->flushStates();
478     obj->ckJustMigrated();
479   }
480   // reset again
481   //CpvAccess(_qd)->flushStates();
482
483   //if (CkMyPe() == 0)
484   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
485   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
486 }
487
488 // recover the lost buddies
489 void CkMemCheckPT::recoverBuddies()
490 {
491   int idx;
492   int len = ckTable.length();
493   // ready to flush reduction manager
494   // cannot be CkMemCheckPT::restart because destory will modify states
495   double curTime = CmiWallTimer();
496   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
497   stage = "recoverBuddies";
498   startTime = curTime;
499
500   //if (iFailed()) return;   ??????
501
502   // recover buddies
503   for (idx=0; idx<len; idx++) {
504     CkMemCheckPTInfo *entry = ckTable[idx];
505     if (entry->pNo == thisFailedPe) {
506       int budPe = CkMyPe();
507 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
508       while (budPe == CkMyPe() || isFailed(budPe)) budPe = (budPe+1)%CkNumPes();
509       entry->pNo = budPe;
510       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
511       CmiAssert(entry->ckBuffer);
512       CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&entry->ckBuffer);
513       msg->cp_flag = 0;            // not checkpointing
514       thisProxy[budPe].recvData(msg);
515     }
516   }
517
518   if (CkMyPe() == 0)
519     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
520 }
521
522 // restore 
523 void CkMemCheckPT::recoverArrayElements()
524 {
525   double curTime = CmiWallTimer();
526   CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, curTime-startTime);
527   stage = "recoverArrayElements";
528   startTime = curTime;
529   //if (iFailed()) return;
530
531   // recover all array elements
532   int count = 0;
533   int len = ckTable.length();
534   for (int idx=0; idx<len; idx++)
535   {
536     CkMemCheckPTInfo *entry = ckTable[idx];
537     // the bigger one will do 
538 //    if (CkMyPe() < entry->pNo) continue;
539     if (!isMaster(entry->pNo)) continue;
540 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
541     if (entry->ckBuffer == NULL) CmiAbort("recoverArrayElements: element does not have checkpoint data.");
542     entry->ckBuffer->bud1 = CkMyPe(); entry->ckBuffer->bud2 = entry->pNo;
543     CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
544     CkArrayCheckPTMessage *msg = (CkArrayCheckPTMessage *)CkCopyMsg((void **)&entry->ckBuffer);
545     // gzheng
546     //checkptMgr[CkMyPe()].inmem_restore(msg);
547     inmem_restore(msg);
548     count ++;
549   }
550   //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
551
552   if (CkMyPe() == 0)
553     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
554 }
555
556 // on every processor
557 // turn load balancer back on
558 void CkMemCheckPT::finishUp()
559 {
560   int i;
561   int numGroups = CkpvAccess(_groupIDTable)->size();
562   for(i=0;i<numGroups;i++) {
563     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
564     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
565     if (obj->isLocMgr()) 
566       ((CkLocMgr *)obj)->doneInserting();
567   }
568   
569   inRestarting = 0;
570
571   if (CkMyPe() == 0)
572   {
573        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
574        CkStartQD(cpCallback);
575        if (CkNumPes()-totalFailed() <=2) {
576          if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
577          _memChkptOn = 0;
578        }
579   } 
580 }
581
582 // called only on 0
583 void CkMemCheckPT::quiescence(CkCallback &cb)
584 {
585   static int pe_count = 0;
586   pe_count ++;
587   CmiAssert(CkMyPe() == 0);
588 //  CkPrintf("quiescence %d\n", pe_count);
589   if (pe_count == CkNumPes()) {
590     pe_count = 0;
591     cb.send();
592   }
593 }
594
595 // function called by user to start a check point
596 // callback cb is used to pass control back
597 void CkStartMemCheckpoint(CkCallback &cb)
598 {
599 #if CMK_MEM_CHECKPOINT
600   if (_memChkptOn == 0) {
601     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
602     cb.send();
603     return;
604   }
605     // store user callback and user data
606   CkMemCheckPT::cpCallback = cb;
607
608     // broadcast to start check pointing
609   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
610   checkptMgr.doItNow(CkMyPe(), cb);
611 #else
612   // when mem checkpoint is not enabled, invike cb immediately
613   cb.send();
614 #endif
615 }
616
617 void CkRestartCheckPoint(int diePe)
618 {
619 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
620   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
621   // broadcast
622   checkptMgr.restart(diePe);
623 }
624
625 static int _diePE = -1;
626
627 // callback function used locally by ccs handler
628 static void CkRestartCheckPointCallback(void *ignore, void *msg)
629 {
630 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
631   CkRestartCheckPoint(_diePE);
632 }
633
634 static int askProcDataHandlerIdx;
635 static int restartBcastHandlerIdx;
636 static int recoverProcDataHandlerIdx;
637 static int restartBeginHandlerIdx;
638
639 static void restartBeginHandler(char *msg)
640 {
641 #if CMK_MEM_CHECKPOINT
642   static int count = 0;
643   CmiFree(msg);
644   CmiAssert(CkMyPe() == _diePE);
645   count ++;
646   if (count == CkNumPes()) {
647     CkRestartCheckPointCallback(NULL, NULL);
648     count = 0;
649   }
650 #endif
651 }
652
653 static void restartBcastHandler(char *msg)
654 {
655 #if CMK_MEM_CHECKPOINT
656   // advance phase counter
657   CkMemCheckPT::inRestarting = 1;
658   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
659   // gzheng
660   if (CkMyPe() != _diePE) cur_restart_phase ++;
661
662   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
663
664   // reset QD counters
665   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
666
667 /*  gzheng
668   if (CkMyPe()==_diePE)
669       CkRestartCheckPointCallback(NULL, NULL);
670 */
671   CmiFree(msg);
672
673   char restartmsg[CmiMsgHeaderSizeBytes];
674   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
675   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
676 #endif
677 }
678
679 extern void _initDone();
680
681 // called on crashed processor
682 static void recoverProcDataHandler(char *msg)
683 {
684 #if CMK_MEM_CHECKPOINT
685    int i;
686    envelope *env = (envelope *)msg;
687    CkUnpackMessage(&env);
688    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
689    cur_restart_phase = procMsg->cur_restart_phase;
690    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
691    cur_restart_phase ++;
692    CpvAccess(_qd)->flushStates();
693
694    // restore readonly, mainchare, group, nodegroup
695 //   int temp = cur_restart_phase;
696 //   cur_restart_phase = -1;
697    PUP::fromMem p(procMsg->packData);
698    _handleProcData(p);
699
700    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
701    // gzheng
702    CKLOCMGR_LOOP(mgr->startInserting(););
703 //   cur_restart_phase = temp;
704
705    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
706    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
707    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
708    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
709    CmiFree(msg);
710
711    _initDone();
712 #endif
713 }
714
715 // called on its backup processor
716 // get backup message buffer and sent to crashed processor
717 static void askProcDataHandler(char *msg)
718 {
719 #if CMK_MEM_CHECKPOINT
720     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
721     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
722     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
723     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
724     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
725
726     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
727
728     CkPackMessage(&env);
729     CmiSetHandler(env, recoverProcDataHandlerIdx);
730     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
731     CpvAccess(procChkptBuf) = NULL;
732 #endif
733 }
734
735 void CkMemRestart(const char *dummy)
736 {
737 #if CMK_MEM_CHECKPOINT
738    _diePE = CkMyPe();
739    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
740    CkMemCheckPT::startTime = CmiWallTimer();
741    CkMemCheckPT::inRestarting = 1;
742    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
743    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
744    cur_restart_phase = 9999;             // big enough to get it processed
745    CmiSetHandler(msg, askProcDataHandlerIdx);
746    int pe = ChkptOnPe();
747    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
748    cur_restart_phase=0;    // allow all message to come in
749 #else
750    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'ft' option");
751 #endif
752 }
753
754 // can be called in other files
755 // return true if it is in restarting
756 int CkInRestarting()
757 {
758 #if CMK_MEM_CHECKPOINT
759   // gzheng
760   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
761   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
762   return CkMemCheckPT::inRestarting;
763 #else
764   return 0;
765 #endif
766 }
767
768 /*****************************************************************************
769                 module initialization
770 *****************************************************************************/
771
772 class CkMemCheckPTInit: public Chare {
773 public:
774   CkMemCheckPTInit(CkArgMsg *m) {
775 #if CMK_MEM_CHECKPOINT
776     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew();
777     CkPrintf("CkMemCheckPTInit main chare created!\n");
778 #endif
779   }
780 };
781
782 // initproc
783 void CkRegisterRestartHandler( )
784 {
785 #if CMK_MEM_CHECKPOINT
786   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
787   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
788   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
789   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
790
791   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
792   CpvAccess(procChkptBuf) = NULL;
793
794 #if 1
795   // for debugging
796   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
797 //  sleep(4);
798 #endif
799 #endif
800 }
801
802 #include "CkMemCheckpoint.def.h"
803
804