when picking buddy processors, always find a processor on next *physical* processors...
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        1
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96 printf("CmiPhysicalNodeID %d rank: %d\n", CmiPhysicalNodeID(pe), CmiPhysicalRank(pe));
97   do {
98     budnode = (budnode+1)%CmiNumPhysicalNodes();
99     budpe = (CmiGetFirstPeOnPhysicalNode(budnode) + r1) % 
100                                    CmiNumPesOnPhysicalNode(budnode);
101 printf("CmiGetFirstPeOnPhysicalNode: %d CmiNumPesOnPhysicalNode: %d\n", CmiGetFirstPeOnPhysicalNode(budnode), CmiNumPesOnPhysicalNode(budnode));
102   } while (isFailed(budpe));
103   if (budpe == pe) {
104     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
105     CmiAbort("Failed to a buddy processor");
106   }
107 #else
108   budpe = pe;
109   while (budpe == pe || isFailed(budPe)) 
110           budPe = (budPe+1)%CkNumPes();
111 #endif
112   return budpe;
113 }
114
115 // called in array element constructor
116 // choose and register with 2 buddies for checkpoiting 
117 #if CMK_MEM_CHECKPOINT
118 void ArrayElement::init_checkpt() {
119         if (_memChkptOn == 0) return;
120         // only master init checkpoint
121         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
122
123         budPEs[0] = CkMyPe();
124         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
125         CmiAssert(budPEs[0] != budPEs[1]);
126         // inform checkPTMgr
127         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
128         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
129         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
130         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
131 }
132 #endif
133
134 // entry function invoked by checkpoint mgr asking for checkpoint data
135 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
136 #if CMK_MEM_CHECKPOINT
137   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
138   CkLocMgr *locMgr = thisArray->getLocMgr();
139   CmiAssert(myRec!=NULL);
140   int size;
141   {
142         PUP::sizer p;
143         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
144         size = p.size();
145   }
146   int packSize = size/sizeof(double) +1;
147   CkArrayCheckPTMessage *msg =
148                  new (packSize, 0) CkArrayCheckPTMessage;
149   msg->len = size;
150   msg->index =thisIndexMax;
151   msg->aid = thisArrayID;
152   msg->locMgr = locMgr->getGroupID();
153   msg->cp_flag = 1;
154   {
155         PUP::toMem p(msg->packData);
156         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
157   }
158
159   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
160   checkptMgr.recvData(msg, 2, budPEs);
161   delete m;
162 #endif
163 }
164
165 // checkpoint holder class - for memory checkpointing
166 class CkMemCheckPTInfo: public CkCheckPTInfo
167 {
168   CkArrayCheckPTMessage *ckBuffer;
169 public:
170   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
171                     CkCheckPTInfo(a, loc, idx, pno)
172   {
173     ckBuffer = NULL;
174   }
175   ~CkMemCheckPTInfo() 
176   {
177     if (ckBuffer) delete ckBuffer; 
178   }
179   inline void updateBuffer(CkArrayCheckPTMessage *data) 
180   {
181     if (ckBuffer) delete ckBuffer;
182     ckBuffer = data;
183   }    
184   inline CkArrayCheckPTMessage * getCopy()
185   {
186     if (ckBuffer == NULL) {
187       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
188       CmiAbort("Abort!");
189     }
190     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
191   }     
192   inline void updateBuddy(int b1, int b2) {
193      CmiAssert(ckBuffer);
194      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
195   }
196   inline int getSize() { 
197      CmiAssert(ckBuffer);
198      return ckBuffer->len; 
199   }
200 };
201
202 // checkpoint holder class - for in-disk checkpointing
203 class CkDiskCheckPTInfo: public CkCheckPTInfo 
204 {
205   char *fname;
206   int bud1, bud2;
207   int len;                      // checkpoint size
208 public:
209   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
210   {
211 #if CMK_USE_MKSTEMP
212     fname = new char[64];
213     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
214     mkstemp(fname);
215 #else
216     fname=tmpnam(NULL);
217 #endif
218     bud1 = bud2 = -1;
219     len = 0;
220   }
221   ~CkDiskCheckPTInfo() 
222   {
223     remove(fname);
224   }
225   inline void updateBuffer(CkArrayCheckPTMessage *data) 
226   {
227     double t = CmiWallTimer();
228     // unpack it
229     envelope *env = UsrToEnv(data);
230     CkUnpackMessage(&env);
231     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
232     FILE *f = fopen(fname,"wb");
233     PUP::toDisk p(f);
234     CkPupMessage(p, (void **)&data);
235     // delay sync to the end because otherwise the messages are blocked
236 //    fsync(fileno(f));
237     fclose(f);
238     bud1 = data->bud1;
239     bud2 = data->bud2;
240     len = data->len;
241     delete data;
242     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
243   }
244   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
245   {
246     CkArrayCheckPTMessage *data;
247     FILE *f = fopen(fname,"rb");
248     PUP::fromDisk p(f);
249     CkPupMessage(p, (void **)&data);
250     fclose(f);
251     data->bud1 = bud1;                          // update the buddies
252     data->bud2 = bud2;
253     return data;
254   }
255   inline void updateBuddy(int b1, int b2) {
256      bud1 = b1; bud2 = b2;
257   }
258   inline int getSize() { 
259      return len; 
260   }
261 };
262
263 CkMemCheckPT::CkMemCheckPT(int w)
264 {
265 #if CK_NO_PROC_POOL
266   if (CkNumPes() <= 2) {
267 #else
268   if (CkNumPes()  == 1) {
269 #endif
270     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
271     _memChkptOn = 0;
272   }
273   inRestarting = 0;
274   recvCount = peCount = 0;
275   where = w;
276 }
277
278 CkMemCheckPT::~CkMemCheckPT()
279 {
280   int len = ckTable.length();
281   for (int i=0; i<len; i++) {
282     delete ckTable[i];
283   }
284 }
285
286 void CkMemCheckPT::pup(PUP::er& p) 
287
288   CBase_CkMemCheckPT::pup(p); 
289   p|cpStarter;
290   p|thisFailedPe;
291   p|failedPes;
292   p|ckCheckPTGroupID;           // recover global variable
293   p|cpCallback;                 // store callback
294   p|where;                      // where to checkpoint
295   if (p.isUnpacking()) {
296     recvCount = peCount = 0;
297   }
298 }
299
300 // called by checkpoint mgr to restore an array element
301 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
302 {
303 #if CMK_MEM_CHECKPOINT
304   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
305   // m->index.print();
306   PUP::fromMem p(m->packData);
307   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
308   CmiAssert(mgr);
309   mgr->resume(m->index, p);
310
311   // find a list of array elements bound together
312   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
313   CmiAssert(elt);
314   CkLocRec_local *rec = elt->myRec;
315   CkVec<CkMigratable *> list;
316   mgr->migratableList(rec, list);
317   CmiAssert(list.length() > 0);
318   for (int l=0; l<list.length(); l++) {
319     elt = (ArrayElement *)list[l];
320     elt->budPEs[0] = m->bud1;
321     elt->budPEs[1] = m->bud2;
322     //    reset, may not needed now
323     // for now.
324     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
325       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
326       if (c) c->redNo = 0;
327     }
328   }
329 #endif
330 }
331
332 // return 1 if pe is a crashed processor
333 int CkMemCheckPT::isFailed(int pe)
334 {
335   for (int i=0; i<failedPes.length(); i++)
336     if (failedPes[i] == pe) return 1;
337   return 0;
338 }
339
340 // add pe into history list of all failed processors
341 void CkMemCheckPT::failed(int pe)
342 {
343   if (isFailed(pe)) return;
344   failedPes.push_back(pe);
345 }
346
347 int CkMemCheckPT::totalFailed()
348 {
349   return failedPes.length();
350 }
351
352 // create an checkpoint entry for array element of aid with index.
353 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
354 {
355   // error check, no duplicate
356   int idx, len = ckTable.size();
357   for (idx=0; idx<len; idx++) {
358     CkCheckPTInfo *entry = ckTable[idx];
359     if (index == entry->index) {
360       if (loc == entry->locMgr) {
361           // bindTo array elements
362           return;
363       }
364         // for array inheritance, the following check may fail
365         // because ArrayElement constructor of all superclasses are called
366       if (aid == entry->aid) {
367         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
368         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
369       }
370     }
371   }
372   CkCheckPTInfo *newEntry;
373   if (where == CkCheckPoint_inMEM)
374     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
375   else
376     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
377   ckTable.push_back(newEntry);
378   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
379 }
380
381 // loop through my checkpoint table and ask checkpointed array elements
382 // to send me checkpoint data.
383 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
384 {
385   cpCallback = cb;
386   cpStarter = starter;
387   if (CkMyPe() == cpStarter) {
388     startTime = CmiWallTimer();
389     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
390   }
391
392   int len = ckTable.length();
393   for (int i=0; i<len; i++) {
394     CkCheckPTInfo *entry = ckTable[i];
395       // always let the bigger number processor send request
396     if (CkMyPe() < entry->pNo) continue;
397       // call inmem_checkpoint to the array element, ask it to send
398       // back checkpoint data via recvData().
399     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
400     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
401   }
402     // if my table is empty, then I am done
403   if (len == 0) thisProxy[cpStarter].cpFinish();
404
405   // pack and send proc level data
406   sendProcData();
407 }
408
409 // don't handle array elements
410 static inline void _handleProcData(PUP::er &p)
411 {
412     // save readonlys, and callback BTW
413     CkPupROData(p);
414
415     // save mainchares into MainChares.dat
416     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
417         
418 #if CMK_FT_CHARE
419     // save non-migratable chare
420     CkPupChareData(p);
421 #endif
422
423     // save groups into Groups.dat
424     CkPupGroupData(p);
425
426     // save nodegroups into NodeGroups.dat
427     if(CkMyRank()==0) CkPupNodeGroupData(p);
428 }
429
430 void CkMemCheckPT::sendProcData()
431 {
432   // find out size of buffer
433   int size;
434   {
435     PUP::sizer p;
436     _handleProcData(p);
437     size = p.size();
438   }
439   int packSize = size;
440   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
441   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
442   {
443     PUP::toMem p(msg->packData);
444     _handleProcData(p);
445   }
446   msg->pe = CkMyPe();
447   msg->len = size;
448   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
449   thisProxy[ChkptOnPe()].recvProcData(msg);
450 }
451
452 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
453 {
454   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
455   CpvAccess(procChkptBuf) = msg;
456 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
457   thisProxy[msg->reportPe].cpFinish();
458 }
459
460 // ArrayElement call this function to give us the checkpointed data
461 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
462 {
463   int len = ckTable.length();
464   int idx;
465   for (idx=0; idx<len; idx++) {
466     CkCheckPTInfo *entry = ckTable[idx];
467     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
468   }
469   CkAssert(idx < len);
470   int isChkpting = msg->cp_flag;
471   ckTable[idx]->updateBuffer(msg);
472   if (isChkpting) {
473       // all my array elements have returned their inmem data
474       // inform starter processor that I am done.
475     recvCount ++;
476     if (recvCount == ckTable.length()) {
477       if (where == CkCheckPoint_inMEM) {
478         thisProxy[cpStarter].cpFinish();
479       }
480       else if (where == CkCheckPoint_inDISK) {
481         // another barrier for finalize the writing using fsync
482         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
483         contribute(0,NULL,CkReduction::sum_int,localcb);
484       }
485       else
486         CmiAbort("Unknown checkpoint scheme");
487       recvCount = 0;
488     } 
489   }
490 }
491
492 // only used in disk checkpointing
493 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
494 {
495   delete m;
496 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
497   system("sync");
498 #endif
499   thisProxy[cpStarter].cpFinish();
500 }
501
502 // only is called on cpStarter when checkpoint is done
503 void CkMemCheckPT::cpFinish()
504 {
505   CmiAssert(CkMyPe() == cpStarter);
506   peCount++;
507     // now that all processors have finished, activate callback
508   if (peCount == 2*(CkNumPes())) {
509     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
510     cpCallback.send();
511     peCount = 0;
512     thisProxy.report();
513   }
514 }
515
516 // for debugging, report checkpoint info
517 void CkMemCheckPT::report()
518 {
519   int objsize = 0;
520   int len = ckTable.length();
521   for (int i=0; i<len; i++) {
522     CkCheckPTInfo *entry = ckTable[i];
523     CmiAssert(entry);
524     objsize += entry->getSize();
525   }
526   CmiAssert(CpvAccess(procChkptBuf));
527   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
528 }
529
530 /*****************************************************************************
531                         RESTART Procedure
532 *****************************************************************************/
533
534 // master processor of two buddies
535 inline int CkMemCheckPT::isMaster(int buddype)
536 {
537   int mype = CkMyPe();
538 //CkPrintf("ismaster: %d %d\n", pe, mype);
539   if (CkNumPes() - totalFailed() == 2) {
540     return mype > buddype;
541   }
542   for (int i=1; i<CkNumPes(); i++) {
543     int me = (buddype+i)%CkNumPes();
544     if (isFailed(me)) continue;
545     if (me == mype) return 1;
546     else return 0;
547   }
548   return 0;
549 }
550
551 #ifdef CKLOCMGR_LOOP
552 #undef CKLOCMGR_LOOP
553 #endif
554
555 // loop over all CkLocMgr and do "code"
556 #define  CKLOCMGR_LOOP(code)    {       \
557   int numGroups = CkpvAccess(_groupIDTable)->size();    \
558   for(int i=0;i<numGroups;i++) {        \
559     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
560     if(obj->isLocMgr())  {      \
561       CkLocMgr *mgr = (CkLocMgr*)obj;   \
562       code      \
563     }   \
564   }     \
565  }
566
567 #if 0
568 // helper class to pup all elements that belong to same ckLocMgr
569 class ElementDestoryer : public CkLocIterator {
570 private:
571         CkLocMgr *locMgr;
572 public:
573         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
574         void addLocation(CkLocation &loc) {
575                 CkArrayIndexMax idx=loc.getIndex();
576                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
577                 loc.destroy();
578         }
579 };
580 #endif
581
582 // restore the bitmap vector for LB
583 void CkMemCheckPT::resetLB(int diepe)
584 {
585 #if CMK_LBDB_ON
586   int i;
587   char *bitmap = new char[CkNumPes()];
588   // set processor available bitmap
589   get_avail_vector(bitmap);
590
591   for (i=0; i<failedPes.length(); i++)
592     bitmap[failedPes[i]] = 0; 
593   bitmap[diepe] = 0;
594
595 #if CK_NO_PROC_POOL
596   set_avail_vector(bitmap);
597 #endif
598
599   // if I am the crashed pe, rebuild my failedPEs array
600   if (CkMyPe() == diepe)
601     for (i=0; i<CkNumPes(); i++) 
602       if (bitmap[i]==0) failed(i);
603
604   delete [] bitmap;
605 #endif
606 }
607
608 // in case when failedPe dies, everybody go through its checkpoint table:
609 // destory all array elements
610 // recover lost buddies
611 // reconstruct all array elements from check point data
612 // called on all processors
613 void CkMemCheckPT::restart(int diePe)
614 {
615 #if CMK_MEM_CHECKPOINT
616   double curTime = CmiWallTimer();
617   if (CkMyPe() == diePe)
618     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
619   stage = (char*)"resetLB";
620   startTime = curTime;
621   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
622
623 #if CK_NO_PROC_POOL
624   failed(diePe);        // add into the list of failed pes
625 #endif
626   thisFailedPe = diePe;
627
628   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
629
630   inRestarting = 1;
631                                                                                 
632   // disable load balancer's barrier
633   if (CkMyPe() != diePe) resetLB(diePe);
634
635   CKLOCMGR_LOOP(mgr->startInserting(););
636
637   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
638 #endif
639 }
640
641 // loally remove all array elements
642 void CkMemCheckPT::removeArrayElements()
643 {
644 #if CMK_MEM_CHECKPOINT
645   int len = ckTable.length();
646   double curTime = CmiWallTimer();
647   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
648   stage = (char*)"removeArrayElements";
649   startTime = curTime;
650
651   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
652   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
653
654   // get rid of all buffering and remote recs
655   // including destorying all array elements
656   CKLOCMGR_LOOP(mgr->flushAllRecs(););
657
658 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
659
660   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
661 #endif
662 }
663
664 // flush state in reduction manager
665 void CkMemCheckPT::resetReductionMgr()
666 {
667   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
668   int numGroups = CkpvAccess(_groupIDTable)->size();
669   for(int i=0;i<numGroups;i++) {
670     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
671     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
672     obj->flushStates();
673     obj->ckJustMigrated();
674   }
675   // reset again
676   //CpvAccess(_qd)->flushStates();
677
678   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
679 }
680
681 // recover the lost buddies
682 void CkMemCheckPT::recoverBuddies()
683 {
684   int idx;
685   int len = ckTable.length();
686   // ready to flush reduction manager
687   // cannot be CkMemCheckPT::restart because destory will modify states
688   double curTime = CmiWallTimer();
689   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
690   stage = (char *)"recoverBuddies";
691   startTime = curTime;
692
693   // recover buddies
694   for (idx=0; idx<len; idx++) {
695     CkCheckPTInfo *entry = ckTable[idx];
696     if (entry->pNo == thisFailedPe) {
697 #if CK_NO_PROC_POOL
698       // find a new buddy
699       int budPe = CkMyPe();
700 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
701       while (budPe == CkMyPe() || isFailed(budPe)) 
702           budPe = (budPe+1)%CkNumPes();
703       entry->pNo = budPe;
704 #else
705       int budPe = thisFailedPe;
706 #endif
707       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
708       CkArrayCheckPTMessage *msg = entry->getCopy();
709       msg->cp_flag = 0;            // not checkpointing
710       thisProxy[budPe].recvData(msg);
711     }
712   }
713
714   if (CkMyPe() == 0)
715     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
716 }
717
718 // restore array elements
719 void CkMemCheckPT::recoverArrayElements()
720 {
721   double curTime = CmiWallTimer();
722   int len = ckTable.length();
723   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
724   stage = (char *)"recoverArrayElements";
725   startTime = curTime;
726
727   // recover all array elements
728   int count = 0;
729   for (int idx=0; idx<len; idx++)
730   {
731     CkCheckPTInfo *entry = ckTable[idx];
732 #if CK_NO_PROC_POOL
733     // the bigger one will do 
734 //    if (CkMyPe() < entry->pNo) continue;
735     if (!isMaster(entry->pNo)) continue;
736 #else
737     // smaller one do it, which has the original object
738     if (CkMyPe() == entry->pNo+1 || 
739         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
740 #endif
741 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
742
743     entry->updateBuddy(CkMyPe(), entry->pNo);
744     CkArrayCheckPTMessage *msg = entry->getCopy();
745     // gzheng
746     //checkptMgr[CkMyPe()].inmem_restore(msg);
747     inmem_restore(msg);
748     count ++;
749   }
750 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
751
752   if (CkMyPe() == 0)
753     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
754 }
755
756 // on every processor
757 // turn load balancer back on
758 void CkMemCheckPT::finishUp()
759 {
760   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
761   CKLOCMGR_LOOP(mgr->doneInserting(););
762   
763   inRestarting = 0;
764
765   if (CkMyPe() == 0)
766   {
767        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
768        CkStartQD(cpCallback);
769   } 
770 #if CK_NO_PROC_POOL
771   if (CkNumPes()-totalFailed() <=2) {
772     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
773     _memChkptOn = 0;
774   }
775 #endif
776 }
777
778 // called only on 0
779 void CkMemCheckPT::quiescence(CkCallback &cb)
780 {
781   static int pe_count = 0;
782   pe_count ++;
783   CmiAssert(CkMyPe() == 0);
784   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
785   if (pe_count == CkNumPes()) {
786     pe_count = 0;
787     cb.send();
788   }
789 }
790
791 // User callable function - to start a checkpoint
792 // callback cb is used to pass control back
793 void CkStartMemCheckpoint(CkCallback &cb)
794 {
795 #if CMK_MEM_CHECKPOINT
796   if (_memChkptOn == 0) {
797     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
798     cb.send();
799     return;
800   }
801   if (CkInRestarting()) {
802       // trying to checkpointing during restart
803     cb.send();
804     return;
805   }
806     // store user callback and user data
807   CkMemCheckPT::cpCallback = cb;
808
809     // broadcast to start check pointing
810   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
811   checkptMgr.doItNow(CkMyPe(), cb);
812 #else
813   // when mem checkpoint is disabled, invike cb immediately
814   cb.send();
815 #endif
816 }
817
818 void CkRestartCheckPoint(int diePe)
819 {
820 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
821   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
822   // broadcast
823   checkptMgr.restart(diePe);
824 }
825
826 static int _diePE = -1;
827
828 // callback function used locally by ccs handler
829 static void CkRestartCheckPointCallback(void *ignore, void *msg)
830 {
831 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
832   CkRestartCheckPoint(_diePE);
833 }
834
835 // Converse function handles
836 static int askProcDataHandlerIdx;
837 static int restartBcastHandlerIdx;
838 static int recoverProcDataHandlerIdx;
839 static int restartBeginHandlerIdx;
840
841 static void restartBeginHandler(char *msg)
842 {
843 #if CMK_MEM_CHECKPOINT
844   static int count = 0;
845   CmiFree(msg);
846   CmiAssert(CkMyPe() == _diePE);
847   count ++;
848   if (count == CkNumPes()) {
849     CkRestartCheckPointCallback(NULL, NULL);
850     count = 0;
851   }
852 #endif
853 }
854
855 static void restartBcastHandler(char *msg)
856 {
857 #if CMK_MEM_CHECKPOINT
858   // advance phase counter
859   CkMemCheckPT::inRestarting = 1;
860   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
861   // gzheng
862   if (CkMyPe() != _diePE) cur_restart_phase ++;
863
864   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
865
866   // reset QD counters
867   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
868
869 /*  gzheng
870   if (CkMyPe()==_diePE)
871       CkRestartCheckPointCallback(NULL, NULL);
872 */
873   CmiFree(msg);
874
875   char restartmsg[CmiMsgHeaderSizeBytes];
876   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
877   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
878 #endif
879 }
880
881 extern void _initDone();
882
883 // called on crashed processor
884 static void recoverProcDataHandler(char *msg)
885 {
886 #if CMK_MEM_CHECKPOINT
887    int i;
888    envelope *env = (envelope *)msg;
889    CkUnpackMessage(&env);
890    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
891    cur_restart_phase = procMsg->cur_restart_phase;
892    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
893    cur_restart_phase ++;
894    CpvAccess(_qd)->flushStates();
895
896    // restore readonly, mainchare, group, nodegroup
897 //   int temp = cur_restart_phase;
898 //   cur_restart_phase = -1;
899    PUP::fromMem p(procMsg->packData);
900    _handleProcData(p);
901
902    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
903    // gzheng
904    CKLOCMGR_LOOP(mgr->startInserting(););
905
906    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
907    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
908    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
909    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
910    CmiFree(msg);
911
912    _initDone();
913 #endif
914 }
915
916 // called on its backup processor
917 // get backup message buffer and sent to crashed processor
918 static void askProcDataHandler(char *msg)
919 {
920 #if CMK_MEM_CHECKPOINT
921     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
922     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
923     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
924     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
925     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
926
927     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
928
929     CkPackMessage(&env);
930     CmiSetHandler(env, recoverProcDataHandlerIdx);
931     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
932     CpvAccess(procChkptBuf) = NULL;
933 #endif
934 }
935
936 void CkMemRestart(const char *dummy, CkArgMsg *args)
937 {
938 #if CMK_MEM_CHECKPOINT
939    _diePE = CkMyPe();
940    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
941    CkMemCheckPT::startTime = CmiWallTimer();
942    CkMemCheckPT::inRestarting = 1;
943    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
944    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
945    cur_restart_phase = 9999;             // big enough to get it processed
946    CmiSetHandler(msg, askProcDataHandlerIdx);
947    int pe = ChkptOnPe();
948    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
949    cur_restart_phase=0;    // allow all message to come in
950 #else
951    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
952 #endif
953 }
954
955 // can be called in other files
956 // return true if it is in restarting
957 int CkInRestarting()
958 {
959 #if CMK_MEM_CHECKPOINT
960   // gzheng
961   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
962   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
963   return CkMemCheckPT::inRestarting;
964 #else
965   return 0;
966 #endif
967 }
968
969 /*****************************************************************************
970                 module initialization
971 *****************************************************************************/
972
973 static int arg_where = CkCheckPoint_inMEM;
974
975 #if CMK_MEM_CHECKPOINT
976 void init_memcheckpt(char **argv)
977 {
978     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
979       arg_where = CkCheckPoint_inDISK;
980     }
981 }
982 #endif
983
984 class CkMemCheckPTInit: public Chare {
985 public:
986   CkMemCheckPTInit(CkArgMsg *m) {
987 #if CMK_MEM_CHECKPOINT
988     if (arg_where == CkCheckPoint_inDISK) {
989       CkPrintf("Charm++> Double-disk Checkpointing. \n");
990     }
991     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
992     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
993 #endif
994   }
995 };
996
997 // initproc
998 void CkRegisterRestartHandler( )
999 {
1000 #if CMK_MEM_CHECKPOINT
1001   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1002   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1003   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1004   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1005
1006   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1007   CpvAccess(procChkptBuf) = NULL;
1008
1009 #if 1
1010   // for debugging
1011   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1012 //  sleep(4);
1013 #endif
1014 #endif
1015 }
1016
1017 /// @todo: the following definitions should be moved to a separate file containing
1018 // structures and functions about fault tolerance strategies
1019
1020 /**
1021  *  * @brief: function for killing a process                                             
1022  *   */
1023 #ifdef CMK_MEM_CHECKPOINT
1024 #if CMK_HAS_GETPID
1025 void killLocal(void *_dummy,double curWallTime){
1026         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1027         if(CmiWallTimer()<killTime-1){
1028                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1029         }else{  
1030                 kill(getpid(),SIGKILL);                                               
1031         }              
1032
1033 #else
1034 void killLocal(void *_dummy,double curWallTime){
1035   CmiAbort("kill() not supported!");
1036 }
1037 #endif
1038 #endif
1039
1040 #ifdef CMK_MEM_CHECKPOINT
1041 /**
1042  * @brief: reads the file with the kill information
1043  */
1044 void readKillFile(){
1045         FILE *fp=fopen(killFile,"r");
1046         if(!fp){
1047                 return;
1048         }
1049         int proc;
1050         double sec;
1051         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1052                 if(proc == CkMyPe()){
1053                         killTime = CmiWallTimer()+sec;
1054                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1055                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1056                 }
1057         }
1058         fclose(fp);
1059 }
1060 #endif
1061
1062 #include "CkMemCheckpoint.def.h"
1063
1064