Merge branch 'charm' into development
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 // pick buddy processor from a different physical node
51 #define NODE_CHECKPOINT                        1
52
53 // assume NO extra processors--1
54 // assume extra processors--0
55 #define CK_NO_PROC_POOL                         1
56
57 #define DEBUGF      // CkPrintf
58
59 // static, so that it is accessible from Converse part
60 int CkMemCheckPT::inRestarting = 0;
61 double CkMemCheckPT::startTime;
62 char *CkMemCheckPT::stage;
63 CkCallback CkMemCheckPT::cpCallback;
64
65 int _memChkptOn = 1;                    // checkpoint is on or off
66
67 CkGroupID ckCheckPTGroupID;             // readonly
68
69
70 /// @todo the following declarations should be moved into a separate file for all 
71 // fault tolerant strategies
72
73 #ifdef CMK_MEM_CHECKPOINT
74 // name of the kill file that contains processes to be killed 
75 char *killFile;                                               
76 // flag for the kill file         
77 int killFlag=0;
78 // variable for storing the killing time
79 double killTime=0.0;
80 #endif
81
82 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
83 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
84
85 // compute the backup processor
86 // FIXME: avoid crashed processors
87 inline int ChkptOnPe() { return (CkMyPe()+1)%CkNumPes(); }
88
89 inline int CkMemCheckPT::BuddyPE(int pe)
90 {
91   int budpe;
92 #if NODE_CHECKPOINT
93     // buddy is the processor with same rank on the next physical node
94   int r1 = CmiPhysicalRank(pe);
95   int budnode = CmiPhysicalNodeID(pe);
96   do {
97     budnode = (budnode+1)%CmiNumPhysicalNodes();
98     int *pelist;
99     int num;
100     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
101     budpe = pelist[r1 % num];
102   } while (isFailed(budpe));
103   if (budpe == pe) {
104     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
105     CmiAbort("Failed to find a buddy processor");
106   }
107 #else
108   budpe = pe;
109   while (budpe == pe || isFailed(budPe)) 
110           budPe = (budPe+1)%CkNumPes();
111 #endif
112   return budpe;
113 }
114
115 // called in array element constructor
116 // choose and register with 2 buddies for checkpoiting 
117 #if CMK_MEM_CHECKPOINT
118 void ArrayElement::init_checkpt() {
119         if (_memChkptOn == 0) return;
120         // only master init checkpoint
121         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
122
123         budPEs[0] = CkMyPe();
124         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
125         CmiAssert(budPEs[0] != budPEs[1]);
126         // inform checkPTMgr
127         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
128         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
129         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
130         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
131 }
132 #endif
133
134 // entry function invoked by checkpoint mgr asking for checkpoint data
135 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
136 #if CMK_MEM_CHECKPOINT
137   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
138   CkLocMgr *locMgr = thisArray->getLocMgr();
139   CmiAssert(myRec!=NULL);
140   int size;
141   {
142         PUP::sizer p;
143         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
144         size = p.size();
145   }
146   int packSize = size/sizeof(double) +1;
147   CkArrayCheckPTMessage *msg =
148                  new (packSize, 0) CkArrayCheckPTMessage;
149   msg->len = size;
150   msg->index =thisIndexMax;
151   msg->aid = thisArrayID;
152   msg->locMgr = locMgr->getGroupID();
153   msg->cp_flag = 1;
154   {
155         PUP::toMem p(msg->packData);
156         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
157   }
158
159   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
160   checkptMgr.recvData(msg, 2, budPEs);
161   delete m;
162 #endif
163 }
164
165 // checkpoint holder class - for memory checkpointing
166 class CkMemCheckPTInfo: public CkCheckPTInfo
167 {
168   CkArrayCheckPTMessage *ckBuffer;
169 public:
170   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno): 
171                     CkCheckPTInfo(a, loc, idx, pno)
172   {
173     ckBuffer = NULL;
174   }
175   ~CkMemCheckPTInfo() 
176   {
177     if (ckBuffer) delete ckBuffer; 
178   }
179   inline void updateBuffer(CkArrayCheckPTMessage *data) 
180   {
181     CmiAssert(data!=NULL);
182     if (ckBuffer) delete ckBuffer;
183     ckBuffer = data;
184   }    
185   inline CkArrayCheckPTMessage * getCopy()
186   {
187     if (ckBuffer == NULL) {
188       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
189       CmiAbort("Abort!");
190     }
191     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
192   }     
193   inline void updateBuddy(int b1, int b2) {
194      CmiAssert(ckBuffer);
195      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
196   }
197   inline int getSize() { 
198      CmiAssert(ckBuffer);
199      return ckBuffer->len; 
200   }
201 };
202
203 // checkpoint holder class - for in-disk checkpointing
204 class CkDiskCheckPTInfo: public CkCheckPTInfo 
205 {
206   char *fname;
207   int bud1, bud2;
208   int len;                      // checkpoint size
209 public:
210   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndexMax idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
211   {
212 #if CMK_USE_MKSTEMP
213     fname = new char[64];
214     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
215     mkstemp(fname);
216 #else
217     fname=tmpnam(NULL);
218 #endif
219     bud1 = bud2 = -1;
220     len = 0;
221   }
222   ~CkDiskCheckPTInfo() 
223   {
224     remove(fname);
225   }
226   inline void updateBuffer(CkArrayCheckPTMessage *data) 
227   {
228     double t = CmiWallTimer();
229     // unpack it
230     envelope *env = UsrToEnv(data);
231     CkUnpackMessage(&env);
232     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
233     FILE *f = fopen(fname,"wb");
234     PUP::toDisk p(f);
235     CkPupMessage(p, (void **)&data);
236     // delay sync to the end because otherwise the messages are blocked
237 //    fsync(fileno(f));
238     fclose(f);
239     bud1 = data->bud1;
240     bud2 = data->bud2;
241     len = data->len;
242     delete data;
243     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
244   }
245   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
246   {
247     CkArrayCheckPTMessage *data;
248     FILE *f = fopen(fname,"rb");
249     PUP::fromDisk p(f);
250     CkPupMessage(p, (void **)&data);
251     fclose(f);
252     data->bud1 = bud1;                          // update the buddies
253     data->bud2 = bud2;
254     return data;
255   }
256   inline void updateBuddy(int b1, int b2) {
257      bud1 = b1; bud2 = b2;
258   }
259   inline int getSize() { 
260      return len; 
261   }
262 };
263
264 CkMemCheckPT::CkMemCheckPT(int w)
265 {
266   int numnodes = 0;
267 #if NODE_CHECKPOINT
268   numnodes = CmiNumPhysicalNodes();
269 #else
270   numnodes = CkNumPes();
271 #endif
272 #if CK_NO_PROC_POOL
273   if (numnodes <= 2)
274 #else
275   if (numnodes  == 1)
276 #endif
277   {
278     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
279     _memChkptOn = 0;
280   }
281   inRestarting = 0;
282   recvCount = peCount = 0;
283   where = w;
284 }
285
286 CkMemCheckPT::~CkMemCheckPT()
287 {
288   int len = ckTable.length();
289   for (int i=0; i<len; i++) {
290     delete ckTable[i];
291   }
292 }
293
294 void CkMemCheckPT::pup(PUP::er& p) 
295
296   CBase_CkMemCheckPT::pup(p); 
297   p|cpStarter;
298   p|thisFailedPe;
299   p|failedPes;
300   p|ckCheckPTGroupID;           // recover global variable
301   p|cpCallback;                 // store callback
302   p|where;                      // where to checkpoint
303   if (p.isUnpacking()) {
304     recvCount = peCount = 0;
305   }
306 }
307
308 // called by checkpoint mgr to restore an array element
309 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
310 {
311 #if CMK_MEM_CHECKPOINT
312   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
313   // m->index.print();
314   PUP::fromMem p(m->packData);
315   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
316   CmiAssert(mgr);
317   mgr->resume(m->index, p);
318
319   // find a list of array elements bound together
320   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
321   CmiAssert(elt);
322   CkLocRec_local *rec = elt->myRec;
323   CkVec<CkMigratable *> list;
324   mgr->migratableList(rec, list);
325   CmiAssert(list.length() > 0);
326   for (int l=0; l<list.length(); l++) {
327     elt = (ArrayElement *)list[l];
328     elt->budPEs[0] = m->bud1;
329     elt->budPEs[1] = m->bud2;
330     //    reset, may not needed now
331     // for now.
332     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
333       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
334       if (c) c->redNo = 0;
335     }
336   }
337 #endif
338 }
339
340 // return 1 if pe is a crashed processor
341 int CkMemCheckPT::isFailed(int pe)
342 {
343   for (int i=0; i<failedPes.length(); i++)
344     if (failedPes[i] == pe) return 1;
345   return 0;
346 }
347
348 // add pe into history list of all failed processors
349 void CkMemCheckPT::failed(int pe)
350 {
351   if (isFailed(pe)) return;
352   failedPes.push_back(pe);
353 }
354
355 int CkMemCheckPT::totalFailed()
356 {
357   return failedPes.length();
358 }
359
360 // create an checkpoint entry for array element of aid with index.
361 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndexMax index, int buddy)
362 {
363   // error check, no duplicate
364   int idx, len = ckTable.size();
365   for (idx=0; idx<len; idx++) {
366     CkCheckPTInfo *entry = ckTable[idx];
367     if (index == entry->index) {
368       if (loc == entry->locMgr) {
369           // bindTo array elements
370           return;
371       }
372         // for array inheritance, the following check may fail
373         // because ArrayElement constructor of all superclasses are called
374       if (aid == entry->aid) {
375         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
376         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
377       }
378     }
379   }
380   CkCheckPTInfo *newEntry;
381   if (where == CkCheckPoint_inMEM)
382     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
383   else
384     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
385   ckTable.push_back(newEntry);
386   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
387 }
388
389 // loop through my checkpoint table and ask checkpointed array elements
390 // to send me checkpoint data.
391 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
392 {
393   cpCallback = cb;
394   cpStarter = starter;
395   if (CkMyPe() == cpStarter) {
396     startTime = CmiWallTimer();
397     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
398   }
399
400   int len = ckTable.length();
401   for (int i=0; i<len; i++) {
402     CkCheckPTInfo *entry = ckTable[i];
403       // always let the bigger number processor send request
404     if (CkMyPe() < entry->pNo) continue;
405       // call inmem_checkpoint to the array element, ask it to send
406       // back checkpoint data via recvData().
407     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
408     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
409   }
410     // if my table is empty, then I am done
411   if (len == 0) thisProxy[cpStarter].cpFinish();
412
413   // pack and send proc level data
414   sendProcData();
415 }
416
417 // don't handle array elements
418 static inline void _handleProcData(PUP::er &p)
419 {
420     // save readonlys, and callback BTW
421     CkPupROData(p);
422
423     // save mainchares into MainChares.dat
424     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
425         
426 #ifndef CMK_CHARE_USE_PTR
427     // save non-migratable chare
428     CkPupChareData(p);
429 #endif
430
431     // save groups into Groups.dat
432     CkPupGroupData(p);
433
434     // save nodegroups into NodeGroups.dat
435     if(CkMyRank()==0) CkPupNodeGroupData(p);
436 }
437
438 void CkMemCheckPT::sendProcData()
439 {
440   // find out size of buffer
441   int size;
442   {
443     PUP::sizer p;
444     _handleProcData(p);
445     size = p.size();
446   }
447   int packSize = size;
448   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
449   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
450   {
451     PUP::toMem p(msg->packData);
452     _handleProcData(p);
453   }
454   msg->pe = CkMyPe();
455   msg->len = size;
456   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
457   thisProxy[ChkptOnPe()].recvProcData(msg);
458 }
459
460 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
461 {
462   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
463   CpvAccess(procChkptBuf) = msg;
464 //CmiPrintf("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
465   thisProxy[msg->reportPe].cpFinish();
466 }
467
468 // ArrayElement call this function to give us the checkpointed data
469 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
470 {
471   int len = ckTable.length();
472   int idx;
473   for (idx=0; idx<len; idx++) {
474     CkCheckPTInfo *entry = ckTable[idx];
475     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
476   }
477   CkAssert(idx < len);
478   int isChkpting = msg->cp_flag;
479   ckTable[idx]->updateBuffer(msg);
480   if (isChkpting) {
481       // all my array elements have returned their inmem data
482       // inform starter processor that I am done.
483     recvCount ++;
484     if (recvCount == ckTable.length()) {
485       if (where == CkCheckPoint_inMEM) {
486         thisProxy[cpStarter].cpFinish();
487       }
488       else if (where == CkCheckPoint_inDISK) {
489         // another barrier for finalize the writing using fsync
490         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
491         contribute(0,NULL,CkReduction::sum_int,localcb);
492       }
493       else
494         CmiAbort("Unknown checkpoint scheme");
495       recvCount = 0;
496     } 
497   }
498 }
499
500 // only used in disk checkpointing
501 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
502 {
503   delete m;
504 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
505   system("sync");
506 #endif
507   thisProxy[cpStarter].cpFinish();
508 }
509
510 // only is called on cpStarter when checkpoint is done
511 void CkMemCheckPT::cpFinish()
512 {
513   CmiAssert(CkMyPe() == cpStarter);
514   peCount++;
515     // now that all processors have finished, activate callback
516   if (peCount == 2*(CkNumPes())) {
517     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
518     cpCallback.send();
519     peCount = 0;
520     thisProxy.report();
521   }
522 }
523
524 // for debugging, report checkpoint info
525 void CkMemCheckPT::report()
526 {
527   int objsize = 0;
528   int len = ckTable.length();
529   for (int i=0; i<len; i++) {
530     CkCheckPTInfo *entry = ckTable[i];
531     CmiAssert(entry);
532     objsize += entry->getSize();
533   }
534   CmiAssert(CpvAccess(procChkptBuf));
535   CkPrintf("[%d] Checkpointed Object size: %d len: %d Processor data: %d\n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
536 }
537
538 /*****************************************************************************
539                         RESTART Procedure
540 *****************************************************************************/
541
542 // master processor of two buddies
543 inline int CkMemCheckPT::isMaster(int buddype)
544 {
545   int mype = CkMyPe();
546 //CkPrintf("ismaster: %d %d\n", pe, mype);
547   if (CkNumPes() - totalFailed() == 2) {
548     return mype > buddype;
549   }
550   for (int i=1; i<CkNumPes(); i++) {
551     int me = (buddype+i)%CkNumPes();
552     if (isFailed(me)) continue;
553     if (me == mype) return 1;
554     else return 0;
555   }
556   return 0;
557 }
558
559 #ifdef CKLOCMGR_LOOP
560 #undef CKLOCMGR_LOOP
561 #endif
562
563 // loop over all CkLocMgr and do "code"
564 #define  CKLOCMGR_LOOP(code)    {       \
565   int numGroups = CkpvAccess(_groupIDTable)->size();    \
566   for(int i=0;i<numGroups;i++) {        \
567     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
568     if(obj->isLocMgr())  {      \
569       CkLocMgr *mgr = (CkLocMgr*)obj;   \
570       code      \
571     }   \
572   }     \
573  }
574
575 #if 0
576 // helper class to pup all elements that belong to same ckLocMgr
577 class ElementDestoryer : public CkLocIterator {
578 private:
579         CkLocMgr *locMgr;
580 public:
581         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
582         void addLocation(CkLocation &loc) {
583                 CkArrayIndexMax idx=loc.getIndex();
584                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
585                 loc.destroy();
586         }
587 };
588 #endif
589
590 // restore the bitmap vector for LB
591 void CkMemCheckPT::resetLB(int diepe)
592 {
593 #if CMK_LBDB_ON
594   int i;
595   char *bitmap = new char[CkNumPes()];
596   // set processor available bitmap
597   get_avail_vector(bitmap);
598
599   for (i=0; i<failedPes.length(); i++)
600     bitmap[failedPes[i]] = 0; 
601   bitmap[diepe] = 0;
602
603 #if CK_NO_PROC_POOL
604   set_avail_vector(bitmap);
605 #endif
606
607   // if I am the crashed pe, rebuild my failedPEs array
608   if (CkMyPe() == diepe)
609     for (i=0; i<CkNumPes(); i++) 
610       if (bitmap[i]==0) failed(i);
611
612   delete [] bitmap;
613 #endif
614 }
615
616 // in case when failedPe dies, everybody go through its checkpoint table:
617 // destory all array elements
618 // recover lost buddies
619 // reconstruct all array elements from check point data
620 // called on all processors
621 void CkMemCheckPT::restart(int diePe)
622 {
623 #if CMK_MEM_CHECKPOINT
624   double curTime = CmiWallTimer();
625   if (CkMyPe() == diePe)
626     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
627   stage = (char*)"resetLB";
628   startTime = curTime;
629   CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
630
631 #if CK_NO_PROC_POOL
632   failed(diePe);        // add into the list of failed pes
633 #endif
634   thisFailedPe = diePe;
635
636   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
637
638   inRestarting = 1;
639                                                                                 
640   // disable load balancer's barrier
641   if (CkMyPe() != diePe) resetLB(diePe);
642
643   CKLOCMGR_LOOP(mgr->startInserting(););
644
645   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
646 #endif
647 }
648
649 // loally remove all array elements
650 void CkMemCheckPT::removeArrayElements()
651 {
652 #if CMK_MEM_CHECKPOINT
653   int len = ckTable.length();
654   double curTime = CmiWallTimer();
655   CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
656   stage = (char*)"removeArrayElements";
657   startTime = curTime;
658
659   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
660   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
661
662   // get rid of all buffering and remote recs
663   // including destorying all array elements
664   CKLOCMGR_LOOP(mgr->flushAllRecs(););
665
666 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
667
668   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
669 #endif
670 }
671
672 // flush state in reduction manager
673 void CkMemCheckPT::resetReductionMgr()
674 {
675   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
676   int numGroups = CkpvAccess(_groupIDTable)->size();
677   for(int i=0;i<numGroups;i++) {
678     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
679     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
680     obj->flushStates();
681     obj->ckJustMigrated();
682   }
683   // reset again
684   //CpvAccess(_qd)->flushStates();
685
686   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
687 }
688
689 // recover the lost buddies
690 void CkMemCheckPT::recoverBuddies()
691 {
692   int idx;
693   int len = ckTable.length();
694   // ready to flush reduction manager
695   // cannot be CkMemCheckPT::restart because destory will modify states
696   double curTime = CmiWallTimer();
697   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
698   stage = (char *)"recoverBuddies";
699   startTime = curTime;
700
701   // recover buddies
702   for (idx=0; idx<len; idx++) {
703     CkCheckPTInfo *entry = ckTable[idx];
704     if (entry->pNo == thisFailedPe) {
705 #if CK_NO_PROC_POOL
706       // find a new buddy
707       int budPe = CkMyPe();
708 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
709       while (budPe == CkMyPe() || isFailed(budPe)) 
710           budPe = (budPe+1)%CkNumPes();
711       entry->pNo = budPe;
712 #else
713       int budPe = thisFailedPe;
714 #endif
715       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
716       CkArrayCheckPTMessage *msg = entry->getCopy();
717       msg->cp_flag = 0;            // not checkpointing
718       thisProxy[budPe].recvData(msg);
719     }
720   }
721
722   if (CkMyPe() == 0)
723     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
724 }
725
726 // restore array elements
727 void CkMemCheckPT::recoverArrayElements()
728 {
729   double curTime = CmiWallTimer();
730   int len = ckTable.length();
731   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
732   stage = (char *)"recoverArrayElements";
733   startTime = curTime;
734
735   // recover all array elements
736   int count = 0;
737   for (int idx=0; idx<len; idx++)
738   {
739     CkCheckPTInfo *entry = ckTable[idx];
740 #if CK_NO_PROC_POOL
741     // the bigger one will do 
742 //    if (CkMyPe() < entry->pNo) continue;
743     if (!isMaster(entry->pNo)) continue;
744 #else
745     // smaller one do it, which has the original object
746     if (CkMyPe() == entry->pNo+1 || 
747         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
748 #endif
749 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
750
751     entry->updateBuddy(CkMyPe(), entry->pNo);
752     CkArrayCheckPTMessage *msg = entry->getCopy();
753     // gzheng
754     //checkptMgr[CkMyPe()].inmem_restore(msg);
755     inmem_restore(msg);
756     count ++;
757   }
758 //CkPrintf("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
759
760   if (CkMyPe() == 0)
761     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
762 }
763
764 // on every processor
765 // turn load balancer back on
766 void CkMemCheckPT::finishUp()
767 {
768   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
769   CKLOCMGR_LOOP(mgr->doneInserting(););
770   
771   inRestarting = 0;
772
773   if (CkMyPe() == 0)
774   {
775        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds\n",CkMyPe(), stage, CmiWallTimer()-startTime);
776        CkStartQD(cpCallback);
777   } 
778 #if CK_NO_PROC_POOL
779 #if NODE_CHECKPOINT
780   int numnodes = CmiNumPhysicalNodes();
781 #else
782   int numnodes = CkNumPes();
783 #endif
784   if (numnodes-totalFailed() <=2) {
785     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
786     _memChkptOn = 0;
787   }
788 #endif
789 }
790
791 // called only on 0
792 void CkMemCheckPT::quiescence(CkCallback &cb)
793 {
794   static int pe_count = 0;
795   pe_count ++;
796   CmiAssert(CkMyPe() == 0);
797   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
798   if (pe_count == CkNumPes()) {
799     pe_count = 0;
800     cb.send();
801   }
802 }
803
804 // User callable function - to start a checkpoint
805 // callback cb is used to pass control back
806 void CkStartMemCheckpoint(CkCallback &cb)
807 {
808 #if CMK_MEM_CHECKPOINT
809   if (_memChkptOn == 0) {
810     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
811     cb.send();
812     return;
813   }
814   if (CkInRestarting()) {
815       // trying to checkpointing during restart
816     cb.send();
817     return;
818   }
819     // store user callback and user data
820   CkMemCheckPT::cpCallback = cb;
821
822     // broadcast to start check pointing
823   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
824   checkptMgr.doItNow(CkMyPe(), cb);
825 #else
826   // when mem checkpoint is disabled, invike cb immediately
827   cb.send();
828 #endif
829 }
830
831 void CkRestartCheckPoint(int diePe)
832 {
833 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d\n", ckCheckPTGroupID.idx);
834   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
835   // broadcast
836   checkptMgr.restart(diePe);
837 }
838
839 static int _diePE = -1;
840
841 // callback function used locally by ccs handler
842 static void CkRestartCheckPointCallback(void *ignore, void *msg)
843 {
844 CkPrintf("CkRestartCheckPointCallback activated for diePe: %d\n", _diePE);
845   CkRestartCheckPoint(_diePE);
846 }
847
848 // Converse function handles
849 static int askProcDataHandlerIdx;
850 static int restartBcastHandlerIdx;
851 static int recoverProcDataHandlerIdx;
852 static int restartBeginHandlerIdx;
853
854 static void restartBeginHandler(char *msg)
855 {
856 #if CMK_MEM_CHECKPOINT
857   static int count = 0;
858   CmiFree(msg);
859   CmiAssert(CkMyPe() == _diePE);
860   count ++;
861   if (count == CkNumPes()) {
862     CkRestartCheckPointCallback(NULL, NULL);
863     count = 0;
864   }
865 #endif
866 }
867
868 static void restartBcastHandler(char *msg)
869 {
870 #if CMK_MEM_CHECKPOINT
871   // advance phase counter
872   CkMemCheckPT::inRestarting = 1;
873   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
874   // gzheng
875   if (CkMyPe() != _diePE) cur_restart_phase ++;
876
877   CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d.\n", CkMyPe(), cur_restart_phase, _diePE);
878
879   // reset QD counters
880   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
881
882 /*  gzheng
883   if (CkMyPe()==_diePE)
884       CkRestartCheckPointCallback(NULL, NULL);
885 */
886   CmiFree(msg);
887
888   char restartmsg[CmiMsgHeaderSizeBytes];
889   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
890   CmiSyncSend(_diePE, CmiMsgHeaderSizeBytes, (char *)&restartmsg);
891 #endif
892 }
893
894 extern void _initDone();
895
896 // called on crashed processor
897 static void recoverProcDataHandler(char *msg)
898 {
899 #if CMK_MEM_CHECKPOINT
900    int i;
901    envelope *env = (envelope *)msg;
902    CkUnpackMessage(&env);
903    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
904    cur_restart_phase = procMsg->cur_restart_phase;
905    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d\n", CkMyPe(), cur_restart_phase);
906    cur_restart_phase ++;
907    CpvAccess(_qd)->flushStates();
908
909    // restore readonly, mainchare, group, nodegroup
910 //   int temp = cur_restart_phase;
911 //   cur_restart_phase = -1;
912    PUP::fromMem p(procMsg->packData);
913    _handleProcData(p);
914
915    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
916    // gzheng
917    CKLOCMGR_LOOP(mgr->startInserting(););
918
919    char reqmsg[CmiMsgHeaderSizeBytes+sizeof(int)];
920    *(int *)(&reqmsg[CmiMsgHeaderSizeBytes]) = CkMyPe();
921    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
922    CmiSyncBroadcastAll(CmiMsgHeaderSizeBytes+sizeof(int), (char *)&reqmsg);
923    CmiFree(msg);
924
925    _initDone();
926 #endif
927 }
928
929 // called on its backup processor
930 // get backup message buffer and sent to crashed processor
931 static void askProcDataHandler(char *msg)
932 {
933 #if CMK_MEM_CHECKPOINT
934     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
935     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d.\n",CmiMyPe(),diePe, cur_restart_phase);
936     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
937     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
938     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
939
940     CpvAccess(procChkptBuf)->cur_restart_phase = cur_restart_phase;
941
942     CkPackMessage(&env);
943     CmiSetHandler(env, recoverProcDataHandlerIdx);
944     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
945     CpvAccess(procChkptBuf) = NULL;
946 #endif
947 }
948
949 void CkMemRestart(const char *dummy, CkArgMsg *args)
950 {
951 #if CMK_MEM_CHECKPOINT
952    _diePE = CkMyPe();
953    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d \n",CmiMyPe(), cur_restart_phase);
954    CkMemCheckPT::startTime = CmiWallTimer();
955    CkMemCheckPT::inRestarting = 1;
956    char msg[CmiMsgHeaderSizeBytes+sizeof(int)];
957    *(int *)(&msg[CmiMsgHeaderSizeBytes]) = CkMyPe();
958    cur_restart_phase = 9999;             // big enough to get it processed
959    CmiSetHandler(msg, askProcDataHandlerIdx);
960    int pe = ChkptOnPe();
961    CmiSyncSend(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)&msg);
962    cur_restart_phase=0;    // allow all message to come in
963 #else
964    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
965 #endif
966 }
967
968 // can be called in other files
969 // return true if it is in restarting
970 int CkInRestarting()
971 {
972 #if CMK_MEM_CHECKPOINT
973   // gzheng
974   if (cur_restart_phase == 9999 || cur_restart_phase == 0) return 1;
975   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
976   return CkMemCheckPT::inRestarting;
977 #else
978   return 0;
979 #endif
980 }
981
982 /*****************************************************************************
983                 module initialization
984 *****************************************************************************/
985
986 static int arg_where = CkCheckPoint_inMEM;
987
988 #if CMK_MEM_CHECKPOINT
989 void init_memcheckpt(char **argv)
990 {
991     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
992       arg_where = CkCheckPoint_inDISK;
993     }
994 }
995 #endif
996
997 class CkMemCheckPTInit: public Chare {
998 public:
999   CkMemCheckPTInit(CkArgMsg *m) {
1000 #if CMK_MEM_CHECKPOINT
1001     if (arg_where == CkCheckPoint_inDISK) {
1002       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1003     }
1004     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1005     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1006 #endif
1007   }
1008 };
1009
1010 // initproc
1011 void CkRegisterRestartHandler( )
1012 {
1013 #if CMK_MEM_CHECKPOINT
1014   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1015   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1016   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1017   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1018
1019   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1020   CpvAccess(procChkptBuf) = NULL;
1021
1022 #if 1
1023   // for debugging
1024   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1025 //  sleep(4);
1026 #endif
1027 #endif
1028 }
1029
1030 /// @todo: the following definitions should be moved to a separate file containing
1031 // structures and functions about fault tolerance strategies
1032
1033 /**
1034  *  * @brief: function for killing a process                                             
1035  *   */
1036 #ifdef CMK_MEM_CHECKPOINT
1037 #if CMK_HAS_GETPID
1038 void killLocal(void *_dummy,double curWallTime){
1039         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1040         if(CmiWallTimer()<killTime-1){
1041                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1042         }else{  
1043                 kill(getpid(),SIGKILL);                                               
1044         }              
1045
1046 #else
1047 void killLocal(void *_dummy,double curWallTime){
1048   CmiAbort("kill() not supported!");
1049 }
1050 #endif
1051 #endif
1052
1053 #ifdef CMK_MEM_CHECKPOINT
1054 /**
1055  * @brief: reads the file with the kill information
1056  */
1057 void readKillFile(){
1058         FILE *fp=fopen(killFile,"r");
1059         if(!fp){
1060                 return;
1061         }
1062         int proc;
1063         double sec;
1064         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1065                 if(proc == CkMyPe()){
1066                         killTime = CmiWallTimer()+sec;
1067                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1068                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1069                 }
1070         }
1071         fclose(fp);
1072 }
1073 #endif
1074
1075 #include "CkMemCheckpoint.def.h"
1076
1077