disable CmiBarrier in TimerInit when doing in-mem restart from checkpoint. This won...
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        0
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on the next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96   do {
97     budnode = (budnode+1)%CmiNumPhysicalNodes();
98     int *pelist;
99     int num;
100     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
101     budpe = pelist[r1 % num];
102   } while (isFailed(budpe));
103   if (budpe == pe) {
104     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
105     CmiAbort("Failed to find a buddy processor");
106   }
107 #else
108   budpe = pe;
109   while (budpe == pe || isFailed(budpe)) 
110           budpe = (budpe+1)%CkNumPes();
111 #endif
112   return budpe;
113 }
114
115 // called in array element constructor
116 // choose and register with 2 buddies for checkpoiting 
117 #if CMK_MEM_CHECKPOINT
118 void ArrayElement::init_checkpt() {
119         if (_memChkptOn == 0) return;
120         if (CkInRestarting()) {
121           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
122         }
123         // only master init checkpoint
124         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
125
126         budPEs[0] = CkMyPe();
127         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
128         CmiAssert(budPEs[0] != budPEs[1]);
129         // inform checkPTMgr
130         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
131         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
132         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
133         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
134 }
135 #endif
136
137 // entry function invoked by checkpoint mgr asking for checkpoint data
138 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
139 #if CMK_MEM_CHECKPOINT
140   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
141   CkLocMgr *locMgr = thisArray->getLocMgr();
142   CmiAssert(myRec!=NULL);
143   int size;
144   {
145         PUP::sizer p;
146         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
147         size = p.size();
148   }
149   int packSize = size/sizeof(double) +1;
150   CkArrayCheckPTMessage *msg =
151                  new (packSize, 0) CkArrayCheckPTMessage;
152   msg->len = size;
153   msg->index =thisIndexMax;
154   msg->aid = thisArrayID;
155   msg->locMgr = locMgr->getGroupID();
156   msg->cp_flag = 1;
157   {
158         PUP::toMem p(msg->packData);
159         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
160   }
161
162   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
163   checkptMgr.recvData(msg, 2, budPEs);
164   delete m;
165 #endif
166 }
167
168 // checkpoint holder class - for memory checkpointing
169 class CkMemCheckPTInfo: public CkCheckPTInfo
170 {
171   CkArrayCheckPTMessage *ckBuffer;
172 public:
173   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
174                     CkCheckPTInfo(a, loc, idx, pno)
175   {
176     ckBuffer = NULL;
177   }
178   ~CkMemCheckPTInfo() 
179   {
180     if (ckBuffer) delete ckBuffer; 
181   }
182   inline void updateBuffer(CkArrayCheckPTMessage *data) 
183   {
184     CmiAssert(data!=NULL);
185     if (ckBuffer) delete ckBuffer;
186     ckBuffer = data;
187   }    
188   inline CkArrayCheckPTMessage * getCopy()
189   {
190     if (ckBuffer == NULL) {
191       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
192       CmiAbort("Abort!");
193     }
194     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
195   }     
196   inline void updateBuddy(int b1, int b2) {
197      CmiAssert(ckBuffer);
198      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
199   }
200   inline int getSize() { 
201      CmiAssert(ckBuffer);
202      return ckBuffer->len; 
203   }
204 };
205
206 // checkpoint holder class - for in-disk checkpointing
207 class CkDiskCheckPTInfo: public CkCheckPTInfo 
208 {
209   char *fname;
210   int bud1, bud2;
211   int len;                      // checkpoint size
212 public:
213   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
214   {
215 #if CMK_USE_MKSTEMP
216     fname = new char[64];
217     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
218     mkstemp(fname);
219 #else
220     fname=tmpnam(NULL);
221 #endif
222     bud1 = bud2 = -1;
223     len = 0;
224   }
225   ~CkDiskCheckPTInfo() 
226   {
227     remove(fname);
228   }
229   inline void updateBuffer(CkArrayCheckPTMessage *data) 
230   {
231     double t = CmiWallTimer();
232     // unpack it
233     envelope *env = UsrToEnv(data);
234     CkUnpackMessage(&env);
235     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
236     FILE *f = fopen(fname,"wb");
237     PUP::toDisk p(f);
238     CkPupMessage(p, (void **)&data);
239     // delay sync to the end because otherwise the messages are blocked
240 //    fsync(fileno(f));
241     fclose(f);
242     bud1 = data->bud1;
243     bud2 = data->bud2;
244     len = data->len;
245     delete data;
246     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
247   }
248   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
249   {
250     CkArrayCheckPTMessage *data;
251     FILE *f = fopen(fname,"rb");
252     PUP::fromDisk p(f);
253     CkPupMessage(p, (void **)&data);
254     fclose(f);
255     data->bud1 = bud1;                          // update the buddies
256     data->bud2 = bud2;
257     return data;
258   }
259   inline void updateBuddy(int b1, int b2) {
260      bud1 = b1; bud2 = b2;
261   }
262   inline int getSize() { 
263      return len; 
264   }
265 };
266
267 CkMemCheckPT::CkMemCheckPT(int w)
268 {
269   int numnodes = 0;
270 #if NODE_CHECKPOINT
271   numnodes = CmiNumPhysicalNodes();
272 #else
273   numnodes = CkNumPes();
274 #endif
275 #if CK_NO_PROC_POOL
276   if (numnodes <= 2)
277 #else
278   if (numnodes  == 1)
279 #endif
280   {
281     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
282     _memChkptOn = 0;
283   }
284   inRestarting = 0;
285   recvCount = peCount = 0;
286   where = w;
287 }
288
289 CkMemCheckPT::~CkMemCheckPT()
290 {
291   int len = ckTable.length();
292   for (int i=0; i<len; i++) {
293     delete ckTable[i];
294   }
295 }
296
297 void CkMemCheckPT::pup(PUP::er& p) 
298
299   CBase_CkMemCheckPT::pup(p); 
300   p|cpStarter;
301   p|thisFailedPe;
302   p|failedPes;
303   p|ckCheckPTGroupID;           // recover global variable
304   p|cpCallback;                 // store callback
305   p|where;                      // where to checkpoint
306   if (p.isUnpacking()) {
307     recvCount = peCount = 0;
308   }
309 }
310
311 // called by checkpoint mgr to restore an array element
312 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
313 {
314 #if CMK_MEM_CHECKPOINT
315   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
316   // m->index.print();
317   PUP::fromMem p(m->packData);
318   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
319   CmiAssert(mgr);
320   mgr->resume(m->index, p);
321
322   // find a list of array elements bound together
323   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
324   CmiAssert(elt);
325   CkLocRec_local *rec = elt->myRec;
326   CkVec<CkMigratable *> list;
327   mgr->migratableList(rec, list);
328   CmiAssert(list.length() > 0);
329   for (int l=0; l<list.length(); l++) {
330     elt = (ArrayElement *)list[l];
331     elt->budPEs[0] = m->bud1;
332     elt->budPEs[1] = m->bud2;
333     //    reset, may not needed now
334     // for now.
335     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
336       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
337       if (c) c->redNo = 0;
338     }
339   }
340 #endif
341 }
342
343 // return 1 if pe is a crashed processor
344 int CkMemCheckPT::isFailed(int pe)
345 {
346   for (int i=0; i<failedPes.length(); i++)
347     if (failedPes[i] == pe) return 1;
348   return 0;
349 }
350
351 // add pe into history list of all failed processors
352 void CkMemCheckPT::failed(int pe)
353 {
354   if (isFailed(pe)) return;
355   failedPes.push_back(pe);
356 }
357
358 int CkMemCheckPT::totalFailed()
359 {
360   return failedPes.length();
361 }
362
363 // create an checkpoint entry for array element of aid with index.
364 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
365 {
366   // error check, no duplicate
367   int idx, len = ckTable.size();
368   for (idx=0; idx<len; idx++) {
369     CkCheckPTInfo *entry = ckTable[idx];
370     if (index == entry->index) {
371       if (loc == entry->locMgr) {
372           // bindTo array elements
373           return;
374       }
375         // for array inheritance, the following check may fail
376         // because ArrayElement constructor of all superclasses are called
377       if (aid == entry->aid) {
378         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
379         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
380       }
381     }
382   }
383   CkCheckPTInfo *newEntry;
384   if (where == CkCheckPoint_inMEM)
385     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
386   else
387     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
388   ckTable.push_back(newEntry);
389   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
390 }
391
392 // loop through my checkpoint table and ask checkpointed array elements
393 // to send me checkpoint data.
394 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
395 {
396   cpCallback = cb;
397   cpStarter = starter;
398   if (CkMyPe() == cpStarter) {
399     startTime = CmiWallTimer();
400     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
401   }
402
403   int len = ckTable.length();
404   for (int i=0; i<len; i++) {
405     CkCheckPTInfo *entry = ckTable[i];
406       // always let the bigger number processor send request
407     if (CkMyPe() < entry->pNo) continue;
408       // call inmem_checkpoint to the array element, ask it to send
409       // back checkpoint data via recvData().
410     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
411     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
412   }
413     // if my table is empty, then I am done
414   if (len == 0) thisProxy[cpStarter].cpFinish();
415
416   // pack and send proc level data
417   sendProcData();
418 }
419
420 // don't handle array elements
421 static inline void _handleProcData(PUP::er &p)
422 {
423     // save readonlys, and callback BTW
424     CkPupROData(p);
425
426     // save mainchares into MainChares.dat
427     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
428         
429 #ifndef CMK_CHARE_USE_PTR
430     // save non-migratable chare
431     CkPupChareData(p);
432 #endif
433
434     // save groups into Groups.dat
435     CkPupGroupData(p);
436
437     // save nodegroups into NodeGroups.dat
438     if(CkMyRank()==0) CkPupNodeGroupData(p);
439 }
440
441 void CkMemCheckPT::sendProcData()
442 {
443   // find out size of buffer
444   int size;
445   {
446     PUP::sizer p;
447     _handleProcData(p);
448     size = p.size();
449   }
450   int packSize = size;
451   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
452   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
453   {
454     PUP::toMem p(msg->packData);
455     _handleProcData(p);
456   }
457   msg->pe = CkMyPe();
458   msg->len = size;
459   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
460   thisProxy[ChkptOnPe()].recvProcData(msg);
461 }
462
463 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
464 {
465   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
466   CpvAccess(procChkptBuf) = msg;
467   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
468   thisProxy[msg->reportPe].cpFinish();
469 }
470
471 // ArrayElement call this function to give us the checkpointed data
472 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
473 {
474   int len = ckTable.length();
475   int idx;
476   for (idx=0; idx<len; idx++) {
477     CkCheckPTInfo *entry = ckTable[idx];
478     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
479   }
480   CkAssert(idx < len);
481   int isChkpting = msg->cp_flag;
482   ckTable[idx]->updateBuffer(msg);
483   if (isChkpting) {
484       // all my array elements have returned their inmem data
485       // inform starter processor that I am done.
486     recvCount ++;
487     if (recvCount == ckTable.length()) {
488       if (where == CkCheckPoint_inMEM) {
489         thisProxy[cpStarter].cpFinish();
490       }
491       else if (where == CkCheckPoint_inDISK) {
492         // another barrier for finalize the writing using fsync
493         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
494         contribute(0,NULL,CkReduction::sum_int,localcb);
495       }
496       else
497         CmiAbort("Unknown checkpoint scheme");
498       recvCount = 0;
499     } 
500   }
501 }
502
503 // only used in disk checkpointing
504 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
505 {
506   delete m;
507 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
508   system("sync");
509 #endif
510   thisProxy[cpStarter].cpFinish();
511 }
512
513 // only is called on cpStarter when checkpoint is done
514 void CkMemCheckPT::cpFinish()
515 {
516   CmiAssert(CkMyPe() == cpStarter);
517   peCount++;
518     // now that all processors have finished, activate callback
519   if (peCount == 2*(CkNumPes())) {
520     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
521     cpCallback.send();
522     peCount = 0;
523     thisProxy.report();
524   }
525 }
526
527 // for debugging, report checkpoint info
528 void CkMemCheckPT::report()
529 {
530   int objsize = 0;
531   int len = ckTable.length();
532   for (int i=0; i<len; i++) {
533     CkCheckPTInfo *entry = ckTable[i];
534     CmiAssert(entry);
535     objsize += entry->getSize();
536   }
537   CmiAssert(CpvAccess(procChkptBuf));
538   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
539 }
540
541 /*****************************************************************************
542                         RESTART Procedure
543 *****************************************************************************/
544
545 // master processor of two buddies
546 inline int CkMemCheckPT::isMaster(int buddype)
547 {
548   int mype = CkMyPe();
549 //CkPrintf("ismaster: %d %d\n", pe, mype);
550   if (CkNumPes() - totalFailed() == 2) {
551     return mype > buddype;
552   }
553   for (int i=1; i<CkNumPes(); i++) {
554     int me = (buddype+i)%CkNumPes();
555     if (isFailed(me)) continue;
556     if (me == mype) return 1;
557     else return 0;
558   }
559   return 0;
560 }
561
562 #ifdef CKLOCMGR_LOOP
563 #undef CKLOCMGR_LOOP
564 #endif
565
566 // loop over all CkLocMgr and do "code"
567 #define  CKLOCMGR_LOOP(code)    {       \
568   int numGroups = CkpvAccess(_groupIDTable)->size();    \
569   for(int i=0;i<numGroups;i++) {        \
570     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
571     if(obj->isLocMgr())  {      \
572       CkLocMgr *mgr = (CkLocMgr*)obj;   \
573       code      \
574     }   \
575   }     \
576  }
577
578 #if 0
579 // helper class to pup all elements that belong to same ckLocMgr
580 class ElementDestoryer : public CkLocIterator {
581 private:
582         CkLocMgr *locMgr;
583 public:
584         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
585         void addLocation(CkLocation &loc) {
586                 CkArrayIndexMax idx=loc.getIndex();
587                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
588                 loc.destroy();
589         }
590 };
591 #endif
592
593 // restore the bitmap vector for LB
594 void CkMemCheckPT::resetLB(int diepe)
595 {
596 #if CMK_LBDB_ON
597   int i;
598   char *bitmap = new char[CkNumPes()];
599   // set processor available bitmap
600   get_avail_vector(bitmap);
601
602   for (i=0; i<failedPes.length(); i++)
603     bitmap[failedPes[i]] = 0; 
604   bitmap[diepe] = 0;
605
606 #if CK_NO_PROC_POOL
607   set_avail_vector(bitmap);
608 #endif
609
610   // if I am the crashed pe, rebuild my failedPEs array
611   if (CkMyPe() == diepe)
612     for (i=0; i<CkNumPes(); i++) 
613       if (bitmap[i]==0) failed(i);
614
615   delete [] bitmap;
616 #endif
617 }
618
619 // in case when failedPe dies, everybody go through its checkpoint table:
620 // destory all array elements
621 // recover lost buddies
622 // reconstruct all array elements from check point data
623 // called on all processors
624 void CkMemCheckPT::restart(int diePe)
625 {
626 #if CMK_MEM_CHECKPOINT
627   double curTime = CmiWallTimer();
628   if (CkMyPe() == diePe)
629     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
630   stage = (char*)"resetLB";
631   startTime = curTime;
632   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
633
634 #if CK_NO_PROC_POOL
635   failed(diePe);        // add into the list of failed pes
636 #endif
637   thisFailedPe = diePe;
638
639   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
640
641   inRestarting = 1;
642                                                                                 
643   // disable load balancer's barrier
644   if (CkMyPe() != diePe) resetLB(diePe);
645
646   CKLOCMGR_LOOP(mgr->startInserting(););
647
648   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
649 #endif
650 }
651
652 // loally remove all array elements
653 void CkMemCheckPT::removeArrayElements()
654 {
655 #if CMK_MEM_CHECKPOINT
656   int len = ckTable.length();
657   double curTime = CmiWallTimer();
658   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
659   stage = (char*)"removeArrayElements";
660   startTime = curTime;
661
662   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
663   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
664
665   // get rid of all buffering and remote recs
666   // including destorying all array elements
667   CKLOCMGR_LOOP(mgr->flushAllRecs(););
668
669 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
670
671   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
672 #endif
673 }
674
675 // flush state in reduction manager
676 void CkMemCheckPT::resetReductionMgr()
677 {
678   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
679   int numGroups = CkpvAccess(_groupIDTable)->size();
680   for(int i=0;i<numGroups;i++) {
681     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
682     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
683     obj->flushStates();
684     obj->ckJustMigrated();
685   }
686   // reset again
687   //CpvAccess(_qd)->flushStates();
688
689   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
690 }
691
692 // recover the lost buddies
693 void CkMemCheckPT::recoverBuddies()
694 {
695   int idx;
696   int len = ckTable.length();
697   // ready to flush reduction manager
698   // cannot be CkMemCheckPT::restart because destory will modify states
699   double curTime = CmiWallTimer();
700   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
701   stage = (char *)"recoverBuddies";
702   startTime = curTime;
703
704   // recover buddies
705   for (idx=0; idx<len; idx++) {
706     CkCheckPTInfo *entry = ckTable[idx];
707     if (entry->pNo == thisFailedPe) {
708 #if CK_NO_PROC_POOL
709       // find a new buddy
710       int budPe = CkMyPe();
711 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
712       while (budPe == CkMyPe() || isFailed(budPe)) 
713           budPe = (budPe+1)%CkNumPes();
714       entry->pNo = budPe;
715 #else
716       int budPe = thisFailedPe;
717 #endif
718       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
719       CkArrayCheckPTMessage *msg = entry->getCopy();
720       msg->cp_flag = 0;            // not checkpointing
721       thisProxy[budPe].recvData(msg);
722     }
723   }
724
725   if (CkMyPe() == 0)
726     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
727 }
728
729 // restore array elements
730 void CkMemCheckPT::recoverArrayElements()
731 {
732   double curTime = CmiWallTimer();
733   int len = ckTable.length();
734   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
735   stage = (char *)"recoverArrayElements";
736   startTime = curTime;
737
738   // recover all array elements
739   int count = 0;
740   for (int idx=0; idx<len; idx++)
741   {
742     CkCheckPTInfo *entry = ckTable[idx];
743 #if CK_NO_PROC_POOL
744     // the bigger one will do 
745 //    if (CkMyPe() < entry->pNo) continue;
746     if (!isMaster(entry->pNo)) continue;
747 #else
748     // smaller one do it, which has the original object
749     if (CkMyPe() == entry->pNo+1 || 
750         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
751 #endif
752 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
753
754     entry->updateBuddy(CkMyPe(), entry->pNo);
755     CkArrayCheckPTMessage *msg = entry->getCopy();
756     // gzheng
757     //checkptMgr[CkMyPe()].inmem_restore(msg);
758     inmem_restore(msg);
759     count ++;
760   }
761   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
762
763   if (CkMyPe() == 0)
764     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
765 }
766
767 // on every processor
768 // turn load balancer back on
769 void CkMemCheckPT::finishUp()
770 {
771   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
772   CKLOCMGR_LOOP(mgr->doneInserting(););
773   
774   inRestarting = 0;
775
776   if (CkMyPe() == 0)
777   {
778        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
779        CkStartQD(cpCallback);
780   } 
781 #if CK_NO_PROC_POOL
782 #if NODE_CHECKPOINT
783   int numnodes = CmiNumPhysicalNodes();
784 #else
785   int numnodes = CkNumPes();
786 #endif
787   if (numnodes-totalFailed() <=2) {
788     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
789     _memChkptOn = 0;
790   }
791 #endif
792 }
793
794 // called only on 0
795 void CkMemCheckPT::quiescence(CkCallback &cb)
796 {
797   static int pe_count = 0;
798   pe_count ++;
799   CmiAssert(CkMyPe() == 0);
800   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
801   if (pe_count == CkNumPes()) {
802     pe_count = 0;
803     cb.send();
804   }
805 }
806
807 // User callable function - to start a checkpoint
808 // callback cb is used to pass control back
809 void CkStartMemCheckpoint(CkCallback &cb)
810 {
811 #if CMK_MEM_CHECKPOINT
812   if (_memChkptOn == 0) {
813     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
814     cb.send();
815     return;
816   }
817   if (CkInRestarting()) {
818       // trying to checkpointing during restart
819     cb.send();
820     return;
821   }
822     // store user callback and user data
823   CkMemCheckPT::cpCallback = cb;
824
825     // broadcast to start check pointing
826   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
827   checkptMgr.doItNow(CkMyPe(), cb);
828 #else
829   // when mem checkpoint is disabled, invike cb immediately
830   cb.send();
831 #endif
832 }
833
834 void CkRestartCheckPoint(int diePe)
835 {
836 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
837   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
838   // broadcast
839   checkptMgr.restart(diePe);
840 }
841
842 static int _diePE = -1;
843
844 // callback function used locally by ccs handler
845 static void CkRestartCheckPointCallback(void *ignore, void *msg)
846 {
847 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
848   CkRestartCheckPoint(_diePE);
849 }
850
851 // Converse function handles
852 static int askProcDataHandlerIdx;
853 static int restartBcastHandlerIdx;
854 static int recoverProcDataHandlerIdx;
855 static int restartBeginHandlerIdx;
856
857 static void restartBeginHandler(char *msg)
858 {
859 #if CMK_MEM_CHECKPOINT
860   static int count = 0;
861   CmiFree(msg);
862   CmiAssert(CkMyPe() == _diePE);
863   count ++;
864   if (count == CkNumPes()) {
865     CkRestartCheckPointCallback(NULL, NULL);
866     count = 0;
867   }
868 #endif
869 }
870
871 static void restartBcastHandler(char *msg)
872 {
873 #if CMK_MEM_CHECKPOINT
874   // advance phase counter
875   CkMemCheckPT::inRestarting = 1;
876   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
877   // gzheng
878   if (CkMyPe() != _diePE) cur_restart_phase ++;
879
880   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
881
882   // reset QD counters
883   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
884
885 /*  gzheng
886   if (CkMyPe()==_diePE)
887       CkRestartCheckPointCallback(NULL, NULL);
888 */
889   CmiFree(msg);
890
891   char restartmsg[CmiMsgHeaderSizeBytes];
892   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
893   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
894 #endif
895 }
896
897 extern void _initDone();
898
899 // called on crashed processor
900 static void recoverProcDataHandler(char *msg)
901 {
902 #if CMK_MEM_CHECKPOINT
903    int i;
904    envelope *env = (envelope *)msg;
905    CkUnpackMessage(&env);
906    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
907    cur_restart_phase = procMsg->cur_restart_phase;
908    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
909    cur_restart_phase ++;
910    CpvAccess(_qd)->flushStates();
911
912    // restore readonly, mainchare, group, nodegroup
913 //   int temp = cur_restart_phase;
914 //   cur_restart_phase = -1;
915    PUP::fromMem p(procMsg->packData);
916    _handleProcData(p);
917
918    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
919    // gzheng
920    CKLOCMGR_LOOP(mgr->startInserting(););
921
922    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
923    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
924    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
925    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
926    CmiFree(msg);
927
928    _initDone();
929 #endif
930 }
931
932 // called on its backup processor
933 // get backup message buffer and sent to crashed processor
934 static void askProcDataHandler(char *msg)
935 {
936 #if CMK_MEM_CHECKPOINT
937     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
938     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
939     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
940     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
941     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
942
943     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
944
945     CkPackMessage(&env);
946     CmiSetHandler(env, recoverProcDataHandlerIdx);
947     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
948     CpvAccess(procChkptBuf) = NULL;
949 #endif
950 }
951
952 void CkMemRestart(const char *dummy, CkArgMsg *args)
953 {
954 #if CMK_MEM_CHECKPOINT
955    _diePE = CkMyPe();
956    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
957    CkMemCheckPT::startTime = CmiWallTimer();
958    CkMemCheckPT::inRestarting = 1;
959    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
960    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
961    cur_restart_phase = 9999;             // big enough to get it processed
962    CmiSetHandler(msg, askProcDataHandlerIdx);
963    int pe = ChkptOnPe();
964    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
965    cur_restart_phase=0;    // allow all message to come in
966 #else
967    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
968 #endif
969 }
970
971 // can be called in other files
972 // return true if it is in restarting
973 int CkInRestarting()
974 {
975 #if CMK_MEM_CHECKPOINT
976   // gzheng
977   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
978   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
979   return CkMemCheckPT::inRestarting;
980 #else
981   return 0;
982 #endif
983 }
984
985 /*****************************************************************************
986                 module initialization
987 *****************************************************************************/
988
989 static int arg_where = CkCheckPoint_inMEM;
990
991 #if CMK_MEM_CHECKPOINT
992 void init_memcheckpt(char **argv)
993 {
994     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
995       arg_where = CkCheckPoint_inDISK;
996     }
997 }
998 #endif
999
1000 class CkMemCheckPTInit: public Chare {
1001 public:
1002   CkMemCheckPTInit(CkArgMsg *m) {
1003 #if CMK_MEM_CHECKPOINT
1004     if (arg_where == CkCheckPoint_inDISK) {
1005       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1006     }
1007     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1008     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1009 #endif
1010   }
1011 };
1012
1013 // initproc
1014 void CkRegisterRestartHandler( )
1015 {
1016 #if CMK_MEM_CHECKPOINT
1017   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1018   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1019   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1020   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1021
1022   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1023   CpvAccess(procChkptBuf) = NULL;
1024
1025 #if 1
1026   // for debugging
1027   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1028 //  sleep(4);
1029 #endif
1030 #endif
1031 }
1032
1033 /// @todo: the following definitions should be moved to a separate file containing
1034 // structures and functions about fault tolerance strategies
1035
1036 /**
1037  *  * @brief: function for killing a process                                             
1038  *   */
1039 #ifdef CMK_MEM_CHECKPOINT
1040 #if CMK_HAS_GETPID
1041 void killLocal(void *_dummy,double curWallTime){
1042         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1043         if(CmiWallTimer()<killTime-1){
1044                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1045         }else{  
1046                 kill(getpid(),SIGKILL);                                               
1047         }              
1048
1049 #else
1050 void killLocal(void *_dummy,double curWallTime){
1051   CmiAbort("kill() not supported!");
1052 }
1053 #endif
1054 #endif
1055
1056 #ifdef CMK_MEM_CHECKPOINT
1057 /**
1058  * @brief: reads the file with the kill information
1059  */
1060 void readKillFile(){
1061         FILE *fp=fopen(killFile,"r");
1062         if(!fp){
1063                 return;
1064         }
1065         int proc;
1066         double sec;
1067         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1068                 if(proc == CkMyPe()){
1069                         killTime = CmiWallTimer()+sec;
1070                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1071                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1072                 }
1073         }
1074         fclose(fp);
1075 }
1076 #endif
1077
1078 #include "CkMemCheckpoint.def.h"
1079
1080