fix another bug in previous fix.
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        1
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on the next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96   do {
97     budnode = (budnode+1)%CmiNumPhysicalNodes();
98     int *pelist;
99     int num;
100     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
101     budpe = pelist[r1 % num];
102   } while (isFailed(budpe));
103   if (budpe == pe) {
104     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
105     CmiAbort("Failed to find a buddy processor");
106   }
107 #else
108   budpe = pe;
109   while (budpe == pe || isFailed(budPe)) 
110           budPe = (budPe+1)%CkNumPes();
111 #endif
112   return budpe;
113 }
114
115 // called in array element constructor
116 // choose and register with 2 buddies for checkpoiting 
117 #if CMK_MEM_CHECKPOINT
118 void ArrayElement::init_checkpt() {
119         if (_memChkptOn == 0) return;
120         // only master init checkpoint
121         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
122
123         budPEs[0] = CkMyPe();
124         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
125         CmiAssert(budPEs[0] != budPEs[1]);
126         // inform checkPTMgr
127         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
128         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
129         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
130         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
131 }
132 #endif
133
134 // entry function invoked by checkpoint mgr asking for checkpoint data
135 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
136 #if CMK_MEM_CHECKPOINT
137   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
138   CkLocMgr *locMgr = thisArray->getLocMgr();
139   CmiAssert(myRec!=NULL);
140   int size;
141   {
142         PUP::sizer p;
143         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
144         size = p.size();
145   }
146   int packSize = size/sizeof(double) +1;
147   CkArrayCheckPTMessage *msg =
148                  new (packSize, 0) CkArrayCheckPTMessage;
149   msg->len = size;
150   msg->index =thisIndexMax;
151   msg->aid = thisArrayID;
152   msg->locMgr = locMgr->getGroupID();
153   msg->cp_flag = 1;
154   {
155         PUP::toMem p(msg->packData);
156         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
157   }
158
159   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
160   checkptMgr.recvData(msg, 2, budPEs);
161   delete m;
162 #endif
163 }
164
165 // checkpoint holder class - for memory checkpointing
166 class CkMemCheckPTInfo: public CkCheckPTInfo
167 {
168   CkArrayCheckPTMessage *ckBuffer;
169 public:
170   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
171                     CkCheckPTInfo(a, loc, idx, pno)
172   {
173     ckBuffer = NULL;
174   }
175   ~CkMemCheckPTInfo() 
176   {
177     if (ckBuffer) delete ckBuffer; 
178   }
179   inline void updateBuffer(CkArrayCheckPTMessage *data) 
180   {
181     CmiAssert(data!=NULL);
182     if (ckBuffer) delete ckBuffer;
183     ckBuffer = data;
184   }    
185   inline CkArrayCheckPTMessage * getCopy()
186   {
187     if (ckBuffer == NULL) {
188       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
189       CmiAbort("Abort!");
190     }
191     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
192   }     
193   inline void updateBuddy(int b1, int b2) {
194      CmiAssert(ckBuffer);
195      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
196   }
197   inline int getSize() { 
198      CmiAssert(ckBuffer);
199      return ckBuffer->len; 
200   }
201 };
202
203 // checkpoint holder class - for in-disk checkpointing
204 class CkDiskCheckPTInfo: public CkCheckPTInfo 
205 {
206   char *fname;
207   int bud1, bud2;
208   int len;                      // checkpoint size
209 public:
210   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
211   {
212 #if CMK_USE_MKSTEMP
213     fname = new char[64];
214     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
215     mkstemp(fname);
216 #else
217     fname=tmpnam(NULL);
218 #endif
219     bud1 = bud2 = -1;
220     len = 0;
221   }
222   ~CkDiskCheckPTInfo() 
223   {
224     remove(fname);
225   }
226   inline void updateBuffer(CkArrayCheckPTMessage *data) 
227   {
228     double t = CmiWallTimer();
229     // unpack it
230     envelope *env = UsrToEnv(data);
231     CkUnpackMessage(&env);
232     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
233     FILE *f = fopen(fname,"wb");
234     PUP::toDisk p(f);
235     CkPupMessage(p, (void **)&data);
236     // delay sync to the end because otherwise the messages are blocked
237 //    fsync(fileno(f));
238     fclose(f);
239     bud1 = data->bud1;
240     bud2 = data->bud2;
241     len = data->len;
242     delete data;
243     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
244   }
245   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
246   {
247     CkArrayCheckPTMessage *data;
248     FILE *f = fopen(fname,"rb");
249     PUP::fromDisk p(f);
250     CkPupMessage(p, (void **)&data);
251     fclose(f);
252     data->bud1 = bud1;                          // update the buddies
253     data->bud2 = bud2;
254     return data;
255   }
256   inline void updateBuddy(int b1, int b2) {
257      bud1 = b1; bud2 = b2;
258   }
259   inline int getSize() { 
260      return len; 
261   }
262 };
263
264 CkMemCheckPT::CkMemCheckPT(int w)
265 {
266 #if CK_NO_PROC_POOL
267   if (CkNumPes() <= 2) {
268 #else
269   if (CkNumPes()  == 1) {
270 #endif
271     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
272     _memChkptOn = 0;
273   }
274   inRestarting = 0;
275   recvCount = peCount = 0;
276   where = w;
277 }
278
279 CkMemCheckPT::~CkMemCheckPT()
280 {
281   int len = ckTable.length();
282   for (int i=0; i<len; i++) {
283     delete ckTable[i];
284   }
285 }
286
287 void CkMemCheckPT::pup(PUP::er& p) 
288
289   CBase_CkMemCheckPT::pup(p); 
290   p|cpStarter;
291   p|thisFailedPe;
292   p|failedPes;
293   p|ckCheckPTGroupID;           // recover global variable
294   p|cpCallback;                 // store callback
295   p|where;                      // where to checkpoint
296   if (p.isUnpacking()) {
297     recvCount = peCount = 0;
298   }
299 }
300
301 // called by checkpoint mgr to restore an array element
302 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
303 {
304 #if CMK_MEM_CHECKPOINT
305   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
306   // m->index.print();
307   PUP::fromMem p(m->packData);
308   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
309   CmiAssert(mgr);
310   mgr->resume(m->index, p);
311
312   // find a list of array elements bound together
313   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
314   CmiAssert(elt);
315   CkLocRec_local *rec = elt->myRec;
316   CkVec<CkMigratable *> list;
317   mgr->migratableList(rec, list);
318   CmiAssert(list.length() > 0);
319   for (int l=0; l<list.length(); l++) {
320     elt = (ArrayElement *)list[l];
321     elt->budPEs[0] = m->bud1;
322     elt->budPEs[1] = m->bud2;
323     //    reset, may not needed now
324     // for now.
325     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
326       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
327       if (c) c->redNo = 0;
328     }
329   }
330 #endif
331 }
332
333 // return 1 if pe is a crashed processor
334 int CkMemCheckPT::isFailed(int pe)
335 {
336   for (int i=0; i<failedPes.length(); i++)
337     if (failedPes[i] == pe) return 1;
338   return 0;
339 }
340
341 // add pe into history list of all failed processors
342 void CkMemCheckPT::failed(int pe)
343 {
344   if (isFailed(pe)) return;
345   failedPes.push_back(pe);
346 }
347
348 int CkMemCheckPT::totalFailed()
349 {
350   return failedPes.length();
351 }
352
353 // create an checkpoint entry for array element of aid with index.
354 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
355 {
356   // error check, no duplicate
357   int idx, len = ckTable.size();
358   for (idx=0; idx<len; idx++) {
359     CkCheckPTInfo *entry = ckTable[idx];
360     if (index == entry->index) {
361       if (loc == entry->locMgr) {
362           // bindTo array elements
363           return;
364       }
365         // for array inheritance, the following check may fail
366         // because ArrayElement constructor of all superclasses are called
367       if (aid == entry->aid) {
368         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
369         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
370       }
371     }
372   }
373   CkCheckPTInfo *newEntry;
374   if (where == CkCheckPoint_inMEM)
375     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
376   else
377     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
378   ckTable.push_back(newEntry);
379   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
380 }
381
382 // loop through my checkpoint table and ask checkpointed array elements
383 // to send me checkpoint data.
384 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
385 {
386   cpCallback = cb;
387   cpStarter = starter;
388   if (CkMyPe() == cpStarter) {
389     startTime = CmiWallTimer();
390     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
391   }
392
393   int len = ckTable.length();
394   for (int i=0; i<len; i++) {
395     CkCheckPTInfo *entry = ckTable[i];
396       // always let the bigger number processor send request
397     if (CkMyPe() < entry->pNo) continue;
398       // call inmem_checkpoint to the array element, ask it to send
399       // back checkpoint data via recvData().
400     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
401     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
402   }
403     // if my table is empty, then I am done
404   if (len == 0) thisProxy[cpStarter].cpFinish();
405
406   // pack and send proc level data
407   sendProcData();
408 }
409
410 // don't handle array elements
411 static inline void _handleProcData(PUP::er &p)
412 {
413     // save readonlys, and callback BTW
414     CkPupROData(p);
415
416     // save mainchares into MainChares.dat
417     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
418         
419 #if CMK_FT_CHARE
420     // save non-migratable chare
421     CkPupChareData(p);
422 #endif
423
424     // save groups into Groups.dat
425     CkPupGroupData(p);
426
427     // save nodegroups into NodeGroups.dat
428     if(CkMyRank()==0) CkPupNodeGroupData(p);
429 }
430
431 void CkMemCheckPT::sendProcData()
432 {
433   // find out size of buffer
434   int size;
435   {
436     PUP::sizer p;
437     _handleProcData(p);
438     size = p.size();
439   }
440   int packSize = size;
441   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
442   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
443   {
444     PUP::toMem p(msg->packData);
445     _handleProcData(p);
446   }
447   msg->pe = CkMyPe();
448   msg->len = size;
449   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
450   thisProxy[ChkptOnPe()].recvProcData(msg);
451 }
452
453 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
454 {
455   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
456   CpvAccess(procChkptBuf) = msg;
457 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
458   thisProxy[msg->reportPe].cpFinish();
459 }
460
461 // ArrayElement call this function to give us the checkpointed data
462 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
463 {
464   int len = ckTable.length();
465   int idx;
466   for (idx=0; idx<len; idx++) {
467     CkCheckPTInfo *entry = ckTable[idx];
468     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
469   }
470   CkAssert(idx < len);
471   int isChkpting = msg->cp_flag;
472   ckTable[idx]->updateBuffer(msg);
473   if (isChkpting) {
474       // all my array elements have returned their inmem data
475       // inform starter processor that I am done.
476     recvCount ++;
477     if (recvCount == ckTable.length()) {
478       if (where == CkCheckPoint_inMEM) {
479         thisProxy[cpStarter].cpFinish();
480       }
481       else if (where == CkCheckPoint_inDISK) {
482         // another barrier for finalize the writing using fsync
483         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
484         contribute(0,NULL,CkReduction::sum_int,localcb);
485       }
486       else
487         CmiAbort("Unknown checkpoint scheme");
488       recvCount = 0;
489     } 
490   }
491 }
492
493 // only used in disk checkpointing
494 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
495 {
496   delete m;
497 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
498   system("sync");
499 #endif
500   thisProxy[cpStarter].cpFinish();
501 }
502
503 // only is called on cpStarter when checkpoint is done
504 void CkMemCheckPT::cpFinish()
505 {
506   CmiAssert(CkMyPe() == cpStarter);
507   peCount++;
508     // now that all processors have finished, activate callback
509   if (peCount == 2*(CkNumPes())) {
510     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
511     cpCallback.send();
512     peCount = 0;
513     thisProxy.report();
514   }
515 }
516
517 // for debugging, report checkpoint info
518 void CkMemCheckPT::report()
519 {
520   int objsize = 0;
521   int len = ckTable.length();
522   for (int i=0; i<len; i++) {
523     CkCheckPTInfo *entry = ckTable[i];
524     CmiAssert(entry);
525     objsize += entry->getSize();
526   }
527   CmiAssert(CpvAccess(procChkptBuf));
528   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
529 }
530
531 /*****************************************************************************
532                         RESTART Procedure
533 *****************************************************************************/
534
535 // master processor of two buddies
536 inline int CkMemCheckPT::isMaster(int buddype)
537 {
538   int mype = CkMyPe();
539 //CkPrintf("ismaster: %d %d\n", pe, mype);
540   if (CkNumPes() - totalFailed() == 2) {
541     return mype > buddype;
542   }
543   for (int i=1; i<CkNumPes(); i++) {
544     int me = (buddype+i)%CkNumPes();
545     if (isFailed(me)) continue;
546     if (me == mype) return 1;
547     else return 0;
548   }
549   return 0;
550 }
551
552 #ifdef CKLOCMGR_LOOP
553 #undef CKLOCMGR_LOOP
554 #endif
555
556 // loop over all CkLocMgr and do "code"
557 #define  CKLOCMGR_LOOP(code)    {       \
558   int numGroups = CkpvAccess(_groupIDTable)->size();    \
559   for(int i=0;i<numGroups;i++) {        \
560     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
561     if(obj->isLocMgr())  {      \
562       CkLocMgr *mgr = (CkLocMgr*)obj;   \
563       code      \
564     }   \
565   }     \
566  }
567
568 #if 0
569 // helper class to pup all elements that belong to same ckLocMgr
570 class ElementDestoryer : public CkLocIterator {
571 private:
572         CkLocMgr *locMgr;
573 public:
574         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
575         void addLocation(CkLocation &loc) {
576                 CkArrayIndexMax idx=loc.getIndex();
577                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
578                 loc.destroy();
579         }
580 };
581 #endif
582
583 // restore the bitmap vector for LB
584 void CkMemCheckPT::resetLB(int diepe)
585 {
586 #if CMK_LBDB_ON
587   int i;
588   char *bitmap = new char[CkNumPes()];
589   // set processor available bitmap
590   get_avail_vector(bitmap);
591
592   for (i=0; i<failedPes.length(); i++)
593     bitmap[failedPes[i]] = 0; 
594   bitmap[diepe] = 0;
595
596 #if CK_NO_PROC_POOL
597   set_avail_vector(bitmap);
598 #endif
599
600   // if I am the crashed pe, rebuild my failedPEs array
601   if (CkMyPe() == diepe)
602     for (i=0; i<CkNumPes(); i++) 
603       if (bitmap[i]==0) failed(i);
604
605   delete [] bitmap;
606 #endif
607 }
608
609 // in case when failedPe dies, everybody go through its checkpoint table:
610 // destory all array elements
611 // recover lost buddies
612 // reconstruct all array elements from check point data
613 // called on all processors
614 void CkMemCheckPT::restart(int diePe)
615 {
616 #if CMK_MEM_CHECKPOINT
617   double curTime = CmiWallTimer();
618   if (CkMyPe() == diePe)
619     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
620   stage = (char*)"resetLB";
621   startTime = curTime;
622   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
623
624 #if CK_NO_PROC_POOL
625   failed(diePe);        // add into the list of failed pes
626 #endif
627   thisFailedPe = diePe;
628
629   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
630
631   inRestarting = 1;
632                                                                                 
633   // disable load balancer's barrier
634   if (CkMyPe() != diePe) resetLB(diePe);
635
636   CKLOCMGR_LOOP(mgr->startInserting(););
637
638   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
639 #endif
640 }
641
642 // loally remove all array elements
643 void CkMemCheckPT::removeArrayElements()
644 {
645 #if CMK_MEM_CHECKPOINT
646   int len = ckTable.length();
647   double curTime = CmiWallTimer();
648   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
649   stage = (char*)"removeArrayElements";
650   startTime = curTime;
651
652   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
653   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
654
655   // get rid of all buffering and remote recs
656   // including destorying all array elements
657   CKLOCMGR_LOOP(mgr->flushAllRecs(););
658
659 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
660
661   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
662 #endif
663 }
664
665 // flush state in reduction manager
666 void CkMemCheckPT::resetReductionMgr()
667 {
668   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
669   int numGroups = CkpvAccess(_groupIDTable)->size();
670   for(int i=0;i<numGroups;i++) {
671     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
672     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
673     obj->flushStates();
674     obj->ckJustMigrated();
675   }
676   // reset again
677   //CpvAccess(_qd)->flushStates();
678
679   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
680 }
681
682 // recover the lost buddies
683 void CkMemCheckPT::recoverBuddies()
684 {
685   int idx;
686   int len = ckTable.length();
687   // ready to flush reduction manager
688   // cannot be CkMemCheckPT::restart because destory will modify states
689   double curTime = CmiWallTimer();
690   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
691   stage = (char *)"recoverBuddies";
692   startTime = curTime;
693
694   // recover buddies
695   for (idx=0; idx<len; idx++) {
696     CkCheckPTInfo *entry = ckTable[idx];
697     if (entry->pNo == thisFailedPe) {
698 #if CK_NO_PROC_POOL
699       // find a new buddy
700       int budPe = CkMyPe();
701 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
702       while (budPe == CkMyPe() || isFailed(budPe)) 
703           budPe = (budPe+1)%CkNumPes();
704       entry->pNo = budPe;
705 #else
706       int budPe = thisFailedPe;
707 #endif
708       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
709       CkArrayCheckPTMessage *msg = entry->getCopy();
710       msg->cp_flag = 0;            // not checkpointing
711       thisProxy[budPe].recvData(msg);
712     }
713   }
714
715   if (CkMyPe() == 0)
716     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
717 }
718
719 // restore array elements
720 void CkMemCheckPT::recoverArrayElements()
721 {
722   double curTime = CmiWallTimer();
723   int len = ckTable.length();
724   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
725   stage = (char *)"recoverArrayElements";
726   startTime = curTime;
727
728   // recover all array elements
729   int count = 0;
730   for (int idx=0; idx<len; idx++)
731   {
732     CkCheckPTInfo *entry = ckTable[idx];
733 #if CK_NO_PROC_POOL
734     // the bigger one will do 
735 //    if (CkMyPe() < entry->pNo) continue;
736     if (!isMaster(entry->pNo)) continue;
737 #else
738     // smaller one do it, which has the original object
739     if (CkMyPe() == entry->pNo+1 || 
740         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
741 #endif
742 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
743
744     entry->updateBuddy(CkMyPe(), entry->pNo);
745     CkArrayCheckPTMessage *msg = entry->getCopy();
746     // gzheng
747     //checkptMgr[CkMyPe()].inmem_restore(msg);
748     inmem_restore(msg);
749     count ++;
750   }
751 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
752
753   if (CkMyPe() == 0)
754     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
755 }
756
757 // on every processor
758 // turn load balancer back on
759 void CkMemCheckPT::finishUp()
760 {
761   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
762   CKLOCMGR_LOOP(mgr->doneInserting(););
763   
764   inRestarting = 0;
765
766   if (CkMyPe() == 0)
767   {
768        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
769        CkStartQD(cpCallback);
770   } 
771 #if CK_NO_PROC_POOL
772   if (CkNumPes()-totalFailed() <=2) {
773     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
774     _memChkptOn = 0;
775   }
776 #endif
777 }
778
779 // called only on 0
780 void CkMemCheckPT::quiescence(CkCallback &cb)
781 {
782   static int pe_count = 0;
783   pe_count ++;
784   CmiAssert(CkMyPe() == 0);
785   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
786   if (pe_count == CkNumPes()) {
787     pe_count = 0;
788     cb.send();
789   }
790 }
791
792 // User callable function - to start a checkpoint
793 // callback cb is used to pass control back
794 void CkStartMemCheckpoint(CkCallback &cb)
795 {
796 #if CMK_MEM_CHECKPOINT
797   if (_memChkptOn == 0) {
798     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
799     cb.send();
800     return;
801   }
802   if (CkInRestarting()) {
803       // trying to checkpointing during restart
804     cb.send();
805     return;
806   }
807     // store user callback and user data
808   CkMemCheckPT::cpCallback = cb;
809
810     // broadcast to start check pointing
811   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
812   checkptMgr.doItNow(CkMyPe(), cb);
813 #else
814   // when mem checkpoint is disabled, invike cb immediately
815   cb.send();
816 #endif
817 }
818
819 void CkRestartCheckPoint(int diePe)
820 {
821 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
822   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
823   // broadcast
824   checkptMgr.restart(diePe);
825 }
826
827 static int _diePE = -1;
828
829 // callback function used locally by ccs handler
830 static void CkRestartCheckPointCallback(void *ignore, void *msg)
831 {
832 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
833   CkRestartCheckPoint(_diePE);
834 }
835
836 // Converse function handles
837 static int askProcDataHandlerIdx;
838 static int restartBcastHandlerIdx;
839 static int recoverProcDataHandlerIdx;
840 static int restartBeginHandlerIdx;
841
842 static void restartBeginHandler(char *msg)
843 {
844 #if CMK_MEM_CHECKPOINT
845   static int count = 0;
846   CmiFree(msg);
847   CmiAssert(CkMyPe() == _diePE);
848   count ++;
849   if (count == CkNumPes()) {
850     CkRestartCheckPointCallback(NULL, NULL);
851     count = 0;
852   }
853 #endif
854 }
855
856 static void restartBcastHandler(char *msg)
857 {
858 #if CMK_MEM_CHECKPOINT
859   // advance phase counter
860   CkMemCheckPT::inRestarting = 1;
861   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
862   // gzheng
863   if (CkMyPe() != _diePE) cur_restart_phase ++;
864
865   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
866
867   // reset QD counters
868   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
869
870 /*  gzheng
871   if (CkMyPe()==_diePE)
872       CkRestartCheckPointCallback(NULL, NULL);
873 */
874   CmiFree(msg);
875
876   char restartmsg[CmiMsgHeaderSizeBytes];
877   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
878   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
879 #endif
880 }
881
882 extern void _initDone();
883
884 // called on crashed processor
885 static void recoverProcDataHandler(char *msg)
886 {
887 #if CMK_MEM_CHECKPOINT
888    int i;
889    envelope *env = (envelope *)msg;
890    CkUnpackMessage(&env);
891    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
892    cur_restart_phase = procMsg->cur_restart_phase;
893    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
894    cur_restart_phase ++;
895    CpvAccess(_qd)->flushStates();
896
897    // restore readonly, mainchare, group, nodegroup
898 //   int temp = cur_restart_phase;
899 //   cur_restart_phase = -1;
900    PUP::fromMem p(procMsg->packData);
901    _handleProcData(p);
902
903    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
904    // gzheng
905    CKLOCMGR_LOOP(mgr->startInserting(););
906
907    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
908    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
909    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
910    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
911    CmiFree(msg);
912
913    _initDone();
914 #endif
915 }
916
917 // called on its backup processor
918 // get backup message buffer and sent to crashed processor
919 static void askProcDataHandler(char *msg)
920 {
921 #if CMK_MEM_CHECKPOINT
922     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
923     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
924     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
925     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
926     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
927
928     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
929
930     CkPackMessage(&env);
931     CmiSetHandler(env, recoverProcDataHandlerIdx);
932     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
933     CpvAccess(procChkptBuf) = NULL;
934 #endif
935 }
936
937 void CkMemRestart(const char *dummy, CkArgMsg *args)
938 {
939 #if CMK_MEM_CHECKPOINT
940    _diePE = CkMyPe();
941    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
942    CkMemCheckPT::startTime = CmiWallTimer();
943    CkMemCheckPT::inRestarting = 1;
944    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
945    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
946    cur_restart_phase = 9999;             // big enough to get it processed
947    CmiSetHandler(msg, askProcDataHandlerIdx);
948    int pe = ChkptOnPe();
949    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
950    cur_restart_phase=0;    // allow all message to come in
951 #else
952    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
953 #endif
954 }
955
956 // can be called in other files
957 // return true if it is in restarting
958 int CkInRestarting()
959 {
960 #if CMK_MEM_CHECKPOINT
961   // gzheng
962   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
963   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
964   return CkMemCheckPT::inRestarting;
965 #else
966   return 0;
967 #endif
968 }
969
970 /*****************************************************************************
971                 module initialization
972 *****************************************************************************/
973
974 static int arg_where = CkCheckPoint_inMEM;
975
976 #if CMK_MEM_CHECKPOINT
977 void init_memcheckpt(char **argv)
978 {
979     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
980       arg_where = CkCheckPoint_inDISK;
981     }
982 }
983 #endif
984
985 class CkMemCheckPTInit: public Chare {
986 public:
987   CkMemCheckPTInit(CkArgMsg *m) {
988 #if CMK_MEM_CHECKPOINT
989     if (arg_where == CkCheckPoint_inDISK) {
990       CkPrintf("Charm++> Double-disk Checkpointing. \n");
991     }
992     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
993     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
994 #endif
995   }
996 };
997
998 // initproc
999 void CkRegisterRestartHandler( )
1000 {
1001 #if CMK_MEM_CHECKPOINT
1002   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1003   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1004   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1005   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1006
1007   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1008   CpvAccess(procChkptBuf) = NULL;
1009
1010 #if 1
1011   // for debugging
1012   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1013 //  sleep(4);
1014 #endif
1015 #endif
1016 }
1017
1018 /// @todo: the following definitions should be moved to a separate file containing
1019 // structures and functions about fault tolerance strategies
1020
1021 /**
1022  *  * @brief: function for killing a process                                             
1023  *   */
1024 #ifdef CMK_MEM_CHECKPOINT
1025 #if CMK_HAS_GETPID
1026 void killLocal(void *_dummy,double curWallTime){
1027         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1028         if(CmiWallTimer()<killTime-1){
1029                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1030         }else{  
1031                 kill(getpid(),SIGKILL);                                               
1032         }              
1033
1034 #else
1035 void killLocal(void *_dummy,double curWallTime){
1036   CmiAbort("kill() not supported!");
1037 }
1038 #endif
1039 #endif
1040
1041 #ifdef CMK_MEM_CHECKPOINT
1042 /**
1043  * @brief: reads the file with the kill information
1044  */
1045 void readKillFile(){
1046         FILE *fp=fopen(killFile,"r");
1047         if(!fp){
1048                 return;
1049         }
1050         int proc;
1051         double sec;
1052         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1053                 if(proc == CkMyPe()){
1054                         killTime = CmiWallTimer()+sec;
1055                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1056                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1057                 }
1058         }
1059         fclose(fp);
1060 }
1061 #endif
1062
1063 #include "CkMemCheckpoint.def.h"
1064
1065