many greate changes. Restructured the checkpoint table to have separate class of...
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To ensure fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element.
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume at a time only one failure happens,
17 and there is no failure during a checkpointing or restarting phase.
18
19 Restart phase contains two steps:
20 1. Converse level restart where only the newly created process for the failed
21    processor is working on restoring the system data (except array elements)
22    from its backup processor.
23 2. CkMemCheckPT gets control and recover array elements and reset all
24    states to be consistent.
25
26 added 3/14/04:
27
28 1. also support for double in-disk checkpoint/restart
29    use macro CK_USE_MEM and CK_USE_DISK to switch.
30
31 TODO:
32  checkpoint scheme can be reimplemented based on per processor scheme;
33  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
34
35 */
36
37 #include "charm++.h"
38 #include "ck.h"
39 #include "register.h"
40 #include "conv-ccs.h"
41
42 #define DEBUGF     // CkPrintf
43
44 // choose to use in-memory or in-disk checkpointing
45 #define CK_USE_MEM                              1
46 #define CK_USE_DISK                             0       
47
48 int CkMemCheckPT::inRestarting = 0;
49 double CkMemCheckPT::startTime;
50 char *CkMemCheckPT::stage;
51
52 int _memChkptOn = 1;
53
54 CkGroupID ckCheckPTGroupID;             // readonly
55
56 CkCallback CkMemCheckPT::cpCallback;    // static
57
58 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
59
60 // compute the backup processor
61 // FIXME: avoid crashed processors
62 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
63
64 // called in array element constructor
65 // choose and register with 2 buggies for checkpoiting 
66 #if CMK_MEM_CHECKPOINT
67 void ArrayElement::init_checkpt() {
68         if (_memChkptOn == 0) return;
69         // CmiPrintf("[%d] ArrayElement::init_checkpt %d\n", CkMyPe(), info.fromMigration);
70         // only master init checkpoint
71         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
72
73 //        budPEs[0] = (CkMyPe()-1+CkNumPes())%CkNumPes();
74         budPEs[0] = CkMyPe();
75         budPEs[1] = (CkMyPe()+1)%CkNumPes();
76         CmiAssert(budPEs[0] != budPEs[1]);
77         // inform checkPTMgr
78         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
79         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
80         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
81 }
82 #endif
83
84 // entry function invoked by checkpoint mgr asking for checkpoint data
85 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
86 #if CMK_MEM_CHECKPOINT
87   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
88   CkLocMgr *locMgr = thisArray->getLocMgr();
89   CmiAssert(myRec!=NULL);
90   int size;
91   {
92         PUP::sizer p;
93         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
94         size = p.size();
95   }
96   int packSize = size/sizeof(double) +1;
97   CkArrayCheckPTMessage *msg =
98                  new (packSize, 0) CkArrayCheckPTMessage;
99   msg->len = size;
100   msg->index =thisIndexMax;
101   msg->aid = thisArrayID;
102   msg->locMgr = locMgr->getGroupID();
103   msg->cp_flag = 1;
104   {
105         PUP::toMem p(msg->packData);
106         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
107   }
108
109   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
110   checkptMgr.recvData(msg, 2, budPEs);
111   delete m;
112 #endif
113 }
114
115 // class for in memory checkpointing
116 class CkMemCheckPTInfo: public CkCheckPTInfo
117 {
118   CkArrayCheckPTMessage *ckBuffer;
119 public:
120   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
121                     CkCheckPTInfo(a, loc, idx, pno)
122   {
123     ckBuffer = NULL;
124   }
125   ~CkMemCheckPTInfo() 
126   {
127     if (ckBuffer) delete ckBuffer; 
128   }
129   inline void updateBuffer(CkArrayCheckPTMessage *data) 
130   {
131     if (ckBuffer) delete ckBuffer;
132     ckBuffer = data;
133   }    
134   inline CkArrayCheckPTMessage * getCopy()
135   {
136     if (ckBuffer == NULL) {
137       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
138       CmiAbort("Abort!");
139     }
140     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
141   }     
142   inline void updateBuddy(int b1, int b2) {
143      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
144   }
145   inline int getSize() { 
146      CmiAssert(ckBuffer);
147      return ckBuffer->len; 
148   }
149 };
150
151 // class for in-disk checkpointing
152 class CkDiskCheckPTInfo: public CkCheckPTInfo 
153 {
154   char fname[64];
155   int bud1, bud2;
156   int len;                      // checkpoint size
157 public:
158   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
159   {
160     sprintf(fname, "/tmp/ckpt%d.%d", getpid(), myidx);
161     bud1 = bud2 = -1;
162     len = 0;
163   }
164   ~CkDiskCheckPTInfo() 
165   {
166     remove(fname);
167   }
168   inline void updateBuffer(CkArrayCheckPTMessage *data) 
169   {
170     double t = CmiWallTimer();
171     FILE *f = fopen(fname,"wb");
172     PUP::toDisk p(f);
173     CkPupMessage(p, (void **)&data);
174     // delay sync to the end because otherwise the messages are delayed
175 //    fsync(fileno(f));
176     fclose(f);
177     bud1 = data->bud1;
178     bud2 = data->bud2;
179     len = data->len;
180     delete data;
181     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
182   }
183   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
184   {
185     CkArrayCheckPTMessage *data;
186     FILE *f = fopen(fname,"rb");
187     PUP::fromDisk p(f);
188     CkPupMessage(p, (void **)&data);
189     fclose(f);
190     data->bud1 = bud1;                          // update the buddies
191     data->bud2 = bud2;
192     return data;
193   }
194   inline void updateBuddy(int b1, int b2) {
195      bud1 = b1; bud2 = b2;
196   }
197   inline int getSize() { 
198      return len; 
199   }
200 };
201
202 CkMemCheckPT::CkMemCheckPT()
203 {
204   if (CkNumPes() <= 2) {
205     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
206     _memChkptOn = 0;
207   }
208   inRestarting = 0;
209   recvCount = peCount = 0;
210 }
211
212 CkMemCheckPT::~CkMemCheckPT()
213 {
214   int len = ckTable.length();
215   for (int i=0; i<len; i++) {
216     delete ckTable[i];
217   }
218 }
219
220 void CkMemCheckPT::pup(PUP::er& p) 
221
222   CBase_CkMemCheckPT::pup(p); 
223   p|cpStarter;
224   p|thisFailedPe;
225   p|failedPes;
226   p|ckCheckPTGroupID;           // recover global variable
227   p|cpCallback;                 // store callback
228   if (p.isUnpacking()) {
229     recvCount = peCount = 0;
230   }
231 }
232
233 // called by checkpoint mgr to restore an array element
234 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
235 {
236 #if CMK_MEM_CHECKPOINT
237   //DEBUGF("[%d] inmem_restore restore", CmiMyPe());  m->index.print();
238 //  CmiPrintf("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  m->index.print();
239   PUP::fromMem p(m->packData);
240   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
241   mgr->resume(m->index, p);
242
243   // find a list of array elements bound together
244   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
245   CkLocRec_local *rec = elt->myRec;
246   CkVec<CkMigratable *> list;
247   mgr->migratableList(rec, list);
248   CmiAssert(list.length() > 0);
249   for (int l=0; l<list.length(); l++) {
250     elt = (ArrayElement *)list[l];
251     elt->budPEs[0] = m->bud1;
252     elt->budPEs[1] = m->bud2;
253     //    reset, may not needed now
254     // for now.
255     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
256       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
257       if (c) c->redNo = 0;
258     }
259   }
260 #endif
261 }
262
263 // return 1 is pe was a crashed processor
264 int CkMemCheckPT::isFailed(int pe)
265 {
266   for (int i=0; i<failedPes.length(); i++)
267     if (failedPes[i] == pe) return 1;
268   return 0;
269 }
270
271 // add pe into history list of all failed processors
272 void CkMemCheckPT::failed(int pe)
273 {
274   if (isFailed(pe)) return;
275   failedPes.push_back(pe);
276 }
277
278 int CkMemCheckPT::totalFailed()
279 {
280   return failedPes.length();
281 }
282
283 // create an checkpoint entry for array element of aid with index.
284 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
285 {
286   // error check, no duplicate
287   int idx, len = ckTable.size();
288   for (idx=0; idx<len; idx++) {
289     CkCheckPTInfo *entry = ckTable[idx];
290     if (index == entry->index) {
291       if (aid == entry->aid) {
292         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry. \n", CkMyPe());
293         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
294       }
295       if (loc == entry->locMgr) {
296         // bindTo array elements
297         return;
298       }
299     }
300   }
301 #if CK_USE_MEM
302   CkMemCheckPTInfo *newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
303 #else
304   CkDiskCheckPTInfo *newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
305 #endif
306   ckTable.push_back(newEntry);
307 }
308
309 // loop through my checkpoint table and ask checkpointed array elements
310 // to send me checkpoint data.
311 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
312 {
313   cpCallback = cb;
314   cpStarter = starter;
315   if (CkMyPe() == cpStarter) {
316     startTime = CmiWallTimer();
317     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
318   }
319
320 //  if (iFailed()) return;
321   int len = ckTable.length();
322   for (int i=0; i<len; i++) {
323     CkCheckPTInfo *entry = ckTable[i];
324       // always let the bigger number processor send request
325     if (CkMyPe() < entry->pNo) continue;
326       // call inmem_checkpoint to the array element, ask it to send
327       // back checkpoint data via recvData().
328     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
329     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
330   }
331     // if my table is empty, then I am done
332   if (len == 0) thisProxy[cpStarter].cpFinish();
333
334   // pack and send proc level data
335   sendProcData();
336 }
337
338 // don't handle array elements
339 static inline void _handleProcData(PUP::er &p)
340 {
341     // save readonlys, and callback BTW
342     CkPupROData(p);
343
344     // save mainchares into MainChares.dat
345     if(CkMyPe()==0) CkPupMainChareData(p);
346         
347     // save groups into Groups.dat
348     CkPupGroupData(p);
349
350     // save nodegroups into NodeGroups.dat
351     if(CkMyRank()==0) CkPupNodeGroupData(p);
352 }
353
354 void CkMemCheckPT::sendProcData()
355 {
356   // find out size of buffer
357   int size;
358   {
359     PUP::sizer p;
360     _handleProcData(p);
361     size = p.size();
362   }
363   int packSize = size;
364   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
365   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
366   {
367     PUP::toMem p(msg->packData);
368     _handleProcData(p);
369   }
370   msg->pe = CkMyPe();
371   msg->len = size;
372   msg->reportPe = cpStarter;    // in case other processor is not in checkpoint mode
373   thisProxy[ChkptOnPe()].recvProcData(msg);
374 }
375
376 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
377 {
378   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
379   CpvAccess(procChkptBuf) = msg;
380 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
381   thisProxy[msg->reportPe].cpFinish();
382 }
383
384 // ArrayElement call this function to give us the checkpointed data
385 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
386 {
387   int len = ckTable.length();
388   int idx;
389   for (idx=0; idx<len; idx++) {
390     CkCheckPTInfo *entry = ckTable[idx];
391     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
392   }
393   CkAssert(idx < len);
394   int isChkpting = msg->cp_flag;
395   ckTable[idx]->updateBuffer(msg);
396     // all my array elements have returned their inmem data
397     // inform starter processor that I am done.
398   if (isChkpting) {
399     recvCount ++;
400     if (recvCount == ckTable.length()) {
401 #if CK_USE_MEM
402       thisProxy[cpStarter].cpFinish();
403 #else
404       // another barrier for finalize the writing using fsync
405       CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
406       contribute(0,NULL,CkReduction::sum_int,localcb);
407 #endif
408       recvCount = 0;
409     } 
410   }
411 }
412
413 // only used in disk
414 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
415 {
416   delete m;
417   system("sync");
418   thisProxy[cpStarter].cpFinish();
419 }
420
421 // only is called on cpStarter when checkpoint is done
422 void CkMemCheckPT::cpFinish()
423 {
424   CmiAssert(CkMyPe() == cpStarter);
425   peCount++;
426     // now all processors have finished, activate callback
427 //CmiPrintf("peCount:%d\n",peCount);
428   if (peCount == 2*(CkNumPes())) {
429     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
430     cpCallback.send();
431     peCount = 0;
432     thisProxy.report();
433   }
434 }
435
436 void CkMemCheckPT::report()
437 {
438   int objsize = 0;
439   int len = ckTable.length();
440   for (int i=0; i<len; i++) {
441     CkCheckPTInfo *entry = ckTable[i];
442     CmiAssert(entry);
443     objsize += entry->getSize();
444   }
445   CmiAssert(CpvAccess(procChkptBuf));
446   CkPrintf("[%d] Checkpointed Object size: %d Processor data: %d\n", CkMyPe(), objsize, CpvAccess(procChkptBuf)->len);
447 }
448
449 /*****************************************************************************
450                         RESTART Procedure
451 *****************************************************************************/
452
453 inline int CkMemCheckPT::isMaster(int pe)
454 {
455   int mype = CkMyPe(); 
456 //CkPrintf("ismaster: %d %d\n", pe, mype);
457   for (int i=1; i<CkNumPes(); i++) {
458     int me = (pe+i)%CkNumPes();
459     if (isFailed(me)) continue;
460     if (me == mype) return 1;
461     else return 0;
462   }
463   return 0;
464 }
465
466 // loop over all CkLocMgr and do "code"
467 #define  CKLOCMGR_LOOP(code)    {       \
468   int numGroups = CkpvAccess(_groupIDTable)->size();    \
469   for(int i=0;i<numGroups;i++) {        \
470     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
471     if(obj->isLocMgr())  {      \
472       CkLocMgr *mgr = (CkLocMgr*)obj;   \
473       code      \
474     }   \
475   }     \
476  }
477
478 #if 0
479 // helper class to pup all elements that belong to same ckLocMgr
480 class ElementDestoryer : public CkLocIterator {
481 private:
482         CkLocMgr *locMgr;
483 public:
484         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
485         void addLocation(CkLocation &loc) {
486                 CkArrayIndexMax idx=loc.getIndex();
487                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
488                 loc.destroy();
489         }
490 };
491 #endif
492
493 // restore the bitmap vector for LB
494 void CkMemCheckPT::resetLB(int diepe)
495 {
496 #if CMK_LBDB_ON
497   int i;
498   char *bitmap = new char[CkNumPes()];
499   // set processor available bitmap
500   get_avail_vector(bitmap);
501
502   for (i=0; i<failedPes.length(); i++)
503     bitmap[failedPes[i]] = 0; 
504   bitmap[diepe] = 0;
505   set_avail_vector(bitmap);
506
507   // if I am the crashed pe, rebuild my failedPEs array
508   if (CkMyPe() == diepe)
509     for (i=0; i<CkNumPes(); i++) 
510       if (bitmap[i]==0) failed(i);
511
512   delete [] bitmap;
513 #endif
514 }
515
516 // in case when failedPe dies, everybody go through its check point table:
517 // destory all array elements
518 // recover lost buddies
519 // reconstruct all array elements from check point data
520 // called on all processors
521 void CkMemCheckPT::restart(int diePe)
522 {
523 #if CMK_MEM_CHECKPOINT
524   double curTime = CmiWallTimer();
525   if (CkMyPe() == diePe)
526     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
527   stage = "resetLB";
528   startTime = curTime;
529   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
530
531   failed(diePe);        // add into the list of failed pes
532   thisFailedPe = diePe;
533
534   // clean array chkpt table
535   if (CkMyPe() == diePe) ckTable.length() = 0;
536
537   inRestarting = 1;
538                                                                                 
539   // disable load balancer's barrier
540   if (CkMyPe() != diePe) resetLB(diePe);
541
542   CKLOCMGR_LOOP(mgr->startInserting(););
543
544   // afterwards, the QD detection should work again
545   //if (CkMyPe() == 0)
546   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
547   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
548 #endif
549 }
550
551 void CkMemCheckPT::removeArrayElements()
552 {
553 #if CMK_MEM_CHECKPOINT
554   int len = ckTable.length();
555   double curTime = CmiWallTimer();
556   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
557   stage = "removeArrayElements";
558   startTime = curTime;
559
560   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
561   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
562
563   // get rid of all buffering and remote recs
564   CKLOCMGR_LOOP(mgr->flushAllRecs(););
565
566 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
567
568 #if 0
569   // first phase: destroy all existing array elements
570   for (int idx=0; idx<len; idx++) {
571     CkMemCheckPTInfo *entry = ckTable[idx];
572     // the bigger number PE do the destory
573     if (CkMyPe() < entry->pNo && entry->pNo != thisFailedPe) continue;
574     CkArrayMessage *msg = (CkArrayMessage *)CkAllocSysMsg();
575     msg->array_setIfNotThere(CkArray_IfNotThere_buffer);
576     CkSendMsgArray(CkIndex_ArrayElement::ckDestroy(),msg,entry->aid,entry->index);
577     //CkCallback cb(CkIndex_ArrayElement::ckDestroy(), entry->index, entry->aid);
578     //cb.send(msg);
579 CkPrintf("[%d] Destory: ", CkMyPe()); entry->index.print();
580   }
581 #endif
582
583   //if (CkMyPe() == 0)
584   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
585   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
586 #endif
587 }
588
589 // flush state in reduction manager
590 void CkMemCheckPT::resetReductionMgr()
591 {
592   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
593   int numGroups = CkpvAccess(_groupIDTable)->size();
594   for(int i=0;i<numGroups;i++) {
595     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
596     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
597     obj->flushStates();
598     obj->ckJustMigrated();
599   }
600   // reset again
601   //CpvAccess(_qd)->flushStates();
602
603   //if (CkMyPe() == 0)
604   //  CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
605   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
606 }
607
608 // recover the lost buddies
609 void CkMemCheckPT::recoverBuddies()
610 {
611   int idx;
612   int len = ckTable.length();
613   // ready to flush reduction manager
614   // cannot be CkMemCheckPT::restart because destory will modify states
615   double curTime = CmiWallTimer();
616   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
617   stage = "recoverBuddies";
618   startTime = curTime;
619
620   //if (iFailed()) return;   ??????
621
622   // recover buddies
623   for (idx=0; idx<len; idx++) {
624     CkCheckPTInfo *entry = ckTable[idx];
625     if (entry->pNo == thisFailedPe) {
626       int budPe = CkMyPe();
627 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
628       while (budPe == CkMyPe() || isFailed(budPe)) budPe = (budPe+1)%CkNumPes();
629       entry->pNo = budPe;
630       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
631       CkArrayCheckPTMessage *msg = entry->getCopy();
632       msg->cp_flag = 0;            // not checkpointing
633       thisProxy[budPe].recvData(msg);
634     }
635   }
636
637   if (CkMyPe() == 0)
638     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
639 }
640
641 // restore 
642 void CkMemCheckPT::recoverArrayElements()
643 {
644   double curTime = CmiWallTimer();
645   CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, curTime-startTime);
646   stage = "recoverArrayElements";
647   startTime = curTime;
648   //if (iFailed()) return;
649
650   // recover all array elements
651   int count = 0;
652   int len = ckTable.length();
653   for (int idx=0; idx<len; idx++)
654   {
655     CkCheckPTInfo *entry = ckTable[idx];
656     // the bigger one will do 
657 //    if (CkMyPe() < entry->pNo) continue;
658     if (!isMaster(entry->pNo)) continue;
659 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
660     entry->updateBuddy(CkMyPe(), entry->pNo);
661     CkArrayCheckPTMessage *msg = entry->getCopy();
662     // gzheng
663     //checkptMgr[CkMyPe()].inmem_restore(msg);
664     inmem_restore(msg);
665     count ++;
666   }
667   //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
668
669   if (CkMyPe() == 0)
670     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
671 }
672
673 // on every processor
674 // turn load balancer back on
675 void CkMemCheckPT::finishUp()
676 {
677   int i;
678   int numGroups = CkpvAccess(_groupIDTable)->size();
679   for(i=0;i<numGroups;i++) {
680     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
681     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
682     if (obj->isLocMgr()) 
683       ((CkLocMgr *)obj)->doneInserting();
684   }
685   
686   inRestarting = 0;
687
688   if (CkMyPe() == 0)
689   {
690        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
691        CkStartQD(cpCallback);
692        if (CkNumPes()-totalFailed() <=2) {
693          if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
694          _memChkptOn = 0;
695        }
696   } 
697 }
698
699 // called only on 0
700 void CkMemCheckPT::quiescence(CkCallback &cb)
701 {
702   static int pe_count = 0;
703   pe_count ++;
704   CmiAssert(CkMyPe() == 0);
705 //  CkPrintf("quiescence %d\n", pe_count);
706   if (pe_count == CkNumPes()) {
707     pe_count = 0;
708     cb.send();
709   }
710 }
711
712 // function called by user to start a check point
713 // callback cb is used to pass control back
714 void CkStartMemCheckpoint(CkCallback &cb)
715 {
716 #if CMK_MEM_CHECKPOINT
717   if (_memChkptOn == 0) {
718     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
719     cb.send();
720     return;
721   }
722     // store user callback and user data
723   CkMemCheckPT::cpCallback = cb;
724
725     // broadcast to start check pointing
726   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
727   checkptMgr.doItNow(CkMyPe(), cb);
728 #else
729   // when mem checkpoint is not enabled, invike cb immediately
730   cb.send();
731 #endif
732 }
733
734 void CkRestartCheckPoint(int diePe)
735 {
736 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
737   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
738   // broadcast
739   checkptMgr.restart(diePe);
740 }
741
742 static int _diePE = -1;
743
744 // callback function used locally by ccs handler
745 static void CkRestartCheckPointCallback(void *ignore, void *msg)
746 {
747 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
748   CkRestartCheckPoint(_diePE);
749 }
750
751 static int askProcDataHandlerIdx;
752 static int restartBcastHandlerIdx;
753 static int recoverProcDataHandlerIdx;
754 static int restartBeginHandlerIdx;
755
756 static void restartBeginHandler(char *msg)
757 {
758 #if CMK_MEM_CHECKPOINT
759   static int count = 0;
760   CmiFree(msg);
761   CmiAssert(CkMyPe() == _diePE);
762   count ++;
763   if (count == CkNumPes()) {
764     CkRestartCheckPointCallback(NULL, NULL);
765     count = 0;
766   }
767 #endif
768 }
769
770 static void restartBcastHandler(char *msg)
771 {
772 #if CMK_MEM_CHECKPOINT
773   // advance phase counter
774   CkMemCheckPT::inRestarting = 1;
775   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
776   // gzheng
777   if (CkMyPe() != _diePE) cur_restart_phase ++;
778
779   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
780
781   // reset QD counters
782   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
783
784 /*  gzheng
785   if (CkMyPe()==_diePE)
786       CkRestartCheckPointCallback(NULL, NULL);
787 */
788   CmiFree(msg);
789
790   char restartmsg[CmiMsgHeaderSizeBytes];
791   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
792   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
793 #endif
794 }
795
796 extern void _initDone();
797
798 // called on crashed processor
799 static void recoverProcDataHandler(char *msg)
800 {
801 #if CMK_MEM_CHECKPOINT
802    int i;
803    envelope *env = (envelope *)msg;
804    CkUnpackMessage(&env);
805    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
806    cur_restart_phase = procMsg->cur_restart_phase;
807    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
808    cur_restart_phase ++;
809    CpvAccess(_qd)->flushStates();
810
811    // restore readonly, mainchare, group, nodegroup
812 //   int temp = cur_restart_phase;
813 //   cur_restart_phase = -1;
814    PUP::fromMem p(procMsg->packData);
815    _handleProcData(p);
816
817    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
818    // gzheng
819    CKLOCMGR_LOOP(mgr->startInserting(););
820 //   cur_restart_phase = temp;
821
822    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
823    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
824    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
825    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
826    CmiFree(msg);
827
828    _initDone();
829 #endif
830 }
831
832 // called on its backup processor
833 // get backup message buffer and sent to crashed processor
834 static void askProcDataHandler(char *msg)
835 {
836 #if CMK_MEM_CHECKPOINT
837     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
838     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
839     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
840     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
841     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
842
843     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
844
845     CkPackMessage(&env);
846     CmiSetHandler(env, recoverProcDataHandlerIdx);
847     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
848     CpvAccess(procChkptBuf) = NULL;
849 #endif
850 }
851
852 void CkMemRestart(const char *dummy)
853 {
854 #if CMK_MEM_CHECKPOINT
855    _diePE = CkMyPe();
856    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
857    CkMemCheckPT::startTime = CmiWallTimer();
858    CkMemCheckPT::inRestarting = 1;
859    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
860    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
861    cur_restart_phase = 9999;             // big enough to get it processed
862    CmiSetHandler(msg, askProcDataHandlerIdx);
863    int pe = ChkptOnPe();
864    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
865    cur_restart_phase=0;    // allow all message to come in
866 #else
867    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'ft' option");
868 #endif
869 }
870
871 // can be called in other files
872 // return true if it is in restarting
873 int CkInRestarting()
874 {
875 #if CMK_MEM_CHECKPOINT
876   // gzheng
877   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
878   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
879   return CkMemCheckPT::inRestarting;
880 #else
881   return 0;
882 #endif
883 }
884
885 /*****************************************************************************
886                 module initialization
887 *****************************************************************************/
888
889 class CkMemCheckPTInit: public Chare {
890 public:
891   CkMemCheckPTInit(CkArgMsg *m) {
892 #if CMK_MEM_CHECKPOINT
893     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew();
894     CkPrintf("CkMemCheckPTInit main chare created!\n");
895 #endif
896   }
897 };
898
899 // initproc
900 void CkRegisterRestartHandler( )
901 {
902 #if CMK_MEM_CHECKPOINT
903   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
904   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
905   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
906   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
907
908   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
909   CpvAccess(procChkptBuf) = NULL;
910
911 #if 1
912   // for debugging
913   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
914 //  sleep(4);
915 #endif
916 #endif
917 }
918
919 #include "CkMemCheckpoint.def.h"
920
921