tweaked a little on finding a buddy processor
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        1
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96   do {
97     budnode = (budnode+1)%CmiNumPhysicalNodes();
98     budpe = (CmiGetFirstPeOnPhysicalNode(budnode) + r1 % CmiNumPesOnPhysicalNode(budnode)) % CmiNumPes();
99   } while (isFailed(budpe));
100   if (budpe == pe) {
101     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
102     CmiAbort("Failed to a buddy processor");
103   }
104 #else
105   budpe = pe;
106   while (budpe == pe || isFailed(budPe)) 
107           budPe = (budPe+1)%CkNumPes();
108 #endif
109   return budpe;
110 }
111
112 // called in array element constructor
113 // choose and register with 2 buddies for checkpoiting 
114 #if CMK_MEM_CHECKPOINT
115 void ArrayElement::init_checkpt() {
116         if (_memChkptOn == 0) return;
117         // only master init checkpoint
118         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
119
120         budPEs[0] = CkMyPe();
121         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
122         CmiAssert(budPEs[0] != budPEs[1]);
123         // inform checkPTMgr
124         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
125         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
126         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
127         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
128 }
129 #endif
130
131 // entry function invoked by checkpoint mgr asking for checkpoint data
132 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
133 #if CMK_MEM_CHECKPOINT
134   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
135   CkLocMgr *locMgr = thisArray->getLocMgr();
136   CmiAssert(myRec!=NULL);
137   int size;
138   {
139         PUP::sizer p;
140         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
141         size = p.size();
142   }
143   int packSize = size/sizeof(double) +1;
144   CkArrayCheckPTMessage *msg =
145                  new (packSize, 0) CkArrayCheckPTMessage;
146   msg->len = size;
147   msg->index =thisIndexMax;
148   msg->aid = thisArrayID;
149   msg->locMgr = locMgr->getGroupID();
150   msg->cp_flag = 1;
151   {
152         PUP::toMem p(msg->packData);
153         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
154   }
155
156   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
157   checkptMgr.recvData(msg, 2, budPEs);
158   delete m;
159 #endif
160 }
161
162 // checkpoint holder class - for memory checkpointing
163 class CkMemCheckPTInfo: public CkCheckPTInfo
164 {
165   CkArrayCheckPTMessage *ckBuffer;
166 public:
167   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
168                     CkCheckPTInfo(a, loc, idx, pno)
169   {
170     ckBuffer = NULL;
171   }
172   ~CkMemCheckPTInfo() 
173   {
174     if (ckBuffer) delete ckBuffer; 
175   }
176   inline void updateBuffer(CkArrayCheckPTMessage *data) 
177   {
178     CmiAssert(data!=NULL);
179     if (ckBuffer) delete ckBuffer;
180     ckBuffer = data;
181   }    
182   inline CkArrayCheckPTMessage * getCopy()
183   {
184     if (ckBuffer == NULL) {
185       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
186       CmiAbort("Abort!");
187     }
188     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
189   }     
190   inline void updateBuddy(int b1, int b2) {
191      CmiAssert(ckBuffer);
192      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
193   }
194   inline int getSize() { 
195      CmiAssert(ckBuffer);
196      return ckBuffer->len; 
197   }
198 };
199
200 // checkpoint holder class - for in-disk checkpointing
201 class CkDiskCheckPTInfo: public CkCheckPTInfo 
202 {
203   char *fname;
204   int bud1, bud2;
205   int len;                      // checkpoint size
206 public:
207   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
208   {
209 #if CMK_USE_MKSTEMP
210     fname = new char[64];
211     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
212     mkstemp(fname);
213 #else
214     fname=tmpnam(NULL);
215 #endif
216     bud1 = bud2 = -1;
217     len = 0;
218   }
219   ~CkDiskCheckPTInfo() 
220   {
221     remove(fname);
222   }
223   inline void updateBuffer(CkArrayCheckPTMessage *data) 
224   {
225     double t = CmiWallTimer();
226     // unpack it
227     envelope *env = UsrToEnv(data);
228     CkUnpackMessage(&env);
229     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
230     FILE *f = fopen(fname,"wb");
231     PUP::toDisk p(f);
232     CkPupMessage(p, (void **)&data);
233     // delay sync to the end because otherwise the messages are blocked
234 //    fsync(fileno(f));
235     fclose(f);
236     bud1 = data->bud1;
237     bud2 = data->bud2;
238     len = data->len;
239     delete data;
240     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
241   }
242   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
243   {
244     CkArrayCheckPTMessage *data;
245     FILE *f = fopen(fname,"rb");
246     PUP::fromDisk p(f);
247     CkPupMessage(p, (void **)&data);
248     fclose(f);
249     data->bud1 = bud1;                          // update the buddies
250     data->bud2 = bud2;
251     return data;
252   }
253   inline void updateBuddy(int b1, int b2) {
254      bud1 = b1; bud2 = b2;
255   }
256   inline int getSize() { 
257      return len; 
258   }
259 };
260
261 CkMemCheckPT::CkMemCheckPT(int w)
262 {
263 #if CK_NO_PROC_POOL
264   if (CkNumPes() <= 2) {
265 #else
266   if (CkNumPes()  == 1) {
267 #endif
268     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
269     _memChkptOn = 0;
270   }
271   inRestarting = 0;
272   recvCount = peCount = 0;
273   where = w;
274 }
275
276 CkMemCheckPT::~CkMemCheckPT()
277 {
278   int len = ckTable.length();
279   for (int i=0; i<len; i++) {
280     delete ckTable[i];
281   }
282 }
283
284 void CkMemCheckPT::pup(PUP::er& p) 
285
286   CBase_CkMemCheckPT::pup(p); 
287   p|cpStarter;
288   p|thisFailedPe;
289   p|failedPes;
290   p|ckCheckPTGroupID;           // recover global variable
291   p|cpCallback;                 // store callback
292   p|where;                      // where to checkpoint
293   if (p.isUnpacking()) {
294     recvCount = peCount = 0;
295   }
296 }
297
298 // called by checkpoint mgr to restore an array element
299 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
300 {
301 #if CMK_MEM_CHECKPOINT
302   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
303   // m->index.print();
304   PUP::fromMem p(m->packData);
305   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
306   CmiAssert(mgr);
307   mgr->resume(m->index, p);
308
309   // find a list of array elements bound together
310   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
311   CmiAssert(elt);
312   CkLocRec_local *rec = elt->myRec;
313   CkVec<CkMigratable *> list;
314   mgr->migratableList(rec, list);
315   CmiAssert(list.length() > 0);
316   for (int l=0; l<list.length(); l++) {
317     elt = (ArrayElement *)list[l];
318     elt->budPEs[0] = m->bud1;
319     elt->budPEs[1] = m->bud2;
320     //    reset, may not needed now
321     // for now.
322     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
323       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
324       if (c) c->redNo = 0;
325     }
326   }
327 #endif
328 }
329
330 // return 1 if pe is a crashed processor
331 int CkMemCheckPT::isFailed(int pe)
332 {
333   for (int i=0; i<failedPes.length(); i++)
334     if (failedPes[i] == pe) return 1;
335   return 0;
336 }
337
338 // add pe into history list of all failed processors
339 void CkMemCheckPT::failed(int pe)
340 {
341   if (isFailed(pe)) return;
342   failedPes.push_back(pe);
343 }
344
345 int CkMemCheckPT::totalFailed()
346 {
347   return failedPes.length();
348 }
349
350 // create an checkpoint entry for array element of aid with index.
351 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
352 {
353   // error check, no duplicate
354   int idx, len = ckTable.size();
355   for (idx=0; idx<len; idx++) {
356     CkCheckPTInfo *entry = ckTable[idx];
357     if (index == entry->index) {
358       if (loc == entry->locMgr) {
359           // bindTo array elements
360           return;
361       }
362         // for array inheritance, the following check may fail
363         // because ArrayElement constructor of all superclasses are called
364       if (aid == entry->aid) {
365         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
366         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
367       }
368     }
369   }
370   CkCheckPTInfo *newEntry;
371   if (where == CkCheckPoint_inMEM)
372     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
373   else
374     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
375   ckTable.push_back(newEntry);
376   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
377 }
378
379 // loop through my checkpoint table and ask checkpointed array elements
380 // to send me checkpoint data.
381 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
382 {
383   cpCallback = cb;
384   cpStarter = starter;
385   if (CkMyPe() == cpStarter) {
386     startTime = CmiWallTimer();
387     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
388   }
389
390   int len = ckTable.length();
391   for (int i=0; i<len; i++) {
392     CkCheckPTInfo *entry = ckTable[i];
393       // always let the bigger number processor send request
394     if (CkMyPe() < entry->pNo) continue;
395       // call inmem_checkpoint to the array element, ask it to send
396       // back checkpoint data via recvData().
397     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
398     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
399   }
400     // if my table is empty, then I am done
401   if (len == 0) thisProxy[cpStarter].cpFinish();
402
403   // pack and send proc level data
404   sendProcData();
405 }
406
407 // don't handle array elements
408 static inline void _handleProcData(PUP::er &p)
409 {
410     // save readonlys, and callback BTW
411     CkPupROData(p);
412
413     // save mainchares into MainChares.dat
414     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
415         
416 #if CMK_FT_CHARE
417     // save non-migratable chare
418     CkPupChareData(p);
419 #endif
420
421     // save groups into Groups.dat
422     CkPupGroupData(p);
423
424     // save nodegroups into NodeGroups.dat
425     if(CkMyRank()==0) CkPupNodeGroupData(p);
426 }
427
428 void CkMemCheckPT::sendProcData()
429 {
430   // find out size of buffer
431   int size;
432   {
433     PUP::sizer p;
434     _handleProcData(p);
435     size = p.size();
436   }
437   int packSize = size;
438   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
439   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
440   {
441     PUP::toMem p(msg->packData);
442     _handleProcData(p);
443   }
444   msg->pe = CkMyPe();
445   msg->len = size;
446   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
447   thisProxy[ChkptOnPe()].recvProcData(msg);
448 }
449
450 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
451 {
452   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
453   CpvAccess(procChkptBuf) = msg;
454 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
455   thisProxy[msg->reportPe].cpFinish();
456 }
457
458 // ArrayElement call this function to give us the checkpointed data
459 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
460 {
461   int len = ckTable.length();
462   int idx;
463   for (idx=0; idx<len; idx++) {
464     CkCheckPTInfo *entry = ckTable[idx];
465     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
466   }
467   CkAssert(idx < len);
468   int isChkpting = msg->cp_flag;
469   ckTable[idx]->updateBuffer(msg);
470   if (isChkpting) {
471       // all my array elements have returned their inmem data
472       // inform starter processor that I am done.
473     recvCount ++;
474     if (recvCount == ckTable.length()) {
475       if (where == CkCheckPoint_inMEM) {
476         thisProxy[cpStarter].cpFinish();
477       }
478       else if (where == CkCheckPoint_inDISK) {
479         // another barrier for finalize the writing using fsync
480         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
481         contribute(0,NULL,CkReduction::sum_int,localcb);
482       }
483       else
484         CmiAbort("Unknown checkpoint scheme");
485       recvCount = 0;
486     } 
487   }
488 }
489
490 // only used in disk checkpointing
491 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
492 {
493   delete m;
494 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
495   system("sync");
496 #endif
497   thisProxy[cpStarter].cpFinish();
498 }
499
500 // only is called on cpStarter when checkpoint is done
501 void CkMemCheckPT::cpFinish()
502 {
503   CmiAssert(CkMyPe() == cpStarter);
504   peCount++;
505     // now that all processors have finished, activate callback
506   if (peCount == 2*(CkNumPes())) {
507     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
508     cpCallback.send();
509     peCount = 0;
510     thisProxy.report();
511   }
512 }
513
514 // for debugging, report checkpoint info
515 void CkMemCheckPT::report()
516 {
517   int objsize = 0;
518   int len = ckTable.length();
519   for (int i=0; i<len; i++) {
520     CkCheckPTInfo *entry = ckTable[i];
521     CmiAssert(entry);
522     objsize += entry->getSize();
523   }
524   CmiAssert(CpvAccess(procChkptBuf));
525   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
526 }
527
528 /*****************************************************************************
529                         RESTART Procedure
530 *****************************************************************************/
531
532 // master processor of two buddies
533 inline int CkMemCheckPT::isMaster(int buddype)
534 {
535   int mype = CkMyPe();
536 //CkPrintf("ismaster: %d %d\n", pe, mype);
537   if (CkNumPes() - totalFailed() == 2) {
538     return mype > buddype;
539   }
540   for (int i=1; i<CkNumPes(); i++) {
541     int me = (buddype+i)%CkNumPes();
542     if (isFailed(me)) continue;
543     if (me == mype) return 1;
544     else return 0;
545   }
546   return 0;
547 }
548
549 #ifdef CKLOCMGR_LOOP
550 #undef CKLOCMGR_LOOP
551 #endif
552
553 // loop over all CkLocMgr and do "code"
554 #define  CKLOCMGR_LOOP(code)    {       \
555   int numGroups = CkpvAccess(_groupIDTable)->size();    \
556   for(int i=0;i<numGroups;i++) {        \
557     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
558     if(obj->isLocMgr())  {      \
559       CkLocMgr *mgr = (CkLocMgr*)obj;   \
560       code      \
561     }   \
562   }     \
563  }
564
565 #if 0
566 // helper class to pup all elements that belong to same ckLocMgr
567 class ElementDestoryer : public CkLocIterator {
568 private:
569         CkLocMgr *locMgr;
570 public:
571         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
572         void addLocation(CkLocation &loc) {
573                 CkArrayIndexMax idx=loc.getIndex();
574                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
575                 loc.destroy();
576         }
577 };
578 #endif
579
580 // restore the bitmap vector for LB
581 void CkMemCheckPT::resetLB(int diepe)
582 {
583 #if CMK_LBDB_ON
584   int i;
585   char *bitmap = new char[CkNumPes()];
586   // set processor available bitmap
587   get_avail_vector(bitmap);
588
589   for (i=0; i<failedPes.length(); i++)
590     bitmap[failedPes[i]] = 0; 
591   bitmap[diepe] = 0;
592
593 #if CK_NO_PROC_POOL
594   set_avail_vector(bitmap);
595 #endif
596
597   // if I am the crashed pe, rebuild my failedPEs array
598   if (CkMyPe() == diepe)
599     for (i=0; i<CkNumPes(); i++) 
600       if (bitmap[i]==0) failed(i);
601
602   delete [] bitmap;
603 #endif
604 }
605
606 // in case when failedPe dies, everybody go through its checkpoint table:
607 // destory all array elements
608 // recover lost buddies
609 // reconstruct all array elements from check point data
610 // called on all processors
611 void CkMemCheckPT::restart(int diePe)
612 {
613 #if CMK_MEM_CHECKPOINT
614   double curTime = CmiWallTimer();
615   if (CkMyPe() == diePe)
616     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
617   stage = (char*)"resetLB";
618   startTime = curTime;
619   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
620
621 #if CK_NO_PROC_POOL
622   failed(diePe);        // add into the list of failed pes
623 #endif
624   thisFailedPe = diePe;
625
626   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
627
628   inRestarting = 1;
629                                                                                 
630   // disable load balancer's barrier
631   if (CkMyPe() != diePe) resetLB(diePe);
632
633   CKLOCMGR_LOOP(mgr->startInserting(););
634
635   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
636 #endif
637 }
638
639 // loally remove all array elements
640 void CkMemCheckPT::removeArrayElements()
641 {
642 #if CMK_MEM_CHECKPOINT
643   int len = ckTable.length();
644   double curTime = CmiWallTimer();
645   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
646   stage = (char*)"removeArrayElements";
647   startTime = curTime;
648
649   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
650   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
651
652   // get rid of all buffering and remote recs
653   // including destorying all array elements
654   CKLOCMGR_LOOP(mgr->flushAllRecs(););
655
656 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
657
658   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
659 #endif
660 }
661
662 // flush state in reduction manager
663 void CkMemCheckPT::resetReductionMgr()
664 {
665   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
666   int numGroups = CkpvAccess(_groupIDTable)->size();
667   for(int i=0;i<numGroups;i++) {
668     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
669     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
670     obj->flushStates();
671     obj->ckJustMigrated();
672   }
673   // reset again
674   //CpvAccess(_qd)->flushStates();
675
676   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
677 }
678
679 // recover the lost buddies
680 void CkMemCheckPT::recoverBuddies()
681 {
682   int idx;
683   int len = ckTable.length();
684   // ready to flush reduction manager
685   // cannot be CkMemCheckPT::restart because destory will modify states
686   double curTime = CmiWallTimer();
687   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
688   stage = (char *)"recoverBuddies";
689   startTime = curTime;
690
691   // recover buddies
692   for (idx=0; idx<len; idx++) {
693     CkCheckPTInfo *entry = ckTable[idx];
694     if (entry->pNo == thisFailedPe) {
695 #if CK_NO_PROC_POOL
696       // find a new buddy
697       int budPe = CkMyPe();
698 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
699       while (budPe == CkMyPe() || isFailed(budPe)) 
700           budPe = (budPe+1)%CkNumPes();
701       entry->pNo = budPe;
702 #else
703       int budPe = thisFailedPe;
704 #endif
705       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
706       CkArrayCheckPTMessage *msg = entry->getCopy();
707       msg->cp_flag = 0;            // not checkpointing
708       thisProxy[budPe].recvData(msg);
709     }
710   }
711
712   if (CkMyPe() == 0)
713     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
714 }
715
716 // restore array elements
717 void CkMemCheckPT::recoverArrayElements()
718 {
719   double curTime = CmiWallTimer();
720   int len = ckTable.length();
721   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
722   stage = (char *)"recoverArrayElements";
723   startTime = curTime;
724
725   // recover all array elements
726   int count = 0;
727   for (int idx=0; idx<len; idx++)
728   {
729     CkCheckPTInfo *entry = ckTable[idx];
730 #if CK_NO_PROC_POOL
731     // the bigger one will do 
732 //    if (CkMyPe() < entry->pNo) continue;
733     if (!isMaster(entry->pNo)) continue;
734 #else
735     // smaller one do it, which has the original object
736     if (CkMyPe() == entry->pNo+1 || 
737         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
738 #endif
739 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
740
741     entry->updateBuddy(CkMyPe(), entry->pNo);
742     CkArrayCheckPTMessage *msg = entry->getCopy();
743     // gzheng
744     //checkptMgr[CkMyPe()].inmem_restore(msg);
745     inmem_restore(msg);
746     count ++;
747   }
748 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
749
750   if (CkMyPe() == 0)
751     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
752 }
753
754 // on every processor
755 // turn load balancer back on
756 void CkMemCheckPT::finishUp()
757 {
758   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
759   CKLOCMGR_LOOP(mgr->doneInserting(););
760   
761   inRestarting = 0;
762
763   if (CkMyPe() == 0)
764   {
765        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
766        CkStartQD(cpCallback);
767   } 
768 #if CK_NO_PROC_POOL
769   if (CkNumPes()-totalFailed() <=2) {
770     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
771     _memChkptOn = 0;
772   }
773 #endif
774 }
775
776 // called only on 0
777 void CkMemCheckPT::quiescence(CkCallback &cb)
778 {
779   static int pe_count = 0;
780   pe_count ++;
781   CmiAssert(CkMyPe() == 0);
782   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
783   if (pe_count == CkNumPes()) {
784     pe_count = 0;
785     cb.send();
786   }
787 }
788
789 // User callable function - to start a checkpoint
790 // callback cb is used to pass control back
791 void CkStartMemCheckpoint(CkCallback &cb)
792 {
793 #if CMK_MEM_CHECKPOINT
794   if (_memChkptOn == 0) {
795     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
796     cb.send();
797     return;
798   }
799   if (CkInRestarting()) {
800       // trying to checkpointing during restart
801     cb.send();
802     return;
803   }
804     // store user callback and user data
805   CkMemCheckPT::cpCallback = cb;
806
807     // broadcast to start check pointing
808   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
809   checkptMgr.doItNow(CkMyPe(), cb);
810 #else
811   // when mem checkpoint is disabled, invike cb immediately
812   cb.send();
813 #endif
814 }
815
816 void CkRestartCheckPoint(int diePe)
817 {
818 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
819   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
820   // broadcast
821   checkptMgr.restart(diePe);
822 }
823
824 static int _diePE = -1;
825
826 // callback function used locally by ccs handler
827 static void CkRestartCheckPointCallback(void *ignore, void *msg)
828 {
829 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
830   CkRestartCheckPoint(_diePE);
831 }
832
833 // Converse function handles
834 static int askProcDataHandlerIdx;
835 static int restartBcastHandlerIdx;
836 static int recoverProcDataHandlerIdx;
837 static int restartBeginHandlerIdx;
838
839 static void restartBeginHandler(char *msg)
840 {
841 #if CMK_MEM_CHECKPOINT
842   static int count = 0;
843   CmiFree(msg);
844   CmiAssert(CkMyPe() == _diePE);
845   count ++;
846   if (count == CkNumPes()) {
847     CkRestartCheckPointCallback(NULL, NULL);
848     count = 0;
849   }
850 #endif
851 }
852
853 static void restartBcastHandler(char *msg)
854 {
855 #if CMK_MEM_CHECKPOINT
856   // advance phase counter
857   CkMemCheckPT::inRestarting = 1;
858   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
859   // gzheng
860   if (CkMyPe() != _diePE) cur_restart_phase ++;
861
862   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
863
864   // reset QD counters
865   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
866
867 /*  gzheng
868   if (CkMyPe()==_diePE)
869       CkRestartCheckPointCallback(NULL, NULL);
870 */
871   CmiFree(msg);
872
873   char restartmsg[CmiMsgHeaderSizeBytes];
874   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
875   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
876 #endif
877 }
878
879 extern void _initDone();
880
881 // called on crashed processor
882 static void recoverProcDataHandler(char *msg)
883 {
884 #if CMK_MEM_CHECKPOINT
885    int i;
886    envelope *env = (envelope *)msg;
887    CkUnpackMessage(&env);
888    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
889    cur_restart_phase = procMsg->cur_restart_phase;
890    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
891    cur_restart_phase ++;
892    CpvAccess(_qd)->flushStates();
893
894    // restore readonly, mainchare, group, nodegroup
895 //   int temp = cur_restart_phase;
896 //   cur_restart_phase = -1;
897    PUP::fromMem p(procMsg->packData);
898    _handleProcData(p);
899
900    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
901    // gzheng
902    CKLOCMGR_LOOP(mgr->startInserting(););
903
904    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
905    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
906    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
907    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
908    CmiFree(msg);
909
910    _initDone();
911 #endif
912 }
913
914 // called on its backup processor
915 // get backup message buffer and sent to crashed processor
916 static void askProcDataHandler(char *msg)
917 {
918 #if CMK_MEM_CHECKPOINT
919     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
920     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
921     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
922     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
923     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
924
925     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
926
927     CkPackMessage(&env);
928     CmiSetHandler(env, recoverProcDataHandlerIdx);
929     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
930     CpvAccess(procChkptBuf) = NULL;
931 #endif
932 }
933
934 void CkMemRestart(const char *dummy, CkArgMsg *args)
935 {
936 #if CMK_MEM_CHECKPOINT
937    _diePE = CkMyPe();
938    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
939    CkMemCheckPT::startTime = CmiWallTimer();
940    CkMemCheckPT::inRestarting = 1;
941    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
942    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
943    cur_restart_phase = 9999;             // big enough to get it processed
944    CmiSetHandler(msg, askProcDataHandlerIdx);
945    int pe = ChkptOnPe();
946    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
947    cur_restart_phase=0;    // allow all message to come in
948 #else
949    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
950 #endif
951 }
952
953 // can be called in other files
954 // return true if it is in restarting
955 int CkInRestarting()
956 {
957 #if CMK_MEM_CHECKPOINT
958   // gzheng
959   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
960   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
961   return CkMemCheckPT::inRestarting;
962 #else
963   return 0;
964 #endif
965 }
966
967 /*****************************************************************************
968                 module initialization
969 *****************************************************************************/
970
971 static int arg_where = CkCheckPoint_inMEM;
972
973 #if CMK_MEM_CHECKPOINT
974 void init_memcheckpt(char **argv)
975 {
976     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
977       arg_where = CkCheckPoint_inDISK;
978     }
979 }
980 #endif
981
982 class CkMemCheckPTInit: public Chare {
983 public:
984   CkMemCheckPTInit(CkArgMsg *m) {
985 #if CMK_MEM_CHECKPOINT
986     if (arg_where == CkCheckPoint_inDISK) {
987       CkPrintf("Charm++> Double-disk Checkpointing. \n");
988     }
989     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
990     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
991 #endif
992   }
993 };
994
995 // initproc
996 void CkRegisterRestartHandler( )
997 {
998 #if CMK_MEM_CHECKPOINT
999   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1000   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1001   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1002   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1003
1004   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1005   CpvAccess(procChkptBuf) = NULL;
1006
1007 #if 1
1008   // for debugging
1009   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1010 //  sleep(4);
1011 #endif
1012 #endif
1013 }
1014
1015 /// @todo: the following definitions should be moved to a separate file containing
1016 // structures and functions about fault tolerance strategies
1017
1018 /**
1019  *  * @brief: function for killing a process                                             
1020  *   */
1021 #ifdef CMK_MEM_CHECKPOINT
1022 #if CMK_HAS_GETPID
1023 void killLocal(void *_dummy,double curWallTime){
1024         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1025         if(CmiWallTimer()<killTime-1){
1026                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1027         }else{  
1028                 kill(getpid(),SIGKILL);                                               
1029         }              
1030
1031 #else
1032 void killLocal(void *_dummy,double curWallTime){
1033   CmiAbort("kill() not supported!");
1034 }
1035 #endif
1036 #endif
1037
1038 #ifdef CMK_MEM_CHECKPOINT
1039 /**
1040  * @brief: reads the file with the kill information
1041  */
1042 void readKillFile(){
1043         FILE *fp=fopen(killFile,"r");
1044         if(!fp){
1045                 return;
1046         }
1047         int proc;
1048         double sec;
1049         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1050                 if(proc == CkMyPe()){
1051                         killTime = CmiWallTimer()+sec;
1052                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1053                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1054                 }
1055         }
1056         fclose(fp);
1057 }
1058 #endif
1059
1060 #include "CkMemCheckpoint.def.h"
1061
1062