demonstrate inmem checkpoint restart on MPI layer:
[charm.git] / src / ck-core / ckmemcheckpoint.C
1
2 /*
3 Charm++ support for fault tolerance of
4 In memory synchronous checkpointing and restart
5
6 written by Gengbin Zheng, gzheng@uiuc.edu
7            Lixia Shi,     lixiashi@uiuc.edu
8
9 added 12/18/03:
10
11 To support fault tolerance while allowing migration, it uses double
12 checkpointing scheme for each array element (not a infallible scheme).
13 In this version, checkpointing is done based on array elements. 
14 Each array element individully sends its checkpoint data to two buddies.
15
16 In this implementation, assume only one failure happens at a time,
17 or two failures on two processors which are not buddy to each other;
18 also assume there is no failure during a checkpointing or restarting phase.
19
20 Restart phase contains two steps:
21 1. Converse level restart: the newly created process for the failed
22    processor recover its system data (no array elements) from 
23    its backup processor.
24 2. Charm++ level restart: CkMemCheckPT gets control and recover array 
25    elements and reset all states of system groups to be consistent.
26
27 added 3/14/04:
28 1. also support for double in-disk checkpoint/restart
29    set "where" to CkCheckPoint_inDISK/CkCheckPoint_inMEM in init()
30
31 added 4/16/04:
32 1. also support the case when there is a pool of extra processors.
33    set CK_NO_PROC_POOL to 0.
34
35 TODO:
36 1. checkpoint scheme can be reimplemented based on per processor scheme;
37  restart phase should restore/reset group table, etc on all processors, thus flushStates() can be eliminated.
38 2. crash at checkpointing phase currently is catastrophic, can be fixed by storing another transient checkpoints.
39
40 */
41
42 #include "unistd.h"
43
44 #include "charm++.h"
45 #include "ck.h"
46 #include "register.h"
47 #include "conv-ccs.h"
48 #include <signal.h>
49
50 void noopck(const char*, ...)
51 {}
52
53
54 //#define DEBUGF      // CkPrintf
55 #define DEBUGF noopck
56
57
58 // pick buddy processor from a different physical node
59 #define NODE_CHECKPOINT                        0
60
61 // assume NO extra processors--1
62 // assume extra processors--0
63 #if CMK_CONVERSE_MPI
64 #define CK_NO_PROC_POOL                         0
65 #else
66 #define CK_NO_PROC_POOL                         1
67 #endif
68
69 #define STREAMING_INFORMHOME                    1
70 CpvDeclare(int, _crashedNode);
71
72 // static, so that it is accessible from Converse part
73 int CkMemCheckPT::inRestarting = 0;
74 double CkMemCheckPT::startTime;
75 char *CkMemCheckPT::stage;
76 CkCallback CkMemCheckPT::cpCallback;
77
78 int _memChkptOn = 1;                    // checkpoint is on or off
79
80 CkGroupID ckCheckPTGroupID;             // readonly
81
82
83 /// @todo the following declarations should be moved into a separate file for all 
84 // fault tolerant strategies
85
86 #ifdef CMK_MEM_CHECKPOINT
87 // name of the kill file that contains processes to be killed 
88 char *killFile;                                               
89 // flag for the kill file         
90 int killFlag=0;
91 // variable for storing the killing time
92 double killTime=0.0;
93 #endif
94
95 /// checkpoint buffer for processor system data, remove static to make icpc 10.1 pass with -O
96 CpvDeclare(CkProcCheckPTMessage*, procChkptBuf);
97
98 // compute the backup processor
99 // FIXME: avoid crashed processors
100 inline int ChkptOnPe(int pe) { return (pe+CmiMyNodeSize())%CkNumPes(); }
101
102 inline int CkMemCheckPT::BuddyPE(int pe)
103 {
104   int budpe;
105 #if NODE_CHECKPOINT
106     // buddy is the processor with same rank on the next physical node
107   int r1 = CmiPhysicalRank(pe);
108   int budnode = CmiPhysicalNodeID(pe);
109   do {
110     budnode = (budnode+1)%CmiNumPhysicalNodes();
111     int *pelist;
112     int num;
113     CmiGetPesOnPhysicalNode(budnode, &pelist, &num);
114     budpe = pelist[r1 % num];
115   } while (isFailed(budpe));
116   if (budpe == pe) {
117     CmiPrintf("[%d] Error: failed to find a buddy processor on a different node.\n", pe);
118     CmiAbort("Failed to find a buddy processor");
119   }
120 #else
121   budpe = pe;
122   while (budpe == pe || isFailed(budpe)) 
123           budpe = (budpe+1)%CkNumPes();
124 #endif
125   return budpe;
126 }
127
128 // called in array element constructor
129 // choose and register with 2 buddies for checkpoiting 
130 #if CMK_MEM_CHECKPOINT
131 void ArrayElement::init_checkpt() {
132         if (_memChkptOn == 0) return;
133         if (CkInRestarting()) {
134           CkPrintf("[%d] Warning: init_checkpt called during restart, possible bug in migration constructor!\n");
135         }
136         // only master init checkpoint
137         if (thisArray->getLocMgr()->firstManager->mgr!=thisArray) return;
138
139         budPEs[0] = CkMyPe();
140         budPEs[1] = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->BuddyPE(CkMyPe());
141         CmiAssert(budPEs[0] != budPEs[1]);
142         // inform checkPTMgr
143         CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
144         //CmiPrintf("[%d] ArrayElement::init_checkpt array %d %p pe: %d %d\n", CkMyPe(), ((CkGroupID)thisArrayID).idx, this, budPEs[0], budPEs[1]);
145         checkptMgr[budPEs[0]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[1]);        
146         checkptMgr[budPEs[1]].createEntry(thisArrayID, thisArray->getLocMgr()->getGroupID(), thisIndexMax, budPEs[0]);
147 }
148 #endif
149
150 // entry function invoked by checkpoint mgr asking for checkpoint data
151 void ArrayElement::inmem_checkpoint(CkArrayCheckPTReqMessage *m) {
152 #if CMK_MEM_CHECKPOINT
153   //DEBUGF("[p%d] HERE checkpoint to %d %d \n", CkMyPe(), budPEs[0], budPEs[1]);
154 //char index[128];   thisIndexMax.sprint(index);
155 //printf("[%d] checkpointing %s\n", CkMyPe(), index);
156   CkLocMgr *locMgr = thisArray->getLocMgr();
157   CmiAssert(myRec!=NULL);
158   int size;
159   {
160         PUP::sizer p;
161         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
162         size = p.size();
163   }
164   int packSize = size/sizeof(double) +1;
165   CkArrayCheckPTMessage *msg =
166                  new (packSize, 0) CkArrayCheckPTMessage;
167   msg->len = size;
168   msg->index =thisIndexMax;
169   msg->aid = thisArrayID;
170   msg->locMgr = locMgr->getGroupID();
171   msg->cp_flag = 1;
172   {
173         PUP::toMem p(msg->packData);
174         locMgr->pupElementsFor (p, myRec, CkElementCreation_migrate);
175   }
176
177   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
178   checkptMgr.recvData(msg, 2, budPEs);
179   delete m;
180 #endif
181 }
182
183 // checkpoint holder class - for memory checkpointing
184 class CkMemCheckPTInfo: public CkCheckPTInfo
185 {
186   CkArrayCheckPTMessage *ckBuffer;
187 public:
188   CkMemCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno): 
189                     CkCheckPTInfo(a, loc, idx, pno)
190   {
191     ckBuffer = NULL;
192   }
193   ~CkMemCheckPTInfo() 
194   {
195     if (ckBuffer) delete ckBuffer; 
196   }
197   inline void updateBuffer(CkArrayCheckPTMessage *data) 
198   {
199     CmiAssert(data!=NULL);
200     if (ckBuffer) delete ckBuffer;
201     ckBuffer = data;
202   }    
203   inline CkArrayCheckPTMessage * getCopy()
204   {
205     if (ckBuffer == NULL) {
206       CmiPrintf("[%d] recoverArrayElements: element does not have checkpoint data.", CkMyPe());
207       CmiAbort("Abort!");
208     }
209     return (CkArrayCheckPTMessage *)CkCopyMsg((void **)&ckBuffer);
210   }     
211   inline void updateBuddy(int b1, int b2) {
212      CmiAssert(ckBuffer);
213      ckBuffer->bud1 = b1; ckBuffer->bud2 = b2;
214      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
215      CmiAssert(pNo != CkMyPe());
216   }
217   inline int getSize() { 
218      CmiAssert(ckBuffer);
219      return ckBuffer->len; 
220   }
221 };
222
223 // checkpoint holder class - for in-disk checkpointing
224 class CkDiskCheckPTInfo: public CkCheckPTInfo 
225 {
226   char *fname;
227   int bud1, bud2;
228   int len;                      // checkpoint size
229 public:
230   CkDiskCheckPTInfo(CkArrayID a, CkGroupID loc, CkArrayIndex idx, int pno, int myidx): CkCheckPTInfo(a, loc, idx, pno)
231   {
232 #if CMK_USE_MKSTEMP
233     fname = new char[64];
234     sprintf(fname, "/tmp/ckpt%d-%d-XXXXXX", CkMyPe(), myidx);
235     mkstemp(fname);
236 #else
237     fname=tmpnam(NULL);
238 #endif
239     bud1 = bud2 = -1;
240     len = 0;
241   }
242   ~CkDiskCheckPTInfo() 
243   {
244     remove(fname);
245   }
246   inline void updateBuffer(CkArrayCheckPTMessage *data) 
247   {
248     double t = CmiWallTimer();
249     // unpack it
250     envelope *env = UsrToEnv(data);
251     CkUnpackMessage(&env);
252     data = (CkArrayCheckPTMessage *)EnvToUsr(env);
253     FILE *f = fopen(fname,"wb");
254     PUP::toDisk p(f);
255     CkPupMessage(p, (void **)&data);
256     // delay sync to the end because otherwise the messages are blocked
257 //    fsync(fileno(f));
258     fclose(f);
259     bud1 = data->bud1;
260     bud2 = data->bud2;
261     len = data->len;
262     delete data;
263     //CmiPrintf("[%d] updateBuffer took %f seconds. \n", CkMyPe(), CmiWallTimer()-t);
264   }
265   inline CkArrayCheckPTMessage * getCopy()      // get a copy of checkpoint
266   {
267     CkArrayCheckPTMessage *data;
268     FILE *f = fopen(fname,"rb");
269     PUP::fromDisk p(f);
270     CkPupMessage(p, (void **)&data);
271     fclose(f);
272     data->bud1 = bud1;                          // update the buddies
273     data->bud2 = bud2;
274     return data;
275   }
276   inline void updateBuddy(int b1, int b2) {
277      bud1 = b1; bud2 = b2;
278      pNo = b1;  if (pNo == CkMyPe()) pNo = b2;
279      CmiAssert(pNo != CkMyPe());
280   }
281   inline int getSize() { 
282      return len; 
283   }
284 };
285
286 CkMemCheckPT::CkMemCheckPT(int w)
287 {
288   int numnodes = 0;
289 #if NODE_CHECKPOINT
290   numnodes = CmiNumPhysicalNodes();
291 #else
292   numnodes = CkNumPes();
293 #endif
294 #if CK_NO_PROC_POOL
295   if (numnodes <= 2)
296 #else
297   if (numnodes  == 1)
298 #endif
299   {
300     if (CkMyPe() == 0)  CkPrintf("Warning: CkMemCheckPT is disabled due to too few nodes.\n");
301     _memChkptOn = 0;
302   }
303   inRestarting = 0;
304   recvCount = peCount = 0;
305   ackCount = 0;
306   expectCount = -1;
307   where = w;
308
309 #if CMK_CONVERSE_MPI
310   void pingBuddy();
311   void pingCheckHandler();
312   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
313   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
314 #endif
315 }
316
317 CkMemCheckPT::~CkMemCheckPT()
318 {
319   int len = ckTable.length();
320   for (int i=0; i<len; i++) {
321     delete ckTable[i];
322   }
323 }
324
325 void CkMemCheckPT::pup(PUP::er& p) 
326
327   CBase_CkMemCheckPT::pup(p); 
328   p|cpStarter;
329   p|thisFailedPe;
330   p|failedPes;
331   p|ckCheckPTGroupID;           // recover global variable
332   p|cpCallback;                 // store callback
333   p|where;                      // where to checkpoint
334   if (p.isUnpacking()) {
335     recvCount = peCount = 0;
336 #if CMK_CONVERSE_MPI
337     void pingBuddy();
338     void pingCheckHandler();
339     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
340     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
341 #endif
342   }
343 }
344
345 // called by checkpoint mgr to restore an array element
346 void CkMemCheckPT::inmem_restore(CkArrayCheckPTMessage *m) 
347 {
348 #if CMK_MEM_CHECKPOINT
349   DEBUGF("[%d] inmem_restore restore: mgr: %d \n", CmiMyPe(), m->locMgr);  
350   // m->index.print();
351   PUP::fromMem p(m->packData);
352   CkLocMgr *mgr = CProxy_CkLocMgr(m->locMgr).ckLocalBranch();
353   CmiAssert(mgr);
354 #if STREAMING_INFORMHOME
355   mgr->resume(m->index, p, CmiFalse);     // optimize notifyHome
356 #else
357   mgr->resume(m->index, p, CmiTrue);     // optimize notifyHome
358 #endif
359
360   // find a list of array elements bound together
361   ArrayElement *elt = (ArrayElement *)mgr->lookup(m->index, m->aid);
362   CmiAssert(elt);
363   CkLocRec_local *rec = elt->myRec;
364   CkVec<CkMigratable *> list;
365   mgr->migratableList(rec, list);
366   CmiAssert(list.length() > 0);
367   for (int l=0; l<list.length(); l++) {
368     elt = (ArrayElement *)list[l];
369     elt->budPEs[0] = m->bud1;
370     elt->budPEs[1] = m->bud2;
371     //    reset, may not needed now
372     // for now.
373     for (int i=0; i<CK_ARRAYLISTENER_MAXLEN; i++) {
374       contributorInfo *c=(contributorInfo *)&elt->listenerData[i];
375       if (c) c->redNo = 0;
376     }
377   }
378 #endif
379 }
380
381 // return 1 if pe is a crashed processor
382 int CkMemCheckPT::isFailed(int pe)
383 {
384   for (int i=0; i<failedPes.length(); i++)
385     if (failedPes[i] == pe) return 1;
386   return 0;
387 }
388
389 // add pe into history list of all failed processors
390 void CkMemCheckPT::failed(int pe)
391 {
392   if (isFailed(pe)) return;
393   failedPes.push_back(pe);
394 }
395
396 int CkMemCheckPT::totalFailed()
397 {
398   return failedPes.length();
399 }
400
401 // create an checkpoint entry for array element of aid with index.
402 void CkMemCheckPT::createEntry(CkArrayID aid, CkGroupID loc, CkArrayIndex index, int buddy)
403 {
404   // error check, no duplicate
405   int idx, len = ckTable.size();
406   for (idx=0; idx<len; idx++) {
407     CkCheckPTInfo *entry = ckTable[idx];
408     if (index == entry->index) {
409       if (loc == entry->locMgr) {
410           // bindTo array elements
411           return;
412       }
413         // for array inheritance, the following check may fail
414         // because ArrayElement constructor of all superclasses are called
415       if (aid == entry->aid) {
416         CkPrintf("[%d] CkMemCheckPT::createEntry a duplciated entry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
417         CmiAbort("CkMemCheckPT::createEntry a duplciated entry");
418       }
419     }
420   }
421   CkCheckPTInfo *newEntry;
422   if (where == CkCheckPoint_inMEM)
423     newEntry = new CkMemCheckPTInfo(aid, loc, index, buddy);
424   else
425     newEntry = new CkDiskCheckPTInfo(aid, loc, index, buddy, len+1);
426   ckTable.push_back(newEntry);
427   //CkPrintf("[%d] CkMemCheckPT::createEntry for arrayID %d:", CkMyPe(), ((CkGroupID)aid).idx); index.print(); CkPrintf("\n");
428 }
429
430 void CkMemCheckPT::recoverEntry(CkArrayCheckPTMessage *msg)
431 {
432   int buddy = msg->bud1;
433   if (buddy == CkMyPe()) buddy = msg->bud2;
434   createEntry(msg->aid, msg->locMgr, msg->index, buddy);
435   recvData(msg);
436     // ack
437   thisProxy[buddy].gotData();
438 }
439
440 // loop through my checkpoint table and ask checkpointed array elements
441 // to send me checkpoint data.
442 void CkMemCheckPT::doItNow(int starter, CkCallback &cb)
443 {
444   cpCallback = cb;
445   cpStarter = starter;
446   if (CkMyPe() == cpStarter) {
447     startTime = CmiWallTimer();
448     CkPrintf("[%d] Start checkpointing  starter: %d... \n", CkMyPe(), cpStarter);
449   }
450
451   int len = ckTable.length();
452   for (int i=0; i<len; i++) {
453     CkCheckPTInfo *entry = ckTable[i];
454       // always let the bigger number processor send request
455     //if (CkMyPe() < entry->pNo) continue;
456       // always let the smaller number processor send request, may on same proc
457     if (!isMaster(entry->pNo)) continue;
458       // call inmem_checkpoint to the array element, ask it to send
459       // back checkpoint data via recvData().
460     CkArrayCheckPTReqMessage *msg = new CkArrayCheckPTReqMessage;
461     CkSendMsgArray(CkIndex_ArrayElement::inmem_checkpoint(NULL),(CkArrayMessage *)msg,entry->aid,entry->index);
462   }
463     // if my table is empty, then I am done
464   if (len == 0) thisProxy[cpStarter].cpFinish();
465
466   // pack and send proc level data
467   sendProcData();
468 }
469
470 // don't handle array elements
471 static inline void _handleProcData(PUP::er &p)
472 {
473     // save readonlys, and callback BTW
474     CkPupROData(p);
475
476     // save mainchares 
477     if(CkMyPe()==0) CkPupMainChareData(p, (CkArgMsg*)NULL);
478         
479 #ifndef CMK_CHARE_USE_PTR
480     // save non-migratable chare
481     CkPupChareData(p);
482 #endif
483
484     // save groups into Groups.dat
485     CkPupGroupData(p);
486
487     // save nodegroups into NodeGroups.dat
488     if(CkMyRank()==0) CkPupNodeGroupData(p);
489 }
490
491 void CkMemCheckPT::sendProcData()
492 {
493   // find out size of buffer
494   int size;
495   {
496     PUP::sizer p;
497     _handleProcData(p);
498     size = p.size();
499   }
500   int packSize = size;
501   CkProcCheckPTMessage *msg = new (packSize, 0) CkProcCheckPTMessage;
502   DEBUGF("[%d] CkMemCheckPT::sendProcData - size: %d\n", CkMyPe(), size);
503   {
504     PUP::toMem p(msg->packData);
505     _handleProcData(p);
506   }
507   msg->pe = CkMyPe();
508   msg->len = size;
509   msg->reportPe = cpStarter;  //in case other processor isn't in checkpoint mode
510   thisProxy[ChkptOnPe(CkMyPe())].recvProcData(msg);
511 }
512
513 void CkMemCheckPT::recvProcData(CkProcCheckPTMessage *msg)
514 {
515   if (CpvAccess(procChkptBuf)) delete CpvAccess(procChkptBuf);
516   CpvAccess(procChkptBuf) = msg;
517   DEBUGF("[%d] CkMemCheckPT::recvProcData report to %d\n", CkMyPe(), msg->reportPe);
518   thisProxy[msg->reportPe].cpFinish();
519 }
520
521 // ArrayElement call this function to give us the checkpointed data
522 void CkMemCheckPT::recvData(CkArrayCheckPTMessage *msg)
523 {
524   int len = ckTable.length();
525   int idx;
526   for (idx=0; idx<len; idx++) {
527     CkCheckPTInfo *entry = ckTable[idx];
528     if (msg->locMgr == entry->locMgr && msg->index == entry->index) break;
529   }
530   CkAssert(idx < len);
531   int isChkpting = msg->cp_flag;
532   ckTable[idx]->updateBuffer(msg);
533   if (isChkpting) {
534       // all my array elements have returned their inmem data
535       // inform starter processor that I am done.
536     recvCount ++;
537     if (recvCount == ckTable.length()) {
538       if (where == CkCheckPoint_inMEM) {
539         thisProxy[cpStarter].cpFinish();
540       }
541       else if (where == CkCheckPoint_inDISK) {
542         // another barrier for finalize the writing using fsync
543         CkCallback localcb(CkIndex_CkMemCheckPT::syncFiles(NULL),thisgroup);
544         contribute(0,NULL,CkReduction::sum_int,localcb);
545       }
546       else
547         CmiAbort("Unknown checkpoint scheme");
548       recvCount = 0;
549     } 
550   }
551 }
552
553 // only used in disk checkpointing
554 void CkMemCheckPT::syncFiles(CkReductionMsg *m)
555 {
556   delete m;
557 #if CMK_HAS_SYNC && ! CMK_DISABLE_SYNC
558   system("sync");
559 #endif
560   thisProxy[cpStarter].cpFinish();
561 }
562
563 // only is called on cpStarter when checkpoint is done
564 void CkMemCheckPT::cpFinish()
565 {
566   CmiAssert(CkMyPe() == cpStarter);
567   peCount++;
568     // now that all processors have finished, activate callback
569   if (peCount == 2*(CkNumPes())) {
570     CmiPrintf("[%d] Checkpoint finished in %f seconds, sending callback ... \n", CkMyPe(), CmiWallTimer()-startTime);
571     cpCallback.send();
572     peCount = 0;
573     thisProxy.report();
574   }
575 }
576
577 // for debugging, report checkpoint info
578 void CkMemCheckPT::report()
579 {
580   int objsize = 0;
581   int len = ckTable.length();
582   for (int i=0; i<len; i++) {
583     CkCheckPTInfo *entry = ckTable[i];
584     CmiAssert(entry);
585     objsize += entry->getSize();
586   }
587   CmiAssert(CpvAccess(procChkptBuf));
588   CkPrintf("[%d] Checkpoint object size: %d len: %d Processor data: %d \n", CkMyPe(), objsize, len, CpvAccess(procChkptBuf)->len);
589 }
590
591 /*****************************************************************************
592                         RESTART Procedure
593 *****************************************************************************/
594
595 // master processor of two buddies
596 inline int CkMemCheckPT::isMaster(int buddype)
597 {
598 #if 0
599   int mype = CkMyPe();
600 //CkPrintf("ismaster: %d %d\n", pe, mype);
601   if (CkNumPes() - totalFailed() == 2) {
602     return mype > buddype;
603   }
604   for (int i=1; i<CkNumPes(); i++) {
605     int me = (buddype+i)%CkNumPes();
606     if (isFailed(me)) continue;
607     if (me == mype) return 1;
608     else return 0;
609   }
610   return 0;
611 #else
612     // smaller one
613   int mype = CkMyPe();
614 //CkPrintf("ismaster: %d %d\n", pe, mype);
615   if (CkNumPes() - totalFailed() == 2) {
616     return mype < buddype;
617   }
618   for (int i=1; i<CkNumPes(); i++) {
619     int me = (mype+i)%CkNumPes();
620     if (isFailed(me)) continue;
621     if (me == buddype) return 1;
622     else return 0;
623   }
624   return 0;
625 #endif
626 }
627
628 #ifdef CKLOCMGR_LOOP
629 #undef CKLOCMGR_LOOP
630 #endif
631
632 // loop over all CkLocMgr and do "code"
633 #define  CKLOCMGR_LOOP(code)    {       \
634   int numGroups = CkpvAccess(_groupIDTable)->size();    \
635   for(int i=0;i<numGroups;i++) {        \
636     IrrGroup *obj = CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();    \
637     if(obj->isLocMgr())  {      \
638       CkLocMgr *mgr = (CkLocMgr*)obj;   \
639       code      \
640     }   \
641   }     \
642  }
643
644 #if 0
645 // helper class to pup all elements that belong to same ckLocMgr
646 class ElementDestoryer : public CkLocIterator {
647 private:
648         CkLocMgr *locMgr;
649 public:
650         ElementDestoryer(CkLocMgr* mgr_):locMgr(mgr_){};
651         void addLocation(CkLocation &loc) {
652                 CkArrayIndex idx=loc.getIndex();
653                 CkPrintf("[%d] destroy: ", CkMyPe()); idx.print();
654                 loc.destroy();
655         }
656 };
657 #endif
658
659 // restore the bitmap vector for LB
660 void CkMemCheckPT::resetLB(int diepe)
661 {
662 #if CMK_LBDB_ON
663   int i;
664   char *bitmap = new char[CkNumPes()];
665   // set processor available bitmap
666   get_avail_vector(bitmap);
667
668   for (i=0; i<failedPes.length(); i++)
669     bitmap[failedPes[i]] = 0; 
670   bitmap[diepe] = 0;
671
672 #if CK_NO_PROC_POOL
673   set_avail_vector(bitmap);
674 #endif
675
676   // if I am the crashed pe, rebuild my failedPEs array
677   if (CkMyPe() == diepe)
678     for (i=0; i<CkNumPes(); i++) 
679       if (bitmap[i]==0) failed(i);
680
681   delete [] bitmap;
682 #endif
683 }
684
685 // in case when failedPe dies, everybody go through its checkpoint table:
686 // destory all array elements
687 // recover lost buddies
688 // reconstruct all array elements from check point data
689 // called on all processors
690 void CkMemCheckPT::restart(int diePe)
691 {
692 #if CMK_MEM_CHECKPOINT
693   double curTime = CmiWallTimer();
694   if (CkMyPe() == diePe)
695     CkPrintf("[%d] Process data restored in %f seconds\n", CkMyPe(), curTime - startTime);
696   stage = (char*)"resetLB";
697   startTime = curTime;
698   if (CkMyPe() == diePe)
699     CkPrintf("[%d] CkMemCheckPT ----- restart.\n",CkMyPe());
700
701 #if CK_NO_PROC_POOL
702   failed(diePe);        // add into the list of failed pes
703 #endif
704   thisFailedPe = diePe;
705
706   if (CkMyPe() == diePe) CmiAssert(ckTable.length() == 0);
707
708   inRestarting = 1;
709                                                                                 
710   // disable load balancer's barrier
711   if (CkMyPe() != diePe) resetLB(diePe);
712
713   CKLOCMGR_LOOP(mgr->startInserting(););
714
715   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
716 /*
717   if (CkMyPe() == 0)
718     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::removeArrayElements(), thisProxy));
719 */
720 #endif
721 }
722
723 // loally remove all array elements
724 void CkMemCheckPT::removeArrayElements()
725 {
726 #if CMK_MEM_CHECKPOINT
727   int len = ckTable.length();
728   double curTime = CmiWallTimer();
729   if (CkMyPe() == thisFailedPe) 
730     CkPrintf("[%d] CkMemCheckPT ----- %s len:%d in %f seconds.\n",CkMyPe(),stage,len,curTime-startTime);
731   stage = (char*)"removeArrayElements";
732   startTime = curTime;
733
734   if (cpCallback.isInvalid()) CkAbort("Didn't set restart callback\n");;
735   if (CkMyPe()==thisFailedPe) CmiAssert(len == 0);
736
737   // get rid of all buffering and remote recs
738   // including destorying all array elements
739   CKLOCMGR_LOOP(mgr->flushAllRecs(););
740
741 //  CKLOCMGR_LOOP(ElementDestoryer chk(mgr); mgr->iterate(chk););
742
743   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::resetReductionMgr(), thisProxy));
744 #endif
745 }
746
747 // flush state in reduction manager
748 void CkMemCheckPT::resetReductionMgr()
749 {
750   //CkPrintf("[%d] CkMemCheckPT ----- resetReductionMgr\n",CkMyPe());
751   int numGroups = CkpvAccess(_groupIDTable)->size();
752   for(int i=0;i<numGroups;i++) {
753     CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
754     IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
755     obj->flushStates();
756     obj->ckJustMigrated();
757   }
758   // reset again
759   //CpvAccess(_qd)->flushStates();
760
761 #if 1
762   thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
763 #else
764   if (CkMyPe() == 0)
765     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverBuddies(), thisProxy));
766 #endif
767 }
768
769 // recover the lost buddies
770 void CkMemCheckPT::recoverBuddies()
771 {
772   int idx;
773   int len = ckTable.length();
774   // ready to flush reduction manager
775   // cannot be CkMemCheckPT::restart because destory will modify states
776   double curTime = CmiWallTimer();
777   if (CkMyPe() == thisFailedPe)
778   CkPrintf("[%d] CkMemCheckPT ----- %s  in %f seconds\n",CkMyPe(), stage, curTime-startTime);
779   stage = (char *)"recoverBuddies";
780   if (CkMyPe() == thisFailedPe)
781   CkPrintf("[%d] CkMemCheckPT ----- %s  starts at %f\n",CkMyPe(), stage, curTime);
782   startTime = curTime;
783
784   // recover buddies
785   expectCount = 0;
786   for (idx=0; idx<len; idx++) {
787     CkCheckPTInfo *entry = ckTable[idx];
788     if (entry->pNo == thisFailedPe) {
789 #if CK_NO_PROC_POOL
790       // find a new buddy
791 /*
792       int budPe = CkMyPe();
793 //      while (budPe == CkMyPe() || isFailed(budPe)) budPe = CrnRand()%CkNumPes();
794       while (budPe == CkMyPe() || isFailed(budPe)) 
795           budPe = (budPe+1)%CkNumPes();
796 */
797       int budPe = BuddyPE(CkMyPe());
798       //entry->pNo = budPe;
799 #else
800       int budPe = thisFailedPe;
801 #endif
802       entry->updateBuddy(CkMyPe(), budPe);
803 #if 0
804       thisProxy[budPe].createEntry(entry->aid, entry->locMgr, entry->index, CkMyPe());
805       CkArrayCheckPTMessage *msg = entry->getCopy();
806       msg->cp_flag = 0;            // not checkpointing
807       thisProxy[budPe].recvData(msg);
808 #else
809       CkArrayCheckPTMessage *msg = entry->getCopy();
810       msg->bud1 = budPe;
811       msg->bud2 = CkMyPe();
812       msg->cp_flag = 0;            // not checkpointing
813       thisProxy[budPe].recoverEntry(msg);
814 #endif
815       expectCount ++;
816     }
817   }
818
819 #if 1
820   if (expectCount == 0) {
821     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
822   }
823 #else
824   if (CkMyPe() == 0) {
825     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
826   }
827 #endif
828
829   //CkPrintf("[%d] CkMemCheckPT ----- recoverBuddies done  in %f seconds\n",CkMyPe(), curTime-startTime);
830 }
831
832 void CkMemCheckPT::gotData()
833 {
834   ackCount ++;
835   if (ackCount == expectCount) {
836     ackCount = 0;
837     expectCount = -1;
838     thisProxy[0].quiescence(CkCallback(CkIndex_CkMemCheckPT::recoverArrayElements(), thisProxy));
839   }
840 }
841
842 void CkMemCheckPT::updateLocations(int n, CkGroupID *g, CkArrayIndex *idx,int nowOnPe)
843 {
844   for (int i=0; i<n; i++) {
845     CkLocMgr *mgr = CProxy_CkLocMgr(g[i]).ckLocalBranch();
846     mgr->updateLocation(idx[i], nowOnPe);
847   }
848 }
849
850 // restore array elements
851 void CkMemCheckPT::recoverArrayElements()
852 {
853   double curTime = CmiWallTimer();
854   int len = ckTable.length();
855   CkPrintf("[%d] CkMemCheckPT ----- %s len: %d in %f seconds \n",CkMyPe(), stage, len, curTime-startTime);
856   stage = (char *)"recoverArrayElements";
857   if (CkMyPe() == thisFailedPe)
858   CkPrintf("[%d] CkMemCheckPT ----- %s starts at %f \n",CkMyPe(), stage, curTime);
859   startTime = curTime;
860
861   // recover all array elements
862   int count = 0;
863 #if STREAMING_INFORMHOME
864   CkVec<CkGroupID> * gmap = new CkVec<CkGroupID>[CkNumPes()];
865   CkVec<CkArrayIndex> * imap = new CkVec<CkArrayIndex>[CkNumPes()];
866 #endif
867   for (int idx=0; idx<len; idx++)
868   {
869     CkCheckPTInfo *entry = ckTable[idx];
870 #if CK_NO_PROC_POOL
871     // the bigger one will do 
872 //    if (CkMyPe() < entry->pNo) continue;
873     if (!isMaster(entry->pNo)) continue;
874 #else
875     // smaller one do it, which has the original object
876     if (CkMyPe() == entry->pNo+1 || 
877         CkMyPe()+CkNumPes() == entry->pNo+1) continue;
878 #endif
879 //CkPrintf("[%d] restore idx:%d aid:%d loc:%d ", CkMyPe(), idx, (CkGroupID)(entry->aid), entry->locMgr); entry->index.print();
880
881     entry->updateBuddy(CkMyPe(), entry->pNo);
882     CkArrayCheckPTMessage *msg = entry->getCopy();
883     // gzheng
884     //thisProxy[CkMyPe()].inmem_restore(msg);
885     inmem_restore(msg);
886 #if STREAMING_INFORMHOME
887     CkLocMgr *mgr = CProxy_CkLocMgr(msg->locMgr).ckLocalBranch();
888     int homePe = mgr->homePe(msg->index);
889     if (homePe != CkMyPe()) {
890       gmap[homePe].push_back(msg->locMgr);
891       imap[homePe].push_back(msg->index);
892     }
893 #endif
894     count ++;
895   }
896 #if STREAMING_INFORMHOME
897   for (int i=0; i<CkNumPes(); i++) {
898     if (gmap[i].size() && i!=CkMyPe()) {
899       thisProxy[i].updateLocations(gmap[i].size(), gmap[i].getVec(), imap[i].getVec(), CkMyPe());
900     }
901   }
902   delete [] imap;
903   delete [] gmap;
904 #endif
905   DEBUGF("[%d] recoverArrayElements restore %d objects\n", CkMyPe(), count);
906
907   CKLOCMGR_LOOP(mgr->doneInserting(););
908
909   inRestarting = 0;
910  // _crashedNode = -1;
911 CpvAccess(_crashedNode) = -1;
912
913   if (CkMyPe() == 0)
914     CkStartQD(CkCallback(CkIndex_CkMemCheckPT::finishUp(), thisProxy));
915 }
916
917 static double restartT;
918
919 // on every processor
920 // turn load balancer back on
921 void CkMemCheckPT::finishUp()
922 {
923   //CkPrintf("[%d] CkMemCheckPT::finishUp\n", CkMyPe());
924   //CKLOCMGR_LOOP(mgr->doneInserting(););
925   
926   if (CkMyPe() == thisFailedPe)
927   {
928        CkPrintf("[%d] CkMemCheckPT ----- %s in %f seconds, callback triggered\n",CkMyPe(), stage, CmiWallTimer()-startTime);
929        //CkStartQD(cpCallback);
930        cpCallback.send();
931        CkPrintf("[%d] Restart finished in %f seconds at %f.\n", CkMyPe(), CkWallTimer()-restartT, CkWallTimer());
932   }
933
934 #if CK_NO_PROC_POOL
935 #if NODE_CHECKPOINT
936   int numnodes = CmiNumPhysicalNodes();
937 #else
938   int numnodes = CkNumPes();
939 #endif
940   if (numnodes-totalFailed() <=2) {
941     if (CkMyPe()==0) CkPrintf("Warning: CkMemCheckPT disabled!\n");
942     _memChkptOn = 0;
943   }
944 #endif
945 }
946
947 // called only on 0
948 void CkMemCheckPT::quiescence(CkCallback &cb)
949 {
950   static int pe_count = 0;
951   pe_count ++;
952   CmiAssert(CkMyPe() == 0);
953   //CkPrintf("quiescence %d %d\n", pe_count, CkNumPes());
954   if (pe_count == CkNumPes()) {
955     pe_count = 0;
956     cb.send();
957   }
958 }
959
960 // User callable function - to start a checkpoint
961 // callback cb is used to pass control back
962 void CkStartMemCheckpoint(CkCallback &cb)
963 {
964 #if CMK_MEM_CHECKPOINT
965   if (_memChkptOn == 0) {
966     CkPrintf("Warning: In-Memory checkpoint has been disabled! \n");
967     cb.send();
968     return;
969   }
970   if (CkInRestarting()) {
971       // trying to checkpointing during restart
972     cb.send();
973     return;
974   }
975     // store user callback and user data
976   CkMemCheckPT::cpCallback = cb;
977
978     // broadcast to start check pointing
979   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
980   checkptMgr.doItNow(CkMyPe(), cb);
981 #else
982   // when mem checkpoint is disabled, invike cb immediately
983   cb.send();
984 #endif
985 }
986
987 void CkRestartCheckPoint(int diePe)
988 {
989 CkPrintf("CkRestartCheckPoint  CkMemCheckPT GID:%d at time %f\n", ckCheckPTGroupID.idx, CkWallTimer());
990   CProxy_CkMemCheckPT checkptMgr(ckCheckPTGroupID);
991   // broadcast
992   checkptMgr.restart(diePe);
993 }
994
995 static int _diePE = -1;
996
997 // callback function used locally by ccs handler
998 static void CkRestartCheckPointCallback(void *ignore, void *msg)
999 {
1000 CkPrintf("[%d] CkRestartCheckPointCallback activated for diePe: %d at %f\n", CkMyPe(), _diePE, CkWallTimer());
1001   CkRestartCheckPoint(_diePE);
1002 }
1003
1004 // Converse function handles
1005 static int askPhaseHandlerIdx;
1006 static int recvPhaseHandlerIdx;
1007 static int askProcDataHandlerIdx;
1008 static int restartBcastHandlerIdx;
1009 static int recoverProcDataHandlerIdx;
1010 static int restartBeginHandlerIdx;
1011 static int notifyHandlerIdx;
1012
1013 // called on crashed PE
1014 static void restartBeginHandler(char *msg)
1015 {
1016 #if CMK_MEM_CHECKPOINT
1017   static int count = 0;
1018   CmiFree(msg);
1019   CmiAssert(CkMyPe() == _diePE);
1020   count ++;
1021   if (count == CkNumPes()) {
1022     CkRestartCheckPointCallback(NULL, NULL);
1023     count = 0;
1024   }
1025 #endif
1026 }
1027
1028 extern void _discard_charm_message();
1029 extern void _resume_charm_message();
1030
1031 static void restartBcastHandler(char *msg)
1032 {
1033 #if CMK_MEM_CHECKPOINT
1034   // advance phase counter
1035   CkMemCheckPT::inRestarting = 1;
1036   _diePE = *(int *)(msg+CmiMsgHeaderSizeBytes);
1037   // gzheng
1038   //if (CkMyPe() != _diePE) cur_restart_phase ++;
1039
1040   if (CkMyPe()==_diePE)
1041     CkPrintf("[%d] restartBcastHandler cur_restart_phase=%d _diePE:%d at %f.\n", CkMyPe(), CpvAccess(_curRestartPhase), _diePE, CkWallTimer());
1042
1043   // reset QD counters
1044 /*  gzheng
1045   if (CkMyPe() != _diePE) CpvAccess(_qd)->flushStates();
1046 */
1047
1048 /*  gzheng
1049   if (CkMyPe()==_diePE)
1050       CkRestartCheckPointCallback(NULL, NULL);
1051 */
1052   CmiFree(msg);
1053
1054   _resume_charm_message();
1055
1056     // reduction
1057   char *restartmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1058   CmiSetHandler(restartmsg, restartBeginHandlerIdx);
1059   CmiSyncSendAndFree(_diePE, CmiMsgHeaderSizeBytes, (char *)restartmsg);
1060 #endif
1061 }
1062
1063 extern void _initDone();
1064
1065 // called on crashed processor
1066 static void recoverProcDataHandler(char *msg)
1067 {
1068 #if CMK_MEM_CHECKPOINT
1069    int i;
1070    envelope *env = (envelope *)msg;
1071    CkUnpackMessage(&env);
1072    CkProcCheckPTMessage* procMsg = (CkProcCheckPTMessage *)(EnvToUsr(env));
1073    CpvAccess(_curRestartPhase) = procMsg->cur_restart_phase;
1074    CmiPrintf("[%d] ----- recoverProcDataHandler  cur_restart_phase:%d at time: %f\n", CkMyPe(), CpvAccess(_curRestartPhase), CkWallTimer());
1075    //cur_restart_phase ++;
1076      // gzheng ?
1077    //CpvAccess(_qd)->flushStates();
1078
1079    // restore readonly, mainchare, group, nodegroup
1080 //   int temp = cur_restart_phase;
1081 //   cur_restart_phase = -1;
1082    PUP::fromMem p(procMsg->packData);
1083    _handleProcData(p);
1084
1085    CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->resetLB(CkMyPe());
1086    // gzheng
1087    CKLOCMGR_LOOP(mgr->startInserting(););
1088
1089    char *reqmsg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1090    *(int *)(reqmsg+CmiMsgHeaderSizeBytes) = CkMyPe();
1091    CmiSetHandler(reqmsg, restartBcastHandlerIdx);
1092    CmiSyncBroadcastAllAndFree(CmiMsgHeaderSizeBytes+sizeof(int), (char *)reqmsg);
1093
1094    _initDone();
1095 //   CpvAccess(_qd)->flushStates();
1096    CmiPrintf("[%d] ----- recoverProcDataHandler  done at %f\n", CkMyPe(), CkWallTimer());
1097 #endif
1098 }
1099
1100 // called on its backup processor
1101 // get backup message buffer and sent to crashed processor
1102 static void askProcDataHandler(char *msg)
1103 {
1104 #if CMK_MEM_CHECKPOINT
1105     int diePe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1106     CkPrintf("[%d] restartBcastHandler called with '%d' cur_restart_phase:%d at time %f.\n",CmiMyPe(),diePe, CpvAccess(_curRestartPhase), CkWallTimer());
1107     if (CpvAccess(procChkptBuf) == NULL) 
1108       CkPrintf("[%d] no checkpoint found for processor %d. This could be due to a crash before the first checkpointing.\n", CkMyPe(), diePe);
1109     CmiAssert(CpvAccess(procChkptBuf)!=NULL);
1110     envelope *env = (envelope *)(UsrToEnv(CpvAccess(procChkptBuf)));
1111     CmiAssert(CpvAccess(procChkptBuf)->pe == diePe);
1112
1113     CpvAccess(procChkptBuf)->cur_restart_phase = CpvAccess(_curRestartPhase);
1114
1115     CkPackMessage(&env);
1116     CmiSetHandler(env, recoverProcDataHandlerIdx);
1117     CmiSyncSendAndFree(CpvAccess(procChkptBuf)->pe, env->getTotalsize(), (char *)env);
1118     CpvAccess(procChkptBuf) = NULL;
1119 #endif
1120 }
1121
1122 // called on PE 0
1123 void qd_callback(void *m)
1124 {
1125    CmiPrintf("[%d] callback after QD for crashed node: %d. \n", CkMyPe(), CpvAccess(_crashedNode));
1126    CkFreeMsg(m);
1127 #ifdef CMK_SMP
1128    for(int i=0;i<CmiMyNodeSize();i++){
1129    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1130    *(int *)(msg+CmiMsgHeaderSizeBytes) =CpvAccess(_crashedNode);
1131         CmiSetHandler(msg, askProcDataHandlerIdx);
1132         int pe = ChkptOnPe(CpvAccess(_crashedNode)*CmiMyNodeSize()+i);    // FIXME ?
1133         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1134    }
1135    return;
1136 #endif
1137    char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1138    *(int *)(msg+CmiMsgHeaderSizeBytes) = CpvAccess(_crashedNode);
1139    // cur_restart_phase = RESTART_PHASE_MAX;             // big enough to get it processed, moved to machine.c
1140    CmiSetHandler(msg, askProcDataHandlerIdx);
1141    int pe = ChkptOnPe(CpvAccess(_crashedNode));
1142    CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1143
1144 }
1145
1146 // on crashed node
1147 void CkMemRestart(const char *dummy, CkArgMsg *args)
1148 {
1149 #if CMK_MEM_CHECKPOINT
1150    _diePE = CmiMyNode();
1151    CkMemCheckPT::startTime = restartT = CmiWallTimer();
1152    CmiPrintf("[%d] I am restarting  cur_restart_phase:%d at time: %f\n",CmiMyPe(), CpvAccess(_curRestartPhase), CkMemCheckPT::startTime);
1153    CkMemCheckPT::inRestarting = 1;
1154
1155   CpvAccess( _crashedNode )= CmiMyNode();
1156         
1157   _discard_charm_message();
1158   
1159 if(CmiMyRank()==0){
1160    CkCallback cb(qd_callback);
1161    CkStartQD(cb);
1162    CkPrintf("crash_node:%d\n",CpvAccess( _crashedNode));
1163  }
1164 #else
1165    CmiAbort("Fault tolerance is not support, rebuild charm++ with 'syncft' option");
1166 #endif
1167 }
1168
1169 // can be called in other files
1170 // return true if it is in restarting
1171 extern "C"
1172 int CkInRestarting()
1173 {
1174 #if CMK_MEM_CHECKPOINT
1175   if (CpvAccess( _crashedNode)!=-1) return 1;
1176   // gzheng
1177   //if (cur_restart_phase == RESTART_PHASE_MAX || cur_restart_phase == 0) return 1;
1178   //return CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch()->inRestarting;
1179   return CkMemCheckPT::inRestarting;
1180 #else
1181   return 0;
1182 #endif
1183 }
1184
1185 /*****************************************************************************
1186                 module initialization
1187 *****************************************************************************/
1188
1189 static int arg_where = CkCheckPoint_inMEM;
1190
1191 #if CMK_MEM_CHECKPOINT
1192 void init_memcheckpt(char **argv)
1193 {
1194     if (CmiGetArgFlagDesc(argv, "+ftc_disk", "Double-disk Checkpointing")) {
1195       arg_where = CkCheckPoint_inDISK;
1196     }
1197
1198         // initiliazing _crashedNode variable
1199         CpvInitialize(int, _crashedNode);
1200         CpvAccess(_crashedNode) = -1;
1201
1202 }
1203 #endif
1204
1205 class CkMemCheckPTInit: public Chare {
1206 public:
1207   CkMemCheckPTInit(CkArgMsg *m) {
1208 #if CMK_MEM_CHECKPOINT
1209     if (arg_where == CkCheckPoint_inDISK) {
1210       CkPrintf("Charm++> Double-disk Checkpointing. \n");
1211     }
1212     ckCheckPTGroupID = CProxy_CkMemCheckPT::ckNew(arg_where);
1213     CkPrintf("Charm++> CkMemCheckPTInit mainchare is created!\n");
1214 #endif
1215   }
1216 };
1217
1218 static void notifyHandler(char *msg)
1219 {
1220 #if CMK_MEM_CHECKPOINT
1221   CmiFree(msg);
1222       /* immediately increase restart phase to filter old messages */
1223   CpvAccess(_curRestartPhase) ++;
1224   CpvAccess(_qd)->flushStates();
1225   _discard_charm_message();
1226 #endif
1227 }
1228
1229 extern "C"
1230 void notify_crash(int node)
1231 {
1232 #ifdef CMK_MEM_CHECKPOINT
1233   CpvAccess( _crashedNode) = node;
1234 #ifdef CMK_SMP
1235   for(int i=0;i<CkMyNodeSize();i++){
1236         CpvAccessOther(_crashedNode,i)=node;
1237   }
1238 #endif
1239   CmiAssert(CmiMyNode() !=CpvAccess( _crashedNode));
1240   CkMemCheckPT::inRestarting = 1;
1241
1242 /*
1243 #ifdef CMK_SMP
1244 */
1245   int pe = CmiNodeFirst(CkMyNode());
1246   for(int i=0;i<CkMyNodeSize();i++){
1247         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1248         CmiSetHandler(msg, notifyHandlerIdx);
1249         CmiSyncSendAndFree(pe+i, CmiMsgHeaderSizeBytes, (char *)msg);
1250   }
1251 /*
1252  return;
1253 #else 
1254     // this may be in interrupt handler, send a message to reset QD
1255   char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes);
1256   CmiSetHandler(msg, notifyHandlerIdx);
1257   CmiSyncSendAndFree(CkMyPe(), CmiMsgHeaderSizeBytes, (char *)msg);
1258 #endif
1259 */
1260 #endif
1261 }
1262
1263 extern "C" void (*notify_crash_fn)(int node);
1264
1265 #if CMK_CONVERSE_MPI
1266 static int pingHandlerIdx;
1267 static int pingBackHandlerIdx;
1268 static int pingCheckHandlerIdx;
1269 static int buddyDieHandlerIdx;
1270 static double lastPingTime = -1;
1271 static int died = 0;
1272
1273 extern void _initCharm(int argc, char **argv);
1274 extern "C" void mpi_restart_crashed(int pe, int rank);
1275 extern "C" int  find_spare_mpirank(int pe);
1276 extern void CkDeleteChares();
1277
1278 void pingCheckHandler();
1279
1280 #if 0
1281 extern "C" void mpi_restart(int argc, char **argv)
1282 {
1283    died = 1;
1284
1285    CkDeleteChares();
1286
1287    _initCharm(argc, argv);
1288    CsdScheduler(-1);
1289    ConverseExit();
1290 }
1291 #endif
1292
1293 void buddyDieHandler(char *msg)
1294 {
1295    // notify
1296    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
1297    notify_crash(diepe);
1298    // send message to crash pe to let it restart
1299    CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1300    int newrank = find_spare_mpirank(diepe);
1301    int buddy = obj->BuddyPE(CmiMyPe());
1302    if (buddy == diepe)  {
1303      mpi_restart_crashed(diepe, newrank);
1304      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1305    }
1306 }
1307
1308 void pingHandler(void *msg)
1309 {
1310   lastPingTime = CmiWallTimer();
1311   CmiFree(msg);
1312 }
1313
1314 void pingBackHandler(char *msg)
1315 {
1316   CmiSetHandler(msg, pingHandlerIdx);
1317   int from = *(int *)(msg+CmiMsgHeaderSizeBytes);
1318   CmiSyncSendAndFree(from, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1319 }
1320
1321 void pingCheckHandler()
1322 {
1323   double now = CmiWallTimer();
1324   if (lastPingTime > 0 && now - lastPingTime > 4) {
1325     // tell everyone the buddy dies
1326     CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1327     int buddy = obj->BuddyPE(CkMyPe());
1328     CmiPrintf("[%d] detected buddy processor %d died. \n", CmiMyPe(), buddy);
1329     for (int pe = 0; pe < CmiNumPes(); pe++) {
1330       if (obj->isFailed(pe) || pe == buddy) continue;
1331       char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1332       *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
1333       CmiSetHandler(msg, buddyDieHandlerIdx);
1334       CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1335     }
1336   }
1337   else 
1338     CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)pingCheckHandler,NULL);
1339 }
1340
1341 void pingBuddy()
1342 {
1343   CkMemCheckPT *obj = CProxy_CkMemCheckPT(ckCheckPTGroupID).ckLocalBranch();
1344   if (obj) {
1345     int buddy = obj->BuddyPE(CkMyPe());
1346 //printf("[%d] pingBuddy %d\n", CmiMyPe(), buddy);
1347     char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
1348     *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
1349     CmiSetHandler(msg, pingBackHandlerIdx);
1350     CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
1351     CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)pingBuddy,NULL);
1352   }
1353 }
1354 #endif
1355
1356 // initproc
1357 void CkRegisterRestartHandler( )
1358 {
1359 #if CMK_MEM_CHECKPOINT
1360   notifyHandlerIdx = CkRegisterHandler((CmiHandler)notifyHandler);
1361   askProcDataHandlerIdx = CkRegisterHandler((CmiHandler)askProcDataHandler);
1362   recoverProcDataHandlerIdx = CkRegisterHandler((CmiHandler)recoverProcDataHandler);
1363   restartBcastHandlerIdx = CkRegisterHandler((CmiHandler)restartBcastHandler);
1364   restartBeginHandlerIdx = CkRegisterHandler((CmiHandler)restartBeginHandler);
1365
1366 #if CMK_CONVERSE_MPI
1367   pingHandlerIdx = CkRegisterHandler((CmiHandler)pingHandler);
1368   pingBackHandlerIdx = CkRegisterHandler((CmiHandler)pingBackHandler);
1369   pingCheckHandlerIdx = CkRegisterHandler((CmiHandler)pingCheckHandler);
1370   buddyDieHandlerIdx = CkRegisterHandler((CmiHandler)buddyDieHandler);
1371
1372 #endif
1373
1374   CpvInitialize(CkProcCheckPTMessage *, procChkptBuf);
1375   CpvAccess(procChkptBuf) = NULL;
1376
1377   notify_crash_fn = notify_crash;
1378
1379 #if 1
1380   // for debugging
1381   CkPrintf("[%d] PID %d \n", CkMyPe(), getpid());
1382 //  sleep(4);
1383 #endif
1384 #endif
1385 }
1386
1387 /// @todo: the following definitions should be moved to a separate file containing
1388 // structures and functions about fault tolerance strategies
1389
1390 /**
1391  *  * @brief: function for killing a process                                             
1392  *   */
1393 #ifdef CMK_MEM_CHECKPOINT
1394 #if CMK_HAS_GETPID
1395 void killLocal(void *_dummy,double curWallTime){
1396         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());          
1397         if(CmiWallTimer()<killTime-1){
1398                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);        
1399         }else{  
1400                 kill(getpid(),SIGKILL);                                               
1401         }              
1402
1403 #else
1404 void killLocal(void *_dummy,double curWallTime){
1405   CmiAbort("kill() not supported!");
1406 }
1407 #endif
1408 #endif
1409
1410 #ifdef CMK_MEM_CHECKPOINT
1411 /**
1412  * @brief: reads the file with the kill information
1413  */
1414 void readKillFile(){
1415         FILE *fp=fopen(killFile,"r");
1416         if(!fp){
1417                 return;
1418         }
1419         int proc;
1420         double sec;
1421         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
1422                 if(proc == CkMyNode() && CkMyRank() == 0){
1423                         killTime = CmiWallTimer()+sec;
1424                         printf("[%d] To be killed after %.6lf s (MEMCKPT) \n",CkMyPe(),sec);
1425                         CcdCallFnAfter(killLocal,NULL,sec*1000);
1426                 }
1427         }
1428         fclose(fp);
1429 }
1430 #endif
1431
1432 #include "CkMemCheckpoint.def.h"
1433
1434