Added some functionality for killing processes as specified in the kill file.
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "charm++.h"
43 #include "ck.h"
44 #include "register.h"
45 #include "conv-ccs.h"
46 #include <signal.h>
47
48 #define DEBUGF     // CkPrintf
49
50 // assume NO extra processors--1
51 // assume extra processors--0
52 #define CK_NO_PROC_POOL                         1
53
54 // static, so that it is accessible from Converse part
55 int CkMemCheckPT::inRestarting = 0;
56 double CkMemCheckPT::startTime;
57 char *CkMemCheckPT::stage;
58 CkCallback CkMemCheckPT::cpCallback;
59
60 int _memChkptOn = 1;                    // checkpoint is on or off
61
62 CkGroupID ckCheckPTGroupID;             // readonly
63
64
65 /// @todo the following declarations should be moved into a separate file for all 
66 // fault tolerant strategies
67
68 // name of the kill file that contains processes to be killed 
69 char *killFile;                                               
70 // flag for the kill file         
71 int killFlag=0;
72 // variable for storing the killing time
73 double killTime=0.0;
74
75 /// checkpoint buffer for processor system data
76 CpvStaticDeclare(CkProcCheckPTMessage*, procChkptBuf);
77
78 // compute the backup processor
79 // FIXME: avoid crashed processors
80 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
81
82 // called in array element constructor
83 // choose and register with 2 buddies for checkpoiting 
84 #if CMK_MEM_CHECKPOINT
85 void ArrayElement::init_checkpt() {
86         if (_memChkptOn == 0) return;
87         // only master init checkpoint
88         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
89
90         budPEs[0] = CkMyPe();
91         budPEs[1] = (CkMyPe()+1)%CkNumPes();
92         CmiAssert(budPEs[0] != budPEs[1]);
93         // inform checkPTMgr
94         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
95         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
96         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
97         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
98 }
99 #endif
100
101 // entry function invoked by checkpoint mgr asking for checkpoint data
102 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
103 #if CMK_MEM_CHECKPOINT
104   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
105   CkLocMgr *locMgr = thisArray->getLocMgr();
106   CmiAssert(myRec!=NULL);
107   int size;
108   {
109         PUP::sizer p;
110         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
111         size = p.size();
112   }
113   int packSize = size/sizeof(double) +1;
114   CkArrayCheckPTMessage *msg =
115                  new (packSize, 0) CkArrayCheckPTMessage;
116   msg->len = size;
117   msg->index =thisIndexMax;
118   msg->aid = thisArrayID;
119   msg->locMgr = locMgr->getGroupID();
120   msg->cp_flag = 1;
121   {
122         PUP::toMem p(msg->packData);
123         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
124   }
125
126   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
127   checkptMgr.recvData(msg, 2, budPEs);
128   delete m;
129 #endif
130 }
131
132 // checkpoint holder class - for memory checkpointing
133 class CkMemCheckPTInfo: public CkCheckPTInfo
134 {
135   CkArrayCheckPTMessage *ckBuffer;
136 public:
137   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
138                     CkCheckPTInfo(a, loc, idx, pno)
139   {
140     ckBuffer = NULL;
141   }
142   ~CkMemCheckPTInfo() 
143   {
144     if (ckBuffer) delete ckBuffer; 
145   }
146   inline void updateBuffer(CkArrayCheckPTMessage *data) 
147   {
148     if (ckBuffer) delete ckBuffer;
149     ckBuffer = data;
150   }    
151   inline CkArrayCheckPTMessage * getCopy()
152   {
153     if (ckBuffer == NULL) {
154       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
155       CmiAbort("Abort!");
156     }
157     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
158   }     
159   inline void updateBuddy(int b1, int b2) {
160      CmiAssert(ckBuffer);
161      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
162   }
163   inline int getSize() { 
164      CmiAssert(ckBuffer);
165      return ckBuffer->len; 
166   }
167 };
168
169 // checkpoint holder class - for in-disk checkpointing
170 class CkDiskCheckPTInfo: public CkCheckPTInfo 
171 {
172   char *fname;
173   int bud1, bud2;
174   int len;                      // checkpoint size
175 public:
176   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
177   {
178 #if CMK_USE_MKSTEMP
179     fname = new char[64];
180     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
181     mkstemp(fname);
182 #else
183     fname=tmpnam(NULL);
184 #endif
185     bud1 = bud2 = -1;
186     len = 0;
187   }
188   ~CkDiskCheckPTInfo() 
189   {
190     remove(fname);
191   }
192   inline void updateBuffer(CkArrayCheckPTMessage *data) 
193   {
194     double t = CmiWallTimer();
195     // unpack it
196     envelope *env = UsrToEnv(data);
197     CkUnpackMessage(&env);
198     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
199     FILE *f = fopen(fname,"wb");
200     PUP::toDisk p(f);
201     CkPupMessage(p, (void **)&data);
202     // delay sync to the end because otherwise the messages are blocked
203 //    fsync(fileno(f));
204     fclose(f);
205     bud1 = data->bud1;
206     bud2 = data->bud2;
207     len = data->len;
208     delete data;
209     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
210   }
211   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
212   {
213     CkArrayCheckPTMessage *data;
214     FILE *f = fopen(fname,"rb");
215     PUP::fromDisk p(f);
216     CkPupMessage(p, (void **)&data);
217     fclose(f);
218     data->bud1 = bud1;                          // update the buddies
219     data->bud2 = bud2;
220     return data;
221   }
222   inline void updateBuddy(int b1, int b2) {
223      bud1 = b1; bud2 = b2;
224   }
225   inline int getSize() { 
226      return len; 
227   }
228 };
229
230 CkMemCheckPT::CkMemCheckPT(int w)
231 {
232 #if CK_NO_PROC_POOL
233   if (CkNumPes() <= 2) {
234 #else
235   if (CkNumPes()  == 1) {
236 #endif
237     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT disabled!\n");
238     _memChkptOn = 0;
239   }
240   inRestarting = 0;
241   recvCount = peCount = 0;
242   where = w;
243 }
244
245 CkMemCheckPT::~CkMemCheckPT()
246 {
247   int len = ckTable.length();
248   for (int i=0; i<len; i++) {
249     delete ckTable[i];
250   }
251 }
252
253 void CkMemCheckPT::pup(PUP::er& p) 
254
255   CBase_CkMemCheckPT::pup(p); 
256   p|cpStarter;
257   p|thisFailedPe;
258   p|failedPes;
259   p|ckCheckPTGroupID;           // recover global variable
260   p|cpCallback;                 // store callback
261   p|where;                      // where to checkpoint
262   if (p.isUnpacking()) {
263     recvCount = peCount = 0;
264   }
265 }
266
267 // called by checkpoint mgr to restore an array element
268 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
269 {
270 #if CMK_MEM_CHECKPOINT
271   DEBUGF("[%d] inmem_restore restore: mgr: %d ", CmiMyPe(), m->locMgr);  
272   // m->index.print();
273   PUP::fromMem p(m->packData);
274   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
275   CmiAssert(mgr);
276   mgr->resume(m->index, p);
277
278   // find a list of array elements bound together
279   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
280   CmiAssert(elt);
281   CkLocRec_local *rec = elt->myRec;
282   CkVec<CkMigratable *> list;
283   mgr->migratableList(rec, list);
284   CmiAssert(list.length() > 0);
285   for (int l=0; l<list.length(); l++) {
286     elt = (ArrayElement *)list[l];
287     elt->budPEs[0] = m->bud1;
288     elt->budPEs[1] = m->bud2;
289     //    reset, may not needed now
290     // for now.
291     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
292       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
293       if (c) c->redNo = 0;
294     }
295   }
296 #endif
297 }
298
299 // return 1 if pe is a crashed processor
300 int CkMemCheckPT::isFailed(int pe)
301 {
302   for (int i=0; i<failedPes.length(); i++)
303     if (failedPes[i] == pe) return 1;
304   return 0;
305 }
306
307 // add pe into history list of all failed processors
308 void CkMemCheckPT::failed(int pe)
309 {
310   if (isFailed(pe)) return;
311   failedPes.push_back(pe);
312 }
313
314 int CkMemCheckPT::totalFailed()
315 {
316   return failedPes.length();
317 }
318
319 // create an checkpoint entry for array element of aid with index.
320 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
321 {
322   // error check, no duplicate
323   int idx, len = ckTable.size();
324   for (idx=0; idx<len; idx++) {
325     CkCheckPTInfo *entry = ckTable[idx];
326     if (index == entry->index) {
327       if (loc == entry->locMgr) {
328           // bindTo array elements
329           return;
330       }
331         // for array inheritance, the following check may fail
332         // because ArrayElement constructor of all superclasses are called
333       if (aid == entry->aid) {
334         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
335         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
336       }
337     }
338   }
339   CkCheckPTInfo *newEntry;
340   if (where == CkCheckPoint_inMEM)
341     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
342   else
343     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
344   ckTable.push_back(newEntry);
345   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
346 }
347
348 // loop through my checkpoint table and ask checkpointed array elements
349 // to send me checkpoint data.
350 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
351 {
352   cpCallback = cb;
353   cpStarter = starter;
354   if (CkMyPe() == cpStarter) {
355     startTime = CmiWallTimer();
356     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
357   }
358
359   int len = ckTable.length();
360   for (int i=0; i<len; i++) {
361     CkCheckPTInfo *entry = ckTable[i];
362       // always let the bigger number processor send request
363     if (CkMyPe() < entry->pNo) continue;
364       // call inmem_checkpoint to the array element, ask it to send
365       // back checkpoint data via recvData().
366     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
367     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
368   }
369     // if my table is empty, then I am done
370   if (len == 0) thisProxy[cpStarter].cpFinish();
371
372   // pack and send proc level data
373   sendProcData();
374 }
375
376 // don't handle array elements
377 static inline void _handleProcData(PUP::er &p)
378 {
379     // save readonlys, and callback BTW
380     CkPupROData(p);
381
382     // save mainchares into MainChares.dat
383     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
384         
385     // save groups into Groups.dat
386     CkPupGroupData(p);
387
388     // save nodegroups into NodeGroups.dat
389     if(CkMyRank()==0) CkPupNodeGroupData(p);
390 }
391
392 void CkMemCheckPT::sendProcData()
393 {
394   // find out size of buffer
395   int size;
396   {
397     PUP::sizer p;
398     _handleProcData(p);
399     size = p.size();
400   }
401   int packSize = size;
402   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
403   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
404   {
405     PUP::toMem p(msg->packData);
406     _handleProcData(p);
407   }
408   msg->pe = CkMyPe();
409   msg->len = size;
410   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
411   thisProxy[ChkptOnPe()].recvProcData(msg);
412 }
413
414 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
415 {
416   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
417   CpvAccess(procChkptBuf) = msg;
418 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
419   thisProxy[msg->reportPe].cpFinish();
420 }
421
422 // ArrayElement call this function to give us the checkpointed data
423 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
424 {
425   int len = ckTable.length();
426   int idx;
427   for (idx=0; idx<len; idx++) {
428     CkCheckPTInfo *entry = ckTable[idx];
429     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
430   }
431   CkAssert(idx < len);
432   int isChkpting = msg->cp_flag;
433   ckTable[idx]->updateBuffer(msg);
434   if (isChkpting) {
435       // all my array elements have returned their inmem data
436       // inform starter processor that I am done.
437     recvCount ++;
438     if (recvCount == ckTable.length()) {
439       if (where == CkCheckPoint_inMEM) {
440         thisProxy[cpStarter].cpFinish();
441       }
442       else if (where == CkCheckPoint_inDISK) {
443         // another barrier for finalize the writing using fsync
444         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
445         contribute(0,NULL,CkReduction::sum_int,localcb);
446       }
447       else
448         CmiAbort("Unknown checkpoint scheme");
449       recvCount = 0;
450     } 
451   }
452 }
453
454 // only used in disk checkpointing
455 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
456 {
457   delete m;
458 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
459   system("sync");
460 #endif
461   thisProxy[cpStarter].cpFinish();
462 }
463
464 // only is called on cpStarter when checkpoint is done
465 void CkMemCheckPT::cpFinish()
466 {
467   CmiAssert(CkMyPe() == cpStarter);
468   peCount++;
469     // now that all processors have finished, activate callback
470   if (peCount == 2*(CkNumPes())) {
471     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
472     cpCallback.send();
473     peCount = 0;
474     thisProxy.report();
475   }
476 }
477
478 // for debugging, report checkpoint info
479 void CkMemCheckPT::report()
480 {
481   int objsize = 0;
482   int len = ckTable.length();
483   for (int i=0; i<len; i++) {
484     CkCheckPTInfo *entry = ckTable[i];
485     CmiAssert(entry);
486     objsize += entry->getSize();
487   }
488   CmiAssert(CpvAccess(procChkptBuf));
489   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
490 }
491
492 /*****************************************************************************
493                         RESTART Procedure
494 *****************************************************************************/
495
496 // master processor of two buddies
497 inline int CkMemCheckPT::isMaster(int buddype)
498 {
499   int mype = CkMyPe();
500 //CkPrintf("ismaster: %d %d\n", pe, mype);
501   if (CkNumPes() - totalFailed() == 2) {
502     return mype > buddype;
503   }
504   for (int i=1; i<CkNumPes(); i++) {
505     int me = (buddype+i)%CkNumPes();
506     if (isFailed(me)) continue;
507     if (me == mype) return 1;
508     else return 0;
509   }
510   return 0;
511 }
512
513 #ifdef CKLOCMGR_LOOP
514 #undef CKLOCMGR_LOOP
515 #endif
516
517 // loop over all CkLocMgr and do "code"
518 #define  CKLOCMGR_LOOP(code)    {       \
519   int numGroups = CkpvAccess(_groupIDTable)->size();    \
520   for(int i=0;i<numGroups;i++) {        \
521     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
522     if(obj->isLocMgr())  {      \
523       CkLocMgr *mgr = (CkLocMgr*)obj;   \
524       code      \
525     }   \
526   }     \
527  }
528
529 #if 0
530 // helper class to pup all elements that belong to same ckLocMgr
531 class ElementDestoryer : public CkLocIterator {
532 private:
533         CkLocMgr *locMgr;
534 public:
535         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
536         void addLocation(CkLocation &loc) {
537                 CkArrayIndexMax idx=loc.getIndex();
538                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
539                 loc.destroy();
540         }
541 };
542 #endif
543
544 // restore the bitmap vector for LB
545 void CkMemCheckPT::resetLB(int diepe)
546 {
547 #if CMK_LBDB_ON
548   int i;
549   char *bitmap = new char[CkNumPes()];
550   // set processor available bitmap
551   get_avail_vector(bitmap);
552
553   for (i=0; i<failedPes.length(); i++)
554     bitmap[failedPes[i]] = 0; 
555   bitmap[diepe] = 0;
556
557 #if CK_NO_PROC_POOL
558   set_avail_vector(bitmap);
559 #endif
560
561   // if I am the crashed pe, rebuild my failedPEs array
562   if (CkMyPe() == diepe)
563     for (i=0; i<CkNumPes(); i++) 
564       if (bitmap[i]==0) failed(i);
565
566   delete [] bitmap;
567 #endif
568 }
569
570 // in case when failedPe dies, everybody go through its checkpoint table:
571 // destory all array elements
572 // recover lost buddies
573 // reconstruct all array elements from check point data
574 // called on all processors
575 void CkMemCheckPT::restart(int diePe)
576 {
577 #if CMK_MEM_CHECKPOINT
578   double curTime = CmiWallTimer();
579   if (CkMyPe() == diePe)
580     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
581   stage = "resetLB";
582   startTime = curTime;
583   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
584
585 #if CK_NO_PROC_POOL
586   failed(diePe);        // add into the list of failed pes
587 #endif
588   thisFailedPe = diePe;
589
590   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
591
592   inRestarting = 1;
593                                                                                 
594   // disable load balancer's barrier
595   if (CkMyPe() != diePe) resetLB(diePe);
596
597   CKLOCMGR_LOOP(mgr->startInserting(););
598
599   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
600 #endif
601 }
602
603 // loally remove all array elements
604 void CkMemCheckPT::removeArrayElements()
605 {
606 #if CMK_MEM_CHECKPOINT
607   int len = ckTable.length();
608   double curTime = CmiWallTimer();
609   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
610   stage = "removeArrayElements";
611   startTime = curTime;
612
613   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
614   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
615
616   // get rid of all buffering and remote recs
617   // including destorying all array elements
618   CKLOCMGR_LOOP(mgr->flushAllRecs(););
619
620 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
621
622   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
623 #endif
624 }
625
626 // flush state in reduction manager
627 void CkMemCheckPT::resetReductionMgr()
628 {
629   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
630   int numGroups = CkpvAccess(_groupIDTable)->size();
631   for(int i=0;i<numGroups;i++) {
632     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
633     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
634     obj->flushStates();
635     obj->ckJustMigrated();
636   }
637   // reset again
638   //CpvAccess(_qd)->flushStates();
639
640   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
641 }
642
643 // recover the lost buddies
644 void CkMemCheckPT::recoverBuddies()
645 {
646   int idx;
647   int len = ckTable.length();
648   // ready to flush reduction manager
649   // cannot be CkMemCheckPT::restart because destory will modify states
650   double curTime = CmiWallTimer();
651   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
652   stage = (char *)"recoverBuddies";
653   startTime = curTime;
654
655   // recover buddies
656   for (idx=0; idx<len; idx++) {
657     CkCheckPTInfo *entry = ckTable[idx];
658     if (entry->pNo == thisFailedPe) {
659 #if CK_NO_PROC_POOL
660       // find a new buddy
661       int budPe = CkMyPe();
662 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
663       while (budPe == CkMyPe() || isFailed(budPe)) 
664           budPe = (budPe+1)%CkNumPes();
665       entry->pNo = budPe;
666 #else
667       int budPe = thisFailedPe;
668 #endif
669       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
670       CkArrayCheckPTMessage *msg = entry->getCopy();
671       msg->cp_flag = 0;            // not checkpointing
672       thisProxy[budPe].recvData(msg);
673     }
674   }
675
676   if (CkMyPe() == 0)
677     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
678 }
679
680 // restore array elements
681 void CkMemCheckPT::recoverArrayElements()
682 {
683   double curTime = CmiWallTimer();
684   int len = ckTable.length();
685   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
686   stage = (char *)"recoverArrayElements";
687   startTime = curTime;
688
689   // recover all array elements
690   int count = 0;
691   for (int idx=0; idx<len; idx++)
692   {
693     CkCheckPTInfo *entry = ckTable[idx];
694 #if CK_NO_PROC_POOL
695     // the bigger one will do 
696 //    if (CkMyPe() < entry->pNo) continue;
697     if (!isMaster(entry->pNo)) continue;
698 #else
699     // smaller one do it, which has the original object
700     if (CkMyPe() == entry->pNo+1 || 
701         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
702 #endif
703 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
704
705     entry->updateBuddy(CkMyPe(), entry->pNo);
706     CkArrayCheckPTMessage *msg = entry->getCopy();
707     // gzheng
708     //checkptMgr[CkMyPe()].inmem_restore(msg);
709     inmem_restore(msg);
710     count ++;
711   }
712 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
713
714   if (CkMyPe() == 0)
715     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
716 }
717
718 // on every processor
719 // turn load balancer back on
720 void CkMemCheckPT::finishUp()
721 {
722   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
723   CKLOCMGR_LOOP(mgr->doneInserting(););
724   
725   inRestarting = 0;
726
727   if (CkMyPe() == 0)
728   {
729        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
730        CkStartQD(cpCallback);
731   } 
732 #if CK_NO_PROC_POOL
733   if (CkNumPes()-totalFailed() <=2) {
734     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
735     _memChkptOn = 0;
736   }
737 #endif
738 }
739
740 // called only on 0
741 void CkMemCheckPT::quiescence(CkCallback &cb)
742 {
743   static int pe_count = 0;
744   pe_count ++;
745   CmiAssert(CkMyPe() == 0);
746   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
747   if (pe_count == CkNumPes()) {
748     pe_count = 0;
749     cb.send();
750   }
751 }
752
753 // User callable function - to start a checkpoint
754 // callback cb is used to pass control back
755 void CkStartMemCheckpoint(CkCallback &cb)
756 {
757 #if CMK_MEM_CHECKPOINT
758   if (_memChkptOn == 0) {
759     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
760     cb.send();
761     return;
762   }
763   if (CkInRestarting()) {
764       // trying to checkpointing during restart
765     cb.send();
766     return;
767   }
768     // store user callback and user data
769   CkMemCheckPT::cpCallback = cb;
770
771     // broadcast to start check pointing
772   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
773   checkptMgr.doItNow(CkMyPe(), cb);
774 #else
775   // when mem checkpoint is disabled, invike cb immediately
776   cb.send();
777 #endif
778 }
779
780 void CkRestartCheckPoint(int diePe)
781 {
782 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
783   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
784   // broadcast
785   checkptMgr.restart(diePe);
786 }
787
788 static int _diePE = -1;
789
790 // callback function used locally by ccs handler
791 static void CkRestartCheckPointCallback(void *ignore, void *msg)
792 {
793 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
794   CkRestartCheckPoint(_diePE);
795 }
796
797 // Converse function handles
798 static int askProcDataHandlerIdx;
799 static int restartBcastHandlerIdx;
800 static int recoverProcDataHandlerIdx;
801 static int restartBeginHandlerIdx;
802
803 static void restartBeginHandler(char *msg)
804 {
805 #if CMK_MEM_CHECKPOINT
806   static int count = 0;
807   CmiFree(msg);
808   CmiAssert(CkMyPe() == _diePE);
809   count ++;
810   if (count == CkNumPes()) {
811     CkRestartCheckPointCallback(NULL, NULL);
812     count = 0;
813   }
814 #endif
815 }
816
817 static void restartBcastHandler(char *msg)
818 {
819 #if CMK_MEM_CHECKPOINT
820   // advance phase counter
821   CkMemCheckPT::inRestarting = 1;
822   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
823   // gzheng
824   if (CkMyPe() != _diePE) cur_restart_phase ++;
825
826   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
827
828   // reset QD counters
829   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
830
831 /*  gzheng
832   if (CkMyPe()==_diePE)
833       CkRestartCheckPointCallback(NULL, NULL);
834 */
835   CmiFree(msg);
836
837   char restartmsg[CmiMsgHeaderSizeBytes];
838   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
839   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
840 #endif
841 }
842
843 extern void _initDone();
844
845 // called on crashed processor
846 static void recoverProcDataHandler(char *msg)
847 {
848 #if CMK_MEM_CHECKPOINT
849    int i;
850    envelope *env = (envelope *)msg;
851    CkUnpackMessage(&env);
852    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
853    cur_restart_phase = procMsg->cur_restart_phase;
854    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
855    cur_restart_phase ++;
856    CpvAccess(_qd)->flushStates();
857
858    // restore readonly, mainchare, group, nodegroup
859 //   int temp = cur_restart_phase;
860 //   cur_restart_phase = -1;
861    PUP::fromMem p(procMsg->packData);
862    _handleProcData(p);
863
864    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
865    // gzheng
866    CKLOCMGR_LOOP(mgr->startInserting(););
867
868    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
869    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
870    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
871    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
872    CmiFree(msg);
873
874    _initDone();
875 #endif
876 }
877
878 // called on its backup processor
879 // get backup message buffer and sent to crashed processor
880 static void askProcDataHandler(char *msg)
881 {
882 #if CMK_MEM_CHECKPOINT
883     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
884     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
885     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
886     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
887     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
888
889     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
890
891     CkPackMessage(&env);
892     CmiSetHandler(env, recoverProcDataHandlerIdx);
893     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
894     CpvAccess(procChkptBuf) = NULL;
895 #endif
896 }
897
898 void CkMemRestart(const char *dummy, CkArgMsg *args)
899 {
900 #if CMK_MEM_CHECKPOINT
901    _diePE = CkMyPe();
902    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
903    CkMemCheckPT::startTime = CmiWallTimer();
904    CkMemCheckPT::inRestarting = 1;
905    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
906    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
907    cur_restart_phase = 9999;             // big enough to get it processed
908    CmiSetHandler(msg, askProcDataHandlerIdx);
909    int pe = ChkptOnPe();
910    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
911    cur_restart_phase=0;    // allow all message to come in
912 #else
913    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
914 #endif
915 }
916
917 // can be called in other files
918 // return true if it is in restarting
919 int CkInRestarting()
920 {
921 #if CMK_MEM_CHECKPOINT
922   // gzheng
923   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
924   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
925   return CkMemCheckPT::inRestarting;
926 #else
927   return 0;
928 #endif
929 }
930
931 /*****************************************************************************
932                 module initialization
933 *****************************************************************************/
934
935 static int arg_where = CkCheckPoint_inMEM;
936
937 #if CMK_MEM_CHECKPOINT
938 void init_memcheckpt(char **argv)
939 {
940     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
941       arg_where = CkCheckPoint_inDISK;
942     }
943 }
944 #endif
945
946 class CkMemCheckPTInit: public Chare {
947 public:
948   CkMemCheckPTInit(CkArgMsg *m) {
949 #if CMK_MEM_CHECKPOINT
950     if (arg_where == CkCheckPoint_inDISK) {
951       CkPrintf("Charm++> Double-disk Checkpointing. \n");
952     }
953     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
954     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
955 #endif
956   }
957 };
958
959 // initproc
960 void CkRegisterRestartHandler( )
961 {
962 #if CMK_MEM_CHECKPOINT
963   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
964   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
965   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
966   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
967
968   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
969   CpvAccess(procChkptBuf) = NULL;
970
971 #if 1
972   // for debugging
973   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
974 //  sleep(4);
975 #endif
976 #endif
977 }
978
979 /// @todo: the following definitions should be moved to a separate file containing
980 // structures and functions about fault tolerance strategies
981
982 /**
983  *  * @brief: function for killing a process                                             
984  *   */
985 void killLocal(void *_dummy,double curWallTime){
986         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
987         if(CmiWallTimer()<killTime-1){
988                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
989         }else{  
990                 kill(getpid(),SIGKILL);                                               
991         }                                                                             
992
993
994
995 /**
996  * @brief: reads the file with the kill information
997  */
998 void readKillFile(){
999         FILE *fp=fopen(killFile,"r");
1000         if(!fp){
1001                 return;
1002         }
1003         int proc;
1004         double sec;
1005         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1006                 if(proc == CkMyPe()){
1007                         killTime = CmiWallTimer()+sec;
1008                         printf("[%d] To be killed after %.6lf s \n",CkMyPe(),sec);
1009                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1010                 }
1011         }
1012         fclose(fp);
1013 }
1014
1015
1016 #include "CkMemCheckpoint.def.h"
1017
1018