Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        0
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on the next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96   do {
97     budnode = (budnode+1)%CmiNumPhysicalNodes();
98     int *pelist;
99     int num;
100     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
101     budpe = pelist[r1 % num];
102   } while (isFailed(budpe));
103   if (budpe == pe) {
104     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
105     CmiAbort("Failed to find a buddy processor");
106   }
107 #else
108   budpe = pe;
109   while (budpe == pe || isFailed(budpe)) 
110           budpe = (budpe+1)%CkNumPes();
111 #endif
112   return budpe;
113 }
114
115 // called in array element constructor
116 // choose and register with 2 buddies for checkpoiting 
117 #if CMK_MEM_CHECKPOINT
118 void ArrayElement::init_checkpt() {
119         if (_memChkptOn == 0) return;
120         if (CkInRestarting()) {
121           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
122         }
123         // only master init checkpoint
124         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
125
126         budPEs[0] = CkMyPe();
127         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
128         CmiAssert(budPEs[0] != budPEs[1]);
129         // inform checkPTMgr
130         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
131         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
132         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
133         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
134 }
135 #endif
136
137 // entry function invoked by checkpoint mgr asking for checkpoint data
138 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
139 #if CMK_MEM_CHECKPOINT
140   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
141   CkLocMgr *locMgr = thisArray->getLocMgr();
142   CmiAssert(myRec!=NULL);
143   int size;
144   {
145         PUP::sizer p;
146         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
147         size = p.size();
148   }
149   int packSize = size/sizeof(double) +1;
150   CkArrayCheckPTMessage *msg =
151                  new (packSize, 0) CkArrayCheckPTMessage;
152   msg->len = size;
153   msg->index =thisIndexMax;
154   msg->aid = thisArrayID;
155   msg->locMgr = locMgr->getGroupID();
156   msg->cp_flag = 1;
157   {
158         PUP::toMem p(msg->packData);
159         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
160   }
161
162   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
163   checkptMgr.recvData(msg, 2, budPEs);
164   delete m;
165 #endif
166 }
167
168 // checkpoint holder class - for memory checkpointing
169 class CkMemCheckPTInfo: public CkCheckPTInfo
170 {
171   CkArrayCheckPTMessage *ckBuffer;
172 public:
173   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
174                     CkCheckPTInfo(a, loc, idx, pno)
175   {
176     ckBuffer = NULL;
177   }
178   ~CkMemCheckPTInfo() 
179   {
180     if (ckBuffer) delete ckBuffer; 
181   }
182   inline void updateBuffer(CkArrayCheckPTMessage *data) 
183   {
184     CmiAssert(data!=NULL);
185     if (ckBuffer) delete ckBuffer;
186     ckBuffer = data;
187   }    
188   inline CkArrayCheckPTMessage * getCopy()
189   {
190     if (ckBuffer == NULL) {
191       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
192       CmiAbort("Abort!");
193     }
194     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
195   }     
196   inline void updateBuddy(int b1, int b2) {
197      CmiAssert(ckBuffer);
198      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
199   }
200   inline int getSize() { 
201      CmiAssert(ckBuffer);
202      return ckBuffer->len; 
203   }
204 };
205
206 // checkpoint holder class - for in-disk checkpointing
207 class CkDiskCheckPTInfo: public CkCheckPTInfo 
208 {
209   char *fname;
210   int bud1, bud2;
211   int len;                      // checkpoint size
212 public:
213   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
214   {
215 #if CMK_USE_MKSTEMP
216     fname = new char[64];
217     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
218     mkstemp(fname);
219 #else
220     fname=tmpnam(NULL);
221 #endif
222     bud1 = bud2 = -1;
223     len = 0;
224   }
225   ~CkDiskCheckPTInfo() 
226   {
227     remove(fname);
228   }
229   inline void updateBuffer(CkArrayCheckPTMessage *data) 
230   {
231     double t = CmiWallTimer();
232     // unpack it
233     envelope *env = UsrToEnv(data);
234     CkUnpackMessage(&env);
235     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
236     FILE *f = fopen(fname,"wb");
237     PUP::toDisk p(f);
238     CkPupMessage(p, (void **)&data);
239     // delay sync to the end because otherwise the messages are blocked
240 //    fsync(fileno(f));
241     fclose(f);
242     bud1 = data->bud1;
243     bud2 = data->bud2;
244     len = data->len;
245     delete data;
246     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
247   }
248   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
249   {
250     CkArrayCheckPTMessage *data;
251     FILE *f = fopen(fname,"rb");
252     PUP::fromDisk p(f);
253     CkPupMessage(p, (void **)&data);
254     fclose(f);
255     data->bud1 = bud1;                          // update the buddies
256     data->bud2 = bud2;
257     return data;
258   }
259   inline void updateBuddy(int b1, int b2) {
260      bud1 = b1; bud2 = b2;
261   }
262   inline int getSize() { 
263      return len; 
264   }
265 };
266
267 CkMemCheckPT::CkMemCheckPT(int w)
268 {
269   int numnodes = 0;
270 #if NODE_CHECKPOINT
271   numnodes = CmiNumPhysicalNodes();
272 #else
273   numnodes = CkNumPes();
274 #endif
275 #if CK_NO_PROC_POOL
276   if (numnodes <= 2)
277 #else
278   if (numnodes  == 1)
279 #endif
280   {
281     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
282     _memChkptOn = 0;
283   }
284   inRestarting = 0;
285   recvCount = peCount = 0;
286   where = w;
287 }
288
289 CkMemCheckPT::~CkMemCheckPT()
290 {
291   int len = ckTable.length();
292   for (int i=0; i<len; i++) {
293     delete ckTable[i];
294   }
295 }
296
297 void CkMemCheckPT::pup(PUP::er& p) 
298
299   CBase_CkMemCheckPT::pup(p); 
300   p|cpStarter;
301   p|thisFailedPe;
302   p|failedPes;
303   p|ckCheckPTGroupID;           // recover global variable
304   p|cpCallback;                 // store callback
305   p|where;                      // where to checkpoint
306   if (p.isUnpacking()) {
307     recvCount = peCount = 0;
308   }
309 }
310
311 // called by checkpoint mgr to restore an array element
312 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
313 {
314 #if CMK_MEM_CHECKPOINT
315   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
316   // m->index.print();
317   PUP::fromMem p(m->packData);
318   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
319   CmiAssert(mgr);
320   mgr->resume(m->index, p);
321
322   // find a list of array elements bound together
323   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
324   CmiAssert(elt);
325   CkLocRec_local *rec = elt->myRec;
326   CkVec<CkMigratable *> list;
327   mgr->migratableList(rec, list);
328   CmiAssert(list.length() > 0);
329   for (int l=0; l<list.length(); l++) {
330     elt = (ArrayElement *)list[l];
331     elt->budPEs[0] = m->bud1;
332     elt->budPEs[1] = m->bud2;
333     //    reset, may not needed now
334     // for now.
335     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
336       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
337       if (c) c->redNo = 0;
338     }
339   }
340 #endif
341 }
342
343 // return 1 if pe is a crashed processor
344 int CkMemCheckPT::isFailed(int pe)
345 {
346   for (int i=0; i<failedPes.length(); i++)
347     if (failedPes[i] == pe) return 1;
348   return 0;
349 }
350
351 // add pe into history list of all failed processors
352 void CkMemCheckPT::failed(int pe)
353 {
354   if (isFailed(pe)) return;
355   failedPes.push_back(pe);
356 }
357
358 int CkMemCheckPT::totalFailed()
359 {
360   return failedPes.length();
361 }
362
363 // create an checkpoint entry for array element of aid with index.
364 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
365 {
366   // error check, no duplicate
367   int idx, len = ckTable.size();
368   for (idx=0; idx<len; idx++) {
369     CkCheckPTInfo *entry = ckTable[idx];
370     if (index == entry->index) {
371       if (loc == entry->locMgr) {
372           // bindTo array elements
373           return;
374       }
375         // for array inheritance, the following check may fail
376         // because ArrayElement constructor of all superclasses are called
377       if (aid == entry->aid) {
378         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
379         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
380       }
381     }
382   }
383   CkCheckPTInfo *newEntry;
384   if (where == CkCheckPoint_inMEM)
385     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
386   else
387     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
388   ckTable.push_back(newEntry);
389   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
390 }
391
392 // loop through my checkpoint table and ask checkpointed array elements
393 // to send me checkpoint data.
394 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
395 {
396   cpCallback = cb;
397   cpStarter = starter;
398   if (CkMyPe() == cpStarter) {
399     startTime = CmiWallTimer();
400     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
401   }
402
403   int len = ckTable.length();
404   for (int i=0; i<len; i++) {
405     CkCheckPTInfo *entry = ckTable[i];
406       // always let the bigger number processor send request
407     if (CkMyPe() < entry->pNo) continue;
408       // call inmem_checkpoint to the array element, ask it to send
409       // back checkpoint data via recvData().
410     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
411     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
412   }
413     // if my table is empty, then I am done
414   if (len == 0) thisProxy[cpStarter].cpFinish();
415
416   // pack and send proc level data
417   sendProcData();
418 }
419
420 // don't handle array elements
421 static inline void _handleProcData(PUP::er &p)
422 {
423     // save readonlys, and callback BTW
424     CkPupROData(p);
425
426     // save mainchares into MainChares.dat
427     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
428         
429 #ifndef CMK_CHARE_USE_PTR
430     // save non-migratable chare
431     CkPupChareData(p);
432 #endif
433
434     // save groups into Groups.dat
435     CkPupGroupData(p);
436
437     // save nodegroups into NodeGroups.dat
438     if(CkMyRank()==0) CkPupNodeGroupData(p);
439 }
440
441 void CkMemCheckPT::sendProcData()
442 {
443   // find out size of buffer
444   int size;
445   {
446     PUP::sizer p;
447     _handleProcData(p);
448     size = p.size();
449   }
450   int packSize = size;
451   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
452   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
453   {
454     PUP::toMem p(msg->packData);
455     _handleProcData(p);
456   }
457   msg->pe = CkMyPe();
458   msg->len = size;
459   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
460   thisProxy[ChkptOnPe()].recvProcData(msg);
461 }
462
463 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
464 {
465   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
466   CpvAccess(procChkptBuf) = msg;
467   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
468   thisProxy[msg->reportPe].cpFinish();
469 }
470
471 // ArrayElement call this function to give us the checkpointed data
472 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
473 {
474   int len = ckTable.length();
475   int idx;
476   for (idx=0; idx<len; idx++) {
477     CkCheckPTInfo *entry = ckTable[idx];
478     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
479   }
480   CkAssert(idx < len);
481   int isChkpting = msg->cp_flag;
482   ckTable[idx]->updateBuffer(msg);
483   if (isChkpting) {
484       // all my array elements have returned their inmem data
485       // inform starter processor that I am done.
486     recvCount ++;
487     if (recvCount == ckTable.length()) {
488       if (where == CkCheckPoint_inMEM) {
489         thisProxy[cpStarter].cpFinish();
490       }
491       else if (where == CkCheckPoint_inDISK) {
492         // another barrier for finalize the writing using fsync
493         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
494         contribute(0,NULL,CkReduction::sum_int,localcb);
495       }
496       else
497         CmiAbort("Unknown checkpoint scheme");
498       recvCount = 0;
499     } 
500   }
501 }
502
503 // only used in disk checkpointing
504 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
505 {
506   delete m;
507 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
508   system("sync");
509 #endif
510   thisProxy[cpStarter].cpFinish();
511 }
512
513 // only is called on cpStarter when checkpoint is done
514 void CkMemCheckPT::cpFinish()
515 {
516   CmiAssert(CkMyPe() == cpStarter);
517   peCount++;
518     // now that all processors have finished, activate callback
519   if (peCount == 2*(CkNumPes())) {
520     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
521     cpCallback.send();
522     peCount = 0;
523     thisProxy.report();
524   }
525 }
526
527 // for debugging, report checkpoint info
528 void CkMemCheckPT::report()
529 {
530   int objsize = 0;
531   int len = ckTable.length();
532   for (int i=0; i<len; i++) {
533     CkCheckPTInfo *entry = ckTable[i];
534     CmiAssert(entry);
535     objsize += entry->getSize();
536   }
537   CmiAssert(CpvAccess(procChkptBuf));
538   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
539 }
540
541 /*****************************************************************************
542                         RESTART Procedure
543 *****************************************************************************/
544
545 // master processor of two buddies
546 inline int CkMemCheckPT::isMaster(int buddype)
547 {
548   int mype = CkMyPe();
549 //CkPrintf("ismaster: %d %d\n", pe, mype);
550   if (CkNumPes() - totalFailed() == 2) {
551     return mype > buddype;
552   }
553   for (int i=1; i<CkNumPes(); i++) {
554     int me = (buddype+i)%CkNumPes();
555     if (isFailed(me)) continue;
556     if (me == mype) return 1;
557     else return 0;
558   }
559   return 0;
560 }
561
562 #ifdef CKLOCMGR_LOOP
563 #undef CKLOCMGR_LOOP
564 #endif
565
566 // loop over all CkLocMgr and do "code"
567 #define  CKLOCMGR_LOOP(code)    {       \
568   int numGroups = CkpvAccess(_groupIDTable)->size();    \
569   for(int i=0;i<numGroups;i++) {        \
570     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
571     if(obj->isLocMgr())  {      \
572       CkLocMgr *mgr = (CkLocMgr*)obj;   \
573       code      \
574     }   \
575   }     \
576  }
577
578 #if 0
579 // helper class to pup all elements that belong to same ckLocMgr
580 class ElementDestoryer : public CkLocIterator {
581 private:
582         CkLocMgr *locMgr;
583 public:
584         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
585         void addLocation(CkLocation &loc) {
586                 CkArrayIndexMax idx=loc.getIndex();
587                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
588                 loc.destroy();
589         }
590 };
591 #endif
592
593 // restore the bitmap vector for LB
594 void CkMemCheckPT::resetLB(int diepe)
595 {
596 #if CMK_LBDB_ON
597   int i;
598   char *bitmap = new char[CkNumPes()];
599   // set processor available bitmap
600   get_avail_vector(bitmap);
601
602   for (i=0; i<failedPes.length(); i++)
603     bitmap[failedPes[i]] = 0; 
604   bitmap[diepe] = 0;
605
606 #if CK_NO_PROC_POOL
607   set_avail_vector(bitmap);
608 #endif
609
610   // if I am the crashed pe, rebuild my failedPEs array
611   if (CkMyPe() == diepe)
612     for (i=0; i<CkNumPes(); i++) 
613       if (bitmap[i]==0) failed(i);
614
615   delete [] bitmap;
616 #endif
617 }
618
619 // in case when failedPe dies, everybody go through its checkpoint table:
620 // destory all array elements
621 // recover lost buddies
622 // reconstruct all array elements from check point data
623 // called on all processors
624 void CkMemCheckPT::restart(int diePe)
625 {
626 #if CMK_MEM_CHECKPOINT
627   double curTime = CmiWallTimer();
628   if (CkMyPe() == diePe)
629     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
630   stage = (char*)"resetLB";
631   startTime = curTime;
632   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
633
634 #if CK_NO_PROC_POOL
635   failed(diePe);        // add into the list of failed pes
636 #endif
637   thisFailedPe = diePe;
638
639   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
640
641   inRestarting = 1;
642                                                                                 
643   // disable load balancer's barrier
644   if (CkMyPe() != diePe) resetLB(diePe);
645
646   CKLOCMGR_LOOP(mgr->startInserting(););
647
648   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
649 #endif
650 }
651
652 // loally remove all array elements
653 void CkMemCheckPT::removeArrayElements()
654 {
655 #if CMK_MEM_CHECKPOINT
656   int len = ckTable.length();
657   double curTime = CmiWallTimer();
658   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
659   stage = (char*)"removeArrayElements";
660   startTime = curTime;
661
662   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
663   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
664
665   // get rid of all buffering and remote recs
666   // including destorying all array elements
667   CKLOCMGR_LOOP(mgr->flushAllRecs(););
668
669 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
670
671   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
672 #endif
673 }
674
675 // flush state in reduction manager
676 void CkMemCheckPT::resetReductionMgr()
677 {
678   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
679   int numGroups = CkpvAccess(_groupIDTable)->size();
680   for(int i=0;i<numGroups;i++) {
681     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
682     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
683     obj->flushStates();
684     obj->ckJustMigrated();
685   }
686   // reset again
687   //CpvAccess(_qd)->flushStates();
688
689   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
690 }
691
692 // recover the lost buddies
693 void CkMemCheckPT::recoverBuddies()
694 {
695   int idx;
696   int len = ckTable.length();
697   // ready to flush reduction manager
698   // cannot be CkMemCheckPT::restart because destory will modify states
699   double curTime = CmiWallTimer();
700   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
701   stage = (char *)"recoverBuddies";
702   startTime = curTime;
703
704   // recover buddies
705   for (idx=0; idx<len; idx++) {
706     CkCheckPTInfo *entry = ckTable[idx];
707     if (entry->pNo == thisFailedPe) {
708 #if CK_NO_PROC_POOL
709       // find a new buddy
710       int budPe = CkMyPe();
711 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
712       while (budPe == CkMyPe() || isFailed(budPe)) 
713           budPe = (budPe+1)%CkNumPes();
714       entry->pNo = budPe;
715 #else
716       int budPe = thisFailedPe;
717 #endif
718       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
719       CkArrayCheckPTMessage *msg = entry->getCopy();
720       msg->cp_flag = 0;            // not checkpointing
721       thisProxy[budPe].recvData(msg);
722     }
723   }
724
725   if (CkMyPe() == 0)
726     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
727 }
728
729 // restore array elements
730 void CkMemCheckPT::recoverArrayElements()
731 {
732   double curTime = CmiWallTimer();
733   int len = ckTable.length();
734   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
735   stage = (char *)"recoverArrayElements";
736   startTime = curTime;
737
738   // recover all array elements
739   int count = 0;
740   for (int idx=0; idx<len; idx++)
741   {
742     CkCheckPTInfo *entry = ckTable[idx];
743 #if CK_NO_PROC_POOL
744     // the bigger one will do 
745 //    if (CkMyPe() < entry->pNo) continue;
746     if (!isMaster(entry->pNo)) continue;
747 #else
748     // smaller one do it, which has the original object
749     if (CkMyPe() == entry->pNo+1 || 
750         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
751 #endif
752 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
753
754     entry->updateBuddy(CkMyPe(), entry->pNo);
755     CkArrayCheckPTMessage *msg = entry->getCopy();
756     // gzheng
757     //checkptMgr[CkMyPe()].inmem_restore(msg);
758     inmem_restore(msg);
759     count ++;
760   }
761   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
762
763   if (CkMyPe() == 0)
764     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
765 }
766
767 static double restartT;
768
769 // on every processor
770 // turn load balancer back on
771 void CkMemCheckPT::finishUp()
772 {
773   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
774   CKLOCMGR_LOOP(mgr->doneInserting(););
775   
776   inRestarting = 0;
777
778   if (CkMyPe() == 0)
779   {
780        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
781        CkStartQD(cpCallback);
782   } 
783   if (CkMyPe() == thisFailedPe)
784        CkPrintf("[%d] Restart finished in %f seconds.\n", CkMyPe(), CkWallTimer()-restartT);
785
786 #if CK_NO_PROC_POOL
787 #if NODE_CHECKPOINT
788   int numnodes = CmiNumPhysicalNodes();
789 #else
790   int numnodes = CkNumPes();
791 #endif
792   if (numnodes-totalFailed() <=2) {
793     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
794     _memChkptOn = 0;
795   }
796 #endif
797 }
798
799 // called only on 0
800 void CkMemCheckPT::quiescence(CkCallback &cb)
801 {
802   static int pe_count = 0;
803   pe_count ++;
804   CmiAssert(CkMyPe() == 0);
805   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
806   if (pe_count == CkNumPes()) {
807     pe_count = 0;
808     cb.send();
809   }
810 }
811
812 // User callable function - to start a checkpoint
813 // callback cb is used to pass control back
814 void CkStartMemCheckpoint(CkCallback &cb)
815 {
816 #if CMK_MEM_CHECKPOINT
817   if (_memChkptOn == 0) {
818     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
819     cb.send();
820     return;
821   }
822   if (CkInRestarting()) {
823       // trying to checkpointing during restart
824     cb.send();
825     return;
826   }
827     // store user callback and user data
828   CkMemCheckPT::cpCallback = cb;
829
830     // broadcast to start check pointing
831   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
832   checkptMgr.doItNow(CkMyPe(), cb);
833 #else
834   // when mem checkpoint is disabled, invike cb immediately
835   cb.send();
836 #endif
837 }
838
839 void CkRestartCheckPoint(int diePe)
840 {
841 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
842   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
843   // broadcast
844   checkptMgr.restart(diePe);
845 }
846
847 static int _diePE = -1;
848
849 // callback function used locally by ccs handler
850 static void CkRestartCheckPointCallback(void *ignore, void *msg)
851 {
852 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
853   CkRestartCheckPoint(_diePE);
854 }
855
856 // Converse function handles
857 static int askProcDataHandlerIdx;
858 static int restartBcastHandlerIdx;
859 static int recoverProcDataHandlerIdx;
860 static int restartBeginHandlerIdx;
861
862 static void restartBeginHandler(char *msg)
863 {
864 #if CMK_MEM_CHECKPOINT
865   static int count = 0;
866   CmiFree(msg);
867   CmiAssert(CkMyPe() == _diePE);
868   count ++;
869   if (count == CkNumPes()) {
870     CkRestartCheckPointCallback(NULL, NULL);
871     count = 0;
872   }
873 #endif
874 }
875
876 static void restartBcastHandler(char *msg)
877 {
878 #if CMK_MEM_CHECKPOINT
879   // advance phase counter
880   CkMemCheckPT::inRestarting = 1;
881   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
882   // gzheng
883   if (CkMyPe() != _diePE) cur_restart_phase ++;
884
885   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
886
887   // reset QD counters
888   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
889
890 /*  gzheng
891   if (CkMyPe()==_diePE)
892       CkRestartCheckPointCallback(NULL, NULL);
893 */
894   CmiFree(msg);
895
896   char restartmsg[CmiMsgHeaderSizeBytes];
897   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
898   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
899 #endif
900 }
901
902 extern void _initDone();
903
904 // called on crashed processor
905 static void recoverProcDataHandler(char *msg)
906 {
907 #if CMK_MEM_CHECKPOINT
908    int i;
909    envelope *env = (envelope *)msg;
910    CkUnpackMessage(&env);
911    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
912    cur_restart_phase = procMsg->cur_restart_phase;
913    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
914    cur_restart_phase ++;
915    CpvAccess(_qd)->flushStates();
916
917    // restore readonly, mainchare, group, nodegroup
918 //   int temp = cur_restart_phase;
919 //   cur_restart_phase = -1;
920    PUP::fromMem p(procMsg->packData);
921    _handleProcData(p);
922
923    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
924    // gzheng
925    CKLOCMGR_LOOP(mgr->startInserting(););
926
927    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
928    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
929    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
930    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
931    CmiFree(msg);
932
933    _initDone();
934 #endif
935 }
936
937 // called on its backup processor
938 // get backup message buffer and sent to crashed processor
939 static void askProcDataHandler(char *msg)
940 {
941 #if CMK_MEM_CHECKPOINT
942     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
943     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
944     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
945     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
946     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
947
948     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
949
950     CkPackMessage(&env);
951     CmiSetHandler(env, recoverProcDataHandlerIdx);
952     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
953     CpvAccess(procChkptBuf) = NULL;
954 #endif
955 }
956
957 void CkMemRestart(const char *dummy, CkArgMsg *args)
958 {
959 #if CMK_MEM_CHECKPOINT
960    restartT = CkWallTimer();
961    _diePE = CkMyPe();
962    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
963    CkMemCheckPT::startTime = CmiWallTimer();
964    CkMemCheckPT::inRestarting = 1;
965    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
966    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
967    cur_restart_phase = 9999;             // big enough to get it processed
968    CmiSetHandler(msg, askProcDataHandlerIdx);
969    int pe = ChkptOnPe();
970    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
971    cur_restart_phase=0;    // allow all message to come in
972 #else
973    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
974 #endif
975 }
976
977 // can be called in other files
978 // return true if it is in restarting
979 int CkInRestarting()
980 {
981 #if CMK_MEM_CHECKPOINT
982   // gzheng
983   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
984   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
985   return CkMemCheckPT::inRestarting;
986 #else
987   return 0;
988 #endif
989 }
990
991 /*****************************************************************************
992                 module initialization
993 *****************************************************************************/
994
995 static int arg_where = CkCheckPoint_inMEM;
996
997 #if CMK_MEM_CHECKPOINT
998 void init_memcheckpt(char **argv)
999 {
1000     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1001       arg_where = CkCheckPoint_inDISK;
1002     }
1003 }
1004 #endif
1005
1006 class CkMemCheckPTInit: public Chare {
1007 public:
1008   CkMemCheckPTInit(CkArgMsg *m) {
1009 #if CMK_MEM_CHECKPOINT
1010     if (arg_where == CkCheckPoint_inDISK) {
1011       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1012     }
1013     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1014     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1015 #endif
1016   }
1017 };
1018
1019 // initproc
1020 void CkRegisterRestartHandler( )
1021 {
1022 #if CMK_MEM_CHECKPOINT
1023   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1024   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1025   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1026   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1027
1028   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1029   CpvAccess(procChkptBuf) = NULL;
1030
1031 #if 1
1032   // for debugging
1033   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1034 //  sleep(4);
1035 #endif
1036 #endif
1037 }
1038
1039 /// @todo: the following definitions should be moved to a separate file containing
1040 // structures and functions about fault tolerance strategies
1041
1042 /**
1043  *  * @brief: function for killing a process                                             
1044  *   */
1045 #ifdef CMK_MEM_CHECKPOINT
1046 #if CMK_HAS_GETPID
1047 void killLocal(void *_dummy,double curWallTime){
1048         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1049         if(CmiWallTimer()<killTime-1){
1050                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1051         }else{  
1052                 kill(getpid(),SIGKILL);                                               
1053         }              
1054
1055 #else
1056 void killLocal(void *_dummy,double curWallTime){
1057   CmiAbort("kill() not supported!");
1058 }
1059 #endif
1060 #endif
1061
1062 #ifdef CMK_MEM_CHECKPOINT
1063 /**
1064  * @brief: reads the file with the kill information
1065  */
1066 void readKillFile(){
1067         FILE *fp=fopen(killFile,"r");
1068         if(!fp){
1069                 return;
1070         }
1071         int proc;
1072         double sec;
1073         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1074                 if(proc == CkMyPe()){
1075                         killTime = CmiWallTimer()+sec;
1076                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1077                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1078                 }
1079         }
1080         fclose(fp);
1081 }
1082 #endif
1083
1084 #include "CkMemCheckpoint.def.h"
1085
1086